diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index dcdd8d142c..a2bd3f3e77 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -4,6 +4,7 @@ redis = require("redis-sharelatex") rclient = redis.createClient(settings.redis.documentupdater) SafeJsonParse = require "./SafeJsonParse" EventLogger = require "./EventLogger" +metrics = require "metrics-sharelatex" MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb @@ -31,13 +32,20 @@ module.exports = DocumentUpdaterController = logger.debug {message}, "got health check message in applied ops channel" _applyUpdateFromDocumentUpdater: (io, doc_id, update) -> - for client in io.sockets.clients(doc_id) + clientList = io.sockets.clients(doc_id) + seen = {} + # send messages only to unique clients (due to duplicate entries in io.sockets.clients) + for client in clientList when not seen[client.id] + seen[client.id] = true if client.id == update.meta.source logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, "distributing update to sender" client.emit "otUpdateApplied", v: update.v, doc: update.doc else if !update.dup # Duplicate ops should just be sent back to sending client for acknowledgement logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, client_id: client.id, "distributing update to collaborator" client.emit "otUpdateApplied", update + if Object.keys(seen).length < clientList.length + metrics.inc "socket-io.duplicate-clients", 0.1 + logger.log doc_id: doc_id, socketIoClients: (client.id for client in clientList), "discarded duplicate clients" _processErrorFromDocumentUpdater: (io, doc_id, error, message) -> for client in io.sockets.clients(doc_id) @@ -46,5 +54,3 @@ module.exports = DocumentUpdaterController = client.disconnect() - - diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index eeedb25916..4095e36e68 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -39,7 +39,12 @@ module.exports = WebsocketLoadBalancer = else if message.room_id? if message._id? EventLogger.checkEventOrder("editor-events", message._id, message) - io.sockets.in(message.room_id).emit(message.message, message.payload...) + # send messages only to unique clients (due to duplicate entries in io.sockets.clients) + clientList = io.sockets.clients(message.room_id) + seen = {} + for client in clientList when not seen[client.id] + seen[client.id] = true + client.emit(message.message, message.payload...) else if message.health_check? logger.debug {message}, "got health check message in editor events channel" diff --git a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee index a2aee89cc0..c104646696 100644 --- a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee +++ b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee @@ -23,6 +23,7 @@ describe "DocumentUpdaterController", -> "./SafeJsonParse": @SafeJsonParse = parse: (data, cb) => cb null, JSON.parse(data) "./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()} + "metrics-sharelatex": @metrics = {inc: sinon.stub()} describe "listenForUpdatesFromDocumentUpdater", -> beforeEach -> @@ -81,7 +82,7 @@ describe "DocumentUpdaterController", -> v: @version = 42 doc: @doc_id @io.sockets = - clients: sinon.stub().returns([@sourceClient, @otherClients...]) + clients: sinon.stub().returns([@sourceClient, @otherClients..., @sourceClient]) # include a duplicate client describe "normally", -> beforeEach -> @@ -91,6 +92,7 @@ describe "DocumentUpdaterController", -> @sourceClient.emit .calledWith("otUpdateApplied", v: @version, doc: @doc_id) .should.equal true + @sourceClient.emit.calledOnce.should.equal true it "should get the clients connected to the document", -> @io.sockets.clients diff --git a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee index ad0c70832c..38c317e9fe 100644 --- a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee @@ -73,18 +73,24 @@ describe "WebsocketLoadBalancer", -> describe "with a designated room", -> beforeEach -> @io.sockets = - in: sinon.stub().returns(emit: @emit = sinon.stub()) + clients: sinon.stub().returns([ + {id: 'client-id-1', emit: @emit1 = sinon.stub()} + {id: 'client-id-2', emit: @emit2 = sinon.stub()} + {id: 'client-id-1', emit: @emit3 = sinon.stub()} # duplicate client + ]) data = JSON.stringify room_id: @room_id message: @message payload: @payload @WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data) - it "should send the message to all clients in the room", -> - @io.sockets.in + it "should send the message to all (unique) clients in the room", -> + @io.sockets.clients .calledWith(@room_id) .should.equal true - @emit.calledWith(@message, @payload...).should.equal true + @emit1.calledWith(@message, @payload...).should.equal true + @emit2.calledWith(@message, @payload...).should.equal true + @emit3.called.should.equal false # duplicate client should be ignored describe "when emitting to all", -> beforeEach ->