Merge pull request #38 from sharelatex/bg-check-message-ids

add check for duplicate events
This commit is contained in:
Brian Gough 2019-03-21 15:57:22 +00:00 committed by GitHub
commit a4d4bf2bf7
5 changed files with 109 additions and 0 deletions

View file

@ -3,6 +3,7 @@ settings = require 'settings-sharelatex'
redis = require("redis-sharelatex")
rclient = redis.createClient(settings.redis.documentupdater)
SafeJsonParse = require "./SafeJsonParse"
EventLogger = require "./EventLogger"
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
@ -21,6 +22,8 @@ module.exports = DocumentUpdaterController =
logger.error {err: error, channel}, "error parsing JSON"
return
if message.op?
if message._id?
EventLogger.checkEventOrder("applied-ops", message._id, message)
DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op)
else if message.error?
DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message)

View file

@ -0,0 +1,50 @@
logger = require 'logger-sharelatex'
metrics = require 'metrics-sharelatex'
# keep track of message counters to detect duplicate and out of order events
# messsage ids have the format "UNIQUEHOSTKEY-COUNTER"
EVENT_LOG_COUNTER = {}
EVENT_LOG_TIMESTAMP = {}
EVENT_LAST_CLEAN_TIMESTAMP = 0
module.exports = EventLogger =
MAX_STALE_TIME_IN_MS: 3600 * 1000
checkEventOrder: (channel, message_id, message) ->
return if typeof(message_id) isnt 'string'
[key, count] = message_id.split("-", 2)
count = parseInt(count, 10)
if !(count >= 0)# ignore checks if counter is not present
return
# store the last count in a hash for each host
previous = EventLogger._storeEventCount(key, count)
if !previous? || count == (previous + 1)
metrics.inc "event.#{channel}.valid", 0.001
return # order is ok
if (count == previous)
metrics.inc "event.#{channel}.duplicate"
# logger.error {key:key, previous: previous, count:count, message:message}, "duplicate event"
return "duplicate"
else
metrics.inc "event.#{channel}.out-of-order"
# logger.error {key:key, previous: previous, count:count, message:message}, "events out of order"
return # out of order
_storeEventCount: (key, count) ->
previous = EVENT_LOG_COUNTER[key]
now = Date.now()
EVENT_LOG_COUNTER[key] = count
EVENT_LOG_TIMESTAMP[key] = now
# periodically remove old counts
if (now - EVENT_LAST_CLEAN_TIMESTAMP) > EventLogger.MAX_STALE_TIME_IN_MS
EventLogger._cleanEventStream(now)
EVENT_LAST_CLEAN_TIMESTAMP = now
return previous
_cleanEventStream: (now) ->
for key, timestamp of EVENT_LOG_TIMESTAMP
if (now - timestamp) > EventLogger.MAX_STALE_TIME_IN_MS
delete EVENT_LOG_COUNTER[key]
delete EVENT_LOG_TIMESTAMP[key]

View file

@ -4,6 +4,7 @@ redis = require("redis-sharelatex")
SafeJsonParse = require "./SafeJsonParse"
rclientPub = redis.createClient(Settings.redis.realtime)
rclientSub = redis.createClient(Settings.redis.realtime)
EventLogger = require "./EventLogger"
module.exports = WebsocketLoadBalancer =
rclientPub: rclientPub
@ -36,6 +37,8 @@ module.exports = WebsocketLoadBalancer =
if message.room_id == "all"
io.sockets.emit(message.message, message.payload...)
else if message.room_id?
if message._id?
EventLogger.checkEventOrder("editor-events", message._id, message)
io.sockets.in(message.room_id).emit(message.message, message.payload...)
else if message.health_check?
logger.debug {message}, "got health check message in editor events channel"

View file

@ -0,0 +1,52 @@
require('chai').should()
expect = require("chai").expect
SandboxedModule = require('sandboxed-module')
modulePath = '../../../app/js/EventLogger'
sinon = require("sinon")
tk = require "timekeeper"
describe 'EventLogger', ->
beforeEach ->
@start = Date.now()
tk.freeze(new Date(@start))
@EventLogger = SandboxedModule.require modulePath, requires:
"logger-sharelatex": @logger = {error: sinon.stub()}
@id_1 = "abc-1"
@message_1 = "message-1"
@id_2 = "abc-2"
@message_2 = "message-2"
afterEach ->
tk.reset()
describe 'checkEventOrder', ->
it 'should accept events in order', ->
@EventLogger.checkEventOrder(@id_1, @message_1)
status = @EventLogger.checkEventOrder(@id_2, @message_2)
expect(status).to.be.undefined
it 'should return "duplicate" for the same event', ->
@EventLogger.checkEventOrder(@id_1, @message_1)
status = @EventLogger.checkEventOrder(@id_1, @message_1)
expect(status).to.equal "duplicate"
it 'should log an error for out of order events', ->
@EventLogger.checkEventOrder(@id_1, @message_1)
@EventLogger.checkEventOrder(@id_2, @message_2)
status = @EventLogger.checkEventOrder(@id_1, @message_1)
expect(status).to.be.undefined
it 'should flush old entries', ->
@EventLogger.MAX_EVENTS_BEFORE_CLEAN = 10
@EventLogger.checkEventOrder(@id_1, @message_1)
for i in [1..8]
status = @EventLogger.checkEventOrder(@id_1, @message_1)
expect(status).to.equal "duplicate"
# the next event should flush the old entries aboce
@EventLogger.MAX_STALE_TIME_IN_MS=1000
tk.freeze(new Date(@start + 5 * 1000))
# because we flushed the entries this should not be a duplicate
@EventLogger.checkEventOrder('other-1', @message_2)
status = @EventLogger.checkEventOrder(@id_1, @message_1)
expect(status).to.be.undefined

View file

@ -12,6 +12,7 @@ describe "WebsocketLoadBalancer", ->
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"./SafeJsonParse": @SafeJsonParse =
parse: (data, cb) => cb null, JSON.parse(data)
"./EventLogger": {checkEventOrder: sinon.stub()}
@io = {}
@WebsocketLoadBalancer.rclientPub = publish: sinon.stub()
@WebsocketLoadBalancer.rclientSub =