Add in load throttling based on a redis key

This commit is contained in:
James Allen 2014-02-28 19:09:29 +00:00
parent 3d70f9126e
commit 86195ce7c3
6 changed files with 136 additions and 36 deletions

View file

@ -25,6 +25,7 @@ module.exports =
docsWithPendingUpdates : DOCIDSWITHPENDINGUPDATES docsWithPendingUpdates : DOCIDSWITHPENDINGUPDATES
combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}" combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}"
splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":") splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":")
historyLoadManagerThreshold: "HistoryLoadManagerThreshold"
now : (key)-> now : (key)->
d = new Date() d = new Date()
d.getDate()+":"+(d.getMonth()+1)+":"+d.getFullYear()+":"+key d.getDate()+":"+(d.getMonth()+1)+":"+d.getFullYear()+":"+key

View file

@ -164,6 +164,12 @@ module.exports =
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
rclient.smembers keys.docsInProject(project_id: project_id), callback rclient.smembers keys.docsInProject(project_id: project_id), callback
getHistoryLoadManagerThreshold: (callback = (error, threshold) ->) ->
rclient.get keys.historyLoadManagerThreshold, (error, value) ->
return callback(error) if error?
return callback null, 0 if !value?
callback null, parseInt(value, 10)
getDocumentsProjectId = (doc_id, callback)-> getDocumentsProjectId = (doc_id, callback)->

View file

@ -2,6 +2,7 @@ settings = require "settings-sharelatex"
request = require "request" request = require "request"
logger = require "logger-sharelatex" logger = require "logger-sharelatex"
RedisManager = require "./RedisManager" RedisManager = require "./RedisManager"
crypto = require("crypto")
module.exports = TrackChangesManager = module.exports = TrackChangesManager =
flushDocChanges: (doc_id, callback = (error) ->) -> flushDocChanges: (doc_id, callback = (error) ->) ->
@ -22,12 +23,22 @@ module.exports = TrackChangesManager =
FLUSH_EVERY_N_OPS: 50 FLUSH_EVERY_N_OPS: 50
pushUncompressedHistoryOp: (doc_id, op, callback = (error) ->) -> pushUncompressedHistoryOp: (doc_id, op, callback = (error) ->) ->
RedisManager.pushUncompressedHistoryOp doc_id, op, (error, length) -> RedisManager.getHistoryLoadManagerThreshold (error, threshold) ->
if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0 return callback(error) if error?
# Do this in the background since it uses HTTP and so may be too if TrackChangesManager.getLoadManagerBucket(doc_id) < threshold
# slow to wait for when processing a doc update. RedisManager.pushUncompressedHistoryOp doc_id, op, (error, length) ->
logger.log length: length, doc_id: doc_id, "flushing track changes api" return callback(error) if error?
TrackChangesManager.flushDocChanges doc_id, (error) -> if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0
if error? # Do this in the background since it uses HTTP and so may be too
logger.error err: error, project_id: project_id, doc_id: doc_id, "error flushing doc to track changes api" # slow to wait for when processing a doc update.
callback() logger.log length: length, doc_id: doc_id, "flushing track changes api"
TrackChangesManager.flushDocChanges doc_id, (error) ->
if error?
logger.error err: error, project_id: project_id, doc_id: doc_id, "error flushing doc to track changes api"
callback()
else
callback()
getLoadManagerBucket: (doc_id) ->
hash = crypto.createHash("md5").update(doc_id).digest("hex")
return parseInt(hash.slice(0,4), 16) % 100

View file

@ -12,7 +12,7 @@ MockWebApi = require "./helpers/MockWebApi"
DocUpdaterClient = require "./helpers/DocUpdaterClient" DocUpdaterClient = require "./helpers/DocUpdaterClient"
describe "Applying updates to a doc", -> describe "Applying updates to a doc", ->
before -> before (done) ->
@lines = ["one", "two", "three"] @lines = ["one", "two", "three"]
@update = @update =
doc: @doc_id doc: @doc_id
@ -22,6 +22,8 @@ describe "Applying updates to a doc", ->
}] }]
v: 0 v: 0
@result = ["one", "one and a half", "two", "three"] @result = ["one", "one and a half", "two", "three"]
rclient.set "HistoryLoadManagerThreshold", 100, (error) =>
done()
describe "when the document is not loaded", -> describe "when the document is not loaded", ->
before (done) -> before (done) ->
@ -233,7 +235,7 @@ describe "Applying updates to a doc", ->
done() done()
describe "with enough updates to flush to the track changes api", -> describe "with enough updates to flush to the track changes api", ->
before (done) -> beforeEach ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
MockWebApi.insertDoc @project_id, @doc_id, { MockWebApi.insertDoc @project_id, @doc_id, {
lines: @lines lines: @lines
@ -247,13 +249,27 @@ describe "Applying updates to a doc", ->
sinon.spy MockTrackChangesApi, "flushDoc" sinon.spy MockTrackChangesApi, "flushDoc"
DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) => afterEach ->
throw error if error?
setTimeout done, 200
after ->
MockTrackChangesApi.flushDoc.restore() MockTrackChangesApi.flushDoc.restore()
it "should flush the doc twice", -> describe "when under the load manager threshold", ->
console.log MockTrackChangesApi.flushDoc.args beforeEach (done) ->
MockTrackChangesApi.flushDoc.calledTwice.should.equal true rclient.set "HistoryLoadManagerThreshold", 100, (error) =>
throw error if error?
DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) =>
throw error if error?
setTimeout done, 200
it "should flush the doc twice", ->
MockTrackChangesApi.flushDoc.calledTwice.should.equal true
describe "when over the load manager threshold", ->
beforeEach (done) ->
rclient.set "HistoryLoadManagerThreshold", 0, (error) =>
throw error if error?
DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) =>
throw error if error?
setTimeout done, 200
it "should not flush the doc", ->
MockTrackChangesApi.flushDoc.called.should.equal false

