From 804f4c2bd289e33e9fb28af0adee111848029a83 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 18 Jul 2019 11:25:10 +0100 Subject: [PATCH] listen on separate channels for each project/doc --- .../app/coffee/ChannelManager.coffee | 43 ++++++++++++ .../coffee/DocumentUpdaterController.coffee | 13 +++- .../real-time/app/coffee/RoomManager.coffee | 69 +++++++++++++++++++ .../app/coffee/WebsocketController.coffee | 8 ++- .../app/coffee/WebsocketLoadBalancer.coffee | 14 +++- 5 files changed, 142 insertions(+), 5 deletions(-) create mode 100644 services/real-time/app/coffee/ChannelManager.coffee create mode 100644 services/real-time/app/coffee/RoomManager.coffee diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee new file mode 100644 index 0000000000..0e6b2fb98e --- /dev/null +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -0,0 +1,43 @@ +logger = require 'logger-sharelatex' +metrics = require "metrics-sharelatex" + +ClientMap = new Map() # for each redis client, stores a Set of subscribed channels + +# Manage redis pubsub subscriptions for individual projects and docs, ensuring +# that we never subscribe to a channel multiple times. The socket.io side is +# handled by RoomManager. + +module.exports = ChannelManager = + _createNewClientEntry: (rclient) -> + ClientMap.set(rclient, new Set()).get(rclient) + + subscribe: (rclient, baseChannel, id) -> + existingChannelSet = ClientMap.get(rclient) || @_createNewClientEntry(rclient) + channel = "#{baseChannel}:#{id}" + if existingChannelSet.has(channel) + logger.error {channel}, "already subscribed" + else + rclient.subscribe channel + existingChannelSet.add(channel) + logger.log {channel}, "subscribed to new channel" + metrics.inc "subscribe.#{baseChannel}" + + unsubscribe: (rclient, baseChannel, id) -> + existingChannelSet = ClientMap.get(rclient) + channel = "#{baseChannel}:#{id}" + if !existingChannelSet.has(channel) + logger.error {channel}, "not subscribed, cannot unsubscribe" + else + rclient.unsubscribe channel + existingChannelSet.delete(channel) + logger.log {channel}, "unsubscribed from channel" + metrics.inc "unsubscribe.#{baseChannel}" + + publish: (rclient, baseChannel, id, data) -> + if id is 'all' + channel = baseChannel + else + channel = "#{baseChannel}:#{id}" + # we publish on a different client to the subscribe, so we can't + # check for the channel existing here + rclient.publish channel, data \ No newline at end of file diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index 05b95e5fac..0cc5751d7c 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager" SafeJsonParse = require "./SafeJsonParse" EventLogger = require "./EventLogger" HealthCheckManager = require "./HealthCheckManager" +RoomManager = require "./RoomManager" +ChannelManager = require "./ChannelManager" metrics = require "metrics-sharelatex" MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb @@ -27,7 +29,16 @@ module.exports = DocumentUpdaterController = do (i) -> rclient.on "message", () -> metrics.inc "rclient-#{i}", 0.001 # per client event rate metric - + for rclient in @rclientList + @handleRoomUpdates(rclient) + + handleRoomUpdates: (rclientSub) -> + roomEvents = RoomManager.eventSource() + roomEvents.on 'doc-active', (doc_id) -> + ChannelManager.subscribe rclientSub, "applied-ops", doc_id + roomEvents.on 'doc-empty', (doc_id) -> + ChannelManager.unsubscribe rclientSub, "applied-ops", doc_id + _processMessageFromDocumentUpdater: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> if error? diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee new file mode 100644 index 0000000000..e9787aa1df --- /dev/null +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -0,0 +1,69 @@ +logger = require 'logger-sharelatex' +{EventEmitter} = require 'events' + +IdMap = new Map() # keep track of whether ids are from projects or docs +RoomEvents = new EventEmitter() + +# Manage socket.io rooms for individual projects and docs +# +# The first time someone joins a project or doc we emit a 'project-active' or +# 'doc-active' event. +# +# When the last person leaves a project or doc, we emit 'project-empty' or +# 'doc-empty' event. +# +# The pubsub side is handled by ChannelManager + +module.exports = RoomManager = + + joinProject: (client, project_id) -> + @_join client, "project", project_id + + joinDoc: (client, doc_id) -> + @_join client, "doc", doc_id + + leaveDoc: (client, doc_id) -> + @_leave client, "doc", doc_id + + leaveProjectAndDocs: (client) -> + # what rooms is this client in? we need to leave them all + for id in @_roomsClientIsIn(client) + entity = IdMap.get(id) + @_leave client, entity, id + + eventSource: () -> + return RoomEvents + + _clientsInRoom: (client, room) -> + nsp = client.namespace.name + name = (nsp + '/') + room; + return (client.manager?.rooms?[name] || []).length + + _roomsClientIsIn: (client) -> + roomList = for fullRoomPath of client.manager.roomClients?[client.id] when fullRoomPath isnt '' + # strip socket.io prefix from room to get original id + [prefix, room] = fullRoomPath.split('/', 2) + room + return roomList + + _join: (client, entity, id) -> + beforeCount = @_clientsInRoom(client, id) + client.join id + afterCount = @_clientsInRoom(client, id) + logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client joined room" + # is this a new room? if so, subscribe + if beforeCount == 0 and afterCount == 1 + logger.log {entity, id}, "room is now active" + RoomEvents.emit "#{entity}-active", id + IdMap.set(id, entity) + + _leave: (client, entity, id) -> + beforeCount = @_clientsInRoom(client, id) + client.leave id + afterCount = @_clientsInRoom(client, id) + logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client left room" + # is the room now empty? if so, unsubscribe + if beforeCount == 1 and afterCount == 0 + logger.log {entity, id}, "room is now empty" + RoomEvents.emit "#{entity}-empty", id + IdMap.delete(id) \ No newline at end of file diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index 470b1f2a52..6d8965883f 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -5,6 +5,7 @@ AuthorizationManager = require "./AuthorizationManager" DocumentUpdaterManager = require "./DocumentUpdaterManager" ConnectedUsersManager = require "./ConnectedUsersManager" WebsocketLoadBalancer = require "./WebsocketLoadBalancer" +RoomManager = require "./RoomManager" Utils = require "./Utils" module.exports = WebsocketController = @@ -25,7 +26,7 @@ module.exports = WebsocketController = logger.warn {err, project_id, user_id, client_id: client.id}, "user is not authorized to join project" return callback(err) - client.join project_id + RoomManager.joinProject(client, project_id) client.set("privilege_level", privilegeLevel) client.set("user_id", user_id) @@ -71,6 +72,7 @@ module.exports = WebsocketController = if err? logger.error {err, project_id, user_id, client_id: client.id}, "error marking client as disconnected" + RoomManager.leaveProjectAndDocs(client) setTimeout () -> remainingClients = io.sockets.clients(project_id) if remainingClients.length == 0 @@ -116,7 +118,7 @@ module.exports = WebsocketController = return callback(err) AuthorizationManager.addAccessToDoc client, doc_id - client.join(doc_id) + RoomManager.joinDoc(client, doc_id) callback null, escapedLines, version, ops, ranges logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc" @@ -124,7 +126,7 @@ module.exports = WebsocketController = metrics.inc "editor.leave-doc" Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) -> logger.log {user_id, project_id, doc_id, client_id: client.id}, "client leaving doc" - client.leave doc_id + RoomManager.leaveDoc(client, doc_id) # we could remove permission when user leaves a doc, but because # the connection is per-project, we continue to allow access # after the initial joinDoc since we know they are already authorised. diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index a9d5052410..1bb74c6a3e 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager" SafeJsonParse = require "./SafeJsonParse" EventLogger = require "./EventLogger" HealthCheckManager = require "./HealthCheckManager" +RoomManager = require "./RoomManager" +ChannelManager = require "./ChannelManager" module.exports = WebsocketLoadBalancer = rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub) @@ -18,8 +20,9 @@ module.exports = WebsocketLoadBalancer = message: message payload: payload logger.log {room_id, message, payload, length: data.length}, "emitting to room" + for rclientPub in @rclientPubList - rclientPub.publish "editor-events", data + ChannelManager.publish rclientPub, "editor-events", room_id, data emitToAll: (message, payload...) -> @emitToRoom "all", message, payload... @@ -32,6 +35,15 @@ module.exports = WebsocketLoadBalancer = rclientSub.on "message", (channel, message) -> EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0 WebsocketLoadBalancer._processEditorEvent io, channel, message + for rclientSub in @rclientSubList + @handleRoomUpdates(rclientSub) + + handleRoomUpdates: (rclientSub) -> + roomEvents = RoomManager.eventSource() + roomEvents.on 'project-active', (project_id) -> + ChannelManager.subscribe rclientSub, "editor-events", project_id + roomEvents.on 'project-empty', (project_id) -> + ChannelManager.unsubscribe rclientSub, "editor-events", project_id _processEditorEvent: (io, channel, message) -> SafeJsonParse.parse message, (error, message) ->