From b5f9bc422bd9a5aab14a21347d5da2c309d383f6 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Sat, 29 Jun 2019 12:28:54 +0100 Subject: [PATCH] support multple redis instances for pubsub --- .../coffee/DocumentUpdaterController.coffee | 13 +++++----- .../app/coffee/WebsocketLoadBalancer.coffee | 18 ++++++------- .../DocumentUpdaterControllerTests.coffee | 25 +++++++++++++------ .../coffee/WebsocketLoadBalancerTests.coffee | 13 +++++----- 4 files changed, 41 insertions(+), 28 deletions(-) diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index 490902a837..41e3bb571a 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -1,7 +1,6 @@ logger = require "logger-sharelatex" settings = require 'settings-sharelatex' redis = require("redis-sharelatex") -rclient = redis.createClient(settings.redis.pubsub) SafeJsonParse = require "./SafeJsonParse" EventLogger = require "./EventLogger" HealthCheckManager = require "./HealthCheckManager" @@ -12,13 +11,15 @@ MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb module.exports = DocumentUpdaterController = # DocumentUpdaterController is responsible for updates that come via Redis # Pub/Sub from the document updater. + rclientList: [redis.createClient(settings.redis.pubsub)] listenForUpdatesFromDocumentUpdater: (io) -> - rclient.subscribe "applied-ops" - rclient.on "message", (channel, message) -> - metrics.inc "rclient", 0.001 # global event rate metric - EventLogger.debugEvent(channel, message) if settings.debugEvents > 0 - DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message) + for rclient in @rclientList + rclient.subscribe "applied-ops" + rclient.on "message", (channel, message) -> + metrics.inc "rclient", 0.001 # global event rate metric + EventLogger.debugEvent(channel, message) if settings.debugEvents > 0 + DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message) _processMessageFromDocumentUpdater: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index e4ed673f8a..5c91cd9fa9 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -2,14 +2,12 @@ Settings = require 'settings-sharelatex' logger = require 'logger-sharelatex' redis = require("redis-sharelatex") SafeJsonParse = require "./SafeJsonParse" -rclientPub = redis.createClient(Settings.redis.pubsub) -rclientSub = redis.createClient(Settings.redis.pubsub) EventLogger = require "./EventLogger" HealthCheckManager = require "./HealthCheckManager" module.exports = WebsocketLoadBalancer = - rclientPub: rclientPub - rclientSub: rclientSub + rclientPubList: [redis.createClient(Settings.redis.pubsub)] + rclientSubList: [redis.createClient(Settings.redis.pubsub)] emitToRoom: (room_id, message, payload...) -> if !room_id? @@ -20,16 +18,18 @@ module.exports = WebsocketLoadBalancer = message: message payload: payload logger.log {room_id, message, payload, length: data.length}, "emitting to room" - @rclientPub.publish "editor-events", data + for rclientPub in @rclientPubList + rclientPub.publish "editor-events", data emitToAll: (message, payload...) -> @emitToRoom "all", message, payload... listenForEditorEvents: (io) -> - @rclientSub.subscribe "editor-events" - @rclientSub.on "message", (channel, message) -> - EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0 - WebsocketLoadBalancer._processEditorEvent io, channel, message + for rclientSub in @rclientSubList + rclientSub.subscribe "editor-events" + rclientSub.on "message", (channel, message) -> + EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0 + WebsocketLoadBalancer._processEditorEvent io, channel, message _processEditorEvent: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> diff --git a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee index 5ed274b36c..24551396d9 100644 --- a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee +++ b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee @@ -10,6 +10,7 @@ describe "DocumentUpdaterController", -> @doc_id = "doc-id-123" @callback = sinon.stub() @io = { "mock": "socket.io" } + @rclient = [] @EditorUpdatesController = SandboxedModule.require modulePath, requires: "logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() } "settings-sharelatex": @settings = @@ -17,9 +18,11 @@ describe "DocumentUpdaterController", -> documentupdater: key_schema: pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" - "redis-sharelatex" : - createClient: () => - @rclient = {} + pubsub: null + "redis-sharelatex" : @redis = + createClient: (name) => + @rclient.push(rclientStub = {name:name}) + return rclientStub "./SafeJsonParse": @SafeJsonParse = parse: (data, cb) => cb null, JSON.parse(data) "./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()} @@ -28,15 +31,23 @@ describe "DocumentUpdaterController", -> describe "listenForUpdatesFromDocumentUpdater", -> beforeEach -> - @rclient.subscribe = sinon.stub() - @rclient.on = sinon.stub() + @rclient.length = 0 # clear any existing clients + @EditorUpdatesController.rclientList = [@redis.createClient("first"), @redis.createClient("second")] + @rclient[0].subscribe = sinon.stub() + @rclient[0].on = sinon.stub() + @rclient[1].subscribe = sinon.stub() + @rclient[1].on = sinon.stub() @EditorUpdatesController.listenForUpdatesFromDocumentUpdater() it "should subscribe to the doc-updater stream", -> - @rclient.subscribe.calledWith("applied-ops").should.equal true + @rclient[0].subscribe.calledWith("applied-ops").should.equal true it "should register a callback to handle updates", -> - @rclient.on.calledWith("message").should.equal true + @rclient[0].on.calledWith("message").should.equal true + + it "should subscribe to any additional doc-updater stream", -> + @rclient[1].subscribe.calledWith("applied-ops").should.equal true + @rclient[1].on.calledWith("message").should.equal true describe "_processMessageFromDocumentUpdater", -> describe "with bad JSON", -> diff --git a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee index e786038ab7..132fdc9d5f 100644 --- a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee @@ -15,11 +15,12 @@ describe "WebsocketLoadBalancer", -> "./EventLogger": {checkEventOrder: sinon.stub()} "./HealthCheckManager": {check: sinon.stub()} @io = {} - @WebsocketLoadBalancer.rclientPub = publish: sinon.stub() - @WebsocketLoadBalancer.rclientSub = + @WebsocketLoadBalancer.rclientPubList = [{publish: sinon.stub()}] + @WebsocketLoadBalancer.rclientSubList = [{ subscribe: sinon.stub() on: sinon.stub() - + }] + @room_id = "room-id" @message = "message-to-editor" @payload = ["argument one", 42] @@ -29,7 +30,7 @@ describe "WebsocketLoadBalancer", -> @WebsocketLoadBalancer.emitToRoom(@room_id, @message, @payload...) it "should publish the message to redis", -> - @WebsocketLoadBalancer.rclientPub.publish + @WebsocketLoadBalancer.rclientPubList[0].publish .calledWith("editor-events", JSON.stringify( room_id: @room_id, message: @message @@ -53,12 +54,12 @@ describe "WebsocketLoadBalancer", -> @WebsocketLoadBalancer.listenForEditorEvents() it "should subscribe to the editor-events channel", -> - @WebsocketLoadBalancer.rclientSub.subscribe + @WebsocketLoadBalancer.rclientSubList[0].subscribe .calledWith("editor-events") .should.equal true it "should process the events with _processEditorEvent", -> - @WebsocketLoadBalancer.rclientSub.on + @WebsocketLoadBalancer.rclientSubList[0].on .calledWith("message", sinon.match.func) .should.equal true