Merge pull request #47 from sharelatex/bg-limit-worker-calls

limit worker calls
This commit is contained in:
Brian Gough 2017-05-30 13:58:54 +01:00 committed by GitHub
commit 9a8c0fda10
4 changed files with 137 additions and 8 deletions

View file

@ -5,9 +5,10 @@ redis = require("redis-sharelatex")
UpdateManager = require('./UpdateManager')
Metrics = require('./Metrics')
RateLimitManager = require('./RateLimitManager')
module.exports = DispatchManager =
createDispatcher: () ->
createDispatcher: (RateLimiter) ->
client = redis.createClient(Settings.redis.realtime)
worker = {
client: client
@ -20,11 +21,11 @@ module.exports = DispatchManager =
[list_name, doc_key] = result
[project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key)
# Dispatch this in the background
Metrics.gauge "processingUpdates", "+1" # increments/decrements gauge with +/- sign
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
Metrics.gauge "processingUpdates", "-1"
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error?
callback()
backgroundTask = (cb) ->
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error?
cb()
RateLimiter.run backgroundTask, callback
run: () ->
return if Settings.shuttingDown
@ -39,6 +40,7 @@ module.exports = DispatchManager =
return worker
createAndStartDispatchers: (number) ->
RateLimiter = new RateLimitManager(number)
for i in [1..number]
worker = DispatchManager.createDispatcher()
worker = DispatchManager.createDispatcher(RateLimiter)
worker.run()

View file

@ -0,0 +1,36 @@
Settings = require('settings-sharelatex')
logger = require('logger-sharelatex')
Metrics = require('./Metrics')
module.exports = class RateLimiter
constructor: (number = 10) ->
@ActiveWorkerCount = 0
@CurrentWorkerLimit = number
@BaseWorkerCount = number
_adjustLimitUp: () ->
@CurrentWorkerLimit += 0.1 # allow target worker limit to increase gradually
_adjustLimitDown: () ->
@CurrentWorkerLimit = Math.max @BaseWorkerCount, (@CurrentWorkerLimit * 0.9)
logger.log {currentLimit: Math.ceil(@CurrentWorkerLimit)}, "reducing rate limit"
_trackAndRun: (task, callback = () ->) ->
@ActiveWorkerCount++
Metrics.gauge "processingUpdates", "+1" # increments/decrements gauge with +/- sign
task (err) =>
@ActiveWorkerCount--
Metrics.gauge "processingUpdates", "-1"
callback(err)
run: (task, callback) ->
if @ActiveWorkerCount < @CurrentWorkerLimit
@_trackAndRun task # below the limit, just put the task in the background
callback() # return immediately
if @CurrentWorkerLimit > @BaseWorkerCount
@_adjustLimitDown()
else
logger.log {active: @ActiveWorkerCount, currentLimit: Math.ceil(@CurrentWorkerLimit)}, "hit rate limit"
@_trackAndRun task, callback # only return after task completes
@_adjustLimitUp()

View file

@ -14,6 +14,7 @@ describe "DispatchManager", ->
realtime: {}
"redis-sharelatex": @redis = {}
@callback = sinon.stub()
@RateLimiter = { run: (task,cb) -> task(cb) } # run task without rate limit
describe "each worker", ->
beforeEach ->
@ -21,7 +22,7 @@ describe "DispatchManager", ->
auth: sinon.stub()
@redis.createClient = sinon.stub().returns @client
@worker = @DispatchManager.createDispatcher()
@worker = @DispatchManager.createDispatcher(@RateLimiter)
it "should create a new redis client", ->
@redis.createClient.called.should.equal true

View file

@ -0,0 +1,90 @@
sinon = require('sinon')
chai = require('chai')
should = chai.should()
expect = chai.expect
modulePath = "../../../../app/js/RateLimitManager.js"
SandboxedModule = require('sandboxed-module')
describe "RateLimitManager", ->
beforeEach ->
@RateLimitManager = SandboxedModule.require modulePath, requires:
"logger-sharelatex": @logger = { log: sinon.stub() }
"settings-sharelatex": @settings =
redis:
realtime: {}
"./Metrics": @Metrics =
Timer: class Timer
done: sinon.stub()
gauge: sinon.stub()
@callback = sinon.stub()
@RateLimiter = new @RateLimitManager(1)
describe "for a single task", ->
beforeEach ->
@task = sinon.stub()
@RateLimiter.run @task, @callback
it "should execute the task in the background", ->
@task.called.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
it "should finish with a worker count of one", ->
# because it's in the background
expect(@RateLimiter.ActiveWorkerCount).to.equal 1
describe "for multiple tasks", ->
beforeEach (done) ->
@task = sinon.stub()
@finalTask = sinon.stub()
task = (cb) =>
@task()
setTimeout cb, 100
finalTask = (cb) =>
@finalTask()
setTimeout cb, 100
@RateLimiter.run task, @callback
@RateLimiter.run task, @callback
@RateLimiter.run task, @callback
@RateLimiter.run finalTask, (err) =>
@callback(err)
done()
it "should execute the first three tasks", ->
@task.calledThrice.should.equal true
it "should execute the final task", ->
@finalTask.called.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
it "should finish with worker count of zero", ->
expect(@RateLimiter.ActiveWorkerCount).to.equal 0
describe "for a mixture of long-running tasks", ->
beforeEach (done) ->
@task = sinon.stub()
@finalTask = sinon.stub()
finalTask = (cb) =>
@finalTask()
setTimeout cb, 100
@RateLimiter.run @task, @callback
@RateLimiter.run @task, @callback
@RateLimiter.run @task, @callback
@RateLimiter.run finalTask, (err) =>
@callback(err)
done()
it "should execute the first three tasks", ->
@task.calledThrice.should.equal true
it "should execute the final task", ->
@finalTask.called.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
it "should finish with worker count of three", ->
expect(@RateLimiter.ActiveWorkerCount).to.equal 3