Merge pull request #58 from overleaf/bg-support-multiple-redis-pubsub

support multple redis instances for pubsub
This commit is contained in:
Brian Gough 2019-07-09 15:07:42 +01:00 committed by GitHub
commit a50c096eae
5 changed files with 69 additions and 32 deletions

View file

@ -1,7 +1,6 @@
logger = require "logger-sharelatex"
settings = require 'settings-sharelatex'
redis = require("redis-sharelatex")
rclient = redis.createClient(settings.redis.pubsub)
RedisClientManager = require "./RedisClientManager"
SafeJsonParse = require "./SafeJsonParse"
EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager"
@ -12,13 +11,19 @@ MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
module.exports = DocumentUpdaterController =
# DocumentUpdaterController is responsible for updates that come via Redis
# Pub/Sub from the document updater.
rclientList: RedisClientManager.createClientList(settings.redis.pubsub, settings.redis.unusedpubsub)
listenForUpdatesFromDocumentUpdater: (io) ->
rclient.subscribe "applied-ops"
rclient.on "message", (channel, message) ->
metrics.inc "rclient", 0.001 # global event rate metric
EventLogger.debugEvent(channel, message) if settings.debugEvents > 0
DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message)
logger.log {rclients: @rclientList.length}, "listening for applied-ops events"
for rclient, i in @rclientList
rclient.subscribe "applied-ops"
rclient.on "message", (channel, message) ->
metrics.inc "rclient", 0.001 # global event rate metric
EventLogger.debugEvent(channel, message) if settings.debugEvents > 0
DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message)
do (i) ->
rclient.on "message", () ->
metrics.inc "rclient-#{i}", 0.001 # per client event rate metric
_processMessageFromDocumentUpdater: (io, channel, message) ->
SafeJsonParse.parse message, (error, message) ->

View file

@ -0,0 +1,18 @@
redis = require("redis-sharelatex")
logger = require 'logger-sharelatex'
module.exports = RedisClientManager =
createClientList: (configs...) ->
# create a dynamic list of redis clients, excluding any configurations which are not defined
clientList = for x in configs when x?
redisType = if x.cluster?
"cluster"
else if x.sentinels?
"sentinel"
else if x.host?
"single"
else
"unknown"
logger.log {redis: redisType}, "creating redis client"
redis.createClient(x)
return clientList

View file

@ -1,15 +1,13 @@
Settings = require 'settings-sharelatex'
logger = require 'logger-sharelatex'
redis = require("redis-sharelatex")
RedisClientManager = require "./RedisClientManager"
SafeJsonParse = require "./SafeJsonParse"
rclientPub = redis.createClient(Settings.redis.pubsub)
rclientSub = redis.createClient(Settings.redis.pubsub)
EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager"
module.exports = WebsocketLoadBalancer =
rclientPub: rclientPub
rclientSub: rclientSub
rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub)
rclientSubList: RedisClientManager.createClientList(Settings.redis.pubsub, Settings.redis.unusedpubsub)
emitToRoom: (room_id, message, payload...) ->
if !room_id?
@ -20,16 +18,20 @@ module.exports = WebsocketLoadBalancer =
message: message
payload: payload
logger.log {room_id, message, payload, length: data.length}, "emitting to room"
@rclientPub.publish "editor-events", data
for rclientPub in @rclientPubList
rclientPub.publish "editor-events", data
emitToAll: (message, payload...) ->
@emitToRoom "all", message, payload...
listenForEditorEvents: (io) ->
@rclientSub.subscribe "editor-events"
@rclientSub.on "message", (channel, message) ->
EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0
WebsocketLoadBalancer._processEditorEvent io, channel, message
logger.log {rclients: @rclientPubList.length}, "publishing editor events"
logger.log {rclients: @rclientSubList.length}, "listening for editor events"
for rclientSub in @rclientSubList
rclientSub.subscribe "editor-events"
rclientSub.on "message", (channel, message) ->
EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0
WebsocketLoadBalancer._processEditorEvent io, channel, message
_processEditorEvent: (io, channel, message) ->
SafeJsonParse.parse message, (error, message) ->

