From 2d82d56f934a0a9e9ea0f811799ba21e00259b38 Mon Sep 17 00:00:00 2001 From: James Allen Date: Fri, 9 Sep 2016 11:01:14 +0100 Subject: [PATCH] Process updates one at a time and write into redis after each one --- .../app/coffee/DocumentManager.coffee | 2 +- .../app/coffee/ShareJsUpdateManager.coffee | 31 ++++++---------- .../app/coffee/UpdateManager.coffee | 14 +++---- .../coffee/GettingADocumentTests.coffee | 3 +- .../coffee/DocumentManager/setDocTests.coffee | 8 ++-- .../ShareJsUpdateManagerTests.coffee | 21 +++++------ .../UpdateManager/ApplyingUpdates.coffee | 37 ++++++++++--------- 7 files changed, 54 insertions(+), 62 deletions(-) diff --git a/services/document-updater/app/coffee/DocumentManager.coffee b/services/document-updater/app/coffee/DocumentManager.coffee index 63df6d0d79..ebbdc3a66e 100644 --- a/services/document-updater/app/coffee/DocumentManager.coffee +++ b/services/document-updater/app/coffee/DocumentManager.coffee @@ -68,7 +68,7 @@ module.exports = DocumentManager = type: "external" source: source user_id: user_id - UpdateManager.applyUpdates project_id, doc_id, [update], (error) -> + UpdateManager.applyUpdate project_id, doc_id, update, (error) -> return callback(error) if error? # If the document was loaded already, then someone has it open # in a project, and the usual flushing mechanism will happen. diff --git a/services/document-updater/app/coffee/ShareJsUpdateManager.coffee b/services/document-updater/app/coffee/ShareJsUpdateManager.coffee index ca00a04ea9..985d03094a 100644 --- a/services/document-updater/app/coffee/ShareJsUpdateManager.coffee +++ b/services/document-updater/app/coffee/ShareJsUpdateManager.coffee @@ -21,8 +21,8 @@ module.exports = ShareJsUpdateManager = model.db = db return model - applyUpdates: (project_id, doc_id, updates, callback = (error, updatedDocLines) ->) -> - logger.log project_id: project_id, doc_id: doc_id, updates: updates, "applying sharejs updates" + applyUpdate: (project_id, doc_id, update, callback = (error, updatedDocLines) ->) -> + logger.log project_id: project_id, doc_id: doc_id, update: update, "applying sharejs updates" jobs = [] # We could use a global model for all docs, but we're hitting issues with the @@ -33,26 +33,19 @@ module.exports = ShareJsUpdateManager = model = @getNewShareJsModel() @_listenForOps(model) doc_key = Keys.combineProjectIdAndDocId(project_id, doc_id) - for update in updates - do (update) => - jobs.push (callback) => - model.applyOp doc_key, update, (error) -> - if error == "Op already submitted" - logger.warn {project_id, doc_id, update}, "op has already been submitted" - update.dup = true - ShareJsUpdateManager._sendOp(project_id, doc_id, update) - callback() - else - callback(error) - - async.series jobs, (error) => - logger.log project_id: project_id, doc_id: doc_id, error: error, "applied updates" + model.applyOp doc_key, update, (error) -> if error? - @_sendError(project_id, doc_id, error) - return callback(error) + if error == "Op already submitted" + logger.warn {project_id, doc_id, update}, "op has already been submitted" + update.dup = true + ShareJsUpdateManager._sendOp(project_id, doc_id, update) + else + ShareJsUpdateManager._sendError(project_id, doc_id, error) + return callback(error) + logger.log project_id: project_id, doc_id: doc_id, error: error, "applied update" model.getSnapshot doc_key, (error, data) => if error? - @_sendError(project_id, doc_id, error) + ShareJsUpdateManager._sendError(project_id, doc_id, error) return callback(error) docLines = data.snapshot.split(/\r\n|\n|\r/) callback(null, docLines, data.v, model.db.appliedOps[doc_key] or []) diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index 0b5da21c8f..bcd0baf8b6 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -39,15 +39,15 @@ module.exports = UpdateManager = return callback(error) if error? if updates.length == 0 return callback() - UpdateManager.applyUpdates project_id, doc_id, updates, callback + async.mapSeries updates, + (update, cb) -> UpdateManager.applyUpdate project_id, doc_id, update, cb + callback - applyUpdates: (project_id, doc_id, updates, callback = (error) ->) -> - for update in updates or [] - UpdateManager._sanitizeUpdate update - ShareJsUpdateManager.applyUpdates project_id, doc_id, updates, (error, updatedDocLines, version, appliedOps) -> + applyUpdate: (project_id, doc_id, update, callback = (error) ->) -> + UpdateManager._sanitizeUpdate update + ShareJsUpdateManager.applyUpdate project_id, doc_id, update, (error, updatedDocLines, version, appliedOps) -> return callback(error) if error? - logger.log doc_id: doc_id, version: version, "updating doc via sharejs" - # TODO: Do these in parallel? Worry about consistency here? + logger.log doc_id: doc_id, version: version, "updating doc in redis" RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, (error) -> return callback(error) if error? TrackChangesManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback diff --git a/services/document-updater/test/acceptance/coffee/GettingADocumentTests.coffee b/services/document-updater/test/acceptance/coffee/GettingADocumentTests.coffee index 210502ae45..0823b8483a 100644 --- a/services/document-updater/test/acceptance/coffee/GettingADocumentTests.coffee +++ b/services/document-updater/test/acceptance/coffee/GettingADocumentTests.coffee @@ -8,9 +8,10 @@ MockWebApi = require "./helpers/MockWebApi" DocUpdaterClient = require "./helpers/DocUpdaterClient" describe "Getting a document", -> - beforeEach -> + before (done) -> @lines = ["one", "two", "three"] @version = 42 + setTimeout done, 200 # Give MockWebApi a chance to start describe "when the document is not loaded", -> before (done) -> diff --git a/services/document-updater/test/unit/coffee/DocumentManager/setDocTests.coffee b/services/document-updater/test/unit/coffee/DocumentManager/setDocTests.coffee index 9307c42feb..360d939b9f 100644 --- a/services/document-updater/test/unit/coffee/DocumentManager/setDocTests.coffee +++ b/services/document-updater/test/unit/coffee/DocumentManager/setDocTests.coffee @@ -32,7 +32,7 @@ describe "DocumentManager.setDoc", -> @afterLines = ["after", "lines"] @DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @beforeLines, @version, true) @DiffCodec.diffAsShareJsOp = sinon.stub().callsArgWith(2, null, @ops) - @UpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null) + @UpdateManager.applyUpdate = sinon.stub().callsArgWith(3, null) @DocumentManager.flushDocIfLoaded = sinon.stub().callsArg(2) @DocumentManager.flushAndDeleteDoc = sinon.stub().callsArg(2) @@ -51,11 +51,11 @@ describe "DocumentManager.setDoc", -> .should.equal true it "should apply the diff as a ShareJS op", -> - @UpdateManager.applyUpdates + @UpdateManager.applyUpdate .calledWith( @project_id, @doc_id, - [ + { doc: @doc_id, v: @version, op: @ops, @@ -64,7 +64,7 @@ describe "DocumentManager.setDoc", -> source: @source user_id: @user_id } - ] + } ) .should.equal true diff --git a/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee b/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee index 8d967ec2ee..94806a1a9d 100644 --- a/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee @@ -20,7 +20,7 @@ describe "ShareJsUpdateManager", -> globals: clearTimeout: @clearTimeout = sinon.stub() - describe "applyUpdates", -> + describe "applyUpdate", -> beforeEach -> @version = 34 @model = @@ -31,17 +31,14 @@ describe "ShareJsUpdateManager", -> @ShareJsUpdateManager.getNewShareJsModel = sinon.stub().returns(@model) @ShareJsUpdateManager._listenForOps = sinon.stub() @ShareJsUpdateManager.removeDocFromCache = sinon.stub().callsArg(1) - @updates = [ - {p: 4, t: "foo"} - {p: 6, t: "bar"} - ] + @update = {p: 4, t: "foo"} @updatedDocLines = ["one", "two"] describe "successfully", -> beforeEach (done) -> @model.getSnapshot.callsArgWith(1, null, {snapshot: @updatedDocLines.join("\n"), v: @version}) @model.db.appliedOps["#{@project_id}:#{@doc_id}"] = @appliedOps = ["mock-ops"] - @ShareJsUpdateManager.applyUpdates @project_id, @doc_id, @updates, (err, docLines, version, appliedOps) => + @ShareJsUpdateManager.applyUpdate @project_id, @doc_id, @update, (err, docLines, version, appliedOps) => @callback(err, docLines, version, appliedOps) done() @@ -54,10 +51,10 @@ describe "ShareJsUpdateManager", -> .calledWith(@model) .should.equal true - it "should send each update to ShareJs", -> - for update in @updates - @model.applyOp - .calledWith("#{@project_id}:#{@doc_id}", update).should.equal true + it "should send the update to ShareJs", -> + @model.applyOp + .calledWith("#{@project_id}:#{@doc_id}", @update) + .should.equal true it "should get the updated doc lines", -> @model.getSnapshot @@ -72,7 +69,7 @@ describe "ShareJsUpdateManager", -> @error = new Error("Something went wrong") @ShareJsUpdateManager._sendError = sinon.stub() @model.applyOp = sinon.stub().callsArgWith(2, @error) - @ShareJsUpdateManager.applyUpdates @project_id, @doc_id, @updates, (err, docLines, version) => + @ShareJsUpdateManager.applyUpdate @project_id, @doc_id, @update, (err, docLines, version) => @callback(err, docLines, version) done() @@ -89,7 +86,7 @@ describe "ShareJsUpdateManager", -> @error = new Error("Something went wrong") @ShareJsUpdateManager._sendError = sinon.stub() @model.getSnapshot.callsArgWith(1, @error) - @ShareJsUpdateManager.applyUpdates @project_id, @doc_id, @updates, (err, docLines, version) => + @ShareJsUpdateManager.applyUpdate @project_id, @doc_id, @update, (err, docLines, version) => @callback(err, docLines, version) done() diff --git a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee index e5c4cf9118..43786f4b98 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee @@ -121,16 +121,17 @@ describe "UpdateManager", -> @updatedDocLines = ["updated", "lines"] @version = 34 @WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) - @UpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version) + @UpdateManager.applyUpdate = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version) @UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback it "should get the pending updates", -> @WebRedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true it "should apply the updates", -> - @UpdateManager.applyUpdates - .calledWith(@project_id, @doc_id, @updates) - .should.equal true + for update in @updates + @UpdateManager.applyUpdate + .calledWith(@project_id, @doc_id, update) + .should.equal true it "should call the callback", -> @callback.called.should.equal true @@ -139,33 +140,33 @@ describe "UpdateManager", -> beforeEach -> @updates = [] @WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) - @UpdateManager.applyUpdates = sinon.stub() + @UpdateManager.applyUpdate = sinon.stub() @RedisManager.setDocument = sinon.stub() @UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback - it "should not call applyUpdates", -> - @UpdateManager.applyUpdates.called.should.equal false + it "should not call applyUpdate", -> + @UpdateManager.applyUpdate.called.should.equal false it "should call the callback", -> @callback.called.should.equal true - describe "applyUpdates", -> + describe "applyUpdate", -> beforeEach -> - @updates = [{op: [{p: 42, i: "foo"}]}] + @update = {op: [{p: 42, i: "foo"}]} @updatedDocLines = ["updated", "lines"] @version = 34 @appliedOps = ["mock-applied-ops"] - @ShareJsUpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version, @appliedOps) + @ShareJsUpdateManager.applyUpdate = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version, @appliedOps) @RedisManager.updateDocument = sinon.stub().callsArg(4) @TrackChangesManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3) describe "normally", -> beforeEach -> - @UpdateManager.applyUpdates @project_id, @doc_id, @updates, @callback + @UpdateManager.applyUpdate @project_id, @doc_id, @update, @callback it "should apply the updates via ShareJS", -> - @ShareJsUpdateManager.applyUpdates - .calledWith(@project_id, @doc_id, @updates) + @ShareJsUpdateManager.applyUpdate + .calledWith(@project_id, @doc_id, @update) .should.equal true it "should save the document", -> @@ -183,14 +184,14 @@ describe "UpdateManager", -> describe "with UTF-16 surrogate pairs in the update", -> beforeEach -> - @updates = [{op: [{p: 42, i: "\uD835\uDC00"}]}] - @UpdateManager.applyUpdates @project_id, @doc_id, @updates, @callback + @update = {op: [{p: 42, i: "\uD835\uDC00"}]} + @UpdateManager.applyUpdate @project_id, @doc_id, @update, @callback it "should apply the update but with surrogate pairs removed", -> - @ShareJsUpdateManager.applyUpdates - .calledWith(@project_id, @doc_id, @updates) + @ShareJsUpdateManager.applyUpdate + .calledWith(@project_id, @doc_id, @update) .should.equal true # \uFFFD is 'replacement character' - @updates[0].op[0].i.should.equal "\uFFFD\uFFFD" + @update.op[0].i.should.equal "\uFFFD\uFFFD"