From 679582093307483d985d4ba1c7abb00f7e54fd54 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 21 Mar 2017 16:49:23 +0000 Subject: [PATCH 1/4] move lock inside web http calls --- .../app/coffee/UpdatesManager.coffee | 96 ++++++++++++------- .../UpdatesManager/UpdatesManagerTests.coffee | 12 ++- 2 files changed, 70 insertions(+), 38 deletions(-) diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index ad6a583979..0060126e71 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -70,51 +70,79 @@ module.exports = UpdatesManager = REDIS_READ_BATCH_SIZE: 100 processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) -> + UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> + return callback(error) if error? + UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) -> + return callback(error) if error? + UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback + + # Check whether the updates are temporary (per-project property) + _prepareProjectForUpdates: (project_id, callback = (error, temporary) ->) -> UpdateTrimmer.shouldTrimUpdates project_id, (error, temporary) -> return callback(error) if error? - MongoManager.backportProjectId project_id, doc_id, (error) -> + callback(null, temporary) + + # Check for project id on document history (per-document property) + _prepareDocForUpdates: (project_id, doc_id, callback = (error) ->) -> + MongoManager.backportProjectId project_id, doc_id, (error) -> + return callback(error) if error? + callback(null) + + # Apply updates for specific project/doc after preparing at project and doc level + _processUncompressedUpdates: (project_id, doc_id, temporary, callback = (error) ->) -> + # get the updates as strings from redis (so we can delete them after they are applied) + RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) -> + return callback(error) if error? + length = docUpdates.length + # parse the redis strings into ShareJs updates + RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) -> return callback(error) if error? - # get the updates as strings from redis (so we can delete them after they are applied) - RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) -> + logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis" + UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> return callback(error) if error? - length = docUpdates.length - # parse the redis strings into ShareJs updates - RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) -> + logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates" + # delete the applied updates from redis + RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) -> return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis" - UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> - return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates" - # delete the applied updates from redis - RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) -> - return callback(error) if error? - if length == UpdatesManager.REDIS_READ_BATCH_SIZE - # There might be more updates - logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" - setTimeout () -> - UpdatesManager.processUncompressedUpdates project_id, doc_id, callback - , 0 - else - logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" - callback() + if length == UpdatesManager.REDIS_READ_BATCH_SIZE + # There might be more updates + logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" + setTimeout () -> + UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback + , 0 + else + logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" + callback() + # Process updates for a doc when we flush it individually processUncompressedUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> - LockManager.runWithLock( - "HistoryLock:#{doc_id}", - (releaseLock) -> - UpdatesManager.processUncompressedUpdates project_id, doc_id, releaseLock - callback - ) + UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> + return callback(error) if error? + UpdatesManager._processUncompressedUpdatesForDoc project_id, doc_id, temporary, callback + + # Process updates for a doc when the whole project is flushed (internal method) + _processUncompressedUpdatesForDoc: (project_id, doc_id, temporary, callback = (error) ->) -> + UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) -> + return callback(error) if error? + LockManager.runWithLock( + "HistoryLock:#{doc_id}", + (releaseLock) -> + UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, releaseLock + callback + ) + + # Process all updates for a project, only check project-level information once processUncompressedUpdatesForProject: (project_id, callback = (error) ->) -> RedisManager.getDocIdsWithHistoryOps project_id, (error, doc_ids) -> return callback(error) if error? - jobs = [] - for doc_id in doc_ids - do (doc_id) -> - jobs.push (callback) -> - UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, callback - async.parallelLimit jobs, 5, callback + UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> + jobs = [] + for doc_id in doc_ids + do (doc_id) -> + jobs.push (cb) -> + UpdatesManager._processUncompressedUpdatesForDoc project_id, doc_id, temporary, cb + async.parallelLimit jobs, 5, callback getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) -> UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) -> diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 161a5deb55..3746cec999 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -251,7 +251,9 @@ describe "UpdatesManager", -> describe "processCompressedUpdatesWithLock", -> beforeEach -> - @UpdatesManager.processUncompressedUpdates = sinon.stub().callsArg(2) + @UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock") + @MongoManager.backportProjectId = sinon.stub().callsArg(2) + @UpdatesManager._processUncompressedUpdates = sinon.stub().callsArg(3) @LockManager.runWithLock = sinon.stub().callsArg(2) @UpdatesManager.processUncompressedUpdatesWithLock @project_id, @doc_id, @callback @@ -313,7 +315,9 @@ describe "UpdatesManager", -> describe "processUncompressedUpdatesForProject", -> beforeEach (done) -> @doc_ids = ["mock-id-1", "mock-id-2"] - @UpdatesManager.processUncompressedUpdatesWithLock = sinon.stub().callsArg(2) + @UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock") + @MongoManager.backportProjectId = sinon.stub().callsArg(2) + @UpdatesManager._processUncompressedUpdatesForDoc = sinon.stub().callsArg(3) @RedisManager.getDocIdsWithHistoryOps = sinon.stub().callsArgWith(1, null, @doc_ids) @UpdatesManager.processUncompressedUpdatesForProject @project_id, () => @callback() @@ -326,8 +330,8 @@ describe "UpdatesManager", -> it "should process the doc ops for the each doc_id", -> for doc_id in @doc_ids - @UpdatesManager.processUncompressedUpdatesWithLock - .calledWith(@project_id, doc_id) + @UpdatesManager._processUncompressedUpdatesForDoc + .calledWith(@project_id, doc_id, @temporary) .should.equal true it "should call the callback", -> From 0dd668416dccebe3170eea94092bc0b50e14103a Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 22 Mar 2017 16:02:50 +0000 Subject: [PATCH 2/4] increase request timeout now it is outside lock --- services/track-changes/app/coffee/WebApiManager.coffee | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/track-changes/app/coffee/WebApiManager.coffee b/services/track-changes/app/coffee/WebApiManager.coffee index 50409ca43c..0067c87d03 100644 --- a/services/track-changes/app/coffee/WebApiManager.coffee +++ b/services/track-changes/app/coffee/WebApiManager.coffee @@ -3,7 +3,7 @@ logger = require "logger-sharelatex" Settings = require "settings-sharelatex" # Don't let HTTP calls hang for a long time -MAX_HTTP_REQUEST_LENGTH = 15000 # 15 seconds +MAX_HTTP_REQUEST_LENGTH = 30000 # 30 seconds # DEPRECATED! This method of getting user details via track-changes is deprecated # in the way we lay out our services. From b1c0ebbaaef9dadd757e51a6061a897631693f85 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 22 Mar 2017 16:16:04 +0000 Subject: [PATCH 3/4] add withLock to processUncompressedUpdatesForDoc --- services/track-changes/app/coffee/UpdatesManager.coffee | 6 +++--- .../unit/coffee/UpdatesManager/UpdatesManagerTests.coffee | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index 0060126e71..6b581bcd53 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -118,11 +118,11 @@ module.exports = UpdatesManager = processUncompressedUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> return callback(error) if error? - UpdatesManager._processUncompressedUpdatesForDoc project_id, doc_id, temporary, callback + UpdatesManager._processUncompressedUpdatesForDocWithLock project_id, doc_id, temporary, callback # Process updates for a doc when the whole project is flushed (internal method) - _processUncompressedUpdatesForDoc: (project_id, doc_id, temporary, callback = (error) ->) -> + _processUncompressedUpdatesForDocWithLock: (project_id, doc_id, temporary, callback = (error) ->) -> UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) -> return callback(error) if error? LockManager.runWithLock( @@ -141,7 +141,7 @@ module.exports = UpdatesManager = for doc_id in doc_ids do (doc_id) -> jobs.push (cb) -> - UpdatesManager._processUncompressedUpdatesForDoc project_id, doc_id, temporary, cb + UpdatesManager._processUncompressedUpdatesForDocWithLock project_id, doc_id, temporary, cb async.parallelLimit jobs, 5, callback getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) -> diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 3746cec999..2c3e06832f 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -317,7 +317,7 @@ describe "UpdatesManager", -> @doc_ids = ["mock-id-1", "mock-id-2"] @UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock") @MongoManager.backportProjectId = sinon.stub().callsArg(2) - @UpdatesManager._processUncompressedUpdatesForDoc = sinon.stub().callsArg(3) + @UpdatesManager._processUncompressedUpdatesForDocWithLock = sinon.stub().callsArg(3) @RedisManager.getDocIdsWithHistoryOps = sinon.stub().callsArgWith(1, null, @doc_ids) @UpdatesManager.processUncompressedUpdatesForProject @project_id, () => @callback() @@ -330,7 +330,7 @@ describe "UpdatesManager", -> it "should process the doc ops for the each doc_id", -> for doc_id in @doc_ids - @UpdatesManager._processUncompressedUpdatesForDoc + @UpdatesManager._processUncompressedUpdatesForDocWithLock .calledWith(@project_id, doc_id, @temporary) .should.equal true @@ -802,4 +802,4 @@ describe "UpdatesManager", -> user_ids: [@user_2.id] start_ts: @now end_ts: @now + 10 - }] \ No newline at end of file + }] From bc7815f7fce2fb1601f7e8892c87ca5420f7e404 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 22 Mar 2017 16:59:52 +0000 Subject: [PATCH 4/4] remove old processUncompressedUpdates method replace with new per doc method --- .../app/coffee/UpdatesManager.coffee | 15 ++++-------- .../UpdatesManager/UpdatesManagerTests.coffee | 24 +++++++++---------- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index 6b581bcd53..fa4016f632 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -68,14 +68,6 @@ module.exports = UpdatesManager = logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result? callback() - REDIS_READ_BATCH_SIZE: 100 - processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) -> - UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) -> - return callback(error) if error? - UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) -> - return callback(error) if error? - UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback - # Check whether the updates are temporary (per-project property) _prepareProjectForUpdates: (project_id, callback = (error, temporary) ->) -> UpdateTrimmer.shouldTrimUpdates project_id, (error, temporary) -> @@ -89,7 +81,8 @@ module.exports = UpdatesManager = callback(null) # Apply updates for specific project/doc after preparing at project and doc level - _processUncompressedUpdates: (project_id, doc_id, temporary, callback = (error) ->) -> + REDIS_READ_BATCH_SIZE: 100 + processUncompressedUpdates: (project_id, doc_id, temporary, callback = (error) ->) -> # get the updates as strings from redis (so we can delete them after they are applied) RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) -> return callback(error) if error? @@ -108,7 +101,7 @@ module.exports = UpdatesManager = # There might be more updates logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" setTimeout () -> - UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback + UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, callback , 0 else logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" @@ -128,7 +121,7 @@ module.exports = UpdatesManager = LockManager.runWithLock( "HistoryLock:#{doc_id}", (releaseLock) -> - UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, releaseLock + UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, releaseLock callback ) diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 2c3e06832f..4a7d463b37 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -182,17 +182,7 @@ describe "UpdatesManager", -> @updates = ["mock-update"] @RedisManager.getOldestDocUpdates = sinon.stub().callsArgWith(2, null, @updates) @RedisManager.expandDocUpdates = sinon.stub().callsArgWith(1, null, @updates) - @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @callback - - it "should check if the updates are temporary", -> - @UpdateTrimmer.shouldTrimUpdates - .calledWith(@project_id) - .should.equal true - - it "should backport the project id", -> - @MongoManager.backportProjectId - .calledWith(@project_id, @doc_id) - .should.equal true + @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @temporary, @callback it "should get the oldest updates", -> @RedisManager.getOldestDocUpdates @@ -225,7 +215,7 @@ describe "UpdatesManager", -> @RedisManager.expandDocUpdates = (jsonUpdates, callback) => callback null, jsonUpdates sinon.spy @RedisManager, "expandDocUpdates" - @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, (args...) => + @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @temporary, (args...) => @callback(args...) done() @@ -257,6 +247,16 @@ describe "UpdatesManager", -> @LockManager.runWithLock = sinon.stub().callsArg(2) @UpdatesManager.processUncompressedUpdatesWithLock @project_id, @doc_id, @callback + it "should check if the updates are temporary", -> + @UpdateTrimmer.shouldTrimUpdates + .calledWith(@project_id) + .should.equal true + + it "should backport the project id", -> + @MongoManager.backportProjectId + .calledWith(@project_id, @doc_id) + .should.equal true + it "should run processUncompressedUpdates with the lock", -> @LockManager.runWithLock .calledWith(