View file

@ -0,0 +1,43 @@
sinon = require('sinon')
chai = require('chai')
should = chai.should()
modulePath = "../../../../app/js/RedisManager.js"
SandboxedModule = require('sandboxed-module')
describe "RedisManager.getHistoryLoadManagerThreshold", ->
beforeEach ->
@RedisManager = SandboxedModule.require modulePath, requires:
"redis": createClient: () =>
@rclient =
auth: () ->
"logger-sharelatex": @logger = {log: sinon.stub()}
@callback = sinon.stub()
describe "with no value", ->
beforeEach ->
@rclient.get = sinon.stub().callsArgWith(1, null, null)
@RedisManager.getHistoryLoadManagerThreshold @callback
it "should get the value", ->
@rclient.get
.calledWith("HistoryLoadManagerThreshold")
.should.equal true
it "should call the callback with 0", ->
@callback.calledWith(null, 0).should.equal true
describe "with a value", ->
beforeEach ->
@rclient.get = sinon.stub().callsArgWith(1, null, "42")
@RedisManager.getHistoryLoadManagerThreshold @callback
it "should get the value", ->
@rclient.get
.calledWith("HistoryLoadManagerThreshold")
.should.equal true
it "should call the callback with the numeric value", ->
@callback.calledWith(null, 42).should.equal true

View file

@ -44,30 +44,53 @@ describe "TrackChangesManager", ->
@op = "mock-op" @op = "mock-op"
@TrackChangesManager.flushDocChanges = sinon.stub().callsArg(1) @TrackChangesManager.flushDocChanges = sinon.stub().callsArg(1)
describe "pushing the op", -> describe "when the doc is under the load manager threshold", ->
beforeEach -> beforeEach ->
@RedisManager.getHistoryLoadManagerThreshold = sinon.stub().callsArgWith(0, null, 40)
@TrackChangesManager.getLoadManagerBucket = sinon.stub().returns(30)
describe "pushing the op", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(2, null, 1)
@TrackChangesManager.pushUncompressedHistoryOp @doc_id, @op, @callback
it "should push the op into redis", ->
@RedisManager.pushUncompressedHistoryOp
.calledWith(@doc_id, @op)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
it "should not try to flush the op", ->
@TrackChangesManager.flushDocChanges.called.should.equal false
describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(2, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.pushUncompressedHistoryOp @doc_id, @op, @callback
it "should tell the track changes api to flush", ->
@TrackChangesManager.flushDocChanges
.calledWith(@doc_id)
.should.equal true
describe "when the doc is over the load manager threshold", ->
beforeEach ->
@RedisManager.getHistoryLoadManagerThreshold = sinon.stub().callsArgWith(0, null, 40)
@TrackChangesManager.getLoadManagerBucket = sinon.stub().returns(50)
@RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(2, null, 1) @RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(2, null, 1)
@TrackChangesManager.pushUncompressedHistoryOp @doc_id, @op, @callback @TrackChangesManager.pushUncompressedHistoryOp @doc_id, @op, @callback
it "should push the op into redis", -> it "should not push the op", ->
@RedisManager.pushUncompressedHistoryOp @RedisManager.pushUncompressedHistoryOp.called.should.equal false
.calledWith(@doc_id, @op)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
it "should not try to flush the op", -> it "should not try to flush the op", ->
@TrackChangesManager.flushDocChanges.called.should.equal false @TrackChangesManager.flushDocChanges.called.should.equal false
describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", -> it "should call the callback", ->
beforeEach -> @callback.called.should.equal true
@RedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(2, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.pushUncompressedHistoryOp @doc_id, @op, @callback
it "should tell the track changes api to flush", ->
@TrackChangesManager.flushDocChanges
.calledWith(@doc_id)
.should.equal true