diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index ad6a583979..0060126e71 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -70,51 +70,79 @@ module.exports = UpdatesManager = REDIS_READ_BATCH_SIZE: 100 processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) -> + UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> + return callback(error) if error? + UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) -> + return callback(error) if error? + UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback + + # Check whether the updates are temporary (per-project property) + _prepareProjectForUpdates: (project_id, callback = (error, temporary) ->) -> UpdateTrimmer.shouldTrimUpdates project_id, (error, temporary) -> return callback(error) if error? - MongoManager.backportProjectId project_id, doc_id, (error) -> + callback(null, temporary) + + # Check for project id on document history (per-document property) + _prepareDocForUpdates: (project_id, doc_id, callback = (error) ->) -> + MongoManager.backportProjectId project_id, doc_id, (error) -> + return callback(error) if error? + callback(null) + + # Apply updates for specific project/doc after preparing at project and doc level + _processUncompressedUpdates: (project_id, doc_id, temporary, callback = (error) ->) -> + # get the updates as strings from redis (so we can delete them after they are applied) + RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) -> + return callback(error) if error? + length = docUpdates.length + # parse the redis strings into ShareJs updates + RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) -> return callback(error) if error? - # get the updates as strings from redis (so we can delete them after they are applied) - RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) -> + logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis" + UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> return callback(error) if error? - length = docUpdates.length - # parse the redis strings into ShareJs updates - RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) -> + logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates" + # delete the applied updates from redis + RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) -> return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis" - UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> - return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates" - # delete the applied updates from redis - RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) -> - return callback(error) if error? - if length == UpdatesManager.REDIS_READ_BATCH_SIZE - # There might be more updates - logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" - setTimeout () -> - UpdatesManager.processUncompressedUpdates project_id, doc_id, callback - , 0 - else - logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" - callback() + if length == UpdatesManager.REDIS_READ_BATCH_SIZE + # There might be more updates + logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" + setTimeout () -> + UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback + , 0 + else + logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" + callback() + # Process updates for a doc when we flush it individually processUncompressedUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> - LockManager.runWithLock( - "HistoryLock:#{doc_id}", - (releaseLock) -> - UpdatesManager.processUncompressedUpdates project_id, doc_id, releaseLock - callback - ) + UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> + return callback(error) if error? + UpdatesManager._processUncompressedUpdatesForDoc project_id, doc_id, temporary, callback + + # Process updates for a doc when the whole project is flushed (internal method) + _processUncompressedUpdatesForDoc: (project_id, doc_id, temporary, callback = (error) ->) -> + UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) -> + return callback(error) if error? + LockManager.runWithLock( + "HistoryLock:#{doc_id}", + (releaseLock) -> + UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, releaseLock + callback + ) + + # Process all updates for a project, only check project-level information once processUncompressedUpdatesForProject: (project_id, callback = (error) ->) -> RedisManager.getDocIdsWithHistoryOps project_id, (error, doc_ids) -> return callback(error) if error? - jobs = [] - for doc_id in doc_ids - do (doc_id) -> - jobs.push (callback) -> - UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, callback - async.parallelLimit jobs, 5, callback + UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> + jobs = [] + for doc_id in doc_ids + do (doc_id) -> + jobs.push (cb) -> + UpdatesManager._processUncompressedUpdatesForDoc project_id, doc_id, temporary, cb + async.parallelLimit jobs, 5, callback getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) -> UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) -> diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 161a5deb55..3746cec999 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -251,7 +251,9 @@ describe "UpdatesManager", -> describe "processCompressedUpdatesWithLock", -> beforeEach -> - @UpdatesManager.processUncompressedUpdates = sinon.stub().callsArg(2) + @UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock") + @MongoManager.backportProjectId = sinon.stub().callsArg(2) + @UpdatesManager._processUncompressedUpdates = sinon.stub().callsArg(3) @LockManager.runWithLock = sinon.stub().callsArg(2) @UpdatesManager.processUncompressedUpdatesWithLock @project_id, @doc_id, @callback @@ -313,7 +315,9 @@ describe "UpdatesManager", -> describe "processUncompressedUpdatesForProject", -> beforeEach (done) -> @doc_ids = ["mock-id-1", "mock-id-2"] - @UpdatesManager.processUncompressedUpdatesWithLock = sinon.stub().callsArg(2) + @UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock") + @MongoManager.backportProjectId = sinon.stub().callsArg(2) + @UpdatesManager._processUncompressedUpdatesForDoc = sinon.stub().callsArg(3) @RedisManager.getDocIdsWithHistoryOps = sinon.stub().callsArgWith(1, null, @doc_ids) @UpdatesManager.processUncompressedUpdatesForProject @project_id, () => @callback() @@ -326,8 +330,8 @@ describe "UpdatesManager", -> it "should process the doc ops for the each doc_id", -> for doc_id in @doc_ids - @UpdatesManager.processUncompressedUpdatesWithLock - .calledWith(@project_id, doc_id) + @UpdatesManager._processUncompressedUpdatesForDoc + .calledWith(@project_id, doc_id, @temporary) .should.equal true it "should call the callback", ->