2019-07-18 10:25:10 +00:00
|
|
|
logger = require 'logger-sharelatex'
|
|
|
|
metrics = require "metrics-sharelatex"
|
2019-07-18 11:40:14 +00:00
|
|
|
settings = require "settings-sharelatex"
|
2019-07-18 10:25:10 +00:00
|
|
|
|
2019-07-24 13:17:19 +00:00
|
|
|
ClientMap = new Map() # for each redis client, store a Map of subscribed channels (channelname -> subscribe promise)
|
2019-07-18 10:25:10 +00:00
|
|
|
|
|
|
|
# Manage redis pubsub subscriptions for individual projects and docs, ensuring
|
|
|
|
# that we never subscribe to a channel multiple times. The socket.io side is
|
|
|
|
# handled by RoomManager.
|
|
|
|
|
|
|
|
module.exports = ChannelManager =
|
2019-07-22 10:23:33 +00:00
|
|
|
getClientMapEntry: (rclient) ->
|
2019-07-24 13:17:19 +00:00
|
|
|
# return the per-client channel map if it exists, otherwise create and
|
|
|
|
# return an empty map for the client.
|
2019-07-23 16:02:09 +00:00
|
|
|
ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient)
|
2019-07-18 10:25:10 +00:00
|
|
|
|
|
|
|
subscribe: (rclient, baseChannel, id) ->
|
2019-07-24 13:17:19 +00:00
|
|
|
clientChannelMap = @getClientMapEntry(rclient)
|
2019-07-18 10:25:10 +00:00
|
|
|
channel = "#{baseChannel}:#{id}"
|
2019-07-24 14:41:25 +00:00
|
|
|
# we track pending subscribes because we want to be sure that the
|
|
|
|
# channel is active before letting the client join the doc or project,
|
|
|
|
# so that events are not lost.
|
2019-07-24 13:17:19 +00:00
|
|
|
if clientChannelMap.has(channel)
|
|
|
|
logger.warn {channel}, "subscribe already actioned"
|
|
|
|
# return the existing subscribe promise, so we can wait for it to resolve
|
|
|
|
return clientChannelMap.get(channel)
|
2019-07-18 10:25:10 +00:00
|
|
|
else
|
2019-07-23 16:02:09 +00:00
|
|
|
# get the subscribe promise and return it, the actual subscribe
|
|
|
|
# completes in the background
|
2019-07-24 08:52:20 +00:00
|
|
|
subscribePromise = rclient.subscribe channel
|
2019-07-24 13:17:19 +00:00
|
|
|
clientChannelMap.set(channel, subscribePromise)
|
2019-07-18 10:25:10 +00:00
|
|
|
logger.log {channel}, "subscribed to new channel"
|
|
|
|
metrics.inc "subscribe.#{baseChannel}"
|
2019-07-23 16:02:09 +00:00
|
|
|
return subscribePromise
|
2019-07-18 10:25:10 +00:00
|
|
|
|
|
|
|
unsubscribe: (rclient, baseChannel, id) ->
|
2019-07-24 13:17:19 +00:00
|
|
|
clientChannelMap = @getClientMapEntry(rclient)
|
2019-07-18 10:25:10 +00:00
|
|
|
channel = "#{baseChannel}:#{id}"
|
2019-07-24 14:41:25 +00:00
|
|
|
# we don't need to track pending unsubscribes, because we there is no
|
|
|
|
# harm if events continue to arrive on the channel while the unsubscribe
|
|
|
|
# command in pending.
|
2019-07-24 13:17:19 +00:00
|
|
|
if !clientChannelMap.has(channel)
|
2019-07-18 13:25:25 +00:00
|
|
|
logger.error {channel}, "not subscribed - shouldn't happen"
|
2019-07-18 10:25:10 +00:00
|
|
|
else
|
2019-07-18 13:25:25 +00:00
|
|
|
rclient.unsubscribe channel # completes in the background
|
2019-07-24 13:17:19 +00:00
|
|
|
clientChannelMap.delete(channel)
|
2019-07-18 10:25:10 +00:00
|
|
|
logger.log {channel}, "unsubscribed from channel"
|
|
|
|
metrics.inc "unsubscribe.#{baseChannel}"
|
|
|
|
|
|
|
|
publish: (rclient, baseChannel, id, data) ->
|
2019-07-18 11:40:14 +00:00
|
|
|
if id is 'all' or !settings.publishOnIndividualChannels
|
2019-07-18 10:25:10 +00:00
|
|
|
channel = baseChannel
|
|
|
|
else
|
|
|
|
channel = "#{baseChannel}:#{id}"
|
|
|
|
# we publish on a different client to the subscribe, so we can't
|
|
|
|
# check for the channel existing here
|
2019-07-22 11:25:41 +00:00
|
|
|
rclient.publish channel, data
|