ensure redis channel is subscribed when joining room

This commit is contained in:
Brian Gough 2019-07-23 17:02:09 +01:00
parent 84e6ff616f
commit 159b39c491
7 changed files with 109 additions and 69 deletions

View file

@ -2,7 +2,7 @@ logger = require 'logger-sharelatex'
metrics = require "metrics-sharelatex"
settings = require "settings-sharelatex"
ClientMap = new Map() # for each redis client, stores a Set of subscribed channels
ClientMap = new Map() # for each redis client, stores a Map of subscribed channels (channelname -> subscribe promise)
# Manage redis pubsub subscriptions for individual projects and docs, ensuring
# that we never subscribe to a channel multiple times. The socket.io side is
@ -12,18 +12,23 @@ module.exports = ChannelManager =
getClientMapEntry: (rclient) ->
# return the rclient channel set if it exists, otherwise create and
# return an empty set for the client.
ClientMap.get(rclient) || ClientMap.set(rclient, new Set()).get(rclient)
ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient)
subscribe: (rclient, baseChannel, id) ->
existingChannelSet = @getClientMapEntry(rclient)
channel = "#{baseChannel}:#{id}"
if existingChannelSet.has(channel)
logger.error {channel}, "already subscribed - shouldn't happen"
# return the subscribe promise, so we can wait for it to resolve
return existingChannelSet.get(channel)
else
rclient.subscribe channel # completes in the background
existingChannelSet.add(channel)
# get the subscribe promise and return it, the actual subscribe
# completes in the background
subscribePromise = rclient.subscribe channel
existingChannelSet.set(channel, subscribePromise)
logger.log {channel}, "subscribed to new channel"
metrics.inc "subscribe.#{baseChannel}"
return subscribePromise
unsubscribe: (rclient, baseChannel, id) ->
existingChannelSet = @getClientMapEntry(rclient)

View file

@ -7,6 +7,7 @@ HealthCheckManager = require "./HealthCheckManager"
RoomManager = require "./RoomManager"
ChannelManager = require "./ChannelManager"
metrics = require "metrics-sharelatex"
util = require "util"
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
@ -29,15 +30,20 @@ module.exports = DocumentUpdaterController =
do (i) ->
rclient.on "message", () ->
metrics.inc "rclient-#{i}", 0.001 # per client event rate metric
for rclient in @rclientList
@handleRoomUpdates(rclient)
@handleRoomUpdates(@rclientList)
handleRoomUpdates: (rclientSub) ->
handleRoomUpdates: (rclientSubList) ->
roomEvents = RoomManager.eventSource()
roomEvents.on 'doc-active', (doc_id) ->
ChannelManager.subscribe rclientSub, "applied-ops", doc_id
subscribePromises = for rclient in rclientSubList
ChannelManager.subscribe rclient, "applied-ops", doc_id
subscribeResult = Promise.all(subscribePromises)
emitResult = (err) => this.emit("doc-subscribed-#{doc_id}", err)
subscribeResult.then () -> emitResult()
subscribeResult.catch (err) -> emitResult(err)
roomEvents.on 'doc-empty', (doc_id) ->
ChannelManager.unsubscribe rclientSub, "applied-ops", doc_id
for rclient in rclientSubList
ChannelManager.unsubscribe rclient, "applied-ops", doc_id
_processMessageFromDocumentUpdater: (io, channel, message) ->
SafeJsonParse.parse message, (error, message) ->

View file

@ -16,11 +16,11 @@ RoomEvents = new EventEmitter()
module.exports = RoomManager =
joinProject: (client, project_id) ->
@joinEntity client, "project", project_id
joinProject: (client, project_id, callback = () ->) ->
@joinEntity client, "project", project_id, callback
joinDoc: (client, doc_id) ->
@joinEntity client, "doc", doc_id
joinDoc: (client, doc_id, callback = () ->) ->
@joinEntity client, "doc", doc_id, callback
leaveDoc: (client, doc_id) ->
@leaveEntity client, "doc", doc_id
@ -38,27 +38,31 @@ module.exports = RoomManager =
eventSource: () ->
return RoomEvents
joinEntity: (client, entity, id) ->
joinEntity: (client, entity, id, callback) ->
beforeCount = @_clientsInRoom(client, id)
client.join id
afterCount = @_clientsInRoom(client, id)
logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client joined room"
# is this a new room? if so, subscribe
if beforeCount == 0 and afterCount == 1
if beforeCount == 0
logger.log {entity, id}, "room is now active"
RoomEvents.once "#{entity}-subscribed-#{id}", (err) ->
logger.log {client: client.id, entity, id, beforeCount}, "client joined room after subscribing channel"
client.join id
callback(err)
RoomEvents.emit "#{entity}-active", id
IdMap.set(id, entity)
else
logger.log {client: client.id, entity, id, beforeCount}, "client joined existing room"
client.join id
callback()
leaveEntity: (client, entity, id) ->
beforeCount = @_clientsInRoom(client, id)
client.leave id
afterCount = @_clientsInRoom(client, id)
logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client left room"
logger.log {client: client.id, entity, id, afterCount}, "client left room"
# is the room now empty? if so, unsubscribe
if !entity?
logger.error {entity: id}, "unknown entity when leaving with id"
return
if beforeCount == 1 and afterCount == 0
if afterCount == 0
logger.log {entity, id}, "room is now empty"
RoomEvents.emit "#{entity}-empty", id
IdMap.delete(id)

