Merge pull request #41 from sharelatex/bg-handle-duplicate-events

handle duplicate events
This commit is contained in:
Brian Gough 2019-04-11 15:17:31 +01:00 committed by GitHub
commit 01a1923e1f
5 changed files with 33 additions and 5 deletions

View file

@ -53,6 +53,11 @@ app.get "/", (req, res, next) ->
app.get "/status", (req, res, next) ->
res.send "real-time-sharelatex is alive"
app.get "/debug/events", (req, res, next) ->
Settings.debugEvents = parseInt(req.query?.count,10) || 20
logger.log {count: Settings.debugEvents}, "starting debug mode"
res.send "debug mode will log next #{Settings.debugEvents} events"
rclient = require("redis-sharelatex").createClient(Settings.redis.realtime)
app.get "/health_check/redis", (req, res, next) ->
rclient.healthCheck (error) ->

View file

@ -15,6 +15,7 @@ module.exports = DocumentUpdaterController =
listenForUpdatesFromDocumentUpdater: (io) ->
rclient.subscribe "applied-ops"
rclient.on "message", (channel, message) ->
EventLogger.debugEvent(channel, message) if settings.debugEvents > 0
DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message)
_processMessageFromDocumentUpdater: (io, channel, message) ->
@ -24,7 +25,9 @@ module.exports = DocumentUpdaterController =
return
if message.op?
if message._id?
EventLogger.checkEventOrder("applied-ops", message._id, message)
status = EventLogger.checkEventOrder("applied-ops", message._id, message)
if status is 'duplicate'
return # skip duplicate events
DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op)
else if message.error?
DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message)
@ -33,6 +36,11 @@ module.exports = DocumentUpdaterController =
_applyUpdateFromDocumentUpdater: (io, doc_id, update) ->
clientList = io.sockets.clients(doc_id)
# avoid unnecessary work if no clients are connected
if clientList.length is 0
return
# send updates to clients
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, socketIoClients: (client.id for client in clientList), "distributing updates to clients"
seen = {}
# send messages only to unique clients (due to duplicate entries in io.sockets.clients)
for client in clientList when not seen[client.id]

View file

@ -1,5 +1,6 @@
logger = require 'logger-sharelatex'
metrics = require 'metrics-sharelatex'
settings = require 'settings-sharelatex'
# keep track of message counters to detect duplicate and out of order events
# messsage ids have the format "UNIQUEHOSTKEY-COUNTER"
@ -8,10 +9,18 @@ EVENT_LOG_COUNTER = {}
EVENT_LOG_TIMESTAMP = {}
EVENT_LAST_CLEAN_TIMESTAMP = 0
# counter for debug logs
COUNTER = 0
module.exports = EventLogger =
MAX_STALE_TIME_IN_MS: 3600 * 1000
debugEvent: (channel, message) ->
if settings.debugEvents > 0
logger.log {channel:channel, message:message, counter: COUNTER++}, "logging event"
settings.debugEvents--
checkEventOrder: (channel, message_id, message) ->
return if typeof(message_id) isnt 'string'
return if !(result = message_id.match(/^(.*)-(\d+)$/))
@ -26,11 +35,11 @@ module.exports = EventLogger =
return # order is ok
if (count == previous)
metrics.inc "event.#{channel}.duplicate"
# logger.error {key:key, previous: previous, count:count, message:message}, "duplicate event"
logger.warn {channel:channel, message_id:message_id}, "duplicate event"
return "duplicate"
else
metrics.inc "event.#{channel}.out-of-order"
# logger.error {key:key, previous: previous, count:count, message:message}, "events out of order"
logger.warn {channel:channel, message_id:message_id, key:key, previous: previous, count:count}, "out of order event"
return "out-of-order"
_storeEventCount: (key, count) ->

View file

@ -27,6 +27,7 @@ module.exports = WebsocketLoadBalancer =
listenForEditorEvents: (io) ->
@rclientSub.subscribe "editor-events"
@rclientSub.on "message", (channel, message) ->
EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0
WebsocketLoadBalancer._processEditorEvent io, channel, message
_processEditorEvent: (io, channel, message) ->
@ -38,9 +39,14 @@ module.exports = WebsocketLoadBalancer =
io.sockets.emit(message.message, message.payload...)
else if message.room_id?
if message._id?
EventLogger.checkEventOrder("editor-events", message._id, message)
status = EventLogger.checkEventOrder("editor-events", message._id, message)
if status is "duplicate"
return # skip duplicate events
# send messages only to unique clients (due to duplicate entries in io.sockets.clients)
clientList = io.sockets.clients(message.room_id)
# avoid unnecessary work if no clients are connected
return if clientList.length is 0
logger.log {channel:channel, message: message.message, room_id: message.room_id, message_id: message._id, socketIoClients: (client.id for client in clientList)}, "distributing event to clients"
seen = {}
for client in clientList when not seen[client.id]
seen[client.id] = true

View file

@ -10,7 +10,7 @@ describe 'EventLogger', ->
@start = Date.now()
tk.freeze(new Date(@start))
@EventLogger = SandboxedModule.require modulePath, requires:
"logger-sharelatex": @logger = {error: sinon.stub()}
"logger-sharelatex": @logger = {error: sinon.stub(), warn: sinon.stub()}
"metrics-sharelatex": @metrics = {inc: sinon.stub()}
@channel = "applied-ops"
@id_1 = "random-hostname:abc-1"