Merge pull request #65 from overleaf/bg-use-per-room-channels

listen on separate channels for each project/doc
This commit is contained in:
Brian Gough 2019-07-25 13:41:55 +01:00 committed by GitHub
commit 624cf5589f
13 changed files with 694 additions and 58 deletions

View file

@ -0,0 +1,57 @@
logger = require 'logger-sharelatex'
metrics = require "metrics-sharelatex"
settings = require "settings-sharelatex"
ClientMap = new Map() # for each redis client, store a Map of subscribed channels (channelname -> subscribe promise)
# Manage redis pubsub subscriptions for individual projects and docs, ensuring
# that we never subscribe to a channel multiple times. The socket.io side is
# handled by RoomManager.
module.exports = ChannelManager =
getClientMapEntry: (rclient) ->
# return the per-client channel map if it exists, otherwise create and
# return an empty map for the client.
ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient)
subscribe: (rclient, baseChannel, id) ->
clientChannelMap = @getClientMapEntry(rclient)
channel = "#{baseChannel}:#{id}"
# we track pending subscribes because we want to be sure that the
# channel is active before letting the client join the doc or project,
# so that events are not lost.
if clientChannelMap.has(channel)
logger.warn {channel}, "subscribe already actioned"
# return the existing subscribe promise, so we can wait for it to resolve
return clientChannelMap.get(channel)
else
# get the subscribe promise and return it, the actual subscribe
# completes in the background
subscribePromise = rclient.subscribe channel
clientChannelMap.set(channel, subscribePromise)
logger.log {channel}, "subscribed to new channel"
metrics.inc "subscribe.#{baseChannel}"
return subscribePromise
unsubscribe: (rclient, baseChannel, id) ->
clientChannelMap = @getClientMapEntry(rclient)
channel = "#{baseChannel}:#{id}"
# we don't need to track pending unsubscribes, because we there is no
# harm if events continue to arrive on the channel while the unsubscribe
# command in pending.
if !clientChannelMap.has(channel)
logger.error {channel}, "not subscribed - shouldn't happen"
else
rclient.unsubscribe channel # completes in the background
clientChannelMap.delete(channel)
logger.log {channel}, "unsubscribed from channel"
metrics.inc "unsubscribe.#{baseChannel}"
publish: (rclient, baseChannel, id, data) ->
if id is 'all' or !settings.publishOnIndividualChannels
channel = baseChannel
else
channel = "#{baseChannel}:#{id}"
# we publish on a different client to the subscribe, so we can't
# check for the channel existing here
rclient.publish channel, data

View file

@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager"
SafeJsonParse = require "./SafeJsonParse"
EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager"
RoomManager = require "./RoomManager"
ChannelManager = require "./ChannelManager"
metrics = require "metrics-sharelatex"
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
@ -27,7 +29,18 @@ module.exports = DocumentUpdaterController =
do (i) ->
rclient.on "message", () ->
metrics.inc "rclient-#{i}", 0.001 # per client event rate metric
@handleRoomUpdates(@rclientList)
handleRoomUpdates: (rclientSubList) ->
roomEvents = RoomManager.eventSource()
roomEvents.on 'doc-active', (doc_id) ->
subscribePromises = for rclient in rclientSubList
ChannelManager.subscribe rclient, "applied-ops", doc_id
RoomManager.emitOnCompletion(subscribePromises, "doc-subscribed-#{doc_id}")
roomEvents.on 'doc-empty', (doc_id) ->
for rclient in rclientSubList
ChannelManager.unsubscribe rclient, "applied-ops", doc_id
_processMessageFromDocumentUpdater: (io, channel, message) ->
SafeJsonParse.parse message, (error, message) ->
if error?

View file