View file

@ -25,8 +25,7 @@ module.exports = WebsocketController =
err = new Error("not authorized")
logger.warn {err, project_id, user_id, client_id: client.id}, "user is not authorized to join project"
return callback(err)
RoomManager.joinProject(client, project_id)
client.set("privilege_level", privilegeLevel)
client.set("user_id", user_id)
@ -39,8 +38,9 @@ module.exports = WebsocketController =
client.set("signup_date", user?.signUpDate)
client.set("login_count", user?.loginCount)
callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION
logger.log {user_id, project_id, client_id: client.id}, "user joined project"
RoomManager.joinProject client, project_id, (err) ->
logger.log {user_id, project_id, client_id: client.id}, "user joined project"
callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION
# No need to block for setting the user as connected in the cursor tracking
ConnectedUsersManager.updateUserPosition project_id, client.id, user, null, () ->
@ -118,9 +118,9 @@ module.exports = WebsocketController =
return callback(err)
AuthorizationManager.addAccessToDoc client, doc_id
RoomManager.joinDoc(client, doc_id)
callback null, escapedLines, version, ops, ranges
logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc"
RoomManager.joinDoc client, doc_id, (err) ->
logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc"
callback null, escapedLines, version, ops, ranges
leaveDoc: (client, doc_id, callback = (error) ->) ->
metrics.inc "editor.leave-doc"

View file

@ -35,15 +35,20 @@ module.exports = WebsocketLoadBalancer =
rclientSub.on "message", (channel, message) ->
EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0
WebsocketLoadBalancer._processEditorEvent io, channel, message
for rclientSub in @rclientSubList
@handleRoomUpdates(rclientSub)
@handleRoomUpdates(@rclientSubList)
handleRoomUpdates: (rclientSub) ->
handleRoomUpdates: (rclientSubList) ->
roomEvents = RoomManager.eventSource()
roomEvents.on 'project-active', (project_id) ->
ChannelManager.subscribe rclientSub, "editor-events", project_id
subscribePromises = for rclient in rclientSubList
ChannelManager.subscribe rclient, "editor-events", project_id
subscribeResult = Promise.all(subscribePromises)
emitResult = (err) => this.emit("project-subscribed-#{project_id}", err)
subscribeResult.then () -> emitResult()
subscribeResult.catch (err) -> emitResult(err)
roomEvents.on 'project-empty', (project_id) ->
ChannelManager.unsubscribe rclientSub, "editor-events", project_id
for rclient in rclientSubList
ChannelManager.unsubscribe rclient, "editor-events", project_id
_processEditorEvent: (io, channel, message) ->
SafeJsonParse.parse message, (error, message) ->

View file

