Ensure that updates are compressed in continuous incrementing order

This commit is contained in:
James Allen 2014-02-25 12:27:42 +00:00
parent de783bf5b0
commit 8ae9bcd60f
2 changed files with 63 additions and 23 deletions

View file

@ -10,9 +10,25 @@ module.exports = HistoryManager =
MongoManager.popLastCompressedUpdate doc_id, (error, lastCompressedUpdate) -> MongoManager.popLastCompressedUpdate doc_id, (error, lastCompressedUpdate) ->
return callback(error) if error? return callback(error) if error?
# Ensure that raw updates start where lastCompressedUpdate left off
if lastCompressedUpdate?
rawUpdates = rawUpdates.slice(0)
while rawUpdates[0]? and rawUpdates[0].v <= lastCompressedUpdate.v
rawUpdates.shift()
if rawUpdates[0]? and rawUpdates[0].v != lastCompressedUpdate.v + 1
return callback new Error("Tried to apply raw op at version #{rawUpdates[0].v} to last compressed update with version #{lastCompressedUpdate.v}")
compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates
MongoManager.insertCompressedUpdates doc_id, compressedUpdates, (error) -> MongoManager.insertCompressedUpdates doc_id, compressedUpdates, (error) ->
return callback(error) if error? return callback(error) if error?
logger.log doc_id: doc_id, rawUpdatesLength: length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates" logger.log doc_id: doc_id, rawUpdatesLength: length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates"
callback() callback()
processUncompressedUpdates: (doc_id, callback = (error) ->) ->
# Get lock - here or elsewhere?
# Get batch from Redis left hand side (oldest)
# pass batch to compressAndSaveRawUpdates
# Delete batch from redis
# release lock

View file

@ -29,8 +29,8 @@ describe "HistoryManager", ->
describe "when there is no compressed history to begin with", -> describe "when there is no compressed history to begin with", ->
beforeEach -> beforeEach ->
@rawUpdates = ["mock-raw-op-1", "mock-raw-op-2"] @rawUpdates = [{ v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }]
@compressedUpdates = ["mock-compressed-op"] @compressedUpdates = { v: 13, op: "compressed-op-12" }
@MongoManager.popLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null) @MongoManager.popLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null)
@MongoManager.insertCompressedUpdates = sinon.stub().callsArg(2) @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(2)
@ -57,31 +57,55 @@ describe "HistoryManager", ->
describe "when the raw ops need appending to existing history", -> describe "when the raw ops need appending to existing history", ->
beforeEach -> beforeEach ->
@rawUpdates = ["mock-raw-op-1", "mock-raw-op-2"] @lastCompressedUpdate = { v: 11, op: "compressed-op-11" }
@lastCompressedUpdate = "mock-last-compressed-op-0" @compressedUpdates = { v: 13, op: "compressed-op-12" }
@compressedUpdates = ["mock-compressed-op-1"]
@MongoManager.popLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate) @MongoManager.popLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate)
@MongoManager.insertCompressedUpdates = sinon.stub().callsArg(2) @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(2)
@UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates)
@HistoryManager.compressAndSaveRawUpdates @doc_id, @rawUpdates, @callback
it "should try to pop the last compressed op", -> describe "when the raw ops start where the existing history ends", ->
@MongoManager.popLastCompressedUpdate beforeEach ->
.calledWith(@doc_id) @rawUpdates = [{ v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }]
.should.equal true @HistoryManager.compressAndSaveRawUpdates @doc_id, @rawUpdates, @callback
it "should compress the last compressed op and the raw ops", -> it "should try to pop the last compressed op", ->
@UpdateCompressor.compressRawUpdates @MongoManager.popLastCompressedUpdate
.calledWith(@lastCompressedUpdate, @rawUpdates) .calledWith(@doc_id)
.should.equal true .should.equal true
it "should save the compressed ops", -> it "should compress the last compressed op and the raw ops", ->
@MongoManager.insertCompressedUpdates @UpdateCompressor.compressRawUpdates
.calledWith(@doc_id, @compressedUpdates) .calledWith(@lastCompressedUpdate, @rawUpdates)
.should.equal true .should.equal true
it "should save the compressed ops", ->
@MongoManager.insertCompressedUpdates
.calledWith(@doc_id, @compressedUpdates)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "when some raw ops are passed that have already been compressed", ->
beforeEach ->
@rawUpdates = [{ v: 10, op: "mock-op-10" }, { v: 11, op: "mock-op-11"}, { v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }]
@HistoryManager.compressAndSaveRawUpdates @doc_id, @rawUpdates, @callback
it "should only compress the more recent raw ops", ->
@UpdateCompressor.compressRawUpdates
.calledWith(@lastCompressedUpdate, @rawUpdates.slice(-2))
.should.equal true
describe "when the raw ops do not follow from the last compressed op version", ->
beforeEach ->
@rawUpdates = [{ v: 13, op: "mock-op-13" }]
@HistoryManager.compressAndSaveRawUpdates @doc_id, @rawUpdates, @callback
it "should call the callback with an error", ->
@callback
.calledWith(new Error("Tried to apply raw op at version 13 to last compressed update with version 11"))
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true