diff --git a/services/document-updater/app/coffee/DispatchManager.coffee b/services/document-updater/app/coffee/DispatchManager.coffee index 4444c0183b..e60d226660 100644 --- a/services/document-updater/app/coffee/DispatchManager.coffee +++ b/services/document-updater/app/coffee/DispatchManager.coffee @@ -5,9 +5,10 @@ redis = require("redis-sharelatex") UpdateManager = require('./UpdateManager') Metrics = require('./Metrics') +RateLimitManager = require('./RateLimitManager') module.exports = DispatchManager = - createDispatcher: () -> + createDispatcher: (RateLimiter) -> client = redis.createClient(Settings.redis.realtime) worker = { client: client @@ -20,11 +21,11 @@ module.exports = DispatchManager = [list_name, doc_key] = result [project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key) # Dispatch this in the background - Metrics.gauge "processingUpdates", "+1" # increments/decrements gauge with +/- sign - UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) -> - Metrics.gauge "processingUpdates", "-1" - logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error? - callback() + backgroundTask = (cb) -> + UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) -> + logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error? + cb() + RateLimiter.run backgroundTask, callback run: () -> return if Settings.shuttingDown @@ -39,6 +40,7 @@ module.exports = DispatchManager = return worker createAndStartDispatchers: (number) -> + RateLimiter = new RateLimitManager(number) for i in [1..number] - worker = DispatchManager.createDispatcher() + worker = DispatchManager.createDispatcher(RateLimiter) worker.run() diff --git a/services/document-updater/app/coffee/RateLimitManager.coffee b/services/document-updater/app/coffee/RateLimitManager.coffee new file mode 100644 index 0000000000..ce61232af3 --- /dev/null +++ b/services/document-updater/app/coffee/RateLimitManager.coffee @@ -0,0 +1,36 @@ +Settings = require('settings-sharelatex') +logger = require('logger-sharelatex') +Metrics = require('./Metrics') + +module.exports = class RateLimiter + + constructor: (number = 10) -> + @ActiveWorkerCount = 0 + @CurrentWorkerLimit = number + @BaseWorkerCount = number + + _adjustLimitUp: () -> + @CurrentWorkerLimit += 0.1 # allow target worker limit to increase gradually + + _adjustLimitDown: () -> + @CurrentWorkerLimit = Math.max @BaseWorkerCount, (@CurrentWorkerLimit * 0.9) + logger.log {currentLimit: Math.ceil(@CurrentWorkerLimit)}, "reducing rate limit" + + _trackAndRun: (task, callback = () ->) -> + @ActiveWorkerCount++ + Metrics.gauge "processingUpdates", "+1" # increments/decrements gauge with +/- sign + task (err) => + @ActiveWorkerCount-- + Metrics.gauge "processingUpdates", "-1" + callback(err) + + run: (task, callback) -> + if @ActiveWorkerCount < @CurrentWorkerLimit + @_trackAndRun task # below the limit, just put the task in the background + callback() # return immediately + if @CurrentWorkerLimit > @BaseWorkerCount + @_adjustLimitDown() + else + logger.log {active: @ActiveWorkerCount, currentLimit: Math.ceil(@CurrentWorkerLimit)}, "hit rate limit" + @_trackAndRun task, callback # only return after task completes + @_adjustLimitUp() diff --git a/services/document-updater/test/unit/coffee/DispatchManager/DispatchManagerTests.coffee b/services/document-updater/test/unit/coffee/DispatchManager/DispatchManagerTests.coffee index a82a40af04..dcd643fcfe 100644 --- a/services/document-updater/test/unit/coffee/DispatchManager/DispatchManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/DispatchManager/DispatchManagerTests.coffee @@ -14,6 +14,7 @@ describe "DispatchManager", -> realtime: {} "redis-sharelatex": @redis = {} @callback = sinon.stub() + @RateLimiter = { run: (task,cb) -> task(cb) } # run task without rate limit describe "each worker", -> beforeEach -> @@ -21,7 +22,7 @@ describe "DispatchManager", -> auth: sinon.stub() @redis.createClient = sinon.stub().returns @client - @worker = @DispatchManager.createDispatcher() + @worker = @DispatchManager.createDispatcher(@RateLimiter) it "should create a new redis client", -> @redis.createClient.called.should.equal true diff --git a/services/document-updater/test/unit/coffee/RateLimitManager/RateLimitManager.coffee b/services/document-updater/test/unit/coffee/RateLimitManager/RateLimitManager.coffee new file mode 100644 index 0000000000..866532a4da --- /dev/null +++ b/services/document-updater/test/unit/coffee/RateLimitManager/RateLimitManager.coffee @@ -0,0 +1,90 @@ +sinon = require('sinon') +chai = require('chai') +should = chai.should() +expect = chai.expect +modulePath = "../../../../app/js/RateLimitManager.js" +SandboxedModule = require('sandboxed-module') + +describe "RateLimitManager", -> + beforeEach -> + @RateLimitManager = SandboxedModule.require modulePath, requires: + "logger-sharelatex": @logger = { log: sinon.stub() } + "settings-sharelatex": @settings = + redis: + realtime: {} + "./Metrics": @Metrics = + Timer: class Timer + done: sinon.stub() + gauge: sinon.stub() + @callback = sinon.stub() + @RateLimiter = new @RateLimitManager(1) + + describe "for a single task", -> + beforeEach -> + @task = sinon.stub() + @RateLimiter.run @task, @callback + + it "should execute the task in the background", -> + @task.called.should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + it "should finish with a worker count of one", -> + # because it's in the background + expect(@RateLimiter.ActiveWorkerCount).to.equal 1 + + describe "for multiple tasks", -> + beforeEach (done) -> + @task = sinon.stub() + @finalTask = sinon.stub() + task = (cb) => + @task() + setTimeout cb, 100 + finalTask = (cb) => + @finalTask() + setTimeout cb, 100 + @RateLimiter.run task, @callback + @RateLimiter.run task, @callback + @RateLimiter.run task, @callback + @RateLimiter.run finalTask, (err) => + @callback(err) + done() + + it "should execute the first three tasks", -> + @task.calledThrice.should.equal true + + it "should execute the final task", -> + @finalTask.called.should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + it "should finish with worker count of zero", -> + expect(@RateLimiter.ActiveWorkerCount).to.equal 0 + + describe "for a mixture of long-running tasks", -> + beforeEach (done) -> + @task = sinon.stub() + @finalTask = sinon.stub() + finalTask = (cb) => + @finalTask() + setTimeout cb, 100 + @RateLimiter.run @task, @callback + @RateLimiter.run @task, @callback + @RateLimiter.run @task, @callback + @RateLimiter.run finalTask, (err) => + @callback(err) + done() + + it "should execute the first three tasks", -> + @task.calledThrice.should.equal true + + it "should execute the final task", -> + @finalTask.called.should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + it "should finish with worker count of three", -> + expect(@RateLimiter.ActiveWorkerCount).to.equal 3