@ -15,26 +15,36 @@ describe 'RoomManager', ->
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
@RoomManager._clientsInRoom = sinon.stub()
@RoomEvents = @RoomManager.eventSource()
sinon.spy(@RoomEvents, 'emit')
sinon.spy(@RoomEvents, 'emit')
sinon.spy(@RoomEvents, 'once')
describe "joinProject", ->
describe "when the project room is empty", ->
beforeEach ->
beforeEach (done) ->
@RoomManager._clientsInRoom
.withArgs(@client, @project_id)
.onFirstCall().returns(0)
.onSecondCall().returns(1)
@client.join = sinon.stub()
@RoomManager.joinProject @client, @project_id
it "should join the room using the id", ->
@client.join.calledWithExactly(@project_id).should.equal true
@callback = sinon.stub()
@RoomEvents.on 'project-active', (id) =>
setTimeout () =>
@RoomEvents.emit "project-subscribed-#{id}"
, 100
@RoomManager.joinProject @client, @project_id, (err) =>
@callback(err)
done()
it "should emit a 'project-active' event with the id", ->
@RoomEvents.emit.calledWithExactly('project-active', @project_id).should.equal true
it "should listen for the 'project-subscribed-id' event", ->
@RoomEvents.once.calledWith("project-subscribed-#{@project_id}").should.equal true
it "should join the room using the id", ->
@client.join.calledWithExactly(@project_id).should.equal true
describe "when there are other clients in the project room", ->
beforeEach ->
@ -56,20 +66,29 @@ describe 'RoomManager', ->
describe "when the doc room is empty", ->
beforeEach ->
beforeEach (done) ->
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onFirstCall().returns(0)
.onSecondCall().returns(1)
@client.join = sinon.stub()
@RoomManager.joinDoc @client, @doc_id
it "should join the room using the id", ->
@client.join.calledWithExactly(@doc_id).should.equal true
@callback = sinon.stub()
@RoomEvents.on 'doc-active', (id) =>
setTimeout () =>
@RoomEvents.emit "doc-subscribed-#{id}"
, 100
@RoomManager.joinDoc @client, @doc_id, (err) =>
@callback(err)
done()
it "should emit a 'doc-active' event with the id", ->
@RoomEvents.emit.calledWithExactly('doc-active', @doc_id).should.equal true
it "should listen for the 'doc-subscribed-id' event", ->
@RoomEvents.once.calledWith("doc-subscribed-#{@doc_id}").should.equal true
it "should join the room using the id", ->
@client.join.calledWithExactly(@doc_id).should.equal true
describe "when there are other clients in the doc room", ->
beforeEach ->
@ -94,8 +113,7 @@ describe 'RoomManager', ->
beforeEach ->
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onFirstCall().returns(1)
.onSecondCall().returns(0)
.onCall(0).returns(0)
@client.leave = sinon.stub()
@RoomManager.leaveDoc @client, @doc_id
@ -111,8 +129,7 @@ describe 'RoomManager', ->
beforeEach ->
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onFirstCall().returns(123)
.onSecondCall().returns(122)
.onCall(0).returns(123)
@client.leave = sinon.stub()
@RoomManager.leaveDoc @client, @doc_id
@ -134,33 +151,36 @@ describe 'RoomManager', ->
describe "when this is the only client connected", ->
beforeEach ->
# first and secondc calls are for the join,
# calls 2 and 3 are for the leave
beforeEach (done) ->
# first call is for the join,
# second for the leave
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onCall(0).returns(0)
.onSecondCall().returns(1)
.onCall(2).returns(1)
.onCall(3).returns(0)
.onCall(1).returns(0)
@RoomManager._clientsInRoom
.withArgs(@client, @other_doc_id)
.onCall(0).returns(0)
.onCall(1).returns(1)
.onCall(2).returns(1)
.onCall(3).returns(0)
.onCall(1).returns(0)
@RoomManager._clientsInRoom
.withArgs(@client, @project_id)
.onCall(0).returns(0)
.onCall(1).returns(1)
.onCall(2).returns(1)
.onCall(3).returns(0)
.onCall(1).returns(0)
@RoomEvents.on 'project-active', (id) =>
setTimeout () =>
@RoomEvents.emit "project-subscribed-#{id}"
, 100
@RoomEvents.on 'doc-active', (id) =>
setTimeout () =>
@RoomEvents.emit "doc-subscribed-#{id}"
, 100
# put the client in the rooms
@RoomManager.joinProject(@client, @project_id)
@RoomManager.joinDoc(@client, @doc_id)
@RoomManager.joinDoc(@client, @other_doc_id)
# now leave the project
@RoomManager.leaveProjectAndDocs @client
@RoomManager.joinProject @client, @project_id, () =>
@RoomManager.joinDoc @client, @doc_id, () =>
@RoomManager.joinDoc @client, @other_doc_id, () =>
# now leave the project
@RoomManager.leaveProjectAndDocs @client
done()
it "should leave all the docs", ->
@client.leave.calledWithExactly(@doc_id).should.equal true

View file

@ -54,7 +54,7 @@ describe 'WebsocketController', ->
@privilegeLevel = "owner"
@ConnectedUsersManager.updateUserPosition = sinon.stub().callsArg(4)
@WebApiManager.joinProject = sinon.stub().callsArgWith(2, null, @project, @privilegeLevel)
@RoomManager.joinProject = sinon.stub()
@RoomManager.joinProject = sinon.stub().callsArg(2)
@WebsocketController.joinProject @client, @user, @project_id, @callback
it "should load the project from web", ->
@ -237,7 +237,7 @@ describe 'WebsocketController', ->
@AuthorizationManager.addAccessToDoc = sinon.stub()
@AuthorizationManager.assertClientCanViewProject = sinon.stub().callsArgWith(1, null)
@DocumentUpdaterManager.getDocument = sinon.stub().callsArgWith(3, null, @doc_lines, @version, @ranges, @ops)
@RoomManager.joinDoc = sinon.stub()
@RoomManager.joinDoc = sinon.stub().callsArg(2)
describe "works", ->
beforeEach ->