From 86195ce7c3df1fe345140a722326159a6961739a Mon Sep 17 00:00:00 2001 From: James Allen Date: Fri, 28 Feb 2014 19:09:29 +0000 Subject: [PATCH] Add in load throttling based on a redis key --- .../app/coffee/RedisKeyBuilder.coffee | 1 + .../app/coffee/RedisManager.coffee | 6 ++ .../app/coffee/TrackChangesManager.coffee | 29 +++++++--- .../coffee/ApplyingUpdatesToADocTests.coffee | 36 ++++++++---- .../getHistoryLoadManagerThreshold.coffee | 43 ++++++++++++++ .../TrackChangesManagerTests.coffee | 57 +++++++++++++------ 6 files changed, 136 insertions(+), 36 deletions(-) create mode 100644 services/document-updater/test/unit/coffee/RedisManager/getHistoryLoadManagerThreshold.coffee diff --git a/services/document-updater/app/coffee/RedisKeyBuilder.coffee b/services/document-updater/app/coffee/RedisKeyBuilder.coffee index 2bd1ed08c8..de2bc85443 100644 --- a/services/document-updater/app/coffee/RedisKeyBuilder.coffee +++ b/services/document-updater/app/coffee/RedisKeyBuilder.coffee @@ -25,6 +25,7 @@ module.exports = docsWithPendingUpdates : DOCIDSWITHPENDINGUPDATES combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}" splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":") + historyLoadManagerThreshold: "HistoryLoadManagerThreshold" now : (key)-> d = new Date() d.getDate()+":"+(d.getMonth()+1)+":"+d.getFullYear()+":"+key diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 5f6c880cee..3d8efdbc70 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -164,6 +164,12 @@ module.exports = getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback + + getHistoryLoadManagerThreshold: (callback = (error, threshold) ->) -> + rclient.get keys.historyLoadManagerThreshold, (error, value) -> + return callback(error) if error? + return callback null, 0 if !value? + callback null, parseInt(value, 10) getDocumentsProjectId = (doc_id, callback)-> diff --git a/services/document-updater/app/coffee/TrackChangesManager.coffee b/services/document-updater/app/coffee/TrackChangesManager.coffee index 489694fbf4..0aca12792b 100644 --- a/services/document-updater/app/coffee/TrackChangesManager.coffee +++ b/services/document-updater/app/coffee/TrackChangesManager.coffee @@ -2,6 +2,7 @@ settings = require "settings-sharelatex" request = require "request" logger = require "logger-sharelatex" RedisManager = require "./RedisManager" +crypto = require("crypto") module.exports = TrackChangesManager = flushDocChanges: (doc_id, callback = (error) ->) -> @@ -22,12 +23,22 @@ module.exports = TrackChangesManager = FLUSH_EVERY_N_OPS: 50 pushUncompressedHistoryOp: (doc_id, op, callback = (error) ->) -> - RedisManager.pushUncompressedHistoryOp doc_id, op, (error, length) -> - if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0 - # Do this in the background since it uses HTTP and so may be too - # slow to wait for when processing a doc update. - logger.log length: length, doc_id: doc_id, "flushing track changes api" - TrackChangesManager.flushDocChanges doc_id, (error) -> - if error? - logger.error err: error, project_id: project_id, doc_id: doc_id, "error flushing doc to track changes api" - callback() + RedisManager.getHistoryLoadManagerThreshold (error, threshold) -> + return callback(error) if error? + if TrackChangesManager.getLoadManagerBucket(doc_id) < threshold + RedisManager.pushUncompressedHistoryOp doc_id, op, (error, length) -> + return callback(error) if error? + if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0 + # Do this in the background since it uses HTTP and so may be too + # slow to wait for when processing a doc update. + logger.log length: length, doc_id: doc_id, "flushing track changes api" + TrackChangesManager.flushDocChanges doc_id, (error) -> + if error? + logger.error err: error, project_id: project_id, doc_id: doc_id, "error flushing doc to track changes api" + callback() + else + callback() + + getLoadManagerBucket: (doc_id) -> + hash = crypto.createHash("md5").update(doc_id).digest("hex") + return parseInt(hash.slice(0,4), 16) % 100 diff --git a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee index 5b867b5807..7d5ac144e7 100644 --- a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee +++ b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee @@ -12,7 +12,7 @@ MockWebApi = require "./helpers/MockWebApi" DocUpdaterClient = require "./helpers/DocUpdaterClient" describe "Applying updates to a doc", -> - before -> + before (done) -> @lines = ["one", "two", "three"] @update = doc: @doc_id @@ -22,6 +22,8 @@ describe "Applying updates to a doc", -> }] v: 0 @result = ["one", "one and a half", "two", "three"] + rclient.set "HistoryLoadManagerThreshold", 100, (error) => + done() describe "when the document is not loaded", -> before (done) -> @@ -233,7 +235,7 @@ describe "Applying updates to a doc", -> done() describe "with enough updates to flush to the track changes api", -> - before (done) -> + beforeEach -> [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] MockWebApi.insertDoc @project_id, @doc_id, { lines: @lines @@ -247,13 +249,27 @@ describe "Applying updates to a doc", -> sinon.spy MockTrackChangesApi, "flushDoc" - DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) => - throw error if error? - setTimeout done, 200 - - after -> + afterEach -> MockTrackChangesApi.flushDoc.restore() - it "should flush the doc twice", -> - console.log MockTrackChangesApi.flushDoc.args - MockTrackChangesApi.flushDoc.calledTwice.should.equal true + describe "when under the load manager threshold", -> + beforeEach (done) -> + rclient.set "HistoryLoadManagerThreshold", 100, (error) => + throw error if error? + DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) => + throw error if error? + setTimeout done, 200 + + it "should flush the doc twice", -> + MockTrackChangesApi.flushDoc.calledTwice.should.equal true + + describe "when over the load manager threshold", -> + beforeEach (done) -> + rclient.set "HistoryLoadManagerThreshold", 0, (error) => + throw error if error? + DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) => + throw error if error? + setTimeout done, 200 + + it "should not flush the doc", -> + MockTrackChangesApi.flushDoc.called.should.equal false diff --git a/services/document-updater/test/unit/coffee/RedisManager/getHistoryLoadManagerThreshold.coffee b/services/document-updater/test/unit/coffee/RedisManager/getHistoryLoadManagerThreshold.coffee new file mode 100644 index 0000000000..d69cec370c --- /dev/null +++ b/services/document-updater/test/unit/coffee/RedisManager/getHistoryLoadManagerThreshold.coffee @@ -0,0 +1,43 @@ +sinon = require('sinon') +chai = require('chai') +should = chai.should() +modulePath = "../../../../app/js/RedisManager.js" +SandboxedModule = require('sandboxed-module') + +describe "RedisManager.getHistoryLoadManagerThreshold", -> + beforeEach -> + @RedisManager = SandboxedModule.require modulePath, requires: + "redis": createClient: () => + @rclient = + auth: () -> + "logger-sharelatex": @logger = {log: sinon.stub()} + @callback = sinon.stub() + + describe "with no value", -> + beforeEach -> + @rclient.get = sinon.stub().callsArgWith(1, null, null) + @RedisManager.getHistoryLoadManagerThreshold @callback + + it "should get the value", -> + @rclient.get + .calledWith("HistoryLoadManagerThreshold") + .should.equal true + + it "should call the callback with 0", -> + @callback.calledWith(null, 0).should.equal true + + describe "with a value", -> + beforeEach -> + @rclient.get = sinon.stub().callsArgWith(1, null, "42") + @RedisManager.getHistoryLoadManagerThreshold @callback + + it "should get the value", -> + @rclient.get + .calledWith("HistoryLoadManagerThreshold") + .should.equal true + + it "should call the callback with the numeric value", -> + @callback.calledWith(null, 42).should.equal true + + + diff --git a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee index 6ff83d7414..0d696730b9 100644 --- a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee @@ -44,30 +44,53 @@ describe "TrackChangesManager", -> @op = "mock-op" @TrackChangesManager.flushDocChanges = sinon.stub().callsArg(1) - describe "pushing the op", -> + describe "when the doc is under the load manager threshold", -> beforeEach -> + @RedisManager.getHistoryLoadManagerThreshold = sinon.stub().callsArgWith(0, null, 40) + @TrackChangesManager.getLoadManagerBucket = sinon.stub().returns(30) + + describe "pushing the op", -> + beforeEach -> + @RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(2, null, 1) + @TrackChangesManager.pushUncompressedHistoryOp @doc_id, @op, @callback + + it "should push the op into redis", -> + @RedisManager.pushUncompressedHistoryOp + .calledWith(@doc_id, @op) + .should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + it "should not try to flush the op", -> + @TrackChangesManager.flushDocChanges.called.should.equal false + + describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", -> + beforeEach -> + @RedisManager.pushUncompressedHistoryOp = + sinon.stub().callsArgWith(2, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) + @TrackChangesManager.pushUncompressedHistoryOp @doc_id, @op, @callback + + it "should tell the track changes api to flush", -> + @TrackChangesManager.flushDocChanges + .calledWith(@doc_id) + .should.equal true + + + describe "when the doc is over the load manager threshold", -> + beforeEach -> + @RedisManager.getHistoryLoadManagerThreshold = sinon.stub().callsArgWith(0, null, 40) + @TrackChangesManager.getLoadManagerBucket = sinon.stub().returns(50) @RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(2, null, 1) @TrackChangesManager.pushUncompressedHistoryOp @doc_id, @op, @callback - it "should push the op into redis", -> - @RedisManager.pushUncompressedHistoryOp - .calledWith(@doc_id, @op) - .should.equal true - - it "should call the callback", -> - @callback.called.should.equal true + it "should not push the op", -> + @RedisManager.pushUncompressedHistoryOp.called.should.equal false it "should not try to flush the op", -> @TrackChangesManager.flushDocChanges.called.should.equal false - describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", -> - beforeEach -> - @RedisManager.pushUncompressedHistoryOp = - sinon.stub().callsArgWith(2, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) - @TrackChangesManager.pushUncompressedHistoryOp @doc_id, @op, @callback + it "should call the callback", -> + @callback.called.should.equal true - it "should tell the track changes api to flush", -> - @TrackChangesManager.flushDocChanges - .calledWith(@doc_id) - .should.equal true