mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
Merge pull request #137 from overleaf/jpa-backport-strict-sequence
[misc] backport strict sequence of redis (un)subscribe
This commit is contained in:
commit
79ea042959
4 changed files with 430 additions and 64 deletions
|
@ -17,35 +17,48 @@ module.exports = ChannelManager =
|
|||
subscribe: (rclient, baseChannel, id) ->
|
||||
clientChannelMap = @getClientMapEntry(rclient)
|
||||
channel = "#{baseChannel}:#{id}"
|
||||
# we track pending subscribes because we want to be sure that the
|
||||
# channel is active before letting the client join the doc or project,
|
||||
# so that events are not lost.
|
||||
if clientChannelMap.has(channel)
|
||||
logger.warn {channel}, "subscribe already actioned"
|
||||
# return the existing subscribe promise, so we can wait for it to resolve
|
||||
return clientChannelMap.get(channel)
|
||||
else
|
||||
# get the subscribe promise and return it, the actual subscribe
|
||||
# completes in the background
|
||||
subscribePromise = rclient.subscribe channel
|
||||
clientChannelMap.set(channel, subscribePromise)
|
||||
logger.log {channel}, "subscribed to new channel"
|
||||
metrics.inc "subscribe.#{baseChannel}"
|
||||
return subscribePromise
|
||||
actualSubscribe = () ->
|
||||
# subscribe is happening in the foreground and it should reject
|
||||
p = rclient.subscribe(channel)
|
||||
p.finally () ->
|
||||
if clientChannelMap.get(channel) is subscribePromise
|
||||
clientChannelMap.delete(channel)
|
||||
.then () ->
|
||||
logger.log {channel}, "subscribed to channel"
|
||||
metrics.inc "subscribe.#{baseChannel}"
|
||||
.catch (err) ->
|
||||
logger.error {channel, err}, "failed to subscribe to channel"
|
||||
metrics.inc "subscribe.failed.#{baseChannel}"
|
||||
return p
|
||||
|
||||
pendingActions = clientChannelMap.get(channel) || Promise.resolve()
|
||||
subscribePromise = pendingActions.then(actualSubscribe, actualSubscribe)
|
||||
clientChannelMap.set(channel, subscribePromise)
|
||||
logger.log {channel}, "planned to subscribe to channel"
|
||||
return subscribePromise
|
||||
|
||||
unsubscribe: (rclient, baseChannel, id) ->
|
||||
clientChannelMap = @getClientMapEntry(rclient)
|
||||
channel = "#{baseChannel}:#{id}"
|
||||
# we don't need to track pending unsubscribes, because we there is no
|
||||
# harm if events continue to arrive on the channel while the unsubscribe
|
||||
# command in pending.
|
||||
if !clientChannelMap.has(channel)
|
||||
logger.error {channel}, "not subscribed - shouldn't happen"
|
||||
else
|
||||
rclient.unsubscribe channel # completes in the background
|
||||
clientChannelMap.delete(channel)
|
||||
logger.log {channel}, "unsubscribed from channel"
|
||||
metrics.inc "unsubscribe.#{baseChannel}"
|
||||
actualUnsubscribe = () ->
|
||||
# unsubscribe is happening in the background, it should not reject
|
||||
p = rclient.unsubscribe(channel)
|
||||
.finally () ->
|
||||
if clientChannelMap.get(channel) is unsubscribePromise
|
||||
clientChannelMap.delete(channel)
|
||||
.then () ->
|
||||
logger.log {channel}, "unsubscribed from channel"
|
||||
metrics.inc "unsubscribe.#{baseChannel}"
|
||||
.catch (err) ->
|
||||
logger.error {channel, err}, "unsubscribed from channel"
|
||||
metrics.inc "unsubscribe.failed.#{baseChannel}"
|
||||
return p
|
||||
|
||||
pendingActions = clientChannelMap.get(channel) || Promise.resolve()
|
||||
unsubscribePromise = pendingActions.then(actualUnsubscribe, actualUnsubscribe)
|
||||
clientChannelMap.set(channel, unsubscribePromise)
|
||||
logger.log {channel}, "planned to unsubscribe from channel"
|
||||
return unsubscribePromise
|
||||
|
||||
publish: (rclient, baseChannel, id, data) ->
|
||||
metrics.summary "redis.publish.#{baseChannel}", data.length
|
||||
|
|
|
@ -4,10 +4,14 @@ FixturesManager = require "./helpers/FixturesManager"
|
|||
|
||||
async = require "async"
|
||||
|
||||
settings = require "settings-sharelatex"
|
||||
redis = require "redis-sharelatex"
|
||||
rclient = redis.createClient(settings.redis.pubsub)
|
||||
|
||||
describe "leaveProject", ->
|
||||
before (done) ->
|
||||
MockDocUpdaterServer.run done
|
||||
|
||||
|
||||
describe "with other clients in the project", ->
|
||||
before (done) ->
|
||||
async.series [
|
||||
|
@ -18,53 +22,76 @@ describe "leaveProject", ->
|
|||
name: "Test Project"
|
||||
}
|
||||
}, (e, {@project_id, @user_id}) => cb()
|
||||
|
||||
|
||||
(cb) =>
|
||||
@clientA = RealTimeClient.connect()
|
||||
@clientA.on "connectionAccepted", cb
|
||||
|
||||
|
||||
(cb) =>
|
||||
@clientB = RealTimeClient.connect()
|
||||
@clientB.on "connectionAccepted", cb
|
||||
|
||||
|
||||
@clientBDisconnectMessages = []
|
||||
@clientB.on "clientTracking.clientDisconnected", (data) =>
|
||||
@clientBDisconnectMessages.push data
|
||||
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) =>
|
||||
cb(error)
|
||||
|
||||
|
||||
(cb) =>
|
||||
@clientB.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) =>
|
||||
cb(error)
|
||||
|
||||
|
||||
(cb) =>
|
||||
FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) =>
|
||||
cb(e)
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinDoc", @doc_id, cb
|
||||
(cb) =>
|
||||
@clientB.emit "joinDoc", @doc_id, cb
|
||||
|
||||
(cb) =>
|
||||
# leaveProject is called when the client disconnects
|
||||
@clientA.on "disconnect", () -> cb()
|
||||
@clientA.disconnect()
|
||||
|
||||
|
||||
(cb) =>
|
||||
# The API waits a little while before flushing changes
|
||||
setTimeout done, 1000
|
||||
|
||||
|
||||
], done
|
||||
|
||||
it "should emit a disconnect message to the room", ->
|
||||
@clientBDisconnectMessages.should.deep.equal [@clientA.socket.sessionid]
|
||||
|
||||
|
||||
it "should no longer list the client in connected users", (done) ->
|
||||
@clientB.emit "clientTracking.getConnectedUsers", (error, users) =>
|
||||
for user in users
|
||||
if user.client_id == @clientA.socket.sessionid
|
||||
throw "Expected clientA to not be listed in connected users"
|
||||
return done()
|
||||
|
||||
|
||||
it "should not flush the project to the document updater", ->
|
||||
MockDocUpdaterServer.deleteProject
|
||||
.calledWith(@project_id)
|
||||
.should.equal false
|
||||
|
||||
it "should remain subscribed to the editor-events channels", (done) ->
|
||||
rclient.pubsub 'CHANNELS', (err, resp) =>
|
||||
return done(err) if err
|
||||
resp.should.include "editor-events:#{@project_id}"
|
||||
done()
|
||||
return null
|
||||
|
||||
it "should remain subscribed to the applied-ops channels", (done) ->
|
||||
rclient.pubsub 'CHANNELS', (err, resp) =>
|
||||
return done(err) if err
|
||||
resp.should.include "applied-ops:#{@doc_id}"
|
||||
done()
|
||||
return null
|
||||
|
||||
describe "with no other clients in the project", ->
|
||||
before (done) ->
|
||||
async.series [
|
||||
|
@ -75,20 +102,26 @@ describe "leaveProject", ->
|
|||
name: "Test Project"
|
||||
}
|
||||
}, (e, {@project_id, @user_id}) => cb()
|
||||
|
||||
|
||||
(cb) =>
|
||||
@clientA = RealTimeClient.connect()
|
||||
@clientA.on "connect", cb
|
||||
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) =>
|
||||
cb(error)
|
||||
|
||||
|
||||
(cb) =>
|
||||
FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) =>
|
||||
cb(e)
|
||||
(cb) =>
|
||||
@clientA.emit "joinDoc", @doc_id, cb
|
||||
|
||||
(cb) =>
|
||||
# leaveProject is called when the client disconnects
|
||||
@clientA.on "disconnect", () -> cb()
|
||||
@clientA.disconnect()
|
||||
|
||||
|
||||
(cb) =>
|
||||
# The API waits a little while before flushing changes
|
||||
setTimeout done, 1000
|
||||
|
@ -98,3 +131,17 @@ describe "leaveProject", ->
|
|||
MockDocUpdaterServer.deleteProject
|
||||
.calledWith(@project_id)
|
||||
.should.equal true
|
||||
|
||||
it "should not subscribe to the editor-events channels anymore", (done) ->
|
||||
rclient.pubsub 'CHANNELS', (err, resp) =>
|
||||
return done(err) if err
|
||||
resp.should.not.include "editor-events:#{@project_id}"
|
||||
done()
|
||||
return null
|
||||
|
||||
it "should not subscribe to the applied-ops channels anymore", (done) ->
|
||||
rclient.pubsub 'CHANNELS', (err, resp) =>
|
||||
return done(err) if err
|
||||
resp.should.not.include "applied-ops:#{@doc_id}"
|
||||
done()
|
||||
return null
|
||||
|
|
205
services/real-time/test/acceptance/coffee/PubSubRace.coffee
Normal file
205
services/real-time/test/acceptance/coffee/PubSubRace.coffee
Normal file
|
@ -0,0 +1,205 @@
|
|||
RealTimeClient = require "./helpers/RealTimeClient"
|
||||
MockDocUpdaterServer = require "./helpers/MockDocUpdaterServer"
|
||||
FixturesManager = require "./helpers/FixturesManager"
|
||||
|
||||
async = require "async"
|
||||
|
||||
settings = require "settings-sharelatex"
|
||||
redis = require "redis-sharelatex"
|
||||
rclient = redis.createClient(settings.redis.pubsub)
|
||||
|
||||
describe "PubSubRace", ->
|
||||
before (done) ->
|
||||
MockDocUpdaterServer.run done
|
||||
|
||||
describe "when the client leaves a doc before joinDoc completes", ->
|
||||
before (done) ->
|
||||
async.series [
|
||||
(cb) =>
|
||||
FixturesManager.setUpProject {
|
||||
privilegeLevel: "owner"
|
||||
project: {
|
||||
name: "Test Project"
|
||||
}
|
||||
}, (e, {@project_id, @user_id}) => cb()
|
||||
|
||||
(cb) =>
|
||||
@clientA = RealTimeClient.connect()
|
||||
@clientA.on "connect", cb
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) =>
|
||||
cb(error)
|
||||
|
||||
(cb) =>
|
||||
FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) =>
|
||||
cb(e)
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
# leave before joinDoc completes
|
||||
@clientA.emit "leaveDoc", @doc_id, cb
|
||||
|
||||
(cb) =>
|
||||
# wait for subscribe and unsubscribe
|
||||
setTimeout cb, 100
|
||||
], done
|
||||
|
||||
it "should not subscribe to the applied-ops channels anymore", (done) ->
|
||||
rclient.pubsub 'CHANNELS', (err, resp) =>
|
||||
return done(err) if err
|
||||
resp.should.not.include "applied-ops:#{@doc_id}"
|
||||
done()
|
||||
return null
|
||||
|
||||
describe "when the client emits joinDoc and leaveDoc requests frequently and leaves eventually", ->
|
||||
before (done) ->
|
||||
async.series [
|
||||
(cb) =>
|
||||
FixturesManager.setUpProject {
|
||||
privilegeLevel: "owner"
|
||||
project: {
|
||||
name: "Test Project"
|
||||
}
|
||||
}, (e, {@project_id, @user_id}) => cb()
|
||||
|
||||
(cb) =>
|
||||
@clientA = RealTimeClient.connect()
|
||||
@clientA.on "connect", cb
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) =>
|
||||
cb(error)
|
||||
|
||||
(cb) =>
|
||||
FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) =>
|
||||
cb(e)
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
@clientA.emit "leaveDoc", @doc_id, () ->
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
@clientA.emit "leaveDoc", @doc_id, () ->
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
@clientA.emit "leaveDoc", @doc_id, () ->
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
@clientA.emit "leaveDoc", @doc_id, () ->
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
@clientA.emit "leaveDoc", @doc_id, cb
|
||||
|
||||
(cb) =>
|
||||
# wait for subscribe and unsubscribe
|
||||
setTimeout cb, 100
|
||||
], done
|
||||
|
||||
it "should not subscribe to the applied-ops channels anymore", (done) ->
|
||||
rclient.pubsub 'CHANNELS', (err, resp) =>
|
||||
return done(err) if err
|
||||
resp.should.not.include "applied-ops:#{@doc_id}"
|
||||
done()
|
||||
return null
|
||||
|
||||
describe "when the client emits joinDoc and leaveDoc requests frequently and remains in the doc", ->
|
||||
before (done) ->
|
||||
async.series [
|
||||
(cb) =>
|
||||
FixturesManager.setUpProject {
|
||||
privilegeLevel: "owner"
|
||||
project: {
|
||||
name: "Test Project"
|
||||
}
|
||||
}, (e, {@project_id, @user_id}) => cb()
|
||||
|
||||
(cb) =>
|
||||
@clientA = RealTimeClient.connect()
|
||||
@clientA.on "connect", cb
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) =>
|
||||
cb(error)
|
||||
|
||||
(cb) =>
|
||||
FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) =>
|
||||
cb(e)
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
@clientA.emit "leaveDoc", @doc_id, () ->
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
@clientA.emit "leaveDoc", @doc_id, () ->
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
@clientA.emit "leaveDoc", @doc_id, () ->
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
@clientA.emit "leaveDoc", @doc_id, () ->
|
||||
@clientA.emit "joinDoc", @doc_id, cb
|
||||
|
||||
(cb) =>
|
||||
# wait for subscribe and unsubscribe
|
||||
setTimeout cb, 100
|
||||
], done
|
||||
|
||||
it "should subscribe to the applied-ops channels", (done) ->
|
||||
rclient.pubsub 'CHANNELS', (err, resp) =>
|
||||
return done(err) if err
|
||||
resp.should.include "applied-ops:#{@doc_id}"
|
||||
done()
|
||||
return null
|
||||
|
||||
describe "when the client disconnects before joinDoc completes", ->
|
||||
before (done) ->
|
||||
async.series [
|
||||
(cb) =>
|
||||
FixturesManager.setUpProject {
|
||||
privilegeLevel: "owner"
|
||||
project: {
|
||||
name: "Test Project"
|
||||
}
|
||||
}, (e, {@project_id, @user_id}) => cb()
|
||||
|
||||
(cb) =>
|
||||
@clientA = RealTimeClient.connect()
|
||||
@clientA.on "connect", cb
|
||||
|
||||
(cb) =>
|
||||
@clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) =>
|
||||
cb(error)
|
||||
|
||||
(cb) =>
|
||||
FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) =>
|
||||
cb(e)
|
||||
|
||||
(cb) =>
|
||||
joinDocCompleted = false
|
||||
@clientA.emit "joinDoc", @doc_id, () ->
|
||||
joinDocCompleted = true
|
||||
# leave before joinDoc completes
|
||||
setTimeout () =>
|
||||
if joinDocCompleted
|
||||
return cb(new Error('joinDocCompleted -- lower timeout'))
|
||||
@clientA.on "disconnect", () -> cb()
|
||||
@clientA.disconnect()
|
||||
# socket.io processes joinDoc and disconnect with different delays:
|
||||
# - joinDoc goes through two process.nextTick
|
||||
# - disconnect goes through one process.nextTick
|
||||
# We have to inject the disconnect event into a different event loop
|
||||
# cycle.
|
||||
, 3
|
||||
|
||||
(cb) =>
|
||||
# wait for subscribe and unsubscribe
|
||||
setTimeout cb, 100
|
||||
], done
|
||||
|
||||
it "should not subscribe to the editor-events channels anymore", (done) ->
|
||||
rclient.pubsub 'CHANNELS', (err, resp) =>
|
||||
return done(err) if err
|
||||
resp.should.not.include "editor-events:#{@project_id}"
|
||||
done()
|
||||
return null
|
||||
|
||||
it "should not subscribe to the applied-ops channels anymore", (done) ->
|
||||
rclient.pubsub 'CHANNELS', (err, resp) =>
|
||||
return done(err) if err
|
||||
resp.should.not.include "applied-ops:#{@doc_id}"
|
||||
done()
|
||||
return null
|
|
@ -1,5 +1,6 @@
|
|||
chai = require('chai')
|
||||
should = chai.should()
|
||||
expect = chai.expect
|
||||
sinon = require("sinon")
|
||||
modulePath = "../../../app/js/ChannelManager.js"
|
||||
SandboxedModule = require('sandboxed-module')
|
||||
|
@ -11,34 +12,82 @@ describe 'ChannelManager', ->
|
|||
@ChannelManager = SandboxedModule.require modulePath, requires:
|
||||
"settings-sharelatex": @settings = {}
|
||||
"metrics-sharelatex": @metrics = {inc: sinon.stub(), summary: sinon.stub()}
|
||||
"logger-sharelatex": @logger = { log: sinon.stub(), warn: sinon.stub(), error: sinon.stub() }
|
||||
|
||||
|
||||
describe "subscribe", ->
|
||||
|
||||
describe "when there is no existing subscription for this redis client", ->
|
||||
beforeEach ->
|
||||
@rclient.subscribe = sinon.stub()
|
||||
beforeEach (done) ->
|
||||
@rclient.subscribe = sinon.stub().resolves()
|
||||
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
setTimeout done
|
||||
|
||||
it "should subscribe to the redis channel", ->
|
||||
@rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true
|
||||
|
||||
describe "when there is an existing subscription for this redis client", ->
|
||||
beforeEach ->
|
||||
@rclient.subscribe = sinon.stub()
|
||||
beforeEach (done) ->
|
||||
@rclient.subscribe = sinon.stub().resolves()
|
||||
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
@rclient.subscribe = sinon.stub() # discard the original stub
|
||||
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
setTimeout done
|
||||
|
||||
it "should not subscribe to the redis channel", ->
|
||||
@rclient.subscribe.called.should.equal false
|
||||
it "should subscribe to the redis channel again", ->
|
||||
@rclient.subscribe.callCount.should.equal 2
|
||||
|
||||
describe "when subscribe errors", ->
|
||||
beforeEach (done) ->
|
||||
@rclient.subscribe = sinon.stub()
|
||||
.onFirstCall().rejects(new Error("some redis error"))
|
||||
.onSecondCall().resolves()
|
||||
p = @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
p.then () ->
|
||||
done(new Error('should not subscribe but fail'))
|
||||
.catch (err) =>
|
||||
err.message.should.equal "some redis error"
|
||||
@ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef").should.equal false
|
||||
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
# subscribe is wrapped in Promise, delay other assertions
|
||||
setTimeout done
|
||||
return null
|
||||
|
||||
it "should have recorded the error", ->
|
||||
expect(@metrics.inc.calledWithExactly("subscribe.failed.applied-ops")).to.equal(true)
|
||||
|
||||
it "should subscribe again", ->
|
||||
@rclient.subscribe.callCount.should.equal 2
|
||||
|
||||
it "should cleanup", ->
|
||||
@ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef").should.equal false
|
||||
|
||||
describe "when subscribe errors and the clientChannelMap entry was replaced", ->
|
||||
beforeEach (done) ->
|
||||
@rclient.subscribe = sinon.stub()
|
||||
.onFirstCall().rejects(new Error("some redis error"))
|
||||
.onSecondCall().resolves()
|
||||
@first = @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
# ignore error
|
||||
@first.catch((()->))
|
||||
expect(@ChannelManager.getClientMapEntry(@rclient).get("applied-ops:1234567890abcdef")).to.equal @first
|
||||
|
||||
@rclient.unsubscribe = sinon.stub().resolves()
|
||||
@ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
@second = @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
# should get replaced immediately
|
||||
expect(@ChannelManager.getClientMapEntry(@rclient).get("applied-ops:1234567890abcdef")).to.equal @second
|
||||
|
||||
# let the first subscribe error -> unsubscribe -> subscribe
|
||||
setTimeout done
|
||||
|
||||
it "should cleanup the second subscribePromise", ->
|
||||
expect(@ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef")).to.equal false
|
||||
|
||||
describe "when there is an existing subscription for another redis client but not this one", ->
|
||||
beforeEach ->
|
||||
@other_rclient.subscribe = sinon.stub()
|
||||
beforeEach (done) ->
|
||||
@other_rclient.subscribe = sinon.stub().resolves()
|
||||
@ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef"
|
||||
@rclient.subscribe = sinon.stub() # discard the original stub
|
||||
@rclient.subscribe = sinon.stub().resolves() # discard the original stub
|
||||
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
setTimeout done
|
||||
|
||||
it "should subscribe to the redis channel on this redis client", ->
|
||||
@rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true
|
||||
|
@ -46,30 +95,82 @@ describe 'ChannelManager', ->
|
|||
describe "unsubscribe", ->
|
||||
|
||||
describe "when there is no existing subscription for this redis client", ->
|
||||
beforeEach ->
|
||||
@rclient.unsubscribe = sinon.stub()
|
||||
beforeEach (done) ->
|
||||
@rclient.unsubscribe = sinon.stub().resolves()
|
||||
@ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
setTimeout done
|
||||
|
||||
it "should not unsubscribe from the redis channel", ->
|
||||
@rclient.unsubscribe.called.should.equal false
|
||||
it "should unsubscribe from the redis channel", ->
|
||||
@rclient.unsubscribe.called.should.equal true
|
||||
|
||||
|
||||
describe "when there is an existing subscription for this another redis client but not this one", ->
|
||||
beforeEach ->
|
||||
@other_rclient.subscribe = sinon.stub()
|
||||
@rclient.unsubscribe = sinon.stub()
|
||||
beforeEach (done) ->
|
||||
@other_rclient.subscribe = sinon.stub().resolves()
|
||||
@rclient.unsubscribe = sinon.stub().resolves()
|
||||
@ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef"
|
||||
@ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
setTimeout done
|
||||
|
||||
it "should not unsubscribe from the redis channel on this client", ->
|
||||
@rclient.unsubscribe.called.should.equal false
|
||||
it "should still unsubscribe from the redis channel on this client", ->
|
||||
@rclient.unsubscribe.called.should.equal true
|
||||
|
||||
describe "when unsubscribe errors and completes", ->
|
||||
beforeEach (done) ->
|
||||
@rclient.subscribe = sinon.stub().resolves()
|
||||
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
@rclient.unsubscribe = sinon.stub().rejects(new Error("some redis error"))
|
||||
@ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
setTimeout done
|
||||
return null
|
||||
|
||||
it "should have cleaned up", ->
|
||||
@ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef").should.equal false
|
||||
|
||||
it "should not error out when subscribing again", (done) ->
|
||||
p = @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
p.then () ->
|
||||
done()
|
||||
.catch done
|
||||
return null
|
||||
|
||||
describe "when unsubscribe errors and another client subscribes at the same time", ->
|
||||
beforeEach (done) ->
|
||||
@rclient.subscribe = sinon.stub().resolves()
|
||||
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
rejectSubscribe = undefined
|
||||
@rclient.unsubscribe = () ->
|
||||
return new Promise (resolve, reject) ->
|
||||
rejectSubscribe = reject
|
||||
@ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
|
||||
setTimeout () =>
|
||||
# delay, actualUnsubscribe should not see the new subscribe request
|
||||
@ChannelManager.subscribe(@rclient, "applied-ops", "1234567890abcdef")
|
||||
.then () ->
|
||||
setTimeout done
|
||||
.catch done
|
||||
setTimeout ->
|
||||
# delay, rejectSubscribe is not defined immediately
|
||||
rejectSubscribe(new Error("redis error"))
|
||||
return null
|
||||
|
||||
it "should have recorded the error", ->
|
||||
expect(@metrics.inc.calledWithExactly("unsubscribe.failed.applied-ops")).to.equal(true)
|
||||
|
||||
it "should have subscribed", ->
|
||||
@rclient.subscribe.called.should.equal true
|
||||
|
||||
it "should have discarded the finished Promise", ->
|
||||
@ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef").should.equal false
|
||||
|
||||
describe "when there is an existing subscription for this redis client", ->
|
||||
beforeEach ->
|
||||
@rclient.subscribe = sinon.stub()
|
||||
@rclient.unsubscribe = sinon.stub()
|
||||
beforeEach (done) ->
|
||||
@rclient.subscribe = sinon.stub().resolves()
|
||||
@rclient.unsubscribe = sinon.stub().resolves()
|
||||
@ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
@ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef"
|
||||
setTimeout done
|
||||
|
||||
it "should unsubscribe from the redis channel", ->
|
||||
@rclient.unsubscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true
|
||||
|
|
Loading…
Reference in a new issue