@ -0,0 +1,93 @@
logger = require 'logger-sharelatex'
metrics = require "metrics-sharelatex"
{EventEmitter} = require 'events'
IdMap = new Map() # keep track of whether ids are from projects or docs
RoomEvents = new EventEmitter() # emits {project,doc}-active and {project,doc}-empty events
# Manage socket.io rooms for individual projects and docs
#
# The first time someone joins a project or doc we emit a 'project-active' or
# 'doc-active' event.
#
# When the last person leaves a project or doc, we emit 'project-empty' or
# 'doc-empty' event.
#
# The pubsub side is handled by ChannelManager
module.exports = RoomManager =
joinProject: (client, project_id, callback = () ->) ->
@joinEntity client, "project", project_id, callback
joinDoc: (client, doc_id, callback = () ->) ->
@joinEntity client, "doc", doc_id, callback
leaveDoc: (client, doc_id) ->
@leaveEntity client, "doc", doc_id
leaveProjectAndDocs: (client) ->
# what rooms is this client in? we need to leave them all. socket.io
# will cause us to leave the rooms, so we only need to manage our
# channel subscriptions... but it will be safer if we leave them
# explicitly, and then socket.io will just regard this as a client that
# has not joined any rooms and do a final disconnection.
for id in @_roomsClientIsIn(client)
entity = IdMap.get(id)
@leaveEntity client, entity, id
emitOnCompletion: (promiseList, eventName) ->
result = Promise.all(promiseList)
result.then () -> RoomEvents.emit(eventName)
result.catch (err) -> RoomEvents.emit(eventName, err)
eventSource: () ->
return RoomEvents
joinEntity: (client, entity, id, callback) ->
beforeCount = @_clientsInRoom(client, id)
# is this a new room? if so, subscribe
if beforeCount == 0
logger.log {entity, id}, "room is now active"
RoomEvents.once "#{entity}-subscribed-#{id}", (err) ->
# only allow the client to join when all the relevant channels have subscribed
logger.log {client: client.id, entity, id, beforeCount}, "client joined room after subscribing channel"
client.join id
callback(err)
RoomEvents.emit "#{entity}-active", id
IdMap.set(id, entity)
# keep track of the number of listeners
metrics.gauge "room-listeners", RoomEvents.eventNames().length
else
logger.log {client: client.id, entity, id, beforeCount}, "client joined existing room"
client.join id
callback()
leaveEntity: (client, entity, id) ->
client.leave id
afterCount = @_clientsInRoom(client, id)
logger.log {client: client.id, entity, id, afterCount}, "client left room"
# is the room now empty? if so, unsubscribe
if !entity?
logger.error {entity: id}, "unknown entity when leaving with id"
return
if afterCount == 0
logger.log {entity, id}, "room is now empty"
RoomEvents.emit "#{entity}-empty", id
IdMap.delete(id)
metrics.gauge "room-listeners", RoomEvents.eventNames().length
# internal functions below, these access socket.io rooms data directly and
# will need updating for socket.io v2
_clientsInRoom: (client, room) ->
nsp = client.namespace.name
name = (nsp + '/') + room;
return (client.manager?.rooms?[name] || []).length
_roomsClientIsIn: (client) ->
roomList = for fullRoomPath of client.manager.roomClients?[client.id] when fullRoomPath isnt ''
# strip socket.io prefix from room to get original id
[prefix, room] = fullRoomPath.split('/', 2)
room
return roomList

View file

@ -5,6 +5,7 @@ AuthorizationManager = require "./AuthorizationManager"
DocumentUpdaterManager = require "./DocumentUpdaterManager"
ConnectedUsersManager = require "./ConnectedUsersManager"
WebsocketLoadBalancer = require "./WebsocketLoadBalancer"
RoomManager = require "./RoomManager"
Utils = require "./Utils"
module.exports = WebsocketController =
@ -24,8 +25,7 @@ module.exports = WebsocketController =
err = new Error("not authorized")
logger.warn {err, project_id, user_id, client_id: client.id}, "user is not authorized to join project"
return callback(err)
client.join project_id
client.set("privilege_level", privilegeLevel)
client.set("user_id", user_id)
@ -38,8 +38,9 @@ module.exports = WebsocketController =
client.set("signup_date", user?.signUpDate)
client.set("login_count", user?.loginCount)
callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION
logger.log {user_id, project_id, client_id: client.id}, "user joined project"
RoomManager.joinProject client, project_id, (err) ->
logger.log {user_id, project_id, client_id: client.id}, "user joined project"
callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION
# No need to block for setting the user as connected in the cursor tracking
ConnectedUsersManager.updateUserPosition project_id, client.id, user, null, () ->
@ -71,6 +72,7 @@ module.exports = WebsocketController =
if err?
logger.error {err, project_id, user_id, client_id: client.id}, "error marking client as disconnected"
RoomManager.leaveProjectAndDocs(client)
setTimeout () ->
remainingClients = io.sockets.clients(project_id)
if remainingClients.length == 0
@ -90,41 +92,44 @@ module.exports = WebsocketController =
AuthorizationManager.assertClientCanViewProject client, (error) ->
return callback(error) if error?
DocumentUpdaterManager.getDocument project_id, doc_id, fromVersion, (error, lines, version, ranges, ops) ->
# ensure the per-doc applied-ops channel is subscribed before sending the
# doc to the client, so that no events are missed.
RoomManager.joinDoc client, doc_id, (error) ->
return callback(error) if error?
DocumentUpdaterManager.getDocument project_id, doc_id, fromVersion, (error, lines, version, ranges, ops) ->
return callback(error) if error?
# Encode any binary bits of data so it can go via WebSockets
# See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html
encodeForWebsockets = (text) -> unescape(encodeURIComponent(text))
escapedLines = []
for line in lines
try
line = encodeForWebsockets(line)
catch err
logger.err {err, project_id, doc_id, fromVersion, line, client_id: client.id}, "error encoding line uri component"
return callback(err)
escapedLines.push line
if options.encodeRanges
try
for comment in ranges?.comments or []
comment.op.c = encodeForWebsockets(comment.op.c) if comment.op.c?
for change in ranges?.changes or []
change.op.i = encodeForWebsockets(change.op.i) if change.op.i?
change.op.d = encodeForWebsockets(change.op.d) if change.op.d?
catch err
logger.err {err, project_id, doc_id, fromVersion, ranges, client_id: client.id}, "error encoding range uri component"
return callback(err)
# Encode any binary bits of data so it can go via WebSockets
# See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html
encodeForWebsockets = (text) -> unescape(encodeURIComponent(text))
escapedLines = []
for line in lines
try
line = encodeForWebsockets(line)
catch err
logger.err {err, project_id, doc_id, fromVersion, line, client_id: client.id}, "error encoding line uri component"
return callback(err)
escapedLines.push line
if options.encodeRanges
try
for comment in ranges?.comments or []
comment.op.c = encodeForWebsockets(comment.op.c) if comment.op.c?
for change in ranges?.changes or []
change.op.i = encodeForWebsockets(change.op.i) if change.op.i?
change.op.d = encodeForWebsockets(change.op.d) if change.op.d?
catch err
logger.err {err, project_id, doc_id, fromVersion, ranges, client_id: client.id}, "error encoding range uri component"
return callback(err)
AuthorizationManager.addAccessToDoc client, doc_id
client.join(doc_id)
callback null, escapedLines, version, ops, ranges
logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc"
AuthorizationManager.addAccessToDoc client, doc_id
logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc"
callback null, escapedLines, version, ops, ranges
leaveDoc: (client, doc_id, callback = (error) ->) ->
metrics.inc "editor.leave-doc"
Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) ->
logger.log {user_id, project_id, doc_id, client_id: client.id}, "client leaving doc"
client.leave doc_id
RoomManager.leaveDoc(client, doc_id)
# we could remove permission when user leaves a doc, but because
# the connection is per-project, we continue to allow access
# after the initial joinDoc since we know they are already authorised.

