mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-22 10:56:59 +00:00
listen on separate channels for each project/doc
This commit is contained in:
parent
ae512dc9fb
commit
804f4c2bd2
5 changed files with 142 additions and 5 deletions
43
services/real-time/app/coffee/ChannelManager.coffee
Normal file
43
services/real-time/app/coffee/ChannelManager.coffee
Normal file
|
@ -0,0 +1,43 @@
|
|||
logger = require 'logger-sharelatex'
|
||||
metrics = require "metrics-sharelatex"
|
||||
|
||||
ClientMap = new Map() # for each redis client, stores a Set of subscribed channels
|
||||
|
||||
# Manage redis pubsub subscriptions for individual projects and docs, ensuring
|
||||
# that we never subscribe to a channel multiple times. The socket.io side is
|
||||
# handled by RoomManager.
|
||||
|
||||
module.exports = ChannelManager =
|
||||
_createNewClientEntry: (rclient) ->
|
||||
ClientMap.set(rclient, new Set()).get(rclient)
|
||||
|
||||
subscribe: (rclient, baseChannel, id) ->
|
||||
existingChannelSet = ClientMap.get(rclient) || @_createNewClientEntry(rclient)
|
||||
channel = "#{baseChannel}:#{id}"
|
||||
if existingChannelSet.has(channel)
|
||||
logger.error {channel}, "already subscribed"
|
||||
else
|
||||
rclient.subscribe channel
|
||||
existingChannelSet.add(channel)
|
||||
logger.log {channel}, "subscribed to new channel"
|
||||
metrics.inc "subscribe.#{baseChannel}"
|
||||
|
||||
unsubscribe: (rclient, baseChannel, id) ->
|
||||
existingChannelSet = ClientMap.get(rclient)
|
||||
channel = "#{baseChannel}:#{id}"
|
||||
if !existingChannelSet.has(channel)
|
||||
logger.error {channel}, "not subscribed, cannot unsubscribe"
|
||||
else
|
||||
rclient.unsubscribe channel
|
||||
existingChannelSet.delete(channel)
|
||||
logger.log {channel}, "unsubscribed from channel"
|
||||
metrics.inc "unsubscribe.#{baseChannel}"
|
||||
|
||||
publish: (rclient, baseChannel, id, data) ->
|
||||
if id is 'all'
|
||||
channel = baseChannel
|
||||
else
|
||||
channel = "#{baseChannel}:#{id}"
|
||||
# we publish on a different client to the subscribe, so we can't
|
||||
# check for the channel existing here
|
||||
rclient.publish channel, data
|
|
@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager"
|
|||
SafeJsonParse = require "./SafeJsonParse"
|
||||
EventLogger = require "./EventLogger"
|
||||
HealthCheckManager = require "./HealthCheckManager"
|
||||
RoomManager = require "./RoomManager"
|
||||
ChannelManager = require "./ChannelManager"
|
||||
metrics = require "metrics-sharelatex"
|
||||
|
||||
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
|
||||
|
@ -27,7 +29,16 @@ module.exports = DocumentUpdaterController =
|
|||
do (i) ->
|
||||
rclient.on "message", () ->
|
||||
metrics.inc "rclient-#{i}", 0.001 # per client event rate metric
|
||||
|
||||
for rclient in @rclientList
|
||||
@handleRoomUpdates(rclient)
|
||||
|
||||
handleRoomUpdates: (rclientSub) ->
|
||||
roomEvents = RoomManager.eventSource()
|
||||
roomEvents.on 'doc-active', (doc_id) ->
|
||||
ChannelManager.subscribe rclientSub, "applied-ops", doc_id
|
||||
roomEvents.on 'doc-empty', (doc_id) ->
|
||||
ChannelManager.unsubscribe rclientSub, "applied-ops", doc_id
|
||||
|
||||
_processMessageFromDocumentUpdater: (io, channel, message) ->
|
||||
SafeJsonParse.parse message, (error, message) ->
|
||||
if error?
|
||||
|
|
69
services/real-time/app/coffee/RoomManager.coffee
Normal file
69
services/real-time/app/coffee/RoomManager.coffee
Normal file
|
@ -0,0 +1,69 @@
|
|||
logger = require 'logger-sharelatex'
|
||||
{EventEmitter} = require 'events'
|
||||
|
||||
IdMap = new Map() # keep track of whether ids are from projects or docs
|
||||
RoomEvents = new EventEmitter()
|
||||
|
||||
# Manage socket.io rooms for individual projects and docs
|
||||
#
|
||||
# The first time someone joins a project or doc we emit a 'project-active' or
|
||||
# 'doc-active' event.
|
||||
#
|
||||
# When the last person leaves a project or doc, we emit 'project-empty' or
|
||||
# 'doc-empty' event.
|
||||
#
|
||||
# The pubsub side is handled by ChannelManager
|
||||
|
||||
module.exports = RoomManager =
|
||||
|
||||
joinProject: (client, project_id) ->
|
||||
@_join client, "project", project_id
|
||||
|
||||
joinDoc: (client, doc_id) ->
|
||||
@_join client, "doc", doc_id
|
||||
|
||||
leaveDoc: (client, doc_id) ->
|
||||
@_leave client, "doc", doc_id
|
||||
|
||||
leaveProjectAndDocs: (client) ->
|
||||
# what rooms is this client in? we need to leave them all
|
||||
for id in @_roomsClientIsIn(client)
|
||||
entity = IdMap.get(id)
|
||||
@_leave client, entity, id
|
||||
|
||||
eventSource: () ->
|
||||
return RoomEvents
|
||||
|
||||
_clientsInRoom: (client, room) ->
|
||||
nsp = client.namespace.name
|
||||
name = (nsp + '/') + room;
|
||||
return (client.manager?.rooms?[name] || []).length
|
||||
|
||||
_roomsClientIsIn: (client) ->
|
||||
roomList = for fullRoomPath of client.manager.roomClients?[client.id] when fullRoomPath isnt ''
|
||||
# strip socket.io prefix from room to get original id
|
||||
[prefix, room] = fullRoomPath.split('/', 2)
|
||||
room
|
||||
return roomList
|
||||
|
||||
_join: (client, entity, id) ->
|
||||
beforeCount = @_clientsInRoom(client, id)
|
||||
client.join id
|
||||
afterCount = @_clientsInRoom(client, id)
|
||||
logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client joined room"
|
||||
# is this a new room? if so, subscribe
|
||||
if beforeCount == 0 and afterCount == 1
|
||||
logger.log {entity, id}, "room is now active"
|
||||
RoomEvents.emit "#{entity}-active", id
|
||||
IdMap.set(id, entity)
|
||||
|
||||
_leave: (client, entity, id) ->
|
||||
beforeCount = @_clientsInRoom(client, id)
|
||||
client.leave id
|
||||
afterCount = @_clientsInRoom(client, id)
|
||||
logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client left room"
|
||||
# is the room now empty? if so, unsubscribe
|
||||
if beforeCount == 1 and afterCount == 0
|
||||
logger.log {entity, id}, "room is now empty"
|
||||
RoomEvents.emit "#{entity}-empty", id
|
||||
IdMap.delete(id)
|
|
@ -5,6 +5,7 @@ AuthorizationManager = require "./AuthorizationManager"
|
|||
DocumentUpdaterManager = require "./DocumentUpdaterManager"
|
||||
ConnectedUsersManager = require "./ConnectedUsersManager"
|
||||
WebsocketLoadBalancer = require "./WebsocketLoadBalancer"
|
||||
RoomManager = require "./RoomManager"
|
||||
Utils = require "./Utils"
|
||||
|
||||
module.exports = WebsocketController =
|
||||
|
@ -25,7 +26,7 @@ module.exports = WebsocketController =
|
|||
logger.warn {err, project_id, user_id, client_id: client.id}, "user is not authorized to join project"
|
||||
return callback(err)
|
||||
|
||||
client.join project_id
|
||||
RoomManager.joinProject(client, project_id)
|
||||
|
||||
client.set("privilege_level", privilegeLevel)
|
||||
client.set("user_id", user_id)
|
||||
|
@ -71,6 +72,7 @@ module.exports = WebsocketController =
|
|||
if err?
|
||||
logger.error {err, project_id, user_id, client_id: client.id}, "error marking client as disconnected"
|
||||
|
||||
RoomManager.leaveProjectAndDocs(client)
|
||||
setTimeout () ->
|
||||
remainingClients = io.sockets.clients(project_id)
|
||||
if remainingClients.length == 0
|
||||
|
@ -116,7 +118,7 @@ module.exports = WebsocketController =
|
|||
return callback(err)
|
||||
|
||||
AuthorizationManager.addAccessToDoc client, doc_id
|
||||
client.join(doc_id)
|
||||
RoomManager.joinDoc(client, doc_id)
|
||||
callback null, escapedLines, version, ops, ranges
|
||||
logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc"
|
||||
|
||||
|
@ -124,7 +126,7 @@ module.exports = WebsocketController =
|
|||
metrics.inc "editor.leave-doc"
|
||||
Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) ->
|
||||
logger.log {user_id, project_id, doc_id, client_id: client.id}, "client leaving doc"
|
||||
client.leave doc_id
|
||||
RoomManager.leaveDoc(client, doc_id)
|
||||
# we could remove permission when user leaves a doc, but because
|
||||
# the connection is per-project, we continue to allow access
|
||||
# after the initial joinDoc since we know they are already authorised.
|
||||
|
|
|
@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager"
|
|||
SafeJsonParse = require "./SafeJsonParse"
|
||||
EventLogger = require "./EventLogger"
|
||||
HealthCheckManager = require "./HealthCheckManager"
|
||||
RoomManager = require "./RoomManager"
|
||||
ChannelManager = require "./ChannelManager"
|
||||
|
||||
module.exports = WebsocketLoadBalancer =
|
||||
rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub)
|
||||
|
@ -18,8 +20,9 @@ module.exports = WebsocketLoadBalancer =
|
|||
message: message
|
||||
payload: payload
|
||||
logger.log {room_id, message, payload, length: data.length}, "emitting to room"
|
||||
|
||||
for rclientPub in @rclientPubList
|
||||
rclientPub.publish "editor-events", data
|
||||
ChannelManager.publish rclientPub, "editor-events", room_id, data
|
||||
|
||||
emitToAll: (message, payload...) ->
|
||||
@emitToRoom "all", message, payload...
|
||||
|
@ -32,6 +35,15 @@ module.exports = WebsocketLoadBalancer =
|
|||
rclientSub.on "message", (channel, message) ->
|
||||
EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0
|
||||
WebsocketLoadBalancer._processEditorEvent io, channel, message
|
||||
for rclientSub in @rclientSubList
|
||||
@handleRoomUpdates(rclientSub)
|
||||
|
||||
handleRoomUpdates: (rclientSub) ->
|
||||
roomEvents = RoomManager.eventSource()
|
||||
roomEvents.on 'project-active', (project_id) ->
|
||||
ChannelManager.subscribe rclientSub, "editor-events", project_id
|
||||
roomEvents.on 'project-empty', (project_id) ->
|
||||
ChannelManager.unsubscribe rclientSub, "editor-events", project_id
|
||||
|
||||
_processEditorEvent: (io, channel, message) ->
|
||||
SafeJsonParse.parse message, (error, message) ->
|
||||
|
|
Loading…
Add table
Reference in a new issue