support multple redis instances for pubsub

This commit is contained in:
Brian Gough 2019-06-29 12:28:54 +01:00
parent 6d4649c842
commit b5f9bc422b
4 changed files with 41 additions and 28 deletions

View file

@ -1,7 +1,6 @@
logger = require "logger-sharelatex"
settings = require 'settings-sharelatex'
redis = require("redis-sharelatex")
rclient = redis.createClient(settings.redis.pubsub)
SafeJsonParse = require "./SafeJsonParse"
EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager"
@ -12,13 +11,15 @@ MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
module.exports = DocumentUpdaterController =
# DocumentUpdaterController is responsible for updates that come via Redis
# Pub/Sub from the document updater.
rclientList: [redis.createClient(settings.redis.pubsub)]
listenForUpdatesFromDocumentUpdater: (io) ->
rclient.subscribe "applied-ops"
rclient.on "message", (channel, message) ->
metrics.inc "rclient", 0.001 # global event rate metric
EventLogger.debugEvent(channel, message) if settings.debugEvents > 0
DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message)
for rclient in @rclientList
rclient.subscribe "applied-ops"
rclient.on "message", (channel, message) ->
metrics.inc "rclient", 0.001 # global event rate metric
EventLogger.debugEvent(channel, message) if settings.debugEvents > 0
DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message)
_processMessageFromDocumentUpdater: (io, channel, message) ->
SafeJsonParse.parse message, (error, message) ->

View file

@ -2,14 +2,12 @@ Settings = require 'settings-sharelatex'
logger = require 'logger-sharelatex'
redis = require("redis-sharelatex")
SafeJsonParse = require "./SafeJsonParse"
rclientPub = redis.createClient(Settings.redis.pubsub)
rclientSub = redis.createClient(Settings.redis.pubsub)
EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager"
module.exports = WebsocketLoadBalancer =
rclientPub: rclientPub
rclientSub: rclientSub
rclientPubList: [redis.createClient(Settings.redis.pubsub)]
rclientSubList: [redis.createClient(Settings.redis.pubsub)]
emitToRoom: (room_id, message, payload...) ->
if !room_id?
@ -20,16 +18,18 @@ module.exports = WebsocketLoadBalancer =
message: message
payload: payload
logger.log {room_id, message, payload, length: data.length}, "emitting to room"
@rclientPub.publish "editor-events", data
for rclientPub in @rclientPubList
rclientPub.publish "editor-events", data
emitToAll: (message, payload...) ->
@emitToRoom "all", message, payload...
listenForEditorEvents: (io) ->
@rclientSub.subscribe "editor-events"
@rclientSub.on "message", (channel, message) ->
EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0
WebsocketLoadBalancer._processEditorEvent io, channel, message
for rclientSub in @rclientSubList
rclientSub.subscribe "editor-events"
rclientSub.on "message", (channel, message) ->
EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0
WebsocketLoadBalancer._processEditorEvent io, channel, message
_processEditorEvent: (io, channel, message) ->
SafeJsonParse.parse message, (error, message) ->

View file

@ -10,6 +10,7 @@ describe "DocumentUpdaterController", ->
@doc_id = "doc-id-123"
@callback = sinon.stub()
@io = { "mock": "socket.io" }
@rclient = []
@EditorUpdatesController = SandboxedModule.require modulePath, requires:
"logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() }
"settings-sharelatex": @settings =
@ -17,9 +18,11 @@ describe "DocumentUpdaterController", ->
documentupdater:
key_schema:
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
"redis-sharelatex" :
createClient: () =>
@rclient = {}
pubsub: null
"redis-sharelatex" : @redis =
createClient: (name) =>
@rclient.push(rclientStub = {name:name})
return rclientStub
"./SafeJsonParse": @SafeJsonParse =
parse: (data, cb) => cb null, JSON.parse(data)
"./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()}
@ -28,15 +31,23 @@ describe "DocumentUpdaterController", ->
describe "listenForUpdatesFromDocumentUpdater", ->
beforeEach ->
@rclient.subscribe = sinon.stub()
@rclient.on = sinon.stub()
@rclient.length = 0 # clear any existing clients
@EditorUpdatesController.rclientList = [@redis.createClient("first"), @redis.createClient("second")]
@rclient[0].subscribe = sinon.stub()
@rclient[0].on = sinon.stub()
@rclient[1].subscribe = sinon.stub()
@rclient[1].on = sinon.stub()
@EditorUpdatesController.listenForUpdatesFromDocumentUpdater()
it "should subscribe to the doc-updater stream", ->
@rclient.subscribe.calledWith("applied-ops").should.equal true
@rclient[0].subscribe.calledWith("applied-ops").should.equal true
it "should register a callback to handle updates", ->
@rclient.on.calledWith("message").should.equal true
@rclient[0].on.calledWith("message").should.equal true
it "should subscribe to any additional doc-updater stream", ->
@rclient[1].subscribe.calledWith("applied-ops").should.equal true
@rclient[1].on.calledWith("message").should.equal true
describe "_processMessageFromDocumentUpdater", ->
describe "with bad JSON", ->

View file

@ -15,11 +15,12 @@ describe "WebsocketLoadBalancer", ->
"./EventLogger": {checkEventOrder: sinon.stub()}
"./HealthCheckManager": {check: sinon.stub()}
@io = {}
@WebsocketLoadBalancer.rclientPub = publish: sinon.stub()
@WebsocketLoadBalancer.rclientSub =
@WebsocketLoadBalancer.rclientPubList = [{publish: sinon.stub()}]
@WebsocketLoadBalancer.rclientSubList = [{
subscribe: sinon.stub()
on: sinon.stub()
}]
@room_id = "room-id"
@message = "message-to-editor"
@payload = ["argument one", 42]
@ -29,7 +30,7 @@ describe "WebsocketLoadBalancer", ->
@WebsocketLoadBalancer.emitToRoom(@room_id, @message, @payload...)
it "should publish the message to redis", ->
@WebsocketLoadBalancer.rclientPub.publish
@WebsocketLoadBalancer.rclientPubList[0].publish
.calledWith("editor-events", JSON.stringify(
room_id: @room_id,
message: @message
@ -53,12 +54,12 @@ describe "WebsocketLoadBalancer", ->
@WebsocketLoadBalancer.listenForEditorEvents()
it "should subscribe to the editor-events channel", ->
@WebsocketLoadBalancer.rclientSub.subscribe
@WebsocketLoadBalancer.rclientSubList[0].subscribe
.calledWith("editor-events")
.should.equal true
it "should process the events with _processEditorEvent", ->
@WebsocketLoadBalancer.rclientSub.on
@WebsocketLoadBalancer.rclientSubList[0].on
.calledWith("message", sinon.match.func)
.should.equal true