View file

@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager"
SafeJsonParse = require "./SafeJsonParse"
EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager"
RoomManager = require "./RoomManager"
ChannelManager = require "./ChannelManager"
module.exports = WebsocketLoadBalancer =
rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub)
@ -18,8 +20,9 @@ module.exports = WebsocketLoadBalancer =
message: message
payload: payload
logger.log {room_id, message, payload, length: data.length}, "emitting to room"
for rclientPub in @rclientPubList
rclientPub.publish "editor-events", data
ChannelManager.publish rclientPub, "editor-events", room_id, data
emitToAll: (message, payload...) ->
@emitToRoom "all", message, payload...
@ -32,6 +35,17 @@ module.exports = WebsocketLoadBalancer =
rclientSub.on "message", (channel, message) ->
EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0
WebsocketLoadBalancer._processEditorEvent io, channel, message
@handleRoomUpdates(@rclientSubList)
handleRoomUpdates: (rclientSubList) ->
roomEvents = RoomManager.eventSource()
roomEvents.on 'project-active', (project_id) ->
subscribePromises = for rclient in rclientSubList
ChannelManager.subscribe rclient, "editor-events", project_id
RoomManager.emitOnCompletion(subscribePromises, "project-subscribed-#{project_id}")
roomEvents.on 'project-empty', (project_id) ->
for rclient in rclientSubList
ChannelManager.unsubscribe rclient, "editor-events", project_id
_processEditorEvent: (io, channel, message) ->
SafeJsonParse.parse message, (error, message) ->

View file

@ -54,6 +54,8 @@ settings =
checkEventOrder: process.env['CHECK_EVENT_ORDER'] or false
publishOnIndividualChannels: process.env['PUBLISH_ON_INDIVIDUAL_CHANNELS'] or false
sentry:
dsn: process.env.SENTRY_DSN

View file

