remove old processUncompressedUpdates method

replace with new per doc method
This commit is contained in:
Brian Gough 2017-03-22 16:59:52 +00:00
parent b1c0ebbaae
commit bc7815f7fc
2 changed files with 16 additions and 23 deletions

View file

@ -68,14 +68,6 @@ module.exports = UpdatesManager =
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result? logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result?
callback() callback()
REDIS_READ_BATCH_SIZE: 100
processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) ->
UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) ->
return callback(error) if error?
UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) ->
return callback(error) if error?
UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback
# Check whether the updates are temporary (per-project property) # Check whether the updates are temporary (per-project property)
_prepareProjectForUpdates: (project_id, callback = (error, temporary) ->) -> _prepareProjectForUpdates: (project_id, callback = (error, temporary) ->) ->
UpdateTrimmer.shouldTrimUpdates project_id, (error, temporary) -> UpdateTrimmer.shouldTrimUpdates project_id, (error, temporary) ->
@ -89,7 +81,8 @@ module.exports = UpdatesManager =
callback(null) callback(null)
# Apply updates for specific project/doc after preparing at project and doc level # Apply updates for specific project/doc after preparing at project and doc level
_processUncompressedUpdates: (project_id, doc_id, temporary, callback = (error) ->) -> REDIS_READ_BATCH_SIZE: 100
processUncompressedUpdates: (project_id, doc_id, temporary, callback = (error) ->) ->
# get the updates as strings from redis (so we can delete them after they are applied) # get the updates as strings from redis (so we can delete them after they are applied)
RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) -> RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) ->
return callback(error) if error? return callback(error) if error?
@ -108,7 +101,7 @@ module.exports = UpdatesManager =
# There might be more updates # There might be more updates
logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates"
setTimeout () -> setTimeout () ->
UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, callback
, 0 , 0
else else
logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed"
@ -128,7 +121,7 @@ module.exports = UpdatesManager =
LockManager.runWithLock( LockManager.runWithLock(
"HistoryLock:#{doc_id}", "HistoryLock:#{doc_id}",
(releaseLock) -> (releaseLock) ->
UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, releaseLock UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, releaseLock
callback callback
) )

View file

@ -182,17 +182,7 @@ describe "UpdatesManager", ->
@updates = ["mock-update"] @updates = ["mock-update"]
@RedisManager.getOldestDocUpdates = sinon.stub().callsArgWith(2, null, @updates) @RedisManager.getOldestDocUpdates = sinon.stub().callsArgWith(2, null, @updates)
@RedisManager.expandDocUpdates = sinon.stub().callsArgWith(1, null, @updates) @RedisManager.expandDocUpdates = sinon.stub().callsArgWith(1, null, @updates)
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @callback @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @temporary, @callback
it "should check if the updates are temporary", ->
@UpdateTrimmer.shouldTrimUpdates
.calledWith(@project_id)
.should.equal true
it "should backport the project id", ->
@MongoManager.backportProjectId
.calledWith(@project_id, @doc_id)
.should.equal true
it "should get the oldest updates", -> it "should get the oldest updates", ->
@RedisManager.getOldestDocUpdates @RedisManager.getOldestDocUpdates
@ -225,7 +215,7 @@ describe "UpdatesManager", ->
@RedisManager.expandDocUpdates = (jsonUpdates, callback) => @RedisManager.expandDocUpdates = (jsonUpdates, callback) =>
callback null, jsonUpdates callback null, jsonUpdates
sinon.spy @RedisManager, "expandDocUpdates" sinon.spy @RedisManager, "expandDocUpdates"
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, (args...) => @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @temporary, (args...) =>
@callback(args...) @callback(args...)
done() done()
@ -257,6 +247,16 @@ describe "UpdatesManager", ->
@LockManager.runWithLock = sinon.stub().callsArg(2) @LockManager.runWithLock = sinon.stub().callsArg(2)
@UpdatesManager.processUncompressedUpdatesWithLock @project_id, @doc_id, @callback @UpdatesManager.processUncompressedUpdatesWithLock @project_id, @doc_id, @callback
it "should check if the updates are temporary", ->
@UpdateTrimmer.shouldTrimUpdates
.calledWith(@project_id)
.should.equal true
it "should backport the project id", ->
@MongoManager.backportProjectId
.calledWith(@project_id, @doc_id)
.should.equal true
it "should run processUncompressedUpdates with the lock", -> it "should run processUncompressedUpdates with the lock", ->
@LockManager.runWithLock @LockManager.runWithLock
.calledWith( .calledWith(