diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee new file mode 100644 index 0000000000..3ea5c2e71e --- /dev/null +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -0,0 +1,57 @@ +logger = require 'logger-sharelatex' +metrics = require "metrics-sharelatex" +settings = require "settings-sharelatex" + +ClientMap = new Map() # for each redis client, store a Map of subscribed channels (channelname -> subscribe promise) + +# Manage redis pubsub subscriptions for individual projects and docs, ensuring +# that we never subscribe to a channel multiple times. The socket.io side is +# handled by RoomManager. + +module.exports = ChannelManager = + getClientMapEntry: (rclient) -> + # return the per-client channel map if it exists, otherwise create and + # return an empty map for the client. + ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient) + + subscribe: (rclient, baseChannel, id) -> + clientChannelMap = @getClientMapEntry(rclient) + channel = "#{baseChannel}:#{id}" + # we track pending subscribes because we want to be sure that the + # channel is active before letting the client join the doc or project, + # so that events are not lost. + if clientChannelMap.has(channel) + logger.warn {channel}, "subscribe already actioned" + # return the existing subscribe promise, so we can wait for it to resolve + return clientChannelMap.get(channel) + else + # get the subscribe promise and return it, the actual subscribe + # completes in the background + subscribePromise = rclient.subscribe channel + clientChannelMap.set(channel, subscribePromise) + logger.log {channel}, "subscribed to new channel" + metrics.inc "subscribe.#{baseChannel}" + return subscribePromise + + unsubscribe: (rclient, baseChannel, id) -> + clientChannelMap = @getClientMapEntry(rclient) + channel = "#{baseChannel}:#{id}" + # we don't need to track pending unsubscribes, because we there is no + # harm if events continue to arrive on the channel while the unsubscribe + # command in pending. + if !clientChannelMap.has(channel) + logger.error {channel}, "not subscribed - shouldn't happen" + else + rclient.unsubscribe channel # completes in the background + clientChannelMap.delete(channel) + logger.log {channel}, "unsubscribed from channel" + metrics.inc "unsubscribe.#{baseChannel}" + + publish: (rclient, baseChannel, id, data) -> + if id is 'all' or !settings.publishOnIndividualChannels + channel = baseChannel + else + channel = "#{baseChannel}:#{id}" + # we publish on a different client to the subscribe, so we can't + # check for the channel existing here + rclient.publish channel, data diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index 05b95e5fac..2611d484ad 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager" SafeJsonParse = require "./SafeJsonParse" EventLogger = require "./EventLogger" HealthCheckManager = require "./HealthCheckManager" +RoomManager = require "./RoomManager" +ChannelManager = require "./ChannelManager" metrics = require "metrics-sharelatex" MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb @@ -27,7 +29,18 @@ module.exports = DocumentUpdaterController = do (i) -> rclient.on "message", () -> metrics.inc "rclient-#{i}", 0.001 # per client event rate metric - + @handleRoomUpdates(@rclientList) + + handleRoomUpdates: (rclientSubList) -> + roomEvents = RoomManager.eventSource() + roomEvents.on 'doc-active', (doc_id) -> + subscribePromises = for rclient in rclientSubList + ChannelManager.subscribe rclient, "applied-ops", doc_id + RoomManager.emitOnCompletion(subscribePromises, "doc-subscribed-#{doc_id}") + roomEvents.on 'doc-empty', (doc_id) -> + for rclient in rclientSubList + ChannelManager.unsubscribe rclient, "applied-ops", doc_id + _processMessageFromDocumentUpdater: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> if error? diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee new file mode 100644 index 0000000000..adf472e26c --- /dev/null +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -0,0 +1,93 @@ +logger = require 'logger-sharelatex' +metrics = require "metrics-sharelatex" +{EventEmitter} = require 'events' + +IdMap = new Map() # keep track of whether ids are from projects or docs +RoomEvents = new EventEmitter() # emits {project,doc}-active and {project,doc}-empty events + +# Manage socket.io rooms for individual projects and docs +# +# The first time someone joins a project or doc we emit a 'project-active' or +# 'doc-active' event. +# +# When the last person leaves a project or doc, we emit 'project-empty' or +# 'doc-empty' event. +# +# The pubsub side is handled by ChannelManager + +module.exports = RoomManager = + + joinProject: (client, project_id, callback = () ->) -> + @joinEntity client, "project", project_id, callback + + joinDoc: (client, doc_id, callback = () ->) -> + @joinEntity client, "doc", doc_id, callback + + leaveDoc: (client, doc_id) -> + @leaveEntity client, "doc", doc_id + + leaveProjectAndDocs: (client) -> + # what rooms is this client in? we need to leave them all. socket.io + # will cause us to leave the rooms, so we only need to manage our + # channel subscriptions... but it will be safer if we leave them + # explicitly, and then socket.io will just regard this as a client that + # has not joined any rooms and do a final disconnection. + for id in @_roomsClientIsIn(client) + entity = IdMap.get(id) + @leaveEntity client, entity, id + + emitOnCompletion: (promiseList, eventName) -> + result = Promise.all(promiseList) + result.then () -> RoomEvents.emit(eventName) + result.catch (err) -> RoomEvents.emit(eventName, err) + + eventSource: () -> + return RoomEvents + + joinEntity: (client, entity, id, callback) -> + beforeCount = @_clientsInRoom(client, id) + # is this a new room? if so, subscribe + if beforeCount == 0 + logger.log {entity, id}, "room is now active" + RoomEvents.once "#{entity}-subscribed-#{id}", (err) -> + # only allow the client to join when all the relevant channels have subscribed + logger.log {client: client.id, entity, id, beforeCount}, "client joined room after subscribing channel" + client.join id + callback(err) + RoomEvents.emit "#{entity}-active", id + IdMap.set(id, entity) + # keep track of the number of listeners + metrics.gauge "room-listeners", RoomEvents.eventNames().length + else + logger.log {client: client.id, entity, id, beforeCount}, "client joined existing room" + client.join id + callback() + + leaveEntity: (client, entity, id) -> + client.leave id + afterCount = @_clientsInRoom(client, id) + logger.log {client: client.id, entity, id, afterCount}, "client left room" + # is the room now empty? if so, unsubscribe + if !entity? + logger.error {entity: id}, "unknown entity when leaving with id" + return + if afterCount == 0 + logger.log {entity, id}, "room is now empty" + RoomEvents.emit "#{entity}-empty", id + IdMap.delete(id) + metrics.gauge "room-listeners", RoomEvents.eventNames().length + + # internal functions below, these access socket.io rooms data directly and + # will need updating for socket.io v2 + + _clientsInRoom: (client, room) -> + nsp = client.namespace.name + name = (nsp + '/') + room; + return (client.manager?.rooms?[name] || []).length + + _roomsClientIsIn: (client) -> + roomList = for fullRoomPath of client.manager.roomClients?[client.id] when fullRoomPath isnt '' + # strip socket.io prefix from room to get original id + [prefix, room] = fullRoomPath.split('/', 2) + room + return roomList diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index 470b1f2a52..fce505c1bc 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -5,6 +5,7 @@ AuthorizationManager = require "./AuthorizationManager" DocumentUpdaterManager = require "./DocumentUpdaterManager" ConnectedUsersManager = require "./ConnectedUsersManager" WebsocketLoadBalancer = require "./WebsocketLoadBalancer" +RoomManager = require "./RoomManager" Utils = require "./Utils" module.exports = WebsocketController = @@ -24,8 +25,7 @@ module.exports = WebsocketController = err = new Error("not authorized") logger.warn {err, project_id, user_id, client_id: client.id}, "user is not authorized to join project" return callback(err) - - client.join project_id + client.set("privilege_level", privilegeLevel) client.set("user_id", user_id) @@ -38,8 +38,9 @@ module.exports = WebsocketController = client.set("signup_date", user?.signUpDate) client.set("login_count", user?.loginCount) - callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION - logger.log {user_id, project_id, client_id: client.id}, "user joined project" + RoomManager.joinProject client, project_id, (err) -> + logger.log {user_id, project_id, client_id: client.id}, "user joined project" + callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION # No need to block for setting the user as connected in the cursor tracking ConnectedUsersManager.updateUserPosition project_id, client.id, user, null, () -> @@ -71,6 +72,7 @@ module.exports = WebsocketController = if err? logger.error {err, project_id, user_id, client_id: client.id}, "error marking client as disconnected" + RoomManager.leaveProjectAndDocs(client) setTimeout () -> remainingClients = io.sockets.clients(project_id) if remainingClients.length == 0 @@ -90,41 +92,44 @@ module.exports = WebsocketController = AuthorizationManager.assertClientCanViewProject client, (error) -> return callback(error) if error? - DocumentUpdaterManager.getDocument project_id, doc_id, fromVersion, (error, lines, version, ranges, ops) -> + # ensure the per-doc applied-ops channel is subscribed before sending the + # doc to the client, so that no events are missed. + RoomManager.joinDoc client, doc_id, (error) -> return callback(error) if error? + DocumentUpdaterManager.getDocument project_id, doc_id, fromVersion, (error, lines, version, ranges, ops) -> + return callback(error) if error? - # Encode any binary bits of data so it can go via WebSockets - # See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html - encodeForWebsockets = (text) -> unescape(encodeURIComponent(text)) - escapedLines = [] - for line in lines - try - line = encodeForWebsockets(line) - catch err - logger.err {err, project_id, doc_id, fromVersion, line, client_id: client.id}, "error encoding line uri component" - return callback(err) - escapedLines.push line - if options.encodeRanges - try - for comment in ranges?.comments or [] - comment.op.c = encodeForWebsockets(comment.op.c) if comment.op.c? - for change in ranges?.changes or [] - change.op.i = encodeForWebsockets(change.op.i) if change.op.i? - change.op.d = encodeForWebsockets(change.op.d) if change.op.d? - catch err - logger.err {err, project_id, doc_id, fromVersion, ranges, client_id: client.id}, "error encoding range uri component" - return callback(err) + # Encode any binary bits of data so it can go via WebSockets + # See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html + encodeForWebsockets = (text) -> unescape(encodeURIComponent(text)) + escapedLines = [] + for line in lines + try + line = encodeForWebsockets(line) + catch err + logger.err {err, project_id, doc_id, fromVersion, line, client_id: client.id}, "error encoding line uri component" + return callback(err) + escapedLines.push line + if options.encodeRanges + try + for comment in ranges?.comments or [] + comment.op.c = encodeForWebsockets(comment.op.c) if comment.op.c? + for change in ranges?.changes or [] + change.op.i = encodeForWebsockets(change.op.i) if change.op.i? + change.op.d = encodeForWebsockets(change.op.d) if change.op.d? + catch err + logger.err {err, project_id, doc_id, fromVersion, ranges, client_id: client.id}, "error encoding range uri component" + return callback(err) - AuthorizationManager.addAccessToDoc client, doc_id - client.join(doc_id) - callback null, escapedLines, version, ops, ranges - logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc" + AuthorizationManager.addAccessToDoc client, doc_id + logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc" + callback null, escapedLines, version, ops, ranges leaveDoc: (client, doc_id, callback = (error) ->) -> metrics.inc "editor.leave-doc" Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) -> logger.log {user_id, project_id, doc_id, client_id: client.id}, "client leaving doc" - client.leave doc_id + RoomManager.leaveDoc(client, doc_id) # we could remove permission when user leaves a doc, but because # the connection is per-project, we continue to allow access # after the initial joinDoc since we know they are already authorised. diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index a9d5052410..865249c63e 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager" SafeJsonParse = require "./SafeJsonParse" EventLogger = require "./EventLogger" HealthCheckManager = require "./HealthCheckManager" +RoomManager = require "./RoomManager" +ChannelManager = require "./ChannelManager" module.exports = WebsocketLoadBalancer = rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub) @@ -18,8 +20,9 @@ module.exports = WebsocketLoadBalancer = message: message payload: payload logger.log {room_id, message, payload, length: data.length}, "emitting to room" + for rclientPub in @rclientPubList - rclientPub.publish "editor-events", data + ChannelManager.publish rclientPub, "editor-events", room_id, data emitToAll: (message, payload...) -> @emitToRoom "all", message, payload... @@ -32,6 +35,17 @@ module.exports = WebsocketLoadBalancer = rclientSub.on "message", (channel, message) -> EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0 WebsocketLoadBalancer._processEditorEvent io, channel, message + @handleRoomUpdates(@rclientSubList) + + handleRoomUpdates: (rclientSubList) -> + roomEvents = RoomManager.eventSource() + roomEvents.on 'project-active', (project_id) -> + subscribePromises = for rclient in rclientSubList + ChannelManager.subscribe rclient, "editor-events", project_id + RoomManager.emitOnCompletion(subscribePromises, "project-subscribed-#{project_id}") + roomEvents.on 'project-empty', (project_id) -> + for rclient in rclientSubList + ChannelManager.unsubscribe rclient, "editor-events", project_id _processEditorEvent: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> diff --git a/services/real-time/config/settings.defaults.coffee b/services/real-time/config/settings.defaults.coffee index ceeb65191d..a3128e84d1 100644 --- a/services/real-time/config/settings.defaults.coffee +++ b/services/real-time/config/settings.defaults.coffee @@ -54,6 +54,8 @@ settings = checkEventOrder: process.env['CHECK_EVENT_ORDER'] or false + publishOnIndividualChannels: process.env['PUBLISH_ON_INDIVIDUAL_CHANNELS'] or false + sentry: dsn: process.env.SENTRY_DSN diff --git a/services/real-time/npm-shrinkwrap.json b/services/real-time/npm-shrinkwrap.json index 4f45764207..a146f50e1f 100644 --- a/services/real-time/npm-shrinkwrap.json +++ b/services/real-time/npm-shrinkwrap.json @@ -356,18 +356,6 @@ "resolved": "https://registry.npmjs.org/bunyan/-/bunyan-0.22.3.tgz", "dev": true }, - "buster-core": { - "version": "0.6.4", - "from": "buster-core@0.6.4", - "resolved": "https://registry.npmjs.org/buster-core/-/buster-core-0.6.4.tgz", - "dev": true - }, - "buster-format": { - "version": "0.5.6", - "from": "buster-format@>=0.5.0 <0.6.0", - "resolved": "https://registry.npmjs.org/buster-format/-/buster-format-0.5.6.tgz", - "dev": true - }, "bytes": { "version": "3.0.0", "from": "bytes@3.0.0", @@ -484,6 +472,12 @@ "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-0.1.3.tgz", "dev": true }, + "define-properties": { + "version": "1.1.3", + "from": "define-properties@>=1.1.3 <2.0.0", + "resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.1.3.tgz", + "dev": true + }, "delay": { "version": "4.3.0", "from": "delay@>=4.0.1 <5.0.0", @@ -556,6 +550,18 @@ "from": "ent@>=2.2.0 <3.0.0", "resolved": "https://registry.npmjs.org/ent/-/ent-2.2.0.tgz" }, + "es-abstract": { + "version": "1.13.0", + "from": "es-abstract@>=1.12.0 <2.0.0", + "resolved": "https://registry.npmjs.org/es-abstract/-/es-abstract-1.13.0.tgz", + "dev": true + }, + "es-to-primitive": { + "version": "1.2.0", + "from": "es-to-primitive@>=1.2.0 <2.0.0", + "resolved": "https://registry.npmjs.org/es-to-primitive/-/es-to-primitive-1.2.0.tgz", + "dev": true + }, "es6-promise": { "version": "4.2.8", "from": "es6-promise@>=4.0.3 <5.0.0", @@ -717,6 +723,12 @@ "from": "form-data@>=2.3.2 <2.4.0", "resolved": "https://registry.npmjs.org/form-data/-/form-data-2.3.3.tgz" }, + "formatio": { + "version": "1.1.1", + "from": "formatio@1.1.1", + "resolved": "https://registry.npmjs.org/formatio/-/formatio-1.1.1.tgz", + "dev": true + }, "forwarded": { "version": "0.1.2", "from": "forwarded@>=0.1.2 <0.2.0", @@ -733,6 +745,12 @@ "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", "dev": true }, + "function-bind": { + "version": "1.1.1", + "from": "function-bind@>=1.1.1 <2.0.0", + "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz", + "dev": true + }, "gaxios": { "version": "1.8.4", "from": "gaxios@>=1.2.1 <2.0.0", @@ -800,6 +818,18 @@ "from": "har-validator@>=5.1.0 <5.2.0", "resolved": "https://registry.npmjs.org/har-validator/-/har-validator-5.1.3.tgz" }, + "has": { + "version": "1.0.3", + "from": "has@>=1.0.3 <2.0.0", + "resolved": "https://registry.npmjs.org/has/-/has-1.0.3.tgz", + "dev": true + }, + "has-symbols": { + "version": "1.0.0", + "from": "has-symbols@>=1.0.0 <2.0.0", + "resolved": "https://registry.npmjs.org/has-symbols/-/has-symbols-1.0.0.tgz", + "dev": true + }, "he": { "version": "1.1.1", "from": "he@1.1.1", @@ -880,11 +910,47 @@ "from": "is@>=3.2.0 <4.0.0", "resolved": "https://registry.npmjs.org/is/-/is-3.3.0.tgz" }, + "is-arguments": { + "version": "1.0.4", + "from": "is-arguments@>=1.0.4 <2.0.0", + "resolved": "https://registry.npmjs.org/is-arguments/-/is-arguments-1.0.4.tgz", + "dev": true + }, "is-buffer": { "version": "2.0.3", "from": "is-buffer@>=2.0.2 <3.0.0", "resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-2.0.3.tgz" }, + "is-callable": { + "version": "1.1.4", + "from": "is-callable@>=1.1.4 <2.0.0", + "resolved": "https://registry.npmjs.org/is-callable/-/is-callable-1.1.4.tgz", + "dev": true + }, + "is-date-object": { + "version": "1.0.1", + "from": "is-date-object@>=1.0.1 <2.0.0", + "resolved": "https://registry.npmjs.org/is-date-object/-/is-date-object-1.0.1.tgz", + "dev": true + }, + "is-generator-function": { + "version": "1.0.7", + "from": "is-generator-function@>=1.0.7 <2.0.0", + "resolved": "https://registry.npmjs.org/is-generator-function/-/is-generator-function-1.0.7.tgz", + "dev": true + }, + "is-regex": { + "version": "1.0.4", + "from": "is-regex@>=1.0.4 <2.0.0", + "resolved": "https://registry.npmjs.org/is-regex/-/is-regex-1.0.4.tgz", + "dev": true + }, + "is-symbol": { + "version": "1.0.2", + "from": "is-symbol@>=1.0.2 <2.0.0", + "resolved": "https://registry.npmjs.org/is-symbol/-/is-symbol-1.0.2.tgz", + "dev": true + }, "is-typedarray": { "version": "1.0.0", "from": "is-typedarray@>=1.0.0 <1.1.0", @@ -985,6 +1051,12 @@ } } }, + "lolex": { + "version": "1.3.2", + "from": "lolex@1.3.2", + "resolved": "https://registry.npmjs.org/lolex/-/lolex-1.3.2.tgz", + "dev": true + }, "long": { "version": "4.0.0", "from": "long@>=4.0.0 <5.0.0", @@ -1176,6 +1248,18 @@ "from": "oauth-sign@>=0.9.0 <0.10.0", "resolved": "https://registry.npmjs.org/oauth-sign/-/oauth-sign-0.9.0.tgz" }, + "object-keys": { + "version": "1.1.1", + "from": "object-keys@>=1.0.12 <2.0.0", + "resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.1.1.tgz", + "dev": true + }, + "object.entries": { + "version": "1.1.0", + "from": "object.entries@>=1.1.0 <2.0.0", + "resolved": "https://registry.npmjs.org/object.entries/-/object.entries-1.1.0.tgz", + "dev": true + }, "on-finished": { "version": "2.3.0", "from": "on-finished@>=2.3.0 <2.4.0", @@ -1469,6 +1553,12 @@ "from": "safer-buffer@>=2.1.2 <3.0.0", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz" }, + "samsam": { + "version": "1.1.2", + "from": "samsam@1.1.2", + "resolved": "https://registry.npmjs.org/samsam/-/samsam-1.1.2.tgz", + "dev": true + }, "sandboxed-module": { "version": "0.3.0", "from": "sandboxed-module@>=0.3.0 <0.4.0", @@ -1533,9 +1623,9 @@ "resolved": "https://registry.npmjs.org/shimmer/-/shimmer-1.2.1.tgz" }, "sinon": { - "version": "1.5.2", - "from": "sinon@>=1.5.2 <1.6.0", - "resolved": "https://registry.npmjs.org/sinon/-/sinon-1.5.2.tgz", + "version": "1.17.7", + "from": "sinon@1.17.7", + "resolved": "https://registry.npmjs.org/sinon/-/sinon-1.17.7.tgz", "dev": true }, "socket.io": { @@ -1715,6 +1805,20 @@ } } }, + "util": { + "version": "0.12.1", + "from": "util@>=0.10.3 <1.0.0", + "resolved": "https://registry.npmjs.org/util/-/util-0.12.1.tgz", + "dev": true, + "dependencies": { + "safe-buffer": { + "version": "5.2.0", + "from": "safe-buffer@>=5.1.2 <6.0.0", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.0.tgz", + "dev": true + } + } + }, "util-deprecate": { "version": "1.0.2", "from": "util-deprecate@>=1.0.1 <1.1.0", diff --git a/services/real-time/package.json b/services/real-time/package.json index f4548a98d7..dd2d6f0198 100644 --- a/services/real-time/package.json +++ b/services/real-time/package.json @@ -42,9 +42,9 @@ "chai": "~1.9.1", "cookie-signature": "^1.0.5", "sandboxed-module": "~0.3.0", - "sinon": "~1.5.2", + "sinon": "^1.5.2", "mocha": "^4.0.1", "uid-safe": "^1.0.1", "timekeeper": "0.0.4" } -} +} \ No newline at end of file diff --git a/services/real-time/test/unit/coffee/ChannelManagerTests.coffee b/services/real-time/test/unit/coffee/ChannelManagerTests.coffee new file mode 100644 index 0000000000..e550e963d4 --- /dev/null +++ b/services/real-time/test/unit/coffee/ChannelManagerTests.coffee @@ -0,0 +1,108 @@ +chai = require('chai') +should = chai.should() +sinon = require("sinon") +modulePath = "../../../app/js/ChannelManager.js" +SandboxedModule = require('sandboxed-module') + +describe 'ChannelManager', -> + beforeEach -> + @rclient = {} + @other_rclient = {} + @ChannelManager = SandboxedModule.require modulePath, requires: + "settings-sharelatex": @settings = {} + "metrics-sharelatex": @metrics = {inc: sinon.stub()} + "logger-sharelatex": @logger = { log: sinon.stub(), warn: sinon.stub(), error: sinon.stub() } + + describe "subscribe", -> + + describe "when there is no existing subscription for this redis client", -> + beforeEach -> + @rclient.subscribe = sinon.stub() + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should subscribe to the redis channel", -> + @rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true + + describe "when there is an existing subscription for this redis client", -> + beforeEach -> + @rclient.subscribe = sinon.stub() + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + @rclient.subscribe = sinon.stub() # discard the original stub + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should not subscribe to the redis channel", -> + @rclient.subscribe.called.should.equal false + + describe "when there is an existing subscription for another redis client but not this one", -> + beforeEach -> + @other_rclient.subscribe = sinon.stub() + @ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef" + @rclient.subscribe = sinon.stub() # discard the original stub + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should subscribe to the redis channel on this redis client", -> + @rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true + + describe "unsubscribe", -> + + describe "when there is no existing subscription for this redis client", -> + beforeEach -> + @rclient.unsubscribe = sinon.stub() + @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should not unsubscribe from the redis channel", -> + @rclient.unsubscribe.called.should.equal false + + + describe "when there is an existing subscription for this another redis client but not this one", -> + beforeEach -> + @other_rclient.subscribe = sinon.stub() + @rclient.unsubscribe = sinon.stub() + @ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef" + @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should not unsubscribe from the redis channel on this client", -> + @rclient.unsubscribe.called.should.equal false + + describe "when there is an existing subscription for this redis client", -> + beforeEach -> + @rclient.subscribe = sinon.stub() + @rclient.unsubscribe = sinon.stub() + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should unsubscribe from the redis channel", -> + @rclient.unsubscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true + + describe "publish", -> + + describe "when the channel is 'all'", -> + beforeEach -> + @rclient.publish = sinon.stub() + @ChannelManager.publish @rclient, "applied-ops", "all", "random-message" + + it "should publish on the base channel", -> + @rclient.publish.calledWithExactly("applied-ops", "random-message").should.equal true + + describe "when the channel has an specific id", -> + + describe "when the individual channel setting is false", -> + beforeEach -> + @rclient.publish = sinon.stub() + @settings.publishOnIndividualChannels = false + @ChannelManager.publish @rclient, "applied-ops", "1234567890abcdef", "random-message" + + it "should publish on the per-id channel", -> + @rclient.publish.calledWithExactly("applied-ops", "random-message").should.equal true + @rclient.publish.calledOnce.should.equal true + + describe "when the individual channel setting is true", -> + beforeEach -> + @rclient.publish = sinon.stub() + @settings.publishOnIndividualChannels = true + @ChannelManager.publish @rclient, "applied-ops", "1234567890abcdef", "random-message" + + it "should publish on the per-id channel", -> + @rclient.publish.calledWithExactly("applied-ops:1234567890abcdef", "random-message").should.equal true + @rclient.publish.calledOnce.should.equal true + diff --git a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee index 24551396d9..b5574c8d16 100644 --- a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee +++ b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee @@ -11,6 +11,7 @@ describe "DocumentUpdaterController", -> @callback = sinon.stub() @io = { "mock": "socket.io" } @rclient = [] + @RoomEvents = { on: sinon.stub() } @EditorUpdatesController = SandboxedModule.require modulePath, requires: "logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() } "settings-sharelatex": @settings = @@ -28,6 +29,8 @@ describe "DocumentUpdaterController", -> "./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()} "./HealthCheckManager": {check: sinon.stub()} "metrics-sharelatex": @metrics = {inc: sinon.stub()} + "./RoomManager" : @RoomManager = { eventSource: sinon.stub().returns @RoomEvents} + "./ChannelManager": @ChannelManager = {} describe "listenForUpdatesFromDocumentUpdater", -> beforeEach -> diff --git a/services/real-time/test/unit/coffee/RoomManagerTests.coffee b/services/real-time/test/unit/coffee/RoomManagerTests.coffee new file mode 100644 index 0000000000..ee46a3ef04 --- /dev/null +++ b/services/real-time/test/unit/coffee/RoomManagerTests.coffee @@ -0,0 +1,225 @@ +chai = require('chai') +should = chai.should() +sinon = require("sinon") +modulePath = "../../../app/js/RoomManager.js" +SandboxedModule = require('sandboxed-module') + +describe 'RoomManager', -> + beforeEach -> + @project_id = "project-id-123" + @doc_id = "doc-id-456" + @other_doc_id = "doc-id-789" + @client = {namespace: {name: ''}, id: "first-client"} + @RoomManager = SandboxedModule.require modulePath, requires: + "settings-sharelatex": @settings = {} + "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } + "metrics-sharelatex": @metrics = { gauge: sinon.stub() } + @RoomManager._clientsInRoom = sinon.stub() + @RoomEvents = @RoomManager.eventSource() + sinon.spy(@RoomEvents, 'emit') + sinon.spy(@RoomEvents, 'once') + + describe "joinProject", -> + + describe "when the project room is empty", -> + + beforeEach (done) -> + @RoomManager._clientsInRoom + .withArgs(@client, @project_id) + .onFirstCall().returns(0) + @client.join = sinon.stub() + @callback = sinon.stub() + @RoomEvents.on 'project-active', (id) => + setTimeout () => + @RoomEvents.emit "project-subscribed-#{id}" + , 100 + @RoomManager.joinProject @client, @project_id, (err) => + @callback(err) + done() + + it "should emit a 'project-active' event with the id", -> + @RoomEvents.emit.calledWithExactly('project-active', @project_id).should.equal true + + it "should listen for the 'project-subscribed-id' event", -> + @RoomEvents.once.calledWith("project-subscribed-#{@project_id}").should.equal true + + it "should join the room using the id", -> + @client.join.calledWithExactly(@project_id).should.equal true + + describe "when there are other clients in the project room", -> + + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @project_id) + .onFirstCall().returns(123) + .onSecondCall().returns(124) + @client.join = sinon.stub() + @RoomManager.joinProject @client, @project_id + + it "should join the room using the id", -> + @client.join.called.should.equal true + + it "should not emit any events", -> + @RoomEvents.emit.called.should.equal false + + + describe "joinDoc", -> + + describe "when the doc room is empty", -> + + beforeEach (done) -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onFirstCall().returns(0) + @client.join = sinon.stub() + @callback = sinon.stub() + @RoomEvents.on 'doc-active', (id) => + setTimeout () => + @RoomEvents.emit "doc-subscribed-#{id}" + , 100 + @RoomManager.joinDoc @client, @doc_id, (err) => + @callback(err) + done() + + it "should emit a 'doc-active' event with the id", -> + @RoomEvents.emit.calledWithExactly('doc-active', @doc_id).should.equal true + + it "should listen for the 'doc-subscribed-id' event", -> + @RoomEvents.once.calledWith("doc-subscribed-#{@doc_id}").should.equal true + + it "should join the room using the id", -> + @client.join.calledWithExactly(@doc_id).should.equal true + + describe "when there are other clients in the doc room", -> + + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onFirstCall().returns(123) + .onSecondCall().returns(124) + @client.join = sinon.stub() + @RoomManager.joinDoc @client, @doc_id + + it "should join the room using the id", -> + @client.join.called.should.equal true + + it "should not emit any events", -> + @RoomEvents.emit.called.should.equal false + + + describe "leaveDoc", -> + + describe "when doc room will be empty after this client has left", -> + + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onCall(0).returns(0) + @client.leave = sinon.stub() + @RoomManager.leaveDoc @client, @doc_id + + it "should leave the room using the id", -> + @client.leave.calledWithExactly(@doc_id).should.equal true + + it "should emit a 'doc-empty' event with the id", -> + @RoomEvents.emit.calledWithExactly('doc-empty', @doc_id).should.equal true + + + describe "when there are other clients in the doc room", -> + + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onCall(0).returns(123) + @client.leave = sinon.stub() + @RoomManager.leaveDoc @client, @doc_id + + it "should leave the room using the id", -> + @client.leave.calledWithExactly(@doc_id).should.equal true + + it "should not emit any events", -> + @RoomEvents.emit.called.should.equal false + + + describe "leaveProjectAndDocs", -> + + describe "when the client is connected to the project and multiple docs", -> + + beforeEach -> + @RoomManager._roomsClientIsIn = sinon.stub().returns [@project_id, @doc_id, @other_doc_id] + @client.join = sinon.stub() + @client.leave = sinon.stub() + + describe "when this is the only client connected", -> + + beforeEach (done) -> + # first call is for the join, + # second for the leave + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onCall(0).returns(0) + .onCall(1).returns(0) + @RoomManager._clientsInRoom + .withArgs(@client, @other_doc_id) + .onCall(0).returns(0) + .onCall(1).returns(0) + @RoomManager._clientsInRoom + .withArgs(@client, @project_id) + .onCall(0).returns(0) + .onCall(1).returns(0) + @RoomEvents.on 'project-active', (id) => + setTimeout () => + @RoomEvents.emit "project-subscribed-#{id}" + , 100 + @RoomEvents.on 'doc-active', (id) => + setTimeout () => + @RoomEvents.emit "doc-subscribed-#{id}" + , 100 + # put the client in the rooms + @RoomManager.joinProject @client, @project_id, () => + @RoomManager.joinDoc @client, @doc_id, () => + @RoomManager.joinDoc @client, @other_doc_id, () => + # now leave the project + @RoomManager.leaveProjectAndDocs @client + done() + + it "should leave all the docs", -> + @client.leave.calledWithExactly(@doc_id).should.equal true + @client.leave.calledWithExactly(@other_doc_id).should.equal true + + it "should leave the project", -> + @client.leave.calledWithExactly(@project_id).should.equal true + + it "should emit a 'doc-empty' event with the id for each doc", -> + @RoomEvents.emit.calledWithExactly('doc-empty', @doc_id).should.equal true + @RoomEvents.emit.calledWithExactly('doc-empty', @other_doc_id).should.equal true + + it "should emit a 'project-empty' event with the id for the project", -> + @RoomEvents.emit.calledWithExactly('project-empty', @project_id).should.equal true + + describe "when other clients are still connected", -> + + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onFirstCall().returns(123) + .onSecondCall().returns(122) + @RoomManager._clientsInRoom + .withArgs(@client, @other_doc_id) + .onFirstCall().returns(123) + .onSecondCall().returns(122) + @RoomManager._clientsInRoom + .withArgs(@client, @project_id) + .onFirstCall().returns(123) + .onSecondCall().returns(122) + @RoomManager.leaveProjectAndDocs @client + + it "should leave all the docs", -> + @client.leave.calledWithExactly(@doc_id).should.equal true + @client.leave.calledWithExactly(@other_doc_id).should.equal true + + it "should leave the project", -> + @client.leave.calledWithExactly(@project_id).should.equal true + + it "should not emit any events", -> + @RoomEvents.emit.called.should.equal false \ No newline at end of file diff --git a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee index 1ddf4e6359..d0dad108e7 100644 --- a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee @@ -36,7 +36,7 @@ describe 'WebsocketController', -> "metrics-sharelatex": @metrics = inc: sinon.stub() set: sinon.stub() - + "./RoomManager": @RoomManager = {} afterEach -> tk.reset() @@ -54,6 +54,7 @@ describe 'WebsocketController', -> @privilegeLevel = "owner" @ConnectedUsersManager.updateUserPosition = sinon.stub().callsArg(4) @WebApiManager.joinProject = sinon.stub().callsArgWith(2, null, @project, @privilegeLevel) + @RoomManager.joinProject = sinon.stub().callsArg(2) @WebsocketController.joinProject @client, @user, @project_id, @callback it "should load the project from web", -> @@ -62,7 +63,7 @@ describe 'WebsocketController', -> .should.equal true it "should join the project room", -> - @client.join.calledWith(@project_id).should.equal true + @RoomManager.joinProject.calledWith(@client, @project_id).should.equal true it "should set the privilege level on the client", -> @client.set.calledWith("privilege_level", @privilegeLevel).should.equal true @@ -125,6 +126,7 @@ describe 'WebsocketController', -> @DocumentUpdaterManager.flushProjectToMongoAndDelete = sinon.stub().callsArg(1) @ConnectedUsersManager.markUserAsDisconnected = sinon.stub().callsArg(2) @WebsocketLoadBalancer.emitToRoom = sinon.stub() + @RoomManager.leaveProjectAndDocs = sinon.stub() @clientsInRoom = [] @io = sockets: @@ -160,6 +162,11 @@ describe 'WebsocketController', -> it "should increment the leave-project metric", -> @metrics.inc.calledWith("editor.leave-project").should.equal true + it "should track the disconnection in RoomManager", -> + @RoomManager.leaveProjectAndDocs + .calledWith(@client) + .should.equal true + describe "when the project is not empty", -> beforeEach -> @clientsInRoom = ["mock-remaining-client"] @@ -230,6 +237,7 @@ describe 'WebsocketController', -> @AuthorizationManager.addAccessToDoc = sinon.stub() @AuthorizationManager.assertClientCanViewProject = sinon.stub().callsArgWith(1, null) @DocumentUpdaterManager.getDocument = sinon.stub().callsArgWith(3, null, @doc_lines, @version, @ranges, @ops) + @RoomManager.joinDoc = sinon.stub().callsArg(2) describe "works", -> beforeEach -> @@ -251,8 +259,8 @@ describe 'WebsocketController', -> .should.equal true it "should join the client to room for the doc_id", -> - @client.join - .calledWith(@doc_id) + @RoomManager.joinDoc + .calledWith(@client, @doc_id) .should.equal true it "should call the callback with the lines, version, ranges and ops", -> @@ -330,11 +338,12 @@ describe 'WebsocketController', -> beforeEach -> @doc_id = "doc-id-123" @client.params.project_id = @project_id + @RoomManager.leaveDoc = sinon.stub() @WebsocketController.leaveDoc @client, @doc_id, @callback it "should remove the client from the doc_id room", -> - @client.leave - .calledWith(@doc_id).should.equal true + @RoomManager.leaveDoc + .calledWith(@client, @doc_id).should.equal true it "should call the callback", -> @callback.called.should.equal true diff --git a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee index 14df2df851..c4f4519790 100644 --- a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee @@ -6,6 +6,7 @@ modulePath = require('path').join __dirname, '../../../app/js/WebsocketLoadBalan describe "WebsocketLoadBalancer", -> beforeEach -> @rclient = {} + @RoomEvents = {on: sinon.stub()} @WebsocketLoadBalancer = SandboxedModule.require modulePath, requires: "./RedisClientManager": createClientList: () => [] @@ -14,6 +15,8 @@ describe "WebsocketLoadBalancer", -> parse: (data, cb) => cb null, JSON.parse(data) "./EventLogger": {checkEventOrder: sinon.stub()} "./HealthCheckManager": {check: sinon.stub()} + "./RoomManager" : @RoomManager = {eventSource: sinon.stub().returns @RoomEvents} + "./ChannelManager": @ChannelManager = {publish: sinon.stub()} @io = {} @WebsocketLoadBalancer.rclientPubList = [{publish: sinon.stub()}] @WebsocketLoadBalancer.rclientSubList = [{ @@ -30,8 +33,8 @@ describe "WebsocketLoadBalancer", -> @WebsocketLoadBalancer.emitToRoom(@room_id, @message, @payload...) it "should publish the message to redis", -> - @WebsocketLoadBalancer.rclientPubList[0].publish - .calledWith("editor-events", JSON.stringify( + @ChannelManager.publish + .calledWith(@WebsocketLoadBalancer.rclientPubList[0], "editor-events", @room_id, JSON.stringify( room_id: @room_id, message: @message payload: @payload