@ -356,18 +356,6 @@
"resolved": "https://registry.npmjs.org/bunyan/-/bunyan-0.22.3.tgz",
"dev": true
},
"buster-core": {
"version": "0.6.4",
"from": "buster-core@0.6.4",
"resolved": "https://registry.npmjs.org/buster-core/-/buster-core-0.6.4.tgz",
"dev": true
},
"buster-format": {
"version": "0.5.6",
"from": "buster-format@>=0.5.0 <0.6.0",
"resolved": "https://registry.npmjs.org/buster-format/-/buster-format-0.5.6.tgz",
"dev": true
},
"bytes": {
"version": "3.0.0",
"from": "bytes@3.0.0",
@ -484,6 +472,12 @@
"resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-0.1.3.tgz",
"dev": true
},
"define-properties": {
"version": "1.1.3",
"from": "define-properties@>=1.1.3 <2.0.0",
"resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.1.3.tgz",
"dev": true
},
"delay": {
"version": "4.3.0",
"from": "delay@>=4.0.1 <5.0.0",
@ -556,6 +550,18 @@
"from": "ent@>=2.2.0 <3.0.0",
"resolved": "https://registry.npmjs.org/ent/-/ent-2.2.0.tgz"
},
"es-abstract": {
"version": "1.13.0",
"from": "es-abstract@>=1.12.0 <2.0.0",
"resolved": "https://registry.npmjs.org/es-abstract/-/es-abstract-1.13.0.tgz",
"dev": true
},
"es-to-primitive": {
"version": "1.2.0",
"from": "es-to-primitive@>=1.2.0 <2.0.0",
"resolved": "https://registry.npmjs.org/es-to-primitive/-/es-to-primitive-1.2.0.tgz",
"dev": true
},
"es6-promise": {
"version": "4.2.8",
"from": "es6-promise@>=4.0.3 <5.0.0",
@ -717,6 +723,12 @@
"from": "form-data@>=2.3.2 <2.4.0",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-2.3.3.tgz"
},
"formatio": {
"version": "1.1.1",
"from": "formatio@1.1.1",
"resolved": "https://registry.npmjs.org/formatio/-/formatio-1.1.1.tgz",
"dev": true
},
"forwarded": {
"version": "0.1.2",
"from": "forwarded@>=0.1.2 <0.2.0",
@ -733,6 +745,12 @@
"resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz",
"dev": true
},
"function-bind": {
"version": "1.1.1",
"from": "function-bind@>=1.1.1 <2.0.0",
"resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz",
"dev": true
},
"gaxios": {
"version": "1.8.4",
"from": "gaxios@>=1.2.1 <2.0.0",
@ -800,6 +818,18 @@
"from": "har-validator@>=5.1.0 <5.2.0",
"resolved": "https://registry.npmjs.org/har-validator/-/har-validator-5.1.3.tgz"
},
"has": {
"version": "1.0.3",
"from": "has@>=1.0.3 <2.0.0",
"resolved": "https://registry.npmjs.org/has/-/has-1.0.3.tgz",
"dev": true
},
"has-symbols": {
"version": "1.0.0",
"from": "has-symbols@>=1.0.0 <2.0.0",
"resolved": "https://registry.npmjs.org/has-symbols/-/has-symbols-1.0.0.tgz",
"dev": true
},
"he": {
"version": "1.1.1",
"from": "he@1.1.1",
@ -880,11 +910,47 @@
"from": "is@>=3.2.0 <4.0.0",
"resolved": "https://registry.npmjs.org/is/-/is-3.3.0.tgz"
},
"is-arguments": {
"version": "1.0.4",
"from": "is-arguments@>=1.0.4 <2.0.0",
"resolved": "https://registry.npmjs.org/is-arguments/-/is-arguments-1.0.4.tgz",
"dev": true
},
"is-buffer": {
"version": "2.0.3",
"from": "is-buffer@>=2.0.2 <3.0.0",
"resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-2.0.3.tgz"
},
"is-callable": {
"version": "1.1.4",
"from": "is-callable@>=1.1.4 <2.0.0",
"resolved": "https://registry.npmjs.org/is-callable/-/is-callable-1.1.4.tgz",
"dev": true
},
"is-date-object": {
"version": "1.0.1",
"from": "is-date-object@>=1.0.1 <2.0.0",
"resolved": "https://registry.npmjs.org/is-date-object/-/is-date-object-1.0.1.tgz",
"dev": true
},
"is-generator-function": {
"version": "1.0.7",
"from": "is-generator-function@>=1.0.7 <2.0.0",
"resolved": "https://registry.npmjs.org/is-generator-function/-/is-generator-function-1.0.7.tgz",
"dev": true
},
"is-regex": {
"version": "1.0.4",
"from": "is-regex@>=1.0.4 <2.0.0",
"resolved": "https://registry.npmjs.org/is-regex/-/is-regex-1.0.4.tgz",
"dev": true
},
"is-symbol": {
"version": "1.0.2",
"from": "is-symbol@>=1.0.2 <2.0.0",
"resolved": "https://registry.npmjs.org/is-symbol/-/is-symbol-1.0.2.tgz",
"dev": true
},
"is-typedarray": {
"version": "1.0.0",
"from": "is-typedarray@>=1.0.0 <1.1.0",
@ -985,6 +1051,12 @@
}
}
},
"lolex": {
"version": "1.3.2",
"from": "lolex@1.3.2",
"resolved": "https://registry.npmjs.org/lolex/-/lolex-1.3.2.tgz",
"dev": true
},
"long": {
"version": "4.0.0",
"from": "long@>=4.0.0 <5.0.0",
@ -1176,6 +1248,18 @@
"from": "oauth-sign@>=0.9.0 <0.10.0",
"resolved": "https://registry.npmjs.org/oauth-sign/-/oauth-sign-0.9.0.tgz"
},
"object-keys": {
"version": "1.1.1",
"from": "object-keys@>=1.0.12 <2.0.0",
"resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.1.1.tgz",
"dev": true
},
"object.entries": {
"version": "1.1.0",
"from": "object.entries@>=1.1.0 <2.0.0",
"resolved": "https://registry.npmjs.org/object.entries/-/object.entries-1.1.0.tgz",
"dev": true
},
"on-finished": {
"version": "2.3.0",
"from": "on-finished@>=2.3.0 <2.4.0",
@ -1469,6 +1553,12 @@
"from": "safer-buffer@>=2.1.2 <3.0.0",
"resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz"
},
"samsam": {
"version": "1.1.2",
"from": "samsam@1.1.2",
"resolved": "https://registry.npmjs.org/samsam/-/samsam-1.1.2.tgz",
"dev": true
},
"sandboxed-module": {
"version": "0.3.0",
"from": "sandboxed-module@>=0.3.0 <0.4.0",
@ -1533,9 +1623,9 @@
"resolved": "https://registry.npmjs.org/shimmer/-/shimmer-1.2.1.tgz"
},
"sinon": {
"version": "1.5.2",
"from": "sinon@>=1.5.2 <1.6.0",
"resolved": "https://registry.npmjs.org/sinon/-/sinon-1.5.2.tgz",
"version": "1.17.7",
"from": "sinon@1.17.7",
"resolved": "https://registry.npmjs.org/sinon/-/sinon-1.17.7.tgz",
"dev": true
},
"socket.io": {
@ -1715,6 +1805,20 @@
}
}
},
"util": {
"version": "0.12.1",
"from": "util@>=0.10.3 <1.0.0",
"resolved": "https://registry.npmjs.org/util/-/util-0.12.1.tgz",
"dev": true,
"dependencies": {
"safe-buffer": {
"version": "5.2.0",
"from": "safe-buffer@>=5.1.2 <6.0.0",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.0.tgz",
"dev": true
}
}
},
"util-deprecate": {
"version": "1.0.2",
"from": "util-deprecate@>=1.0.1 <1.1.0",

View file

@ -42,9 +42,9 @@
"chai": "~1.9.1",
"cookie-signature": "^1.0.5",
"sandboxed-module": "~0.3.0",
"sinon": "~1.5.2",
"sinon": "^1.5.2",
"mocha": "^4.0.1",
"uid-safe": "^1.0.1",
"timekeeper": "0.0.4"
}
}
}

