From befe4be5171392e7a92c87cece2501e68890228b Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 19 Mar 2019 10:55:12 +0000 Subject: [PATCH] add check for duplicate events --- .../real-time/app/coffee/EventLogger.coffee | 46 ++++++++++++++++ .../app/coffee/WebsocketLoadBalancer.coffee | 3 ++ .../test/unit/coffee/EventLoggerTests.coffee | 52 +++++++++++++++++++ .../coffee/WebsocketLoadBalancerTests.coffee | 1 + 4 files changed, 102 insertions(+) create mode 100644 services/real-time/app/coffee/EventLogger.coffee create mode 100644 services/real-time/test/unit/coffee/EventLoggerTests.coffee diff --git a/services/real-time/app/coffee/EventLogger.coffee b/services/real-time/app/coffee/EventLogger.coffee new file mode 100644 index 0000000000..773c9dfa9b --- /dev/null +++ b/services/real-time/app/coffee/EventLogger.coffee @@ -0,0 +1,46 @@ +logger = require 'logger-sharelatex' + +# keep track of message counters to detect duplicate and out of order events +# messsage ids have the format "UNIQUEHOSTKEY-COUNTER" + +EVENT_LOG_COUNTER = {} +EVENT_LOG_TIMESTAMP = {} +EVENT_COUNT = 0 + +module.exports = EventLogger = + + MAX_EVENTS_BEFORE_CLEAN: 100000 + MAX_STALE_TIME_IN_MS: 3600 * 1000 + + checkEventOrder: (message_id, message) -> + return if typeof(message_id) isnt 'string' + [key, count] = message_id.split("-", 2) + count = parseInt(count, 10) + if !count # ignore checks if counter is not present + return + # store the last count in a hash for each host + previous = EventLogger._storeEventCount(key, count) + if !previous? || count == (previous + 1) + return # order is ok + if (count == previous) + logger.error {key:key, previous: previous, count:count, message:message}, "duplicate event" + return "duplicate" + else + logger.error {key:key, previous: previous, count:count, message:message}, "events out of order" + return # out of order + + _storeEventCount: (key, count) -> + previous = EVENT_LOG_COUNTER[key] + now = Date.now() + EVENT_LOG_COUNTER[key] = count + EVENT_LOG_TIMESTAMP[key] = now + # periodically remove old counts + if (++EVENT_COUNT % EventLogger.MAX_EVENTS_BEFORE_CLEAN) == 0 + EventLogger._cleanEventStream(now) + return previous + + _cleanEventStream: (now) -> + for key, timestamp of EVENT_LOG_TIMESTAMP + if (now - timestamp) > EventLogger.MAX_STALE_TIME_IN_MS + delete EVENT_LOG_COUNTER[key] + delete EVENT_LOG_TIMESTAMP[key] \ No newline at end of file diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index c30a3d3e85..13137805fd 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -4,6 +4,7 @@ redis = require("redis-sharelatex") SafeJsonParse = require "./SafeJsonParse" rclientPub = redis.createClient(Settings.redis.realtime) rclientSub = redis.createClient(Settings.redis.realtime) +EventLogger = require "./EventLogger" module.exports = WebsocketLoadBalancer = rclientPub: rclientPub @@ -36,6 +37,8 @@ module.exports = WebsocketLoadBalancer = if message.room_id == "all" io.sockets.emit(message.message, message.payload...) else if message.room_id? + if message._id? + EventLogger.checkEventOrder(message._id, message) io.sockets.in(message.room_id).emit(message.message, message.payload...) else if message.health_check? logger.debug {message}, "got health check message in editor events channel" diff --git a/services/real-time/test/unit/coffee/EventLoggerTests.coffee b/services/real-time/test/unit/coffee/EventLoggerTests.coffee new file mode 100644 index 0000000000..ce955a8e7d --- /dev/null +++ b/services/real-time/test/unit/coffee/EventLoggerTests.coffee @@ -0,0 +1,52 @@ +require('chai').should() +expect = require("chai").expect +SandboxedModule = require('sandboxed-module') +modulePath = '../../../app/js/EventLogger' +sinon = require("sinon") +tk = require "timekeeper" + +describe 'EventLogger', -> + beforeEach -> + @start = Date.now() + tk.freeze(new Date(@start)) + @EventLogger = SandboxedModule.require modulePath, requires: + "logger-sharelatex": @logger = {error: sinon.stub()} + @id_1 = "abc-1" + @message_1 = "message-1" + @id_2 = "abc-2" + @message_2 = "message-2" + + afterEach -> + tk.reset() + + describe 'checkEventOrder', -> + + it 'should accept events in order', -> + @EventLogger.checkEventOrder(@id_1, @message_1) + status = @EventLogger.checkEventOrder(@id_2, @message_2) + expect(status).to.be.undefined + + it 'should return "duplicate" for the same event', -> + @EventLogger.checkEventOrder(@id_1, @message_1) + status = @EventLogger.checkEventOrder(@id_1, @message_1) + expect(status).to.equal "duplicate" + + it 'should log an error for out of order events', -> + @EventLogger.checkEventOrder(@id_1, @message_1) + @EventLogger.checkEventOrder(@id_2, @message_2) + status = @EventLogger.checkEventOrder(@id_1, @message_1) + expect(status).to.be.undefined + + it 'should flush old entries', -> + @EventLogger.MAX_EVENTS_BEFORE_CLEAN = 10 + @EventLogger.checkEventOrder(@id_1, @message_1) + for i in [1..8] + status = @EventLogger.checkEventOrder(@id_1, @message_1) + expect(status).to.equal "duplicate" + # the next event should flush the old entries aboce + @EventLogger.MAX_STALE_TIME_IN_MS=1000 + tk.freeze(new Date(@start + 5 * 1000)) + # because we flushed the entries this should not be a duplicate + @EventLogger.checkEventOrder('other-1', @message_2) + status = @EventLogger.checkEventOrder(@id_1, @message_1) + expect(status).to.be.undefined \ No newline at end of file diff --git a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee index 5cae81a31d..ad0c70832c 100644 --- a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee @@ -12,6 +12,7 @@ describe "WebsocketLoadBalancer", -> "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } "./SafeJsonParse": @SafeJsonParse = parse: (data, cb) => cb null, JSON.parse(data) + "./EventLogger": {checkEventOrder: sinon.stub()} @io = {} @WebsocketLoadBalancer.rclientPub = publish: sinon.stub() @WebsocketLoadBalancer.rclientSub =