Process updates one at a time and write into redis after each one

This commit is contained in:
James Allen 2016-09-09 11:01:14 +01:00
parent c85fd7ef41
commit 2d82d56f93
7 changed files with 54 additions and 62 deletions

View file

@ -68,7 +68,7 @@ module.exports = DocumentManager =
type: "external"
source: source
user_id: user_id
UpdateManager.applyUpdates project_id, doc_id, [update], (error) ->
UpdateManager.applyUpdate project_id, doc_id, update, (error) ->
return callback(error) if error?
# If the document was loaded already, then someone has it open
# in a project, and the usual flushing mechanism will happen.

View file

@ -21,8 +21,8 @@ module.exports = ShareJsUpdateManager =
model.db = db
return model
applyUpdates: (project_id, doc_id, updates, callback = (error, updatedDocLines) ->) ->
logger.log project_id: project_id, doc_id: doc_id, updates: updates, "applying sharejs updates"
applyUpdate: (project_id, doc_id, update, callback = (error, updatedDocLines) ->) ->
logger.log project_id: project_id, doc_id: doc_id, update: update, "applying sharejs updates"
jobs = []
# We could use a global model for all docs, but we're hitting issues with the
@ -33,26 +33,19 @@ module.exports = ShareJsUpdateManager =
model = @getNewShareJsModel()
@_listenForOps(model)
doc_key = Keys.combineProjectIdAndDocId(project_id, doc_id)
for update in updates
do (update) =>
jobs.push (callback) =>
model.applyOp doc_key, update, (error) ->
if error == "Op already submitted"
logger.warn {project_id, doc_id, update}, "op has already been submitted"
update.dup = true
ShareJsUpdateManager._sendOp(project_id, doc_id, update)
callback()
else
callback(error)
async.series jobs, (error) =>
logger.log project_id: project_id, doc_id: doc_id, error: error, "applied updates"
model.applyOp doc_key, update, (error) ->
if error?
@_sendError(project_id, doc_id, error)
return callback(error)
if error == "Op already submitted"
logger.warn {project_id, doc_id, update}, "op has already been submitted"
update.dup = true
ShareJsUpdateManager._sendOp(project_id, doc_id, update)
else
ShareJsUpdateManager._sendError(project_id, doc_id, error)
return callback(error)
logger.log project_id: project_id, doc_id: doc_id, error: error, "applied update"
model.getSnapshot doc_key, (error, data) =>
if error?
@_sendError(project_id, doc_id, error)
ShareJsUpdateManager._sendError(project_id, doc_id, error)
return callback(error)
docLines = data.snapshot.split(/\r\n|\n|\r/)
callback(null, docLines, data.v, model.db.appliedOps[doc_key] or [])

View file

@ -39,15 +39,15 @@ module.exports = UpdateManager =
return callback(error) if error?
if updates.length == 0
return callback()
UpdateManager.applyUpdates project_id, doc_id, updates, callback
async.mapSeries updates,
(update, cb) -> UpdateManager.applyUpdate project_id, doc_id, update, cb
callback
applyUpdates: (project_id, doc_id, updates, callback = (error) ->) ->
for update in updates or []
UpdateManager._sanitizeUpdate update
ShareJsUpdateManager.applyUpdates project_id, doc_id, updates, (error, updatedDocLines, version, appliedOps) ->
applyUpdate: (project_id, doc_id, update, callback = (error) ->) ->
UpdateManager._sanitizeUpdate update
ShareJsUpdateManager.applyUpdate project_id, doc_id, update, (error, updatedDocLines, version, appliedOps) ->
return callback(error) if error?
logger.log doc_id: doc_id, version: version, "updating doc via sharejs"
# TODO: Do these in parallel? Worry about consistency here?
logger.log doc_id: doc_id, version: version, "updating doc in redis"
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, (error) ->
return callback(error) if error?
TrackChangesManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback

View file

@ -8,9 +8,10 @@ MockWebApi = require "./helpers/MockWebApi"
DocUpdaterClient = require "./helpers/DocUpdaterClient"
describe "Getting a document", ->
beforeEach ->
before (done) ->
@lines = ["one", "two", "three"]
@version = 42
setTimeout done, 200 # Give MockWebApi a chance to start
describe "when the document is not loaded", ->
before (done) ->

View file

@ -32,7 +32,7 @@ describe "DocumentManager.setDoc", ->
@afterLines = ["after", "lines"]
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @beforeLines, @version, true)
@DiffCodec.diffAsShareJsOp = sinon.stub().callsArgWith(2, null, @ops)
@UpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null)
@UpdateManager.applyUpdate = sinon.stub().callsArgWith(3, null)
@DocumentManager.flushDocIfLoaded = sinon.stub().callsArg(2)
@DocumentManager.flushAndDeleteDoc = sinon.stub().callsArg(2)
@ -51,11 +51,11 @@ describe "DocumentManager.setDoc", ->
.should.equal true
it "should apply the diff as a ShareJS op", ->
@UpdateManager.applyUpdates
@UpdateManager.applyUpdate
.calledWith(
@project_id,
@doc_id,
[
{
doc: @doc_id,
v: @version,
op: @ops,
@ -64,7 +64,7 @@ describe "DocumentManager.setDoc", ->
source: @source
user_id: @user_id
}
]
}
)
.should.equal true

View file