View file

@ -0,0 +1,108 @@
chai = require('chai')
should = chai.should()
sinon = require("sinon")
modulePath = "../../../app/js/ChannelManager.js"
SandboxedModule = require('sandboxed-module')
describe 'ChannelManager', ->
beforeEach ->
@rclient = {}
@other_rclient = {}
@ChannelManager = SandboxedModule.require modulePath, requires:
"settings-sharelatex": @settings = {}
"metrics-sharelatex": @metrics = {inc: sinon.stub()}
"logger-sharelatex": @logger = { log: sinon.stub(), warn: sinon.stub(), error: sinon.stub() }
describe "subscribe", ->
describe "when there is no existing subscription for this redis client", ->
beforeEach ->
@rclient.subscribe = sinon.stub()
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
it "should subscribe to the redis channel", ->
@rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true
describe "when there is an existing subscription for this redis client", ->
beforeEach ->
@rclient.subscribe = sinon.stub()
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
@rclient.subscribe = sinon.stub() # discard the original stub
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
it "should not subscribe to the redis channel", ->
@rclient.subscribe.called.should.equal false
describe "when there is an existing subscription for another redis client but not this one", ->
beforeEach ->
@other_rclient.subscribe = sinon.stub()
@ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef"
@rclient.subscribe = sinon.stub() # discard the original stub
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
it "should subscribe to the redis channel on this redis client", ->
@rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true
describe "unsubscribe", ->
describe "when there is no existing subscription for this redis client", ->
beforeEach ->
@rclient.unsubscribe = sinon.stub()
@ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef"
it "should not unsubscribe from the redis channel", ->
@rclient.unsubscribe.called.should.equal false
describe "when there is an existing subscription for this another redis client but not this one", ->
beforeEach ->
@other_rclient.subscribe = sinon.stub()
@rclient.unsubscribe = sinon.stub()
@ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef"
@ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef"
it "should not unsubscribe from the redis channel on this client", ->
@rclient.unsubscribe.called.should.equal false
describe "when there is an existing subscription for this redis client", ->
beforeEach ->
@rclient.subscribe = sinon.stub()
@rclient.unsubscribe = sinon.stub()
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
@ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef"
it "should unsubscribe from the redis channel", ->
@rclient.unsubscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true
describe "publish", ->
describe "when the channel is 'all'", ->
beforeEach ->
@rclient.publish = sinon.stub()
@ChannelManager.publish @rclient, "applied-ops", "all", "random-message"
it "should publish on the base channel", ->
@rclient.publish.calledWithExactly("applied-ops", "random-message").should.equal true
describe "when the channel has an specific id", ->
describe "when the individual channel setting is false", ->
beforeEach ->
@rclient.publish = sinon.stub()
@settings.publishOnIndividualChannels = false
@ChannelManager.publish @rclient, "applied-ops", "1234567890abcdef", "random-message"
it "should publish on the per-id channel", ->
@rclient.publish.calledWithExactly("applied-ops", "random-message").should.equal true
@rclient.publish.calledOnce.should.equal true
describe "when the individual channel setting is true", ->
beforeEach ->
@rclient.publish = sinon.stub()
@settings.publishOnIndividualChannels = true
@ChannelManager.publish @rclient, "applied-ops", "1234567890abcdef", "random-message"
it "should publish on the per-id channel", ->
@rclient.publish.calledWithExactly("applied-ops:1234567890abcdef", "random-message").should.equal true
@rclient.publish.calledOnce.should.equal true

