From 962520fca8e2e485e6872e1c2e553ca2f59c6d82 Mon Sep 17 00:00:00 2001 From: Hayden Faulds Date: Thu, 5 Oct 2017 13:45:29 +0100 Subject: [PATCH] flush track-changes and project-history in HistoryManager --- .../app/coffee/DocumentManager.coffee | 7 +- .../app/coffee/HistoryManager.coffee | 86 +++++--- .../app/coffee/UpdateManager.coffee | 7 +- .../DocumentManagerTests.coffee | 4 +- .../HistoryManager/HistoryManagerTests.coffee | 205 ++++++++++++------ .../UpdateManager/UpdateManagerTests.coffee | 10 +- 6 files changed, 205 insertions(+), 114 deletions(-) diff --git a/services/document-updater/app/coffee/DocumentManager.coffee b/services/document-updater/app/coffee/DocumentManager.coffee index e74e436fde..5ddca2e6a8 100644 --- a/services/document-updater/app/coffee/DocumentManager.coffee +++ b/services/document-updater/app/coffee/DocumentManager.coffee @@ -116,11 +116,8 @@ module.exports = DocumentManager = DocumentManager.flushDocIfLoaded project_id, doc_id, (error) -> return callback(error) if error? - # Flush in the background since it requires and http request - # to track changes - HistoryManager.flushDocChanges project_id, doc_id, (err) -> - if err? - logger.err {err, project_id, doc_id}, "error flushing to track changes" + # Flush in the background since it requires a http request + HistoryManager.flushChangesAsync project_id, doc_id RedisManager.removeDocFromMemory project_id, doc_id, (error) -> return callback(error) if error? diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee index 9f78b5af4b..c693a6a599 100644 --- a/services/document-updater/app/coffee/HistoryManager.coffee +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -1,45 +1,73 @@ -settings = require "settings-sharelatex" +Settings = require "settings-sharelatex" request = require "request" logger = require "logger-sharelatex" async = require "async" HistoryRedisManager = require "./HistoryRedisManager" module.exports = HistoryManager = - flushDocChanges: (project_id, doc_id, callback = (error) ->) -> - if !settings.apis?.trackchanges? - logger.warn doc_id: doc_id, "track changes API is not configured, so not flushing" - return callback() + flushChangesAsync: (project_id, doc_id) -> + HistoryManager._flushDocChangesAsync project_id, doc_id + if Settings.apis?.project_history?.enabled + HistoryManager._flushProjectChangesAsync project_id - url = "#{settings.apis.trackchanges.url}/project/#{project_id}/doc/#{doc_id}/flush" - logger.log project_id: project_id, doc_id: doc_id, url: url, "flushing doc in track changes api" + _flushDocChangesAsync: (project_id, doc_id) -> + if !Settings.apis?.trackchanges? + logger.warn { doc_id }, "track changes API is not configured, so not flushing" + return + + url = "#{Settings.apis.trackchanges.url}/project/#{project_id}/doc/#{doc_id}/flush" + logger.log { project_id, doc_id, url }, "flushing doc in track changes api" request.post url, (error, res, body)-> if error? - return callback(error) - else if res.statusCode >= 200 and res.statusCode < 300 - return callback(null) - else - error = new Error("track changes api returned a failure status code: #{res.statusCode}") - return callback(error) + logger.error( + { error, doc_id, project_id}, + "track changes doc to track changes api" + ) + else if res.statusCode < 200 and res.statusCode >= 300 + logger.error( + { doc_id, project_id }, + "track changes api returned a failure status code: #{res.statusCode}" + ) + + _flushProjectChangesAsync: (project_id) -> + return if !Settings.apis?.project_history? + + url = "#{Settings.apis.project_history.url}/project/#{project_id}/flush" + logger.log { project_id, url }, "flushing doc in project history api" + request.post url, (error, res, body)-> + if error? + logger.error { error, project_id}, "project history doc to track changes api" + else if res.statusCode < 200 and res.statusCode >= 300 + logger.error { project_id }, "project history api returned a failure status code: #{res.statusCode}" FLUSH_EVERY_N_OPS: 100 - recordAndFlushHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) -> + recordAndFlushHistoryOps: (project_id, doc_id, ops = [], doc_ops_length, project_ops_length, callback = (error) ->) -> if ops.length == 0 return callback() - HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) -> - return callback(error) if error? - return callback() if not length? # don't flush unless we know the length - # We want to flush every 100 ops, i.e. 100, 200, 300, etc - # Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these - # ops. If we've changed, then we've gone over a multiple of 100 and should flush. - # (Most of the time, we will only hit 100 and then flushing will put us back to 0) - previousLength = length - ops.length - prevBlock = Math.floor(previousLength / HistoryManager.FLUSH_EVERY_N_OPS) - newBlock = Math.floor(length / HistoryManager.FLUSH_EVERY_N_OPS) - if newBlock != prevBlock + + if Settings.apis?.project_history?.enabled + if HistoryManager._shouldFlushHistoryOps(project_ops_length, ops, HistoryManager.FLUSH_EVERY_N_OPS) # Do this in the background since it uses HTTP and so may be too # slow to wait for when processing a doc update. - logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api" - HistoryManager.flushDocChanges project_id, doc_id, (error) -> - if error? - logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api" + logger.log { project_ops_length, project_id }, "flushing project history api" + HistoryManager._flushProjectChangesAsync project_id + + HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) -> + return callback(error) if error? + if HistoryManager._shouldFlushHistoryOps(doc_ops_length, ops, HistoryManager.FLUSH_EVERY_N_OPS) + # Do this in the background since it uses HTTP and so may be too + # slow to wait for when processing a doc update. + logger.log { doc_ops_length, doc_id, project_id }, "flushing track changes api" + HistoryManager._flushDocChangesAsync project_id, doc_id callback() + + _shouldFlushHistoryOps: (length, ops, threshold) -> + return false if !length # don't flush unless we know the length + # We want to flush every 100 ops, i.e. 100, 200, 300, etc + # Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these + # ops. If we've changed, then we've gone over a multiple of 100 and should flush. + # (Most of the time, we will only hit 100 and then flushing will put us back to 0) + previousLength = length - ops.length + prevBlock = Math.floor(previousLength / threshold) + newBlock = Math.floor(length / threshold) + return newBlock != prevBlock diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index b5ced9544d..e821926015 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -80,13 +80,12 @@ module.exports = UpdateManager = RangesManager.applyUpdate project_id, doc_id, ranges, appliedOps, updatedDocLines, (error, new_ranges) -> profile.log("RangesManager.applyUpdate") return callback(error) if error? - RedisManager.updateDocument project_id, doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) -> + RedisManager.updateDocument project_id, doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, doc_ops_length, project_ops_length) -> profile.log("RedisManager.updateDocument") return callback(error) if error? - HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, historyOpsLength, (error) -> + HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, doc_ops_length, project_ops_length, (error) -> profile.log("recordAndFlushHistoryOps") - return callback(error) if error? - callback() + callback(error) lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) -> profile = new Profiler("lockUpdatesAndDo", {project_id, doc_id}) diff --git a/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee b/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee index 3703058693..ac0601b34b 100644 --- a/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee @@ -35,7 +35,7 @@ describe "DocumentManager", -> beforeEach -> @RedisManager.removeDocFromMemory = sinon.stub().callsArg(2) @DocumentManager.flushDocIfLoaded = sinon.stub().callsArgWith(2) - @HistoryManager.flushDocChanges = sinon.stub().callsArg(2) + @HistoryManager.flushChangesAsync = sinon.stub() @DocumentManager.flushAndDeleteDoc @project_id, @doc_id, @callback it "should flush the doc", -> @@ -55,7 +55,7 @@ describe "DocumentManager", -> @Metrics.Timer::done.called.should.equal true it "should flush to the history api", -> - @HistoryManager.flushDocChanges + @HistoryManager.flushChangesAsync .calledWith(@project_id, @doc_id) .should.equal true diff --git a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee index 37e35ca285..4956a410b2 100644 --- a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee @@ -7,106 +7,171 @@ describe "HistoryManager", -> beforeEach -> @HistoryManager = SandboxedModule.require modulePath, requires: "request": @request = {} - "settings-sharelatex": @Settings = {} + "settings-sharelatex": @Settings = { + apis: + project_history: + enabled: true + url: "http://project_history.example.com" + trackchanges: + url: "http://trackchanges.example.com" + } "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } "./HistoryRedisManager": @HistoryRedisManager = {} @project_id = "mock-project-id" @doc_id = "mock-doc-id" @callback = sinon.stub() - describe "flushDocChanges", -> + describe "flushChangesAsync", -> beforeEach -> - @Settings.apis = - trackchanges: url: "http://trackchanges.example.com" + @HistoryManager._flushDocChangesAsync = sinon.stub() + @HistoryManager._flushProjectChangesAsync = sinon.stub() - describe "successfully", -> - beforeEach -> - @request.post = sinon.stub().callsArgWith(1, null, statusCode: 204) - @HistoryManager.flushDocChanges @project_id, @doc_id, @callback + @HistoryManager.flushChangesAsync(@project_id, @doc_id) - it "should send a request to the track changes api", -> - @request.post - .calledWith("#{@Settings.apis.trackchanges.url}/project/#{@project_id}/doc/#{@doc_id}/flush") - .should.equal true + it "flushes doc changes", -> + @HistoryManager._flushDocChangesAsync + .calledWith(@project_id, @doc_id) + .should.equal true - it "should return the callback", -> - @callback.calledWith(null).should.equal true + it "flushes project changes", -> + @HistoryManager._flushProjectChangesAsync + .calledWith(@project_id) + .should.equal true - describe "when the track changes api returns an error", -> - beforeEach -> - @request.post = sinon.stub().callsArgWith(1, null, statusCode: 500) - @HistoryManager.flushDocChanges @project_id, @doc_id, @callback + describe "_flushDocChangesAsync", -> + beforeEach -> + @request.post = sinon.stub().callsArgWith(1, null, statusCode: 204) - it "should return the callback with an error", -> - @callback.calledWith(new Error("track changes api return non-success code: 500")).should.equal true + @HistoryManager._flushDocChangesAsync @project_id, @doc_id + + it "should send a request to the track changes api", -> + @request.post + .calledWith("#{@Settings.apis.trackchanges.url}/project/#{@project_id}/doc/#{@doc_id}/flush") + .should.equal true + + describe "_flushProjectChangesAsync", -> + beforeEach -> + @request.post = sinon.stub().callsArgWith(1, null, statusCode: 204) + + @HistoryManager._flushProjectChangesAsync @project_id + + it "should send a request to the project history api", -> + @request.post + .calledWith("#{@Settings.apis.project_history.url}/project/#{@project_id}/flush") + .should.equal true describe "recordAndFlushHistoryOps", -> beforeEach -> - @ops = ["mock-ops"] - @HistoryManager.flushDocChanges = sinon.stub().callsArg(2) + @ops = [ 'mock-ops' ] + @project_ops_length = 10 + @doc_ops_length = 5 - describe "pushing the op", -> + @HistoryManager._flushProjectChangesAsync = sinon.stub() + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArg(3) + @HistoryManager._flushDocChangesAsync = sinon.stub() + + describe "with no ops", -> beforeEach -> - @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) - @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 1, @callback + @HistoryManager.recordAndFlushHistoryOps( + @project_id, @doc_id, [], @doc_ops_length, @project_ops_length, @callback + ) - it "should push the ops into redis", -> + it "should not flush project changes", -> + @HistoryManager._flushProjectChangesAsync.called.should.equal false + + it "should not record doc has history ops", -> + @HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false + + it "should not flush doc changes", -> + @HistoryManager._flushDocChangesAsync.called.should.equal false + + it "should call the callback", -> + @callback.called.should.equal true + + describe "with enough ops to flush project changes", -> + beforeEach -> + @HistoryManager._shouldFlushHistoryOps = sinon.stub() + @HistoryManager._shouldFlushHistoryOps.withArgs(@project_ops_length).returns(true) + @HistoryManager._shouldFlushHistoryOps.withArgs(@doc_ops_length).returns(false) + + @HistoryManager.recordAndFlushHistoryOps( + @project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback + ) + + it "should flush project changes", -> + @HistoryManager._flushProjectChangesAsync + .calledWith(@project_id) + .should.equal true + + it "should record doc has history ops", -> @HistoryRedisManager.recordDocHasHistoryOps .calledWith(@project_id, @doc_id, @ops) + + it "should not flush doc changes", -> + @HistoryManager._flushDocChangesAsync.called.should.equal false + + it "should call the callback", -> + @callback.called.should.equal true + + describe "with enough ops to flush doc changes", -> + beforeEach -> + @HistoryManager._shouldFlushHistoryOps = sinon.stub() + @HistoryManager._shouldFlushHistoryOps.withArgs(@project_ops_length).returns(false) + @HistoryManager._shouldFlushHistoryOps.withArgs(@doc_ops_length).returns(true) + + @HistoryManager.recordAndFlushHistoryOps( + @project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback + ) + + it "should not flush project changes", -> + @HistoryManager._flushProjectChangesAsync.called.should.equal false + + it "should record doc has history ops", -> + @HistoryRedisManager.recordDocHasHistoryOps + .calledWith(@project_id, @doc_id, @ops) + + it "should flush doc changes", -> + @HistoryManager._flushDocChangesAsync + .calledWith(@project_id, @doc_id) .should.equal true it "should call the callback", -> @callback.called.should.equal true - it "should not try to flush the op", -> - @HistoryManager.flushDocChanges.called.should.equal false - - describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", -> + describe "when recording doc has history ops errors", -> beforeEach -> + @error = new Error("error") @HistoryRedisManager.recordDocHasHistoryOps = - sinon.stub().callsArgWith(3, null) - @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback + sinon.stub().callsArgWith(3, @error) - it "should tell the track changes api to flush", -> - @HistoryManager.flushDocChanges - .calledWith(@project_id, @doc_id) - .should.equal true + @HistoryManager.recordAndFlushHistoryOps( + @project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback + ) - describe "when we go over a multiple of FLUSH_EVERY_N_OPS ops", -> - beforeEach -> - @ops = ["op1", "op2", "op3"] - @HistoryRedisManager.recordDocHasHistoryOps = - sinon.stub().callsArgWith(3, null) - @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback + it "should not flush doc changes", -> + @HistoryManager._flushDocChangesAsync.called.should.equal false - it "should tell the track changes api to flush", -> - @HistoryManager.flushDocChanges - .calledWith(@project_id, @doc_id) - .should.equal true + it "should call the callback with the error", -> + @callback.calledWith(@error).should.equal true - describe "when HistoryManager errors", -> - beforeEach -> - @HistoryRedisManager.recordDocHasHistoryOps = - sinon.stub().callsArgWith(3, null) - @HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) - @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback + describe "_shouldFlushHistoryOps", -> + it "should return false if the number of ops is not known", -> + @HistoryManager._shouldFlushHistoryOps(null, ['a', 'b', 'c'], 1).should.equal false - it "should log out the error", -> - @logger.error - .calledWith( - err: @error - doc_id: @doc_id - project_id: @project_id - "error flushing doc to track changes api" - ) - .should.equal true - - describe "with no ops", -> - beforeEach -> - @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) - @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, [], 1, @callback - - it "should not call HistoryRedisManager.recordDocHasHistoryOps", -> - @HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false - + it "should return false if the updates didn't take us past the threshold", -> + # Currently there are 14 ops + # Previously we were on 11 ops + # We didn't pass over a multiple of 5 + @HistoryManager._shouldFlushHistoryOps(14, ['a', 'b', 'c'], 5).should.equal false + it "should return true if the updates took to the threshold", -> + # Currently there are 15 ops + # Previously we were on 12 ops + # We've reached a new multiple of 5 + @HistoryManager._shouldFlushHistoryOps(15, ['a', 'b', 'c'], 5).should.equal true + + it "should return true if the updates took past the threshold", -> + # Currently there are 19 ops + # Previously we were on 16 ops + # We didn't pass over a multiple of 5 + @HistoryManager._shouldFlushHistoryOps(17, ['a', 'b', 'c'], 5).should.equal true diff --git a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee index 45653f99b5..b68698bc49 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee @@ -19,7 +19,7 @@ describe "UpdateManager", -> "./Metrics": @Metrics = Timer: class Timer done: sinon.stub() - "settings-sharelatex": Settings = {} + "settings-sharelatex": @Settings = {} "./DocumentManager": @DocumentManager = {} "./RangesManager": @RangesManager = {} "./Profiler": class Profiler @@ -164,12 +164,14 @@ describe "UpdateManager", -> @ranges = { entries: "mock", comments: "mock" } @updated_ranges = { entries: "updated", comments: "updated" } @appliedOps = ["mock-applied-ops"] + @doc_ops_length = sinon.stub() + @project_ops_length = sinon.stub() @DocumentManager.getDoc = sinon.stub().yields(null, @lines, @version, @ranges) @RangesManager.applyUpdate = sinon.stub().yields(null, @updated_ranges) @ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps) - @RedisManager.updateDocument = sinon.stub().yields() + @RedisManager.updateDocument = sinon.stub().yields(null, @doc_ops_length, @project_ops_length) @RealTimeRedisManager.sendData = sinon.stub() - @HistoryManager.recordAndFlushHistoryOps = sinon.stub().callsArg(4) + @HistoryManager.recordAndFlushHistoryOps = sinon.stub().callsArg(5) describe "normally", -> beforeEach -> @@ -192,7 +194,7 @@ describe "UpdateManager", -> it "should push the applied ops into the history queue", -> @HistoryManager.recordAndFlushHistoryOps - .calledWith(@project_id, @doc_id, @appliedOps) + .calledWith(@project_id, @doc_id, @appliedOps, @doc_ops_length, @project_ops_length) .should.equal true it "should call the callback", ->