Continue to process raw updates until redis is empty

This commit is contained in:
James Allen 2014-02-26 11:18:26 +00:00
parent 8405a37c2c
commit 65360a9a2b
2 changed files with 65 additions and 21 deletions

View file

@ -32,11 +32,18 @@ module.exports = HistoryManager =
processUncompressedUpdates: (doc_id, callback = (error) ->) -> processUncompressedUpdates: (doc_id, callback = (error) ->) ->
RedisManager.getOldestRawUpdates doc_id, HistoryManager.REDIS_READ_BATCH_SIZE, (error, rawUpdates) -> RedisManager.getOldestRawUpdates doc_id, HistoryManager.REDIS_READ_BATCH_SIZE, (error, rawUpdates) ->
return callback(error) if error? return callback(error) if error?
length = rawUpdates.length
HistoryManager.compressAndSaveRawUpdates doc_id, rawUpdates, (error) -> HistoryManager.compressAndSaveRawUpdates doc_id, rawUpdates, (error) ->
return callback(error) if error? return callback(error) if error?
RedisManager.deleteOldestRawUpdates doc_id, HistoryManager.REDIS_READ_BATCH_SIZE, (error) -> RedisManager.deleteOldestRawUpdates doc_id, HistoryManager.REDIS_READ_BATCH_SIZE, (error) ->
return callback(error) if error? return callback(error) if error?
callback() if length == HistoryManager.REDIS_READ_BATCH_SIZE
# There might be more updates
setTimeout () ->
HistoryManager.processUncompressedUpdates doc_id, callback
, 0
else
callback()
processUncompressedUpdatesWithLock: (doc_id, callback = (error) ->) -> processUncompressedUpdatesWithLock: (doc_id, callback = (error) ->) ->
LockManager.runWithLock( LockManager.runWithLock(

View file

@ -112,30 +112,67 @@ describe "HistoryManager", ->
.should.equal true .should.equal true
describe "processUncompressedUpdates", -> describe "processUncompressedUpdates", ->
beforeEach -> describe "when there is fewer than one batch to send", ->
@updates = ["mock-update"] beforeEach ->
@RedisManager.getOldestRawUpdates = sinon.stub().callsArgWith(2, null, @updates) @updates = ["mock-update"]
@HistoryManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(2) @RedisManager.getOldestRawUpdates = sinon.stub().callsArgWith(2, null, @updates)
@RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(2) @HistoryManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(2)
@HistoryManager.processUncompressedUpdates @doc_id, @callback @RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(2)
@HistoryManager.processUncompressedUpdates @doc_id, @callback
it "should get the oldest updates", -> it "should get the oldest updates", ->
@RedisManager.getOldestRawUpdates @RedisManager.getOldestRawUpdates
.calledWith(@doc_id, @HistoryManager.REDIS_READ_BATCH_SIZE) .calledWith(@doc_id, @HistoryManager.REDIS_READ_BATCH_SIZE)
.should.equal true .should.equal true
it "should compress and save the updates", -> it "should compress and save the updates", ->
@HistoryManager.compressAndSaveRawUpdates @HistoryManager.compressAndSaveRawUpdates
.calledWith(@doc_id, @updates) .calledWith(@doc_id, @updates)
.should.equal true .should.equal true
it "should delete the batch of uncompressed updates that was just processed", -> it "should delete the batch of uncompressed updates that was just processed", ->
@RedisManager.deleteOldestRawUpdates @RedisManager.deleteOldestRawUpdates
.calledWith(@doc_id, @HistoryManager.REDIS_READ_BATCH_SIZE) .calledWith(@doc_id, @HistoryManager.REDIS_READ_BATCH_SIZE)
.should.equal true .should.equal true
it "should call the callback", -> it "should call the callback", ->
@callback.called.should.equal true @callback.called.should.equal true
describe "when there are multiple batches to send", ->
beforeEach (done) ->
@HistoryManager.REDIS_READ_BATCH_SIZE = 2
@updates = ["mock-update-0", "mock-update-1", "mock-update-2", "mock-update-3", "mock-update-4"]
@redisArray = @updates.slice()
@RedisManager.getOldestRawUpdates = (doc_id, batchSize, callback = (error, updates) ->) =>
updates = @redisArray.slice(0, batchSize)
@redisArray = @redisArray.slice(batchSize)
callback null, updates
sinon.spy @RedisManager, "getOldestRawUpdates"
@HistoryManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(2)
@RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(2)
@HistoryManager.processUncompressedUpdates @doc_id, (args...) =>
@callback(args...)
done()
it "should get the oldest updates in three batches ", ->
@RedisManager.getOldestRawUpdates.callCount.should.equal 3
it "should compress and save the updates in batches", ->
@HistoryManager.compressAndSaveRawUpdates
.calledWith(@doc_id, @updates.slice(0,2))
.should.equal true
@HistoryManager.compressAndSaveRawUpdates
.calledWith(@doc_id, @updates.slice(2,4))
.should.equal true
@HistoryManager.compressAndSaveRawUpdates
.calledWith(@doc_id, @updates.slice(4,5))
.should.equal true
it "should delete the batches of uncompressed updates", ->
@RedisManager.deleteOldestRawUpdates.callCount.should.equal 3
it "should call the callback", ->
@callback.called.should.equal true
describe "processCompressedUpdatesWithLock", -> describe "processCompressedUpdatesWithLock", ->
beforeEach -> beforeEach ->