View file

@ -11,6 +11,7 @@ describe "DocumentUpdaterController", ->
@callback = sinon.stub()
@io = { "mock": "socket.io" }
@rclient = []
@RoomEvents = { on: sinon.stub() }
@EditorUpdatesController = SandboxedModule.require modulePath, requires:
"logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() }
"settings-sharelatex": @settings =
@ -28,6 +29,8 @@ describe "DocumentUpdaterController", ->
"./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()}
"./HealthCheckManager": {check: sinon.stub()}
"metrics-sharelatex": @metrics = {inc: sinon.stub()}
"./RoomManager" : @RoomManager = { eventSource: sinon.stub().returns @RoomEvents}
"./ChannelManager": @ChannelManager = {}
describe "listenForUpdatesFromDocumentUpdater", ->
beforeEach ->

View file

@ -0,0 +1,225 @@
chai = require('chai')
should = chai.should()
sinon = require("sinon")
modulePath = "../../../app/js/RoomManager.js"
SandboxedModule = require('sandboxed-module')
describe 'RoomManager', ->
beforeEach ->
@project_id = "project-id-123"
@doc_id = "doc-id-456"
@other_doc_id = "doc-id-789"
@client = {namespace: {name: ''}, id: "first-client"}
@RoomManager = SandboxedModule.require modulePath, requires:
"settings-sharelatex": @settings = {}
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"metrics-sharelatex": @metrics = { gauge: sinon.stub() }
@RoomManager._clientsInRoom = sinon.stub()
@RoomEvents = @RoomManager.eventSource()
sinon.spy(@RoomEvents, 'emit')
sinon.spy(@RoomEvents, 'once')
describe "joinProject", ->
describe "when the project room is empty", ->
beforeEach (done) ->
@RoomManager._clientsInRoom
.withArgs(@client, @project_id)
.onFirstCall().returns(0)
@client.join = sinon.stub()
@callback = sinon.stub()
@RoomEvents.on 'project-active', (id) =>
setTimeout () =>
@RoomEvents.emit "project-subscribed-#{id}"
, 100
@RoomManager.joinProject @client, @project_id, (err) =>
@callback(err)
done()
it "should emit a 'project-active' event with the id", ->
@RoomEvents.emit.calledWithExactly('project-active', @project_id).should.equal true
it "should listen for the 'project-subscribed-id' event", ->
@RoomEvents.once.calledWith("project-subscribed-#{@project_id}").should.equal true
it "should join the room using the id", ->
@client.join.calledWithExactly(@project_id).should.equal true
describe "when there are other clients in the project room", ->
beforeEach ->
@RoomManager._clientsInRoom
.withArgs(@client, @project_id)
.onFirstCall().returns(123)
.onSecondCall().returns(124)
@client.join = sinon.stub()
@RoomManager.joinProject @client, @project_id
it "should join the room using the id", ->
@client.join.called.should.equal true
it "should not emit any events", ->
@RoomEvents.emit.called.should.equal false
describe "joinDoc", ->
describe "when the doc room is empty", ->
beforeEach (done) ->
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onFirstCall().returns(0)
@client.join = sinon.stub()
@callback = sinon.stub()
@RoomEvents.on 'doc-active', (id) =>
setTimeout () =>
@RoomEvents.emit "doc-subscribed-#{id}"
, 100
@RoomManager.joinDoc @client, @doc_id, (err) =>
@callback(err)
done()
it "should emit a 'doc-active' event with the id", ->
@RoomEvents.emit.calledWithExactly('doc-active', @doc_id).should.equal true
it "should listen for the 'doc-subscribed-id' event", ->
@RoomEvents.once.calledWith("doc-subscribed-#{@doc_id}").should.equal true
it "should join the room using the id", ->
@client.join.calledWithExactly(@doc_id).should.equal true
describe "when there are other clients in the doc room", ->
beforeEach ->
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onFirstCall().returns(123)
.onSecondCall().returns(124)
@client.join = sinon.stub()
@RoomManager.joinDoc @client, @doc_id
it "should join the room using the id", ->
@client.join.called.should.equal true
it "should not emit any events", ->
@RoomEvents.emit.called.should.equal false
describe "leaveDoc", ->
describe "when doc room will be empty after this client has left", ->
beforeEach ->
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onCall(0).returns(0)
@client.leave = sinon.stub()
@RoomManager.leaveDoc @client, @doc_id
it "should leave the room using the id", ->
@client.leave.calledWithExactly(@doc_id).should.equal true
it "should emit a 'doc-empty' event with the id", ->
@RoomEvents.emit.calledWithExactly('doc-empty', @doc_id).should.equal true
describe "when there are other clients in the doc room", ->
beforeEach ->
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onCall(0).returns(123)
@client.leave = sinon.stub()
@RoomManager.leaveDoc @client, @doc_id
it "should leave the room using the id", ->
@client.leave.calledWithExactly(@doc_id).should.equal true
it "should not emit any events", ->
@RoomEvents.emit.called.should.equal false
describe "leaveProjectAndDocs", ->
describe "when the client is connected to the project and multiple docs", ->
beforeEach ->
@RoomManager._roomsClientIsIn = sinon.stub().returns [@project_id, @doc_id, @other_doc_id]
@client.join = sinon.stub()
@client.leave = sinon.stub()
describe "when this is the only client connected", ->
beforeEach (done) ->
# first call is for the join,
# second for the leave
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onCall(0).returns(0)
.onCall(1).returns(0)
@RoomManager._clientsInRoom
.withArgs(@client, @other_doc_id)
.onCall(0).returns(0)
.onCall(1).returns(0)
@RoomManager._clientsInRoom
.withArgs(@client, @project_id)
.onCall(0).returns(0)
.onCall(1).returns(0)
@RoomEvents.on 'project-active', (id) =>
setTimeout () =>
@RoomEvents.emit "project-subscribed-#{id}"
, 100
@RoomEvents.on 'doc-active', (id) =>
setTimeout () =>
@RoomEvents.emit "doc-subscribed-#{id}"
, 100
# put the client in the rooms
@RoomManager.joinProject @client, @project_id, () =>
@RoomManager.joinDoc @client, @doc_id, () =>
@RoomManager.joinDoc @client, @other_doc_id, () =>
# now leave the project
@RoomManager.leaveProjectAndDocs @client
done()
it "should leave all the docs", ->
@client.leave.calledWithExactly(@doc_id).should.equal true
@client.leave.calledWithExactly(@other_doc_id).should.equal true
it "should leave the project", ->
@client.leave.calledWithExactly(@project_id).should.equal true
it "should emit a 'doc-empty' event with the id for each doc", ->
@RoomEvents.emit.calledWithExactly('doc-empty', @doc_id).should.equal true
@RoomEvents.emit.calledWithExactly('doc-empty', @other_doc_id).should.equal true
it "should emit a 'project-empty' event with the id for the project", ->
@RoomEvents.emit.calledWithExactly('project-empty', @project_id).should.equal true
describe "when other clients are still connected", ->
beforeEach ->
@RoomManager._clientsInRoom
.withArgs(@client, @doc_id)
.onFirstCall().returns(123)
.onSecondCall().returns(122)
@RoomManager._clientsInRoom
.withArgs(@client, @other_doc_id)
.onFirstCall().returns(123)
.onSecondCall().returns(122)
@RoomManager._clientsInRoom
.withArgs(@client, @project_id)
.onFirstCall().returns(123)
.onSecondCall().returns(122)
@RoomManager.leaveProjectAndDocs @client
it "should leave all the docs", ->
@client.leave.calledWithExactly(@doc_id).should.equal true
@client.leave.calledWithExactly(@other_doc_id).should.equal true
it "should leave the project", ->
@client.leave.calledWithExactly(@project_id).should.equal true
it "should not emit any events", ->
@RoomEvents.emit.called.should.equal false

