[ChannelManager] rework (un)subscribing to redis

- send a subscribe request on every request
- wait for a pending unsubscribe request before subscribing
- wait for a pending subscribe request before unsubscribing

Co-Authored-By: Brian Gough <brian.gough@overleaf.com>
This commit is contained in:
Jakob Ackermann 2020-05-15 11:34:07 +02:00
parent 1095851dfe
commit 41debfae0f

View file

@ -17,35 +17,48 @@ module.exports = ChannelManager =
subscribe: (rclient, baseChannel, id) ->
clientChannelMap = @getClientMapEntry(rclient)
channel = "#{baseChannel}:#{id}"
# we track pending subscribes because we want to be sure that the
# channel is active before letting the client join the doc or project,
# so that events are not lost.
if clientChannelMap.has(channel)
logger.warn {channel}, "subscribe already actioned"
# return the existing subscribe promise, so we can wait for it to resolve
return clientChannelMap.get(channel)
else
# get the subscribe promise and return it, the actual subscribe
# completes in the background
subscribePromise = rclient.subscribe channel
clientChannelMap.set(channel, subscribePromise)
logger.log {channel}, "subscribed to new channel"
metrics.inc "subscribe.#{baseChannel}"
return subscribePromise
actualSubscribe = () ->
# subscribe is happening in the foreground and it should reject
p = rclient.subscribe(channel)
p.finally () ->
if clientChannelMap.get(channel) is subscribePromise
clientChannelMap.delete(channel)
.then () ->
logger.log {channel}, "subscribed to channel"
metrics.inc "subscribe.#{baseChannel}"
.catch (err) ->
logger.error {channel, err}, "failed to subscribe to channel"
metrics.inc "subscribe.failed.#{baseChannel}"
return p
pendingActions = clientChannelMap.get(channel) || Promise.resolve()
subscribePromise = pendingActions.then(actualSubscribe, actualSubscribe)
clientChannelMap.set(channel, subscribePromise)
logger.log {channel}, "planned to subscribe to channel"
return subscribePromise
unsubscribe: (rclient, baseChannel, id) ->
clientChannelMap = @getClientMapEntry(rclient)
channel = "#{baseChannel}:#{id}"
# we don't need to track pending unsubscribes, because we there is no
# harm if events continue to arrive on the channel while the unsubscribe
# command in pending.
if !clientChannelMap.has(channel)
logger.error {channel}, "not subscribed - shouldn't happen"
else
rclient.unsubscribe channel # completes in the background
clientChannelMap.delete(channel)
logger.log {channel}, "unsubscribed from channel"
metrics.inc "unsubscribe.#{baseChannel}"
actualUnsubscribe = () ->
# unsubscribe is happening in the background, it should not reject
p = rclient.unsubscribe(channel)
.finally () ->
if clientChannelMap.get(channel) is unsubscribePromise
clientChannelMap.delete(channel)
.then () ->
logger.log {channel}, "unsubscribed from channel"
metrics.inc "unsubscribe.#{baseChannel}"
.catch (err) ->
logger.error {channel, err}, "unsubscribed from channel"
metrics.inc "unsubscribe.failed.#{baseChannel}"
return p
pendingActions = clientChannelMap.get(channel) || Promise.resolve()
unsubscribePromise = pendingActions.then(actualUnsubscribe, actualUnsubscribe)
clientChannelMap.set(channel, unsubscribePromise)
logger.log {channel}, "planned to unsubscribe from channel"
return unsubscribePromise
publish: (rclient, baseChannel, id, data) ->
metrics.summary "redis.publish.#{baseChannel}", data.length