From 159b39c4915ec1821b39528e363014e30c8fba12 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 23 Jul 2019 17:02:09 +0100 Subject: [PATCH] ensure redis channel is subscribed when joining room --- .../app/coffee/ChannelManager.coffee | 13 ++- .../coffee/DocumentUpdaterController.coffee | 16 ++-- .../real-time/app/coffee/RoomManager.coffee | 28 +++--- .../app/coffee/WebsocketController.coffee | 14 +-- .../app/coffee/WebsocketLoadBalancer.coffee | 15 ++-- .../test/unit/coffee/RoomManagerTests.coffee | 88 ++++++++++++------- .../coffee/WebsocketControllerTests.coffee | 4 +- 7 files changed, 109 insertions(+), 69 deletions(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 0efef6ce96..900fd764f5 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -2,7 +2,7 @@ logger = require 'logger-sharelatex' metrics = require "metrics-sharelatex" settings = require "settings-sharelatex" -ClientMap = new Map() # for each redis client, stores a Set of subscribed channels +ClientMap = new Map() # for each redis client, stores a Map of subscribed channels (channelname -> subscribe promise) # Manage redis pubsub subscriptions for individual projects and docs, ensuring # that we never subscribe to a channel multiple times. The socket.io side is @@ -12,18 +12,23 @@ module.exports = ChannelManager = getClientMapEntry: (rclient) -> # return the rclient channel set if it exists, otherwise create and # return an empty set for the client. - ClientMap.get(rclient) || ClientMap.set(rclient, new Set()).get(rclient) + ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient) subscribe: (rclient, baseChannel, id) -> existingChannelSet = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" if existingChannelSet.has(channel) logger.error {channel}, "already subscribed - shouldn't happen" + # return the subscribe promise, so we can wait for it to resolve + return existingChannelSet.get(channel) else - rclient.subscribe channel # completes in the background - existingChannelSet.add(channel) + # get the subscribe promise and return it, the actual subscribe + # completes in the background + subscribePromise = rclient.subscribe channel + existingChannelSet.set(channel, subscribePromise) logger.log {channel}, "subscribed to new channel" metrics.inc "subscribe.#{baseChannel}" + return subscribePromise unsubscribe: (rclient, baseChannel, id) -> existingChannelSet = @getClientMapEntry(rclient) diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index 0cc5751d7c..e2d27fc343 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -7,6 +7,7 @@ HealthCheckManager = require "./HealthCheckManager" RoomManager = require "./RoomManager" ChannelManager = require "./ChannelManager" metrics = require "metrics-sharelatex" +util = require "util" MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb @@ -29,15 +30,20 @@ module.exports = DocumentUpdaterController = do (i) -> rclient.on "message", () -> metrics.inc "rclient-#{i}", 0.001 # per client event rate metric - for rclient in @rclientList - @handleRoomUpdates(rclient) + @handleRoomUpdates(@rclientList) - handleRoomUpdates: (rclientSub) -> + handleRoomUpdates: (rclientSubList) -> roomEvents = RoomManager.eventSource() roomEvents.on 'doc-active', (doc_id) -> - ChannelManager.subscribe rclientSub, "applied-ops", doc_id + subscribePromises = for rclient in rclientSubList + ChannelManager.subscribe rclient, "applied-ops", doc_id + subscribeResult = Promise.all(subscribePromises) + emitResult = (err) => this.emit("doc-subscribed-#{doc_id}", err) + subscribeResult.then () -> emitResult() + subscribeResult.catch (err) -> emitResult(err) roomEvents.on 'doc-empty', (doc_id) -> - ChannelManager.unsubscribe rclientSub, "applied-ops", doc_id + for rclient in rclientSubList + ChannelManager.unsubscribe rclient, "applied-ops", doc_id _processMessageFromDocumentUpdater: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee index 225dd37f6d..08e65b5e52 100644 --- a/services/real-time/app/coffee/RoomManager.coffee +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -16,11 +16,11 @@ RoomEvents = new EventEmitter() module.exports = RoomManager = - joinProject: (client, project_id) -> - @joinEntity client, "project", project_id + joinProject: (client, project_id, callback = () ->) -> + @joinEntity client, "project", project_id, callback - joinDoc: (client, doc_id) -> - @joinEntity client, "doc", doc_id + joinDoc: (client, doc_id, callback = () ->) -> + @joinEntity client, "doc", doc_id, callback leaveDoc: (client, doc_id) -> @leaveEntity client, "doc", doc_id @@ -38,27 +38,31 @@ module.exports = RoomManager = eventSource: () -> return RoomEvents - joinEntity: (client, entity, id) -> + joinEntity: (client, entity, id, callback) -> beforeCount = @_clientsInRoom(client, id) - client.join id - afterCount = @_clientsInRoom(client, id) - logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client joined room" # is this a new room? if so, subscribe - if beforeCount == 0 and afterCount == 1 + if beforeCount == 0 logger.log {entity, id}, "room is now active" + RoomEvents.once "#{entity}-subscribed-#{id}", (err) -> + logger.log {client: client.id, entity, id, beforeCount}, "client joined room after subscribing channel" + client.join id + callback(err) RoomEvents.emit "#{entity}-active", id IdMap.set(id, entity) + else + logger.log {client: client.id, entity, id, beforeCount}, "client joined existing room" + client.join id + callback() leaveEntity: (client, entity, id) -> - beforeCount = @_clientsInRoom(client, id) client.leave id afterCount = @_clientsInRoom(client, id) - logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client left room" + logger.log {client: client.id, entity, id, afterCount}, "client left room" # is the room now empty? if so, unsubscribe if !entity? logger.error {entity: id}, "unknown entity when leaving with id" return - if beforeCount == 1 and afterCount == 0 + if afterCount == 0 logger.log {entity, id}, "room is now empty" RoomEvents.emit "#{entity}-empty", id IdMap.delete(id) diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index 6d8965883f..22ea7e7a0c 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -25,8 +25,7 @@ module.exports = WebsocketController = err = new Error("not authorized") logger.warn {err, project_id, user_id, client_id: client.id}, "user is not authorized to join project" return callback(err) - - RoomManager.joinProject(client, project_id) + client.set("privilege_level", privilegeLevel) client.set("user_id", user_id) @@ -39,8 +38,9 @@ module.exports = WebsocketController = client.set("signup_date", user?.signUpDate) client.set("login_count", user?.loginCount) - callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION - logger.log {user_id, project_id, client_id: client.id}, "user joined project" + RoomManager.joinProject client, project_id, (err) -> + logger.log {user_id, project_id, client_id: client.id}, "user joined project" + callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION # No need to block for setting the user as connected in the cursor tracking ConnectedUsersManager.updateUserPosition project_id, client.id, user, null, () -> @@ -118,9 +118,9 @@ module.exports = WebsocketController = return callback(err) AuthorizationManager.addAccessToDoc client, doc_id - RoomManager.joinDoc(client, doc_id) - callback null, escapedLines, version, ops, ranges - logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc" + RoomManager.joinDoc client, doc_id, (err) -> + logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc" + callback null, escapedLines, version, ops, ranges leaveDoc: (client, doc_id, callback = (error) ->) -> metrics.inc "editor.leave-doc" diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index 1bb74c6a3e..e8ff88aa7b 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -35,15 +35,20 @@ module.exports = WebsocketLoadBalancer = rclientSub.on "message", (channel, message) -> EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0 WebsocketLoadBalancer._processEditorEvent io, channel, message - for rclientSub in @rclientSubList - @handleRoomUpdates(rclientSub) + @handleRoomUpdates(@rclientSubList) - handleRoomUpdates: (rclientSub) -> + handleRoomUpdates: (rclientSubList) -> roomEvents = RoomManager.eventSource() roomEvents.on 'project-active', (project_id) -> - ChannelManager.subscribe rclientSub, "editor-events", project_id + subscribePromises = for rclient in rclientSubList + ChannelManager.subscribe rclient, "editor-events", project_id + subscribeResult = Promise.all(subscribePromises) + emitResult = (err) => this.emit("project-subscribed-#{project_id}", err) + subscribeResult.then () -> emitResult() + subscribeResult.catch (err) -> emitResult(err) roomEvents.on 'project-empty', (project_id) -> - ChannelManager.unsubscribe rclientSub, "editor-events", project_id + for rclient in rclientSubList + ChannelManager.unsubscribe rclient, "editor-events", project_id _processEditorEvent: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> diff --git a/services/real-time/test/unit/coffee/RoomManagerTests.coffee b/services/real-time/test/unit/coffee/RoomManagerTests.coffee index 2f78b33c52..f9fde4dd83 100644 --- a/services/real-time/test/unit/coffee/RoomManagerTests.coffee +++ b/services/real-time/test/unit/coffee/RoomManagerTests.coffee @@ -15,26 +15,36 @@ describe 'RoomManager', -> "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } @RoomManager._clientsInRoom = sinon.stub() @RoomEvents = @RoomManager.eventSource() - sinon.spy(@RoomEvents, 'emit') + sinon.spy(@RoomEvents, 'emit') + sinon.spy(@RoomEvents, 'once') describe "joinProject", -> describe "when the project room is empty", -> - beforeEach -> + beforeEach (done) -> @RoomManager._clientsInRoom .withArgs(@client, @project_id) .onFirstCall().returns(0) - .onSecondCall().returns(1) @client.join = sinon.stub() - @RoomManager.joinProject @client, @project_id - - it "should join the room using the id", -> - @client.join.calledWithExactly(@project_id).should.equal true + @callback = sinon.stub() + @RoomEvents.on 'project-active', (id) => + setTimeout () => + @RoomEvents.emit "project-subscribed-#{id}" + , 100 + @RoomManager.joinProject @client, @project_id, (err) => + @callback(err) + done() it "should emit a 'project-active' event with the id", -> @RoomEvents.emit.calledWithExactly('project-active', @project_id).should.equal true + it "should listen for the 'project-subscribed-id' event", -> + @RoomEvents.once.calledWith("project-subscribed-#{@project_id}").should.equal true + + it "should join the room using the id", -> + @client.join.calledWithExactly(@project_id).should.equal true + describe "when there are other clients in the project room", -> beforeEach -> @@ -56,20 +66,29 @@ describe 'RoomManager', -> describe "when the doc room is empty", -> - beforeEach -> + beforeEach (done) -> @RoomManager._clientsInRoom .withArgs(@client, @doc_id) .onFirstCall().returns(0) - .onSecondCall().returns(1) @client.join = sinon.stub() - @RoomManager.joinDoc @client, @doc_id - - it "should join the room using the id", -> - @client.join.calledWithExactly(@doc_id).should.equal true + @callback = sinon.stub() + @RoomEvents.on 'doc-active', (id) => + setTimeout () => + @RoomEvents.emit "doc-subscribed-#{id}" + , 100 + @RoomManager.joinDoc @client, @doc_id, (err) => + @callback(err) + done() it "should emit a 'doc-active' event with the id", -> @RoomEvents.emit.calledWithExactly('doc-active', @doc_id).should.equal true + it "should listen for the 'doc-subscribed-id' event", -> + @RoomEvents.once.calledWith("doc-subscribed-#{@doc_id}").should.equal true + + it "should join the room using the id", -> + @client.join.calledWithExactly(@doc_id).should.equal true + describe "when there are other clients in the doc room", -> beforeEach -> @@ -94,8 +113,7 @@ describe 'RoomManager', -> beforeEach -> @RoomManager._clientsInRoom .withArgs(@client, @doc_id) - .onFirstCall().returns(1) - .onSecondCall().returns(0) + .onCall(0).returns(0) @client.leave = sinon.stub() @RoomManager.leaveDoc @client, @doc_id @@ -111,8 +129,7 @@ describe 'RoomManager', -> beforeEach -> @RoomManager._clientsInRoom .withArgs(@client, @doc_id) - .onFirstCall().returns(123) - .onSecondCall().returns(122) + .onCall(0).returns(123) @client.leave = sinon.stub() @RoomManager.leaveDoc @client, @doc_id @@ -134,33 +151,36 @@ describe 'RoomManager', -> describe "when this is the only client connected", -> - beforeEach -> - # first and secondc calls are for the join, - # calls 2 and 3 are for the leave + beforeEach (done) -> + # first call is for the join, + # second for the leave @RoomManager._clientsInRoom .withArgs(@client, @doc_id) .onCall(0).returns(0) - .onSecondCall().returns(1) - .onCall(2).returns(1) - .onCall(3).returns(0) + .onCall(1).returns(0) @RoomManager._clientsInRoom .withArgs(@client, @other_doc_id) .onCall(0).returns(0) - .onCall(1).returns(1) - .onCall(2).returns(1) - .onCall(3).returns(0) + .onCall(1).returns(0) @RoomManager._clientsInRoom .withArgs(@client, @project_id) .onCall(0).returns(0) - .onCall(1).returns(1) - .onCall(2).returns(1) - .onCall(3).returns(0) + .onCall(1).returns(0) + @RoomEvents.on 'project-active', (id) => + setTimeout () => + @RoomEvents.emit "project-subscribed-#{id}" + , 100 + @RoomEvents.on 'doc-active', (id) => + setTimeout () => + @RoomEvents.emit "doc-subscribed-#{id}" + , 100 # put the client in the rooms - @RoomManager.joinProject(@client, @project_id) - @RoomManager.joinDoc(@client, @doc_id) - @RoomManager.joinDoc(@client, @other_doc_id) - # now leave the project - @RoomManager.leaveProjectAndDocs @client + @RoomManager.joinProject @client, @project_id, () => + @RoomManager.joinDoc @client, @doc_id, () => + @RoomManager.joinDoc @client, @other_doc_id, () => + # now leave the project + @RoomManager.leaveProjectAndDocs @client + done() it "should leave all the docs", -> @client.leave.calledWithExactly(@doc_id).should.equal true diff --git a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee index ab442006c2..d0dad108e7 100644 --- a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee @@ -54,7 +54,7 @@ describe 'WebsocketController', -> @privilegeLevel = "owner" @ConnectedUsersManager.updateUserPosition = sinon.stub().callsArg(4) @WebApiManager.joinProject = sinon.stub().callsArgWith(2, null, @project, @privilegeLevel) - @RoomManager.joinProject = sinon.stub() + @RoomManager.joinProject = sinon.stub().callsArg(2) @WebsocketController.joinProject @client, @user, @project_id, @callback it "should load the project from web", -> @@ -237,7 +237,7 @@ describe 'WebsocketController', -> @AuthorizationManager.addAccessToDoc = sinon.stub() @AuthorizationManager.assertClientCanViewProject = sinon.stub().callsArgWith(1, null) @DocumentUpdaterManager.getDocument = sinon.stub().callsArgWith(3, null, @doc_lines, @version, @ranges, @ops) - @RoomManager.joinDoc = sinon.stub() + @RoomManager.joinDoc = sinon.stub().callsArg(2) describe "works", -> beforeEach ->