View file

@ -36,7 +36,7 @@ describe 'WebsocketController', ->
"metrics-sharelatex": @metrics =
inc: sinon.stub()
set: sinon.stub()
"./RoomManager": @RoomManager = {}
afterEach ->
tk.reset()
@ -54,6 +54,7 @@ describe 'WebsocketController', ->
@privilegeLevel = "owner"
@ConnectedUsersManager.updateUserPosition = sinon.stub().callsArg(4)
@WebApiManager.joinProject = sinon.stub().callsArgWith(2, null, @project, @privilegeLevel)
@RoomManager.joinProject = sinon.stub().callsArg(2)
@WebsocketController.joinProject @client, @user, @project_id, @callback
it "should load the project from web", ->
@ -62,7 +63,7 @@ describe 'WebsocketController', ->
.should.equal true
it "should join the project room", ->
@client.join.calledWith(@project_id).should.equal true
@RoomManager.joinProject.calledWith(@client, @project_id).should.equal true
it "should set the privilege level on the client", ->
@client.set.calledWith("privilege_level", @privilegeLevel).should.equal true
@ -125,6 +126,7 @@ describe 'WebsocketController', ->
@DocumentUpdaterManager.flushProjectToMongoAndDelete = sinon.stub().callsArg(1)
@ConnectedUsersManager.markUserAsDisconnected = sinon.stub().callsArg(2)
@WebsocketLoadBalancer.emitToRoom = sinon.stub()
@RoomManager.leaveProjectAndDocs = sinon.stub()
@clientsInRoom = []
@io =
sockets:
@ -160,6 +162,11 @@ describe 'WebsocketController', ->
it "should increment the leave-project metric", ->
@metrics.inc.calledWith("editor.leave-project").should.equal true
it "should track the disconnection in RoomManager", ->
@RoomManager.leaveProjectAndDocs
.calledWith(@client)
.should.equal true
describe "when the project is not empty", ->
beforeEach ->
@clientsInRoom = ["mock-remaining-client"]
@ -230,6 +237,7 @@ describe 'WebsocketController', ->
@AuthorizationManager.addAccessToDoc = sinon.stub()
@AuthorizationManager.assertClientCanViewProject = sinon.stub().callsArgWith(1, null)
@DocumentUpdaterManager.getDocument = sinon.stub().callsArgWith(3, null, @doc_lines, @version, @ranges, @ops)
@RoomManager.joinDoc = sinon.stub().callsArg(2)
describe "works", ->
beforeEach ->
@ -251,8 +259,8 @@ describe 'WebsocketController', ->
.should.equal true
it "should join the client to room for the doc_id", ->
@client.join
.calledWith(@doc_id)
@RoomManager.joinDoc
.calledWith(@client, @doc_id)
.should.equal true
it "should call the callback with the lines, version, ranges and ops", ->
@ -330,11 +338,12 @@ describe 'WebsocketController', ->
beforeEach ->
@doc_id = "doc-id-123"
@client.params.project_id = @project_id
@RoomManager.leaveDoc = sinon.stub()
@WebsocketController.leaveDoc @client, @doc_id, @callback
it "should remove the client from the doc_id room", ->
@client.leave
.calledWith(@doc_id).should.equal true
@RoomManager.leaveDoc
.calledWith(@client, @doc_id).should.equal true
it "should call the callback", ->
@callback.called.should.equal true

