mirror of
https://github.com/overleaf/overleaf.git
synced 2025-01-27 04:41:49 +00:00
put a rate limit on worker calls
This commit is contained in:
parent
95046b1f91
commit
891ffda3bf
4 changed files with 137 additions and 8 deletions
|
@ -5,9 +5,10 @@ redis = require("redis-sharelatex")
|
|||
|
||||
UpdateManager = require('./UpdateManager')
|
||||
Metrics = require('./Metrics')
|
||||
RateLimitManager = require('./RateLimitManager')
|
||||
|
||||
module.exports = DispatchManager =
|
||||
createDispatcher: () ->
|
||||
createDispatcher: (RateLimiter) ->
|
||||
client = redis.createClient(Settings.redis.realtime)
|
||||
worker = {
|
||||
client: client
|
||||
|
@ -20,11 +21,11 @@ module.exports = DispatchManager =
|
|||
[list_name, doc_key] = result
|
||||
[project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key)
|
||||
# Dispatch this in the background
|
||||
Metrics.gauge "processingUpdates", "+1" # increments/decrements gauge with +/- sign
|
||||
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
|
||||
Metrics.gauge "processingUpdates", "-1"
|
||||
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error?
|
||||
callback()
|
||||
backgroundTask = (cb) ->
|
||||
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
|
||||
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error?
|
||||
cb()
|
||||
RateLimiter.run backgroundTask, callback
|
||||
|
||||
run: () ->
|
||||
return if Settings.shuttingDown
|
||||
|
@ -39,6 +40,7 @@ module.exports = DispatchManager =
|
|||
return worker
|
||||
|
||||
createAndStartDispatchers: (number) ->
|
||||
RateLimiter = new RateLimitManager(number)
|
||||
for i in [1..number]
|
||||
worker = DispatchManager.createDispatcher()
|
||||
worker = DispatchManager.createDispatcher(RateLimiter)
|
||||
worker.run()
|
||||
|
|
36
services/document-updater/app/coffee/RateLimitManager.coffee
Normal file
36
services/document-updater/app/coffee/RateLimitManager.coffee
Normal file
|
@ -0,0 +1,36 @@
|
|||
Settings = require('settings-sharelatex')
|
||||
logger = require('logger-sharelatex')
|
||||
Metrics = require('./Metrics')
|
||||
|
||||
module.exports = class RateLimiter
|
||||
|
||||
constructor: (number = 10) ->
|
||||
@ActiveWorkerCount = 0
|
||||
@CurrentWorkerLimit = number
|
||||
@BaseWorkerCount = number
|
||||
|
||||
_adjustLimitUp: () ->
|
||||
@CurrentWorkerLimit += 0.1 # allow target worker limit to increase gradually
|
||||
|
||||
_adjustLimitDown: () ->
|
||||
@CurrentWorkerLimit = Math.max @BaseWorkerCount, (@CurrentWorkerLimit * 0.9)
|
||||
logger.log {currentLimit: Math.ceil(@CurrentWorkerLimit)}, "reducing rate limit"
|
||||
|
||||
_trackAndRun: (task, callback = () ->) ->
|
||||
@ActiveWorkerCount++
|
||||
Metrics.gauge "processingUpdates", "+1" # increments/decrements gauge with +/- sign
|
||||
task (err) =>
|
||||
@ActiveWorkerCount--
|
||||
Metrics.gauge "processingUpdates", "-1"
|
||||
callback(err)
|
||||
|
||||
run: (task, callback) ->
|
||||
if @ActiveWorkerCount < @CurrentWorkerLimit
|
||||
@_trackAndRun task # below the limit, just put the task in the background
|
||||
callback() # return immediately
|
||||
if @CurrentWorkerLimit > @BaseWorkerCount
|
||||
@_adjustLimitDown()
|
||||
else
|
||||
logger.log {active: @ActiveWorkerCount, currentLimit: Math.ceil(@CurrentWorkerLimit)}, "hit rate limit"
|
||||
@_trackAndRun task, callback # only return after task completes
|
||||
@_adjustLimitUp()
|
|
@ -14,6 +14,7 @@ describe "DispatchManager", ->
|
|||
realtime: {}
|
||||
"redis-sharelatex": @redis = {}
|
||||
@callback = sinon.stub()
|
||||
@RateLimiter = { run: (task,cb) -> task(cb) } # run task without rate limit
|
||||
|
||||
describe "each worker", ->
|
||||
beforeEach ->
|
||||
|
@ -21,7 +22,7 @@ describe "DispatchManager", ->
|
|||
auth: sinon.stub()
|
||||
@redis.createClient = sinon.stub().returns @client
|
||||
|
||||
@worker = @DispatchManager.createDispatcher()
|
||||
@worker = @DispatchManager.createDispatcher(@RateLimiter)
|
||||
|
||||
it "should create a new redis client", ->
|
||||
@redis.createClient.called.should.equal true
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
sinon = require('sinon')
|
||||
chai = require('chai')
|
||||
should = chai.should()
|
||||
expect = chai.expect
|
||||
modulePath = "../../../../app/js/RateLimitManager.js"
|
||||
SandboxedModule = require('sandboxed-module')
|
||||
|
||||
describe "RateLimitManager", ->
|
||||
beforeEach ->
|
||||
@RateLimitManager = SandboxedModule.require modulePath, requires:
|
||||
"logger-sharelatex": @logger = { log: sinon.stub() }
|
||||
"settings-sharelatex": @settings =
|
||||
redis:
|
||||
realtime: {}
|
||||
"./Metrics": @Metrics =
|
||||
Timer: class Timer
|
||||
done: sinon.stub()
|
||||
gauge: sinon.stub()
|
||||
@callback = sinon.stub()
|
||||
@RateLimiter = new @RateLimitManager(1)
|
||||
|
||||
describe "for a single task", ->
|
||||
beforeEach ->
|
||||
@task = sinon.stub()
|
||||
@RateLimiter.run @task, @callback
|
||||
|
||||
it "should execute the task in the background", ->
|
||||
@task.called.should.equal true
|
||||
|
||||
it "should call the callback", ->
|
||||
@callback.called.should.equal true
|
||||
|
||||
it "should finish with a worker count of one", ->
|
||||
# because it's in the background
|
||||
expect(@RateLimiter.ActiveWorkerCount).to.equal 1
|
||||
|
||||
describe "for multiple tasks", ->
|
||||
beforeEach (done) ->
|
||||
@task = sinon.stub()
|
||||
@finalTask = sinon.stub()
|
||||
task = (cb) =>
|
||||
@task()
|
||||
setTimeout cb, 100
|
||||
finalTask = (cb) =>
|
||||
@finalTask()
|
||||
setTimeout cb, 100
|
||||
@RateLimiter.run task, @callback
|
||||
@RateLimiter.run task, @callback
|
||||
@RateLimiter.run task, @callback
|
||||
@RateLimiter.run finalTask, (err) =>
|
||||
@callback(err)
|
||||
done()
|
||||
|
||||
it "should execute the first three tasks", ->
|
||||
@task.calledThrice.should.equal true
|
||||
|
||||
it "should execute the final task", ->
|
||||
@finalTask.called.should.equal true
|
||||
|
||||
it "should call the callback", ->
|
||||
@callback.called.should.equal true
|
||||
|
||||
it "should finish with worker count of zero", ->
|
||||
expect(@RateLimiter.ActiveWorkerCount).to.equal 0
|
||||
|
||||
describe "for a mixture of long-running tasks", ->
|
||||
beforeEach (done) ->
|
||||
@task = sinon.stub()
|
||||
@finalTask = sinon.stub()
|
||||
finalTask = (cb) =>
|
||||
@finalTask()
|
||||
setTimeout cb, 100
|
||||
@RateLimiter.run @task, @callback
|
||||
@RateLimiter.run @task, @callback
|
||||
@RateLimiter.run @task, @callback
|
||||
@RateLimiter.run finalTask, (err) =>
|
||||
@callback(err)
|
||||
done()
|
||||
|
||||
it "should execute the first three tasks", ->
|
||||
@task.calledThrice.should.equal true
|
||||
|
||||
it "should execute the final task", ->
|
||||
@finalTask.called.should.equal true
|
||||
|
||||
it "should call the callback", ->
|
||||
@callback.called.should.equal true
|
||||
|
||||
it "should finish with worker count of three", ->
|
||||
expect(@RateLimiter.ActiveWorkerCount).to.equal 3
|
Loading…
Reference in a new issue