@ -20,7 +20,7 @@ describe "ShareJsUpdateManager", ->
globals:
clearTimeout: @clearTimeout = sinon.stub()
describe "applyUpdates", ->
describe "applyUpdate", ->
beforeEach ->
@version = 34
@model =
@ -31,17 +31,14 @@ describe "ShareJsUpdateManager", ->
@ShareJsUpdateManager.getNewShareJsModel = sinon.stub().returns(@model)
@ShareJsUpdateManager._listenForOps = sinon.stub()
@ShareJsUpdateManager.removeDocFromCache = sinon.stub().callsArg(1)
@updates = [
{p: 4, t: "foo"}
{p: 6, t: "bar"}
]
@update = {p: 4, t: "foo"}
@updatedDocLines = ["one", "two"]
describe "successfully", ->
beforeEach (done) ->
@model.getSnapshot.callsArgWith(1, null, {snapshot: @updatedDocLines.join("\n"), v: @version})
@model.db.appliedOps["#{@project_id}:#{@doc_id}"] = @appliedOps = ["mock-ops"]
@ShareJsUpdateManager.applyUpdates @project_id, @doc_id, @updates, (err, docLines, version, appliedOps) =>
@ShareJsUpdateManager.applyUpdate @project_id, @doc_id, @update, (err, docLines, version, appliedOps) =>
@callback(err, docLines, version, appliedOps)
done()
@ -54,10 +51,10 @@ describe "ShareJsUpdateManager", ->
.calledWith(@model)
.should.equal true
it "should send each update to ShareJs", ->
for update in @updates
@model.applyOp
.calledWith("#{@project_id}:#{@doc_id}", update).should.equal true
it "should send the update to ShareJs", ->
@model.applyOp
.calledWith("#{@project_id}:#{@doc_id}", @update)
.should.equal true
it "should get the updated doc lines", ->
@model.getSnapshot
@ -72,7 +69,7 @@ describe "ShareJsUpdateManager", ->
@error = new Error("Something went wrong")
@ShareJsUpdateManager._sendError = sinon.stub()
@model.applyOp = sinon.stub().callsArgWith(2, @error)
@ShareJsUpdateManager.applyUpdates @project_id, @doc_id, @updates, (err, docLines, version) =>
@ShareJsUpdateManager.applyUpdate @project_id, @doc_id, @update, (err, docLines, version) =>
@callback(err, docLines, version)
done()
@ -89,7 +86,7 @@ describe "ShareJsUpdateManager", ->
@error = new Error("Something went wrong")
@ShareJsUpdateManager._sendError = sinon.stub()
@model.getSnapshot.callsArgWith(1, @error)
@ShareJsUpdateManager.applyUpdates @project_id, @doc_id, @updates, (err, docLines, version) =>
@ShareJsUpdateManager.applyUpdate @project_id, @doc_id, @update, (err, docLines, version) =>
@callback(err, docLines, version)
done()

View file

@ -121,16 +121,17 @@ describe "UpdateManager", ->
@updatedDocLines = ["updated", "lines"]
@version = 34
@WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
@UpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version)
@UpdateManager.applyUpdate = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version)
@UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback
it "should get the pending updates", ->
@WebRedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true
it "should apply the updates", ->
@UpdateManager.applyUpdates
.calledWith(@project_id, @doc_id, @updates)
.should.equal true
for update in @updates
@UpdateManager.applyUpdate
.calledWith(@project_id, @doc_id, update)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
@ -139,33 +140,33 @@ describe "UpdateManager", ->
beforeEach ->
@updates = []
@WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
@UpdateManager.applyUpdates = sinon.stub()
@UpdateManager.applyUpdate = sinon.stub()
@RedisManager.setDocument = sinon.stub()
@UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback
it "should not call applyUpdates", ->
@UpdateManager.applyUpdates.called.should.equal false
it "should not call applyUpdate", ->
@UpdateManager.applyUpdate.called.should.equal false
it "should call the callback", ->
@callback.called.should.equal true
describe "applyUpdates", ->
describe "applyUpdate", ->
beforeEach ->
@updates = [{op: [{p: 42, i: "foo"}]}]
@update = {op: [{p: 42, i: "foo"}]}
@updatedDocLines = ["updated", "lines"]
@version = 34
@appliedOps = ["mock-applied-ops"]
@ShareJsUpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version, @appliedOps)
@ShareJsUpdateManager.applyUpdate = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version, @appliedOps)
@RedisManager.updateDocument = sinon.stub().callsArg(4)
@TrackChangesManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3)
describe "normally", ->
beforeEach ->
@UpdateManager.applyUpdates @project_id, @doc_id, @updates, @callback
@UpdateManager.applyUpdate @project_id, @doc_id, @update, @callback
it "should apply the updates via ShareJS", ->
@ShareJsUpdateManager.applyUpdates
.calledWith(@project_id, @doc_id, @updates)
@ShareJsUpdateManager.applyUpdate
.calledWith(@project_id, @doc_id, @update)
.should.equal true
it "should save the document", ->
@ -183,14 +184,14 @@ describe "UpdateManager", ->
describe "with UTF-16 surrogate pairs in the update", ->
beforeEach ->
@updates = [{op: [{p: 42, i: "\uD835\uDC00"}]}]
@UpdateManager.applyUpdates @project_id, @doc_id, @updates, @callback
@update = {op: [{p: 42, i: "\uD835\uDC00"}]}
@UpdateManager.applyUpdate @project_id, @doc_id, @update, @callback
it "should apply the update but with surrogate pairs removed", ->
@ShareJsUpdateManager.applyUpdates
.calledWith(@project_id, @doc_id, @updates)
@ShareJsUpdateManager.applyUpdate
.calledWith(@project_id, @doc_id, @update)
.should.equal true
# \uFFFD is 'replacement character'
@updates[0].op[0].i.should.equal "\uFFFD\uFFFD"
@update.op[0].i.should.equal "\uFFFD\uFFFD"