View file

@ -6,6 +6,7 @@ modulePath = require('path').join __dirname, '../../../app/js/WebsocketLoadBalan
describe "WebsocketLoadBalancer", ->
beforeEach ->
@rclient = {}
@RoomEvents = {on: sinon.stub()}
@WebsocketLoadBalancer = SandboxedModule.require modulePath, requires:
"./RedisClientManager":
createClientList: () => []
@ -14,6 +15,8 @@ describe "WebsocketLoadBalancer", ->
parse: (data, cb) => cb null, JSON.parse(data)
"./EventLogger": {checkEventOrder: sinon.stub()}
"./HealthCheckManager": {check: sinon.stub()}
"./RoomManager" : @RoomManager = {eventSource: sinon.stub().returns @RoomEvents}
"./ChannelManager": @ChannelManager = {publish: sinon.stub()}
@io = {}
@WebsocketLoadBalancer.rclientPubList = [{publish: sinon.stub()}]
@WebsocketLoadBalancer.rclientSubList = [{
@ -30,8 +33,8 @@ describe "WebsocketLoadBalancer", ->
@WebsocketLoadBalancer.emitToRoom(@room_id, @message, @payload...)
it "should publish the message to redis", ->
@WebsocketLoadBalancer.rclientPubList[0].publish
.calledWith("editor-events", JSON.stringify(
@ChannelManager.publish
.calledWith(@WebsocketLoadBalancer.rclientPubList[0], "editor-events", @room_id, JSON.stringify(
room_id: @room_id,
message: @message
payload: @payload