add check for duplicate events

This commit is contained in:
Brian Gough 2019-03-19 10:55:12 +00:00
parent b9e3853a47
commit befe4be517
4 changed files with 102 additions and 0 deletions

View file

@ -0,0 +1,46 @@
logger = require 'logger-sharelatex'
# keep track of message counters to detect duplicate and out of order events
# messsage ids have the format "UNIQUEHOSTKEY-COUNTER"
EVENT_LOG_COUNTER = {}
EVENT_LOG_TIMESTAMP = {}
EVENT_COUNT = 0
module.exports = EventLogger =
MAX_EVENTS_BEFORE_CLEAN: 100000
MAX_STALE_TIME_IN_MS: 3600 * 1000
checkEventOrder: (message_id, message) ->
return if typeof(message_id) isnt 'string'
[key, count] = message_id.split("-", 2)
count = parseInt(count, 10)
if !count # ignore checks if counter is not present
return
# store the last count in a hash for each host
previous = EventLogger._storeEventCount(key, count)
if !previous? || count == (previous + 1)
return # order is ok
if (count == previous)
logger.error {key:key, previous: previous, count:count, message:message}, "duplicate event"
return "duplicate"
else
logger.error {key:key, previous: previous, count:count, message:message}, "events out of order"
return # out of order
_storeEventCount: (key, count) ->
previous = EVENT_LOG_COUNTER[key]
now = Date.now()
EVENT_LOG_COUNTER[key] = count
EVENT_LOG_TIMESTAMP[key] = now
# periodically remove old counts
if (++EVENT_COUNT % EventLogger.MAX_EVENTS_BEFORE_CLEAN) == 0
EventLogger._cleanEventStream(now)
return previous
_cleanEventStream: (now) ->
for key, timestamp of EVENT_LOG_TIMESTAMP
if (now - timestamp) > EventLogger.MAX_STALE_TIME_IN_MS
delete EVENT_LOG_COUNTER[key]
delete EVENT_LOG_TIMESTAMP[key]

View file

@ -4,6 +4,7 @@ redis = require("redis-sharelatex")
SafeJsonParse = require "./SafeJsonParse"
rclientPub = redis.createClient(Settings.redis.realtime)
rclientSub = redis.createClient(Settings.redis.realtime)
EventLogger = require "./EventLogger"
module.exports = WebsocketLoadBalancer =
rclientPub: rclientPub
@ -36,6 +37,8 @@ module.exports = WebsocketLoadBalancer =
if message.room_id == "all"
io.sockets.emit(message.message, message.payload...)
else if message.room_id?
if message._id?
EventLogger.checkEventOrder(message._id, message)
io.sockets.in(message.room_id).emit(message.message, message.payload...)
else if message.health_check?
logger.debug {message}, "got health check message in editor events channel"

View file

@ -0,0 +1,52 @@
require('chai').should()
expect = require("chai").expect
SandboxedModule = require('sandboxed-module')
modulePath = '../../../app/js/EventLogger'
sinon = require("sinon")
tk = require "timekeeper"
describe 'EventLogger', ->
beforeEach ->
@start = Date.now()
tk.freeze(new Date(@start))
@EventLogger = SandboxedModule.require modulePath, requires:
"logger-sharelatex": @logger = {error: sinon.stub()}
@id_1 = "abc-1"
@message_1 = "message-1"
@id_2 = "abc-2"
@message_2 = "message-2"
afterEach ->
tk.reset()
describe 'checkEventOrder', ->
it 'should accept events in order', ->
@EventLogger.checkEventOrder(@id_1, @message_1)
status = @EventLogger.checkEventOrder(@id_2, @message_2)
expect(status).to.be.undefined
it 'should return "duplicate" for the same event', ->
@EventLogger.checkEventOrder(@id_1, @message_1)
status = @EventLogger.checkEventOrder(@id_1, @message_1)
expect(status).to.equal "duplicate"
it 'should log an error for out of order events', ->
@EventLogger.checkEventOrder(@id_1, @message_1)
@EventLogger.checkEventOrder(@id_2, @message_2)
status = @EventLogger.checkEventOrder(@id_1, @message_1)
expect(status).to.be.undefined
it 'should flush old entries', ->
@EventLogger.MAX_EVENTS_BEFORE_CLEAN = 10
@EventLogger.checkEventOrder(@id_1, @message_1)
for i in [1..8]
status = @EventLogger.checkEventOrder(@id_1, @message_1)
expect(status).to.equal "duplicate"
# the next event should flush the old entries aboce
@EventLogger.MAX_STALE_TIME_IN_MS=1000
tk.freeze(new Date(@start + 5 * 1000))
# because we flushed the entries this should not be a duplicate
@EventLogger.checkEventOrder('other-1', @message_2)
status = @EventLogger.checkEventOrder(@id_1, @message_1)
expect(status).to.be.undefined

View file

@ -12,6 +12,7 @@ describe "WebsocketLoadBalancer", ->
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"./SafeJsonParse": @SafeJsonParse =
parse: (data, cb) => cb null, JSON.parse(data)
"./EventLogger": {checkEventOrder: sinon.stub()}
@io = {}
@WebsocketLoadBalancer.rclientPub = publish: sinon.stub()
@WebsocketLoadBalancer.rclientSub =