diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index ad6a583979..fa4016f632 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -68,53 +68,74 @@ module.exports = UpdatesManager = logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result? callback() - REDIS_READ_BATCH_SIZE: 100 - processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) -> + # Check whether the updates are temporary (per-project property) + _prepareProjectForUpdates: (project_id, callback = (error, temporary) ->) -> UpdateTrimmer.shouldTrimUpdates project_id, (error, temporary) -> return callback(error) if error? - MongoManager.backportProjectId project_id, doc_id, (error) -> + callback(null, temporary) + + # Check for project id on document history (per-document property) + _prepareDocForUpdates: (project_id, doc_id, callback = (error) ->) -> + MongoManager.backportProjectId project_id, doc_id, (error) -> + return callback(error) if error? + callback(null) + + # Apply updates for specific project/doc after preparing at project and doc level + REDIS_READ_BATCH_SIZE: 100 + processUncompressedUpdates: (project_id, doc_id, temporary, callback = (error) ->) -> + # get the updates as strings from redis (so we can delete them after they are applied) + RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) -> + return callback(error) if error? + length = docUpdates.length + # parse the redis strings into ShareJs updates + RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) -> return callback(error) if error? - # get the updates as strings from redis (so we can delete them after they are applied) - RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) -> + logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis" + UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> return callback(error) if error? - length = docUpdates.length - # parse the redis strings into ShareJs updates - RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) -> + logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates" + # delete the applied updates from redis + RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) -> return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis" - UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> - return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates" - # delete the applied updates from redis - RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) -> - return callback(error) if error? - if length == UpdatesManager.REDIS_READ_BATCH_SIZE - # There might be more updates - logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" - setTimeout () -> - UpdatesManager.processUncompressedUpdates project_id, doc_id, callback - , 0 - else - logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" - callback() + if length == UpdatesManager.REDIS_READ_BATCH_SIZE + # There might be more updates + logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" + setTimeout () -> + UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, callback + , 0 + else + logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" + callback() + # Process updates for a doc when we flush it individually processUncompressedUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> - LockManager.runWithLock( - "HistoryLock:#{doc_id}", - (releaseLock) -> - UpdatesManager.processUncompressedUpdates project_id, doc_id, releaseLock - callback - ) + UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> + return callback(error) if error? + UpdatesManager._processUncompressedUpdatesForDocWithLock project_id, doc_id, temporary, callback + + # Process updates for a doc when the whole project is flushed (internal method) + _processUncompressedUpdatesForDocWithLock: (project_id, doc_id, temporary, callback = (error) ->) -> + UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) -> + return callback(error) if error? + LockManager.runWithLock( + "HistoryLock:#{doc_id}", + (releaseLock) -> + UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, releaseLock + callback + ) + + # Process all updates for a project, only check project-level information once processUncompressedUpdatesForProject: (project_id, callback = (error) ->) -> RedisManager.getDocIdsWithHistoryOps project_id, (error, doc_ids) -> return callback(error) if error? - jobs = [] - for doc_id in doc_ids - do (doc_id) -> - jobs.push (callback) -> - UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, callback - async.parallelLimit jobs, 5, callback + UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> + jobs = [] + for doc_id in doc_ids + do (doc_id) -> + jobs.push (cb) -> + UpdatesManager._processUncompressedUpdatesForDocWithLock project_id, doc_id, temporary, cb + async.parallelLimit jobs, 5, callback getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) -> UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) -> diff --git a/services/track-changes/app/coffee/WebApiManager.coffee b/services/track-changes/app/coffee/WebApiManager.coffee index 50409ca43c..0067c87d03 100644 --- a/services/track-changes/app/coffee/WebApiManager.coffee +++ b/services/track-changes/app/coffee/WebApiManager.coffee @@ -3,7 +3,7 @@ logger = require "logger-sharelatex" Settings = require "settings-sharelatex" # Don't let HTTP calls hang for a long time -MAX_HTTP_REQUEST_LENGTH = 15000 # 15 seconds +MAX_HTTP_REQUEST_LENGTH = 30000 # 30 seconds # DEPRECATED! This method of getting user details via track-changes is deprecated # in the way we lay out our services. diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 161a5deb55..4a7d463b37 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -182,17 +182,7 @@ describe "UpdatesManager", -> @updates = ["mock-update"] @RedisManager.getOldestDocUpdates = sinon.stub().callsArgWith(2, null, @updates) @RedisManager.expandDocUpdates = sinon.stub().callsArgWith(1, null, @updates) - @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @callback - - it "should check if the updates are temporary", -> - @UpdateTrimmer.shouldTrimUpdates - .calledWith(@project_id) - .should.equal true - - it "should backport the project id", -> - @MongoManager.backportProjectId - .calledWith(@project_id, @doc_id) - .should.equal true + @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @temporary, @callback it "should get the oldest updates", -> @RedisManager.getOldestDocUpdates @@ -225,7 +215,7 @@ describe "UpdatesManager", -> @RedisManager.expandDocUpdates = (jsonUpdates, callback) => callback null, jsonUpdates sinon.spy @RedisManager, "expandDocUpdates" - @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, (args...) => + @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @temporary, (args...) => @callback(args...) done() @@ -251,10 +241,22 @@ describe "UpdatesManager", -> describe "processCompressedUpdatesWithLock", -> beforeEach -> - @UpdatesManager.processUncompressedUpdates = sinon.stub().callsArg(2) + @UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock") + @MongoManager.backportProjectId = sinon.stub().callsArg(2) + @UpdatesManager._processUncompressedUpdates = sinon.stub().callsArg(3) @LockManager.runWithLock = sinon.stub().callsArg(2) @UpdatesManager.processUncompressedUpdatesWithLock @project_id, @doc_id, @callback + it "should check if the updates are temporary", -> + @UpdateTrimmer.shouldTrimUpdates + .calledWith(@project_id) + .should.equal true + + it "should backport the project id", -> + @MongoManager.backportProjectId + .calledWith(@project_id, @doc_id) + .should.equal true + it "should run processUncompressedUpdates with the lock", -> @LockManager.runWithLock .calledWith( @@ -313,7 +315,9 @@ describe "UpdatesManager", -> describe "processUncompressedUpdatesForProject", -> beforeEach (done) -> @doc_ids = ["mock-id-1", "mock-id-2"] - @UpdatesManager.processUncompressedUpdatesWithLock = sinon.stub().callsArg(2) + @UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock") + @MongoManager.backportProjectId = sinon.stub().callsArg(2) + @UpdatesManager._processUncompressedUpdatesForDocWithLock = sinon.stub().callsArg(3) @RedisManager.getDocIdsWithHistoryOps = sinon.stub().callsArgWith(1, null, @doc_ids) @UpdatesManager.processUncompressedUpdatesForProject @project_id, () => @callback() @@ -326,8 +330,8 @@ describe "UpdatesManager", -> it "should process the doc ops for the each doc_id", -> for doc_id in @doc_ids - @UpdatesManager.processUncompressedUpdatesWithLock - .calledWith(@project_id, doc_id) + @UpdatesManager._processUncompressedUpdatesForDocWithLock + .calledWith(@project_id, doc_id, @temporary) .should.equal true it "should call the callback", -> @@ -798,4 +802,4 @@ describe "UpdatesManager", -> user_ids: [@user_2.id] start_ts: @now end_ts: @now + 10 - }] \ No newline at end of file + }]