View file

@ -10,6 +10,7 @@ describe "DocumentUpdaterController", ->
@doc_id = "doc-id-123"
@callback = sinon.stub()
@io = { "mock": "socket.io" }
@rclient = []
@EditorUpdatesController = SandboxedModule.require modulePath, requires:
"logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() }
"settings-sharelatex": @settings =
@ -17,9 +18,11 @@ describe "DocumentUpdaterController", ->
documentupdater:
key_schema:
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
"redis-sharelatex" :
createClient: () =>
@rclient = {}
pubsub: null
"redis-sharelatex" : @redis =
createClient: (name) =>
@rclient.push(rclientStub = {name:name})
return rclientStub
"./SafeJsonParse": @SafeJsonParse =
parse: (data, cb) => cb null, JSON.parse(data)
"./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()}
@ -28,15 +31,23 @@ describe "DocumentUpdaterController", ->
describe "listenForUpdatesFromDocumentUpdater", ->
beforeEach ->
@rclient.subscribe = sinon.stub()
@rclient.on = sinon.stub()
@rclient.length = 0 # clear any existing clients
@EditorUpdatesController.rclientList = [@redis.createClient("first"), @redis.createClient("second")]
@rclient[0].subscribe = sinon.stub()
@rclient[0].on = sinon.stub()
@rclient[1].subscribe = sinon.stub()
@rclient[1].on = sinon.stub()
@EditorUpdatesController.listenForUpdatesFromDocumentUpdater()
it "should subscribe to the doc-updater stream", ->
@rclient.subscribe.calledWith("applied-ops").should.equal true
@rclient[0].subscribe.calledWith("applied-ops").should.equal true
it "should register a callback to handle updates", ->
@rclient.on.calledWith("message").should.equal true
@rclient[0].on.calledWith("message").should.equal true
it "should subscribe to any additional doc-updater stream", ->
@rclient[1].subscribe.calledWith("applied-ops").should.equal true
@rclient[1].on.calledWith("message").should.equal true
describe "_processMessageFromDocumentUpdater", ->
describe "with bad JSON", ->

View file

@ -7,19 +7,20 @@ describe "WebsocketLoadBalancer", ->
beforeEach ->
@rclient = {}
@WebsocketLoadBalancer = SandboxedModule.require modulePath, requires:
"redis-sharelatex":
createClient: () => @rclient
"./RedisClientManager":
createClientList: () => []
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"./SafeJsonParse": @SafeJsonParse =
parse: (data, cb) => cb null, JSON.parse(data)
"./EventLogger": {checkEventOrder: sinon.stub()}
"./HealthCheckManager": {check: sinon.stub()}
@io = {}
@WebsocketLoadBalancer.rclientPub = publish: sinon.stub()
@WebsocketLoadBalancer.rclientSub =
@WebsocketLoadBalancer.rclientPubList = [{publish: sinon.stub()}]
@WebsocketLoadBalancer.rclientSubList = [{
subscribe: sinon.stub()
on: sinon.stub()
}]
@room_id = "room-id"
@message = "message-to-editor"
@payload = ["argument one", 42]
@ -29,7 +30,7 @@ describe "WebsocketLoadBalancer", ->
@WebsocketLoadBalancer.emitToRoom(@room_id, @message, @payload...)
it "should publish the message to redis", ->
@WebsocketLoadBalancer.rclientPub.publish
@WebsocketLoadBalancer.rclientPubList[0].publish
.calledWith("editor-events", JSON.stringify(
room_id: @room_id,
message: @message
@ -53,12 +54,12 @@ describe "WebsocketLoadBalancer", ->
@WebsocketLoadBalancer.listenForEditorEvents()
it "should subscribe to the editor-events channel", ->
@WebsocketLoadBalancer.rclientSub.subscribe
@WebsocketLoadBalancer.rclientSubList[0].subscribe
.calledWith("editor-events")
.should.equal true
it "should process the events with _processEditorEvent", ->
@WebsocketLoadBalancer.rclientSub.on
@WebsocketLoadBalancer.rclientSubList[0].on
.calledWith("message", sinon.match.func)
.should.equal true