diff --git a/services/real-time/app/coffee/DrainManager.coffee b/services/real-time/app/coffee/DrainManager.coffee new file mode 100644 index 0000000000..08caac3c28 --- /dev/null +++ b/services/real-time/app/coffee/DrainManager.coffee @@ -0,0 +1,26 @@ +logger = require "logger-sharelatex" + +module.exports = + startDrain: (io, rate) -> + # Clear out any old interval + clearInterval @interval + if rate == 0 + return + @interval = setInterval () => + @reconnectNClients(io, rate) + , 1000 + + RECONNECTED_CLIENTS: {} + reconnectNClients: (io, N) -> + drainedCount = 0 + for client in io.sockets.clients() + logger.log {client_id: client.id, already_reconnecting: @RECONNECTED_CLIENTS[client.id]}, "Considering client" + if !@RECONNECTED_CLIENTS[client.id] + @RECONNECTED_CLIENTS[client.id] = true + logger.log {client_id: client.id}, "Asking client to reconnect gracefully" + client.emit "reconnectGracefully" + drainedCount++ + if drainedCount == N + break + if drainedCount < N + logger.log "All clients have been told to reconnectGracefully" \ No newline at end of file diff --git a/services/real-time/app/coffee/HttpApiController.coffee b/services/real-time/app/coffee/HttpApiController.coffee index 1082a11b34..a2a9d4d23c 100644 --- a/services/real-time/app/coffee/HttpApiController.coffee +++ b/services/real-time/app/coffee/HttpApiController.coffee @@ -1,4 +1,5 @@ WebsocketLoadBalancer = require "./WebsocketLoadBalancer" +DrainManager = require "./DrainManager" logger = require "logger-sharelatex" module.exports = HttpApiController = @@ -9,4 +10,12 @@ module.exports = HttpApiController = WebsocketLoadBalancer.emitToRoom req.params.project_id, req.params.message, payload else WebsocketLoadBalancer.emitToRoom req.params.project_id, req.params.message, req.body - res.send 204 # No content \ No newline at end of file + res.send 204 # No content + + startDrain: (req, res, next) -> + io = req.app.get("io") + rate = req.query.rate or "4" + rate = parseInt(rate, 10) + logger.log {rate}, "setting client drain rate" + DrainManager.startDrain io, rate + res.send 204 \ No newline at end of file diff --git a/services/real-time/app/coffee/Router.coffee b/services/real-time/app/coffee/Router.coffee index 64fdb6931d..2f56020578 100644 --- a/services/real-time/app/coffee/Router.coffee +++ b/services/real-time/app/coffee/Router.coffee @@ -35,6 +35,8 @@ module.exports = Router = app.get "/clients/:client_id", HttpController.getConnectedClient app.post "/project/:project_id/message/:message", httpAuth, bodyParser.json(limit: "5mb"), HttpApiController.sendMessage + + app.post "/drain", httpAuth, HttpApiController.startDrain session.on 'connection', (error, client, session) -> if error? diff --git a/services/real-time/test/unit/coffee/DrainManagerTests.coffee b/services/real-time/test/unit/coffee/DrainManagerTests.coffee new file mode 100644 index 0000000000..b3cdaf1ca0 --- /dev/null +++ b/services/real-time/test/unit/coffee/DrainManagerTests.coffee @@ -0,0 +1,64 @@ +should = require('chai').should() +sinon = require "sinon" +SandboxedModule = require('sandboxed-module') +path = require "path" +modulePath = path.join __dirname, "../../../app/js/DrainManager" + +describe "DrainManager", -> + beforeEach -> + @DrainManager = SandboxedModule.require modulePath, requires: + "logger-sharelatex": @logger = log: sinon.stub() + @io = + sockets: + clients: sinon.stub() + + describe "reconnectNClients", -> + beforeEach -> + @clients = [] + for i in [0..9] + @clients[i] = { + id: i + emit: sinon.stub() + } + @io.sockets.clients.returns @clients + + describe "after first pass", -> + beforeEach -> + @DrainManager.reconnectNClients(@io, 3) + + it "should reconnect the first 3 clients", -> + for i in [0..2] + @clients[i].emit.calledWith("reconnectGracefully").should.equal true + + it "should not reconnect any more clients", -> + for i in [3..9] + @clients[i].emit.calledWith("reconnectGracefully").should.equal false + + describe "after second pass", -> + beforeEach -> + @DrainManager.reconnectNClients(@io, 3) + + it "should reconnect the next 3 clients", -> + for i in [3..5] + @clients[i].emit.calledWith("reconnectGracefully").should.equal true + + it "should not reconnect any more clients", -> + for i in [6..9] + @clients[i].emit.calledWith("reconnectGracefully").should.equal false + + it "should not reconnect the first 3 clients again", -> + for i in [0..2] + @clients[i].emit.calledOnce.should.equal true + + describe "after final pass", -> + beforeEach -> + @DrainManager.reconnectNClients(@io, 100) + + it "should not reconnect the first 6 clients again", -> + for i in [0..5] + @clients[i].emit.calledOnce.should.equal true + + it "should log out that it reached the end", -> + @logger.log + .calledWith("All clients have been told to reconnectGracefully") + .should.equal true