mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
Merge pull request #40 from sharelatex/bg-handle-duplicate-clients
handle duplicate entries in io.sockets.clients
This commit is contained in:
commit
9a32c2eebb
4 changed files with 28 additions and 9 deletions
|
@ -4,6 +4,7 @@ redis = require("redis-sharelatex")
|
||||||
rclient = redis.createClient(settings.redis.documentupdater)
|
rclient = redis.createClient(settings.redis.documentupdater)
|
||||||
SafeJsonParse = require "./SafeJsonParse"
|
SafeJsonParse = require "./SafeJsonParse"
|
||||||
EventLogger = require "./EventLogger"
|
EventLogger = require "./EventLogger"
|
||||||
|
metrics = require "metrics-sharelatex"
|
||||||
|
|
||||||
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
|
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
|
||||||
|
|
||||||
|
@ -31,13 +32,20 @@ module.exports = DocumentUpdaterController =
|
||||||
logger.debug {message}, "got health check message in applied ops channel"
|
logger.debug {message}, "got health check message in applied ops channel"
|
||||||
|
|
||||||
_applyUpdateFromDocumentUpdater: (io, doc_id, update) ->
|
_applyUpdateFromDocumentUpdater: (io, doc_id, update) ->
|
||||||
for client in io.sockets.clients(doc_id)
|
clientList = io.sockets.clients(doc_id)
|
||||||
|
seen = {}
|
||||||
|
# send messages only to unique clients (due to duplicate entries in io.sockets.clients)
|
||||||
|
for client in clientList when not seen[client.id]
|
||||||
|
seen[client.id] = true
|
||||||
if client.id == update.meta.source
|
if client.id == update.meta.source
|
||||||
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, "distributing update to sender"
|
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, "distributing update to sender"
|
||||||
client.emit "otUpdateApplied", v: update.v, doc: update.doc
|
client.emit "otUpdateApplied", v: update.v, doc: update.doc
|
||||||
else if !update.dup # Duplicate ops should just be sent back to sending client for acknowledgement
|
else if !update.dup # Duplicate ops should just be sent back to sending client for acknowledgement
|
||||||
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, client_id: client.id, "distributing update to collaborator"
|
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, client_id: client.id, "distributing update to collaborator"
|
||||||
client.emit "otUpdateApplied", update
|
client.emit "otUpdateApplied", update
|
||||||
|
if Object.keys(seen).length < clientList.length
|
||||||
|
metrics.inc "socket-io.duplicate-clients", 0.1
|
||||||
|
logger.log doc_id: doc_id, socketIoClients: (client.id for client in clientList), "discarded duplicate clients"
|
||||||
|
|
||||||
_processErrorFromDocumentUpdater: (io, doc_id, error, message) ->
|
_processErrorFromDocumentUpdater: (io, doc_id, error, message) ->
|
||||||
for client in io.sockets.clients(doc_id)
|
for client in io.sockets.clients(doc_id)
|
||||||
|
@ -46,5 +54,3 @@ module.exports = DocumentUpdaterController =
|
||||||
client.disconnect()
|
client.disconnect()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,12 @@ module.exports = WebsocketLoadBalancer =
|
||||||
else if message.room_id?
|
else if message.room_id?
|
||||||
if message._id?
|
if message._id?
|
||||||
EventLogger.checkEventOrder("editor-events", message._id, message)
|
EventLogger.checkEventOrder("editor-events", message._id, message)
|
||||||
io.sockets.in(message.room_id).emit(message.message, message.payload...)
|
# send messages only to unique clients (due to duplicate entries in io.sockets.clients)
|
||||||
|
clientList = io.sockets.clients(message.room_id)
|
||||||
|
seen = {}
|
||||||
|
for client in clientList when not seen[client.id]
|
||||||
|
seen[client.id] = true
|
||||||
|
client.emit(message.message, message.payload...)
|
||||||
else if message.health_check?
|
else if message.health_check?
|
||||||
logger.debug {message}, "got health check message in editor events channel"
|
logger.debug {message}, "got health check message in editor events channel"
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ describe "DocumentUpdaterController", ->
|
||||||
"./SafeJsonParse": @SafeJsonParse =
|
"./SafeJsonParse": @SafeJsonParse =
|
||||||
parse: (data, cb) => cb null, JSON.parse(data)
|
parse: (data, cb) => cb null, JSON.parse(data)
|
||||||
"./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()}
|
"./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()}
|
||||||
|
"metrics-sharelatex": @metrics = {inc: sinon.stub()}
|
||||||
|
|
||||||
describe "listenForUpdatesFromDocumentUpdater", ->
|
describe "listenForUpdatesFromDocumentUpdater", ->
|
||||||
beforeEach ->
|
beforeEach ->
|
||||||
|
@ -81,7 +82,7 @@ describe "DocumentUpdaterController", ->
|
||||||
v: @version = 42
|
v: @version = 42
|
||||||
doc: @doc_id
|
doc: @doc_id
|
||||||
@io.sockets =
|
@io.sockets =
|
||||||
clients: sinon.stub().returns([@sourceClient, @otherClients...])
|
clients: sinon.stub().returns([@sourceClient, @otherClients..., @sourceClient]) # include a duplicate client
|
||||||
|
|
||||||
describe "normally", ->
|
describe "normally", ->
|
||||||
beforeEach ->
|
beforeEach ->
|
||||||
|
@ -91,6 +92,7 @@ describe "DocumentUpdaterController", ->
|
||||||
@sourceClient.emit
|
@sourceClient.emit
|
||||||
.calledWith("otUpdateApplied", v: @version, doc: @doc_id)
|
.calledWith("otUpdateApplied", v: @version, doc: @doc_id)
|
||||||
.should.equal true
|
.should.equal true
|
||||||
|
@sourceClient.emit.calledOnce.should.equal true
|
||||||
|
|
||||||
it "should get the clients connected to the document", ->
|
it "should get the clients connected to the document", ->
|
||||||
@io.sockets.clients
|
@io.sockets.clients
|
||||||
|
|
|
@ -73,18 +73,24 @@ describe "WebsocketLoadBalancer", ->
|
||||||
describe "with a designated room", ->
|
describe "with a designated room", ->
|
||||||
beforeEach ->
|
beforeEach ->
|
||||||
@io.sockets =
|
@io.sockets =
|
||||||
in: sinon.stub().returns(emit: @emit = sinon.stub())
|
clients: sinon.stub().returns([
|
||||||
|
{id: 'client-id-1', emit: @emit1 = sinon.stub()}
|
||||||
|
{id: 'client-id-2', emit: @emit2 = sinon.stub()}
|
||||||
|
{id: 'client-id-1', emit: @emit3 = sinon.stub()} # duplicate client
|
||||||
|
])
|
||||||
data = JSON.stringify
|
data = JSON.stringify
|
||||||
room_id: @room_id
|
room_id: @room_id
|
||||||
message: @message
|
message: @message
|
||||||
payload: @payload
|
payload: @payload
|
||||||
@WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data)
|
@WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data)
|
||||||
|
|
||||||
it "should send the message to all clients in the room", ->
|
it "should send the message to all (unique) clients in the room", ->
|
||||||
@io.sockets.in
|
@io.sockets.clients
|
||||||
.calledWith(@room_id)
|
.calledWith(@room_id)
|
||||||
.should.equal true
|
.should.equal true
|
||||||
@emit.calledWith(@message, @payload...).should.equal true
|
@emit1.calledWith(@message, @payload...).should.equal true
|
||||||
|
@emit2.calledWith(@message, @payload...).should.equal true
|
||||||
|
@emit3.called.should.equal false # duplicate client should be ignored
|
||||||
|
|
||||||
describe "when emitting to all", ->
|
describe "when emitting to all", ->
|
||||||
beforeEach ->
|
beforeEach ->
|
||||||
|
|
Loading…
Reference in a new issue