diff --git a/services/real-time/test/acceptance/coffee/LeaveProjectTests.coffee b/services/real-time/test/acceptance/coffee/LeaveProjectTests.coffee index 7ef8b35c64..5925472245 100644 --- a/services/real-time/test/acceptance/coffee/LeaveProjectTests.coffee +++ b/services/real-time/test/acceptance/coffee/LeaveProjectTests.coffee @@ -4,10 +4,14 @@ FixturesManager = require "./helpers/FixturesManager" async = require "async" +settings = require "settings-sharelatex" +redis = require "redis-sharelatex" +rclient = redis.createClient(settings.redis.pubsub) + describe "leaveProject", -> before (done) -> MockDocUpdaterServer.run done - + describe "with other clients in the project", -> before (done) -> async.series [ @@ -18,53 +22,76 @@ describe "leaveProject", -> name: "Test Project" } }, (e, {@project_id, @user_id}) => cb() - + (cb) => @clientA = RealTimeClient.connect() @clientA.on "connectionAccepted", cb - + (cb) => @clientB = RealTimeClient.connect() @clientB.on "connectionAccepted", cb - + @clientBDisconnectMessages = [] @clientB.on "clientTracking.clientDisconnected", (data) => @clientBDisconnectMessages.push data - + (cb) => @clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) => cb(error) - + (cb) => @clientB.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) => cb(error) - + + (cb) => + FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) => + cb(e) + + (cb) => + @clientA.emit "joinDoc", @doc_id, cb + (cb) => + @clientB.emit "joinDoc", @doc_id, cb + (cb) => # leaveProject is called when the client disconnects @clientA.on "disconnect", () -> cb() @clientA.disconnect() - + (cb) => # The API waits a little while before flushing changes setTimeout done, 1000 - + ], done it "should emit a disconnect message to the room", -> @clientBDisconnectMessages.should.deep.equal [@clientA.socket.sessionid] - + it "should no longer list the client in connected users", (done) -> @clientB.emit "clientTracking.getConnectedUsers", (error, users) => for user in users if user.client_id == @clientA.socket.sessionid throw "Expected clientA to not be listed in connected users" return done() - + it "should not flush the project to the document updater", -> MockDocUpdaterServer.deleteProject .calledWith(@project_id) .should.equal false + it "should remain subscribed to the editor-events channels", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + resp.should.include "editor-events:#{@project_id}" + done() + return null + + it "should remain subscribed to the applied-ops channels", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + resp.should.include "applied-ops:#{@doc_id}" + done() + return null + describe "with no other clients in the project", -> before (done) -> async.series [ @@ -75,20 +102,26 @@ describe "leaveProject", -> name: "Test Project" } }, (e, {@project_id, @user_id}) => cb() - + (cb) => @clientA = RealTimeClient.connect() @clientA.on "connect", cb - + (cb) => @clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) => cb(error) - + + (cb) => + FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) => + cb(e) + (cb) => + @clientA.emit "joinDoc", @doc_id, cb + (cb) => # leaveProject is called when the client disconnects @clientA.on "disconnect", () -> cb() @clientA.disconnect() - + (cb) => # The API waits a little while before flushing changes setTimeout done, 1000 @@ -98,3 +131,17 @@ describe "leaveProject", -> MockDocUpdaterServer.deleteProject .calledWith(@project_id) .should.equal true + + it "should not subscribe to the editor-events channels anymore", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + resp.should.not.include "editor-events:#{@project_id}" + done() + return null + + it "should not subscribe to the applied-ops channels anymore", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + resp.should.not.include "applied-ops:#{@doc_id}" + done() + return null diff --git a/services/real-time/test/acceptance/coffee/PubSubRace.coffee b/services/real-time/test/acceptance/coffee/PubSubRace.coffee new file mode 100644 index 0000000000..d5e6653fac --- /dev/null +++ b/services/real-time/test/acceptance/coffee/PubSubRace.coffee @@ -0,0 +1,205 @@ +RealTimeClient = require "./helpers/RealTimeClient" +MockDocUpdaterServer = require "./helpers/MockDocUpdaterServer" +FixturesManager = require "./helpers/FixturesManager" + +async = require "async" + +settings = require "settings-sharelatex" +redis = require "redis-sharelatex" +rclient = redis.createClient(settings.redis.pubsub) + +describe "PubSubRace", -> + before (done) -> + MockDocUpdaterServer.run done + + describe "when the client leaves a doc before joinDoc completes", -> + before (done) -> + async.series [ + (cb) => + FixturesManager.setUpProject { + privilegeLevel: "owner" + project: { + name: "Test Project" + } + }, (e, {@project_id, @user_id}) => cb() + + (cb) => + @clientA = RealTimeClient.connect() + @clientA.on "connect", cb + + (cb) => + @clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) => + cb(error) + + (cb) => + FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) => + cb(e) + + (cb) => + @clientA.emit "joinDoc", @doc_id, () -> + # leave before joinDoc completes + @clientA.emit "leaveDoc", @doc_id, cb + + (cb) => + # wait for subscribe and unsubscribe + setTimeout cb, 100 + ], done + + it "should not subscribe to the applied-ops channels anymore", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + resp.should.not.include "applied-ops:#{@doc_id}" + done() + return null + + describe "when the client emits joinDoc and leaveDoc requests frequently and leaves eventually", -> + before (done) -> + async.series [ + (cb) => + FixturesManager.setUpProject { + privilegeLevel: "owner" + project: { + name: "Test Project" + } + }, (e, {@project_id, @user_id}) => cb() + + (cb) => + @clientA = RealTimeClient.connect() + @clientA.on "connect", cb + + (cb) => + @clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) => + cb(error) + + (cb) => + FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) => + cb(e) + + (cb) => + @clientA.emit "joinDoc", @doc_id, () -> + @clientA.emit "leaveDoc", @doc_id, () -> + @clientA.emit "joinDoc", @doc_id, () -> + @clientA.emit "leaveDoc", @doc_id, () -> + @clientA.emit "joinDoc", @doc_id, () -> + @clientA.emit "leaveDoc", @doc_id, () -> + @clientA.emit "joinDoc", @doc_id, () -> + @clientA.emit "leaveDoc", @doc_id, () -> + @clientA.emit "joinDoc", @doc_id, () -> + @clientA.emit "leaveDoc", @doc_id, cb + + (cb) => + # wait for subscribe and unsubscribe + setTimeout cb, 100 + ], done + + it "should not subscribe to the applied-ops channels anymore", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + resp.should.not.include "applied-ops:#{@doc_id}" + done() + return null + + describe "when the client emits joinDoc and leaveDoc requests frequently and remains in the doc", -> + before (done) -> + async.series [ + (cb) => + FixturesManager.setUpProject { + privilegeLevel: "owner" + project: { + name: "Test Project" + } + }, (e, {@project_id, @user_id}) => cb() + + (cb) => + @clientA = RealTimeClient.connect() + @clientA.on "connect", cb + + (cb) => + @clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) => + cb(error) + + (cb) => + FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) => + cb(e) + + (cb) => + @clientA.emit "joinDoc", @doc_id, () -> + @clientA.emit "leaveDoc", @doc_id, () -> + @clientA.emit "joinDoc", @doc_id, () -> + @clientA.emit "leaveDoc", @doc_id, () -> + @clientA.emit "joinDoc", @doc_id, () -> + @clientA.emit "leaveDoc", @doc_id, () -> + @clientA.emit "joinDoc", @doc_id, () -> + @clientA.emit "leaveDoc", @doc_id, () -> + @clientA.emit "joinDoc", @doc_id, cb + + (cb) => + # wait for subscribe and unsubscribe + setTimeout cb, 100 + ], done + + it "should subscribe to the applied-ops channels", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + resp.should.include "applied-ops:#{@doc_id}" + done() + return null + + describe "when the client disconnects before joinDoc completes", -> + before (done) -> + async.series [ + (cb) => + FixturesManager.setUpProject { + privilegeLevel: "owner" + project: { + name: "Test Project" + } + }, (e, {@project_id, @user_id}) => cb() + + (cb) => + @clientA = RealTimeClient.connect() + @clientA.on "connect", cb + + (cb) => + @clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) => + cb(error) + + (cb) => + FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) => + cb(e) + + (cb) => + joinDocCompleted = false + @clientA.emit "joinDoc", @doc_id, () -> + joinDocCompleted = true + # leave before joinDoc completes + setTimeout () => + if joinDocCompleted + return cb(new Error('joinDocCompleted -- lower timeout')) + @clientA.on "disconnect", () -> cb() + @clientA.disconnect() + # socket.io processes joinDoc and disconnect with different delays: + # - joinDoc goes through two process.nextTick + # - disconnect goes through one process.nextTick + # We have to inject the disconnect event into a different event loop + # cycle. + , 3 + + (cb) => + # wait for subscribe and unsubscribe + setTimeout cb, 100 + ], done + + it "should not subscribe to the editor-events channels anymore", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + resp.should.not.include "editor-events:#{@project_id}" + done() + return null + + it "should not subscribe to the applied-ops channels anymore", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + resp.should.not.include "applied-ops:#{@doc_id}" + done() + return null diff --git a/services/real-time/test/unit/coffee/ChannelManagerTests.coffee b/services/real-time/test/unit/coffee/ChannelManagerTests.coffee index edde3e8170..cb77991f58 100644 --- a/services/real-time/test/unit/coffee/ChannelManagerTests.coffee +++ b/services/real-time/test/unit/coffee/ChannelManagerTests.coffee @@ -1,5 +1,6 @@ chai = require('chai') should = chai.should() +expect = chai.expect sinon = require("sinon") modulePath = "../../../app/js/ChannelManager.js" SandboxedModule = require('sandboxed-module') @@ -11,34 +12,82 @@ describe 'ChannelManager', -> @ChannelManager = SandboxedModule.require modulePath, requires: "settings-sharelatex": @settings = {} "metrics-sharelatex": @metrics = {inc: sinon.stub(), summary: sinon.stub()} - "logger-sharelatex": @logger = { log: sinon.stub(), warn: sinon.stub(), error: sinon.stub() } - + describe "subscribe", -> describe "when there is no existing subscription for this redis client", -> - beforeEach -> - @rclient.subscribe = sinon.stub() + beforeEach (done) -> + @rclient.subscribe = sinon.stub().resolves() @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + setTimeout done it "should subscribe to the redis channel", -> @rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true describe "when there is an existing subscription for this redis client", -> - beforeEach -> - @rclient.subscribe = sinon.stub() + beforeEach (done) -> + @rclient.subscribe = sinon.stub().resolves() @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" - @rclient.subscribe = sinon.stub() # discard the original stub @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + setTimeout done - it "should not subscribe to the redis channel", -> - @rclient.subscribe.called.should.equal false + it "should subscribe to the redis channel again", -> + @rclient.subscribe.callCount.should.equal 2 + + describe "when subscribe errors", -> + beforeEach (done) -> + @rclient.subscribe = sinon.stub() + .onFirstCall().rejects(new Error("some redis error")) + .onSecondCall().resolves() + p = @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + p.then () -> + done(new Error('should not subscribe but fail')) + .catch (err) => + err.message.should.equal "some redis error" + @ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef").should.equal false + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + # subscribe is wrapped in Promise, delay other assertions + setTimeout done + return null + + it "should have recorded the error", -> + expect(@metrics.inc.calledWithExactly("subscribe.failed.applied-ops")).to.equal(true) + + it "should subscribe again", -> + @rclient.subscribe.callCount.should.equal 2 + + it "should cleanup", -> + @ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef").should.equal false + + describe "when subscribe errors and the clientChannelMap entry was replaced", -> + beforeEach (done) -> + @rclient.subscribe = sinon.stub() + .onFirstCall().rejects(new Error("some redis error")) + .onSecondCall().resolves() + @first = @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + # ignore error + @first.catch((()->)) + expect(@ChannelManager.getClientMapEntry(@rclient).get("applied-ops:1234567890abcdef")).to.equal @first + + @rclient.unsubscribe = sinon.stub().resolves() + @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + @second = @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + # should get replaced immediately + expect(@ChannelManager.getClientMapEntry(@rclient).get("applied-ops:1234567890abcdef")).to.equal @second + + # let the first subscribe error -> unsubscribe -> subscribe + setTimeout done + + it "should cleanup the second subscribePromise", -> + expect(@ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef")).to.equal false describe "when there is an existing subscription for another redis client but not this one", -> - beforeEach -> - @other_rclient.subscribe = sinon.stub() + beforeEach (done) -> + @other_rclient.subscribe = sinon.stub().resolves() @ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef" - @rclient.subscribe = sinon.stub() # discard the original stub + @rclient.subscribe = sinon.stub().resolves() # discard the original stub @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + setTimeout done it "should subscribe to the redis channel on this redis client", -> @rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true @@ -46,30 +95,82 @@ describe 'ChannelManager', -> describe "unsubscribe", -> describe "when there is no existing subscription for this redis client", -> - beforeEach -> - @rclient.unsubscribe = sinon.stub() + beforeEach (done) -> + @rclient.unsubscribe = sinon.stub().resolves() @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + setTimeout done - it "should not unsubscribe from the redis channel", -> - @rclient.unsubscribe.called.should.equal false + it "should unsubscribe from the redis channel", -> + @rclient.unsubscribe.called.should.equal true describe "when there is an existing subscription for this another redis client but not this one", -> - beforeEach -> - @other_rclient.subscribe = sinon.stub() - @rclient.unsubscribe = sinon.stub() + beforeEach (done) -> + @other_rclient.subscribe = sinon.stub().resolves() + @rclient.unsubscribe = sinon.stub().resolves() @ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef" @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + setTimeout done - it "should not unsubscribe from the redis channel on this client", -> - @rclient.unsubscribe.called.should.equal false + it "should still unsubscribe from the redis channel on this client", -> + @rclient.unsubscribe.called.should.equal true + + describe "when unsubscribe errors and completes", -> + beforeEach (done) -> + @rclient.subscribe = sinon.stub().resolves() + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + @rclient.unsubscribe = sinon.stub().rejects(new Error("some redis error")) + @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + setTimeout done + return null + + it "should have cleaned up", -> + @ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef").should.equal false + + it "should not error out when subscribing again", (done) -> + p = @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + p.then () -> + done() + .catch done + return null + + describe "when unsubscribe errors and another client subscribes at the same time", -> + beforeEach (done) -> + @rclient.subscribe = sinon.stub().resolves() + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + rejectSubscribe = undefined + @rclient.unsubscribe = () -> + return new Promise (resolve, reject) -> + rejectSubscribe = reject + @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + + setTimeout () => + # delay, actualUnsubscribe should not see the new subscribe request + @ChannelManager.subscribe(@rclient, "applied-ops", "1234567890abcdef") + .then () -> + setTimeout done + .catch done + setTimeout -> + # delay, rejectSubscribe is not defined immediately + rejectSubscribe(new Error("redis error")) + return null + + it "should have recorded the error", -> + expect(@metrics.inc.calledWithExactly("unsubscribe.failed.applied-ops")).to.equal(true) + + it "should have subscribed", -> + @rclient.subscribe.called.should.equal true + + it "should have discarded the finished Promise", -> + @ChannelManager.getClientMapEntry(@rclient).has("applied-ops:1234567890abcdef").should.equal false describe "when there is an existing subscription for this redis client", -> - beforeEach -> - @rclient.subscribe = sinon.stub() - @rclient.unsubscribe = sinon.stub() + beforeEach (done) -> + @rclient.subscribe = sinon.stub().resolves() + @rclient.unsubscribe = sinon.stub().resolves() @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + setTimeout done it "should unsubscribe from the redis channel", -> @rclient.unsubscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true