diff --git a/services/track-changes/app/coffee/RedisManager.coffee b/services/track-changes/app/coffee/RedisManager.coffee index eec6ce5086..6cc0f206be 100644 --- a/services/track-changes/app/coffee/RedisManager.coffee +++ b/services/track-changes/app/coffee/RedisManager.coffee @@ -6,20 +6,24 @@ rawUpdatesKey = (doc_id) -> "UncompressedHistoryOps:#{doc_id}" docsWithHistoryOpsKey = (project_id) -> "DocsWithHistoryOps:#{project_id}" module.exports = RedisManager = - getOldestRawUpdates: (doc_id, batchSize, callback = (error, rawUpdates) ->) -> - key = rawUpdatesKey(doc_id) - rclient.lrange key, 0, batchSize - 1, (error, jsonUpdates) -> - try - rawUpdates = ( JSON.parse(update) for update in jsonUpdates or [] ) - catch e - return callback(e) - callback null, rawUpdates - deleteOldestRawUpdates: (project_id, doc_id, batchSize, callback = (error) ->) -> + getOldestDocUpdates: (doc_id, batchSize, callback = (error, jsonUpdates) ->) -> + key = rawUpdatesKey(doc_id) + rclient.lrange key, 0, batchSize - 1, callback + + expandDocUpdates: (jsonUpdates, callback = (error, rawUpdates) ->) -> + try + rawUpdates = ( JSON.parse(update) for update in jsonUpdates or [] ) + catch e + return callback(e) + callback null, rawUpdates + + deleteAppliedDocUpdates: (project_id, doc_id, docUpdates, callback = (error) ->) -> # It's ok to delete the doc_id from the set here. Even though the list # of updates may not be empty, we will continue to process it until it is. multi = rclient.multi() - multi.ltrim rawUpdatesKey(doc_id), batchSize, -1 + for update in docUpdates or [] + multi.lrem rawUpdatesKey(doc_id), 0, update multi.srem docsWithHistoryOpsKey(project_id), doc_id multi.exec (error, results) -> return callback(error) if error? diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index 4703ff6a1d..b46e4ccffa 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -59,23 +59,25 @@ module.exports = UpdatesManager = return callback(error) if error? MongoManager.backportProjectId project_id, doc_id, (error) -> return callback(error) if error? - RedisManager.getOldestRawUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, rawUpdates) -> + RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) -> return callback(error) if error? - length = rawUpdates.length - UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> + length = docUpdates.length + RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) -> return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates" - RedisManager.deleteOldestRawUpdates project_id, doc_id, length, (error) -> + UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> return callback(error) if error? - if length == UpdatesManager.REDIS_READ_BATCH_SIZE - # There might be more updates - logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" - setTimeout () -> - UpdatesManager.processUncompressedUpdates project_id, doc_id, callback - , 0 - else - logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" - callback() + logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates" + RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) -> + return callback(error) if error? + if length == UpdatesManager.REDIS_READ_BATCH_SIZE + # There might be more updates + logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates" + setTimeout () -> + UpdatesManager.processUncompressedUpdates project_id, doc_id, callback + , 0 + else + logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed" + callback() processUncompressedUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> LockManager.runWithLock( diff --git a/services/track-changes/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/track-changes/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 7293892be0..12f78fd4aa 100644 --- a/services/track-changes/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -20,40 +20,54 @@ describe "RedisManager", -> @batchSize = 100 @callback = sinon.stub() - describe "getOldestRawUpdates", -> + describe "getOldestDocUpdates", -> beforeEach -> @rawUpdates = [ {v: 42, op: "mock-op-42"}, { v: 45, op: "mock-op-45" }] @jsonUpdates = (JSON.stringify(update) for update in @rawUpdates) @rclient.lrange = sinon.stub().callsArgWith(3, null, @jsonUpdates) - @RedisManager.getOldestRawUpdates @doc_id, @batchSize, @callback + @RedisManager.getOldestDocUpdates @doc_id, @batchSize, @callback it "should read the updates from redis", -> @rclient.lrange .calledWith("UncompressedHistoryOps:#{@doc_id}", 0, @batchSize - 1) .should.equal true - it "should call the callback with the parsed ops", -> - @callback.calledWith(null, @rawUpdates).should.equal true + it "should call the callback with the unparsed ops", -> + @callback.calledWith(null, @jsonUpdates).should.equal true - describe "deleteOldestRawUpdates", -> - beforeEach -> - @rclient.ltrim = sinon.stub() - @rclient.srem = sinon.stub() - @rclient.exec = sinon.stub().callsArgWith(0) - @RedisManager.deleteOldestRawUpdates @project_id, @doc_id, @batchSize, @callback - it "should delete the updates from redis", -> - @rclient.ltrim - .calledWith("UncompressedHistoryOps:#{@doc_id}", @batchSize, -1) - .should.equal true + describe "expandDocUpdates", -> + beforeEach -> + @RedisManager.expandDocUpdates @jsonUpdates, @callback - it "should delete the doc from the set of docs with history ops", -> - @rclient.srem - .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) - .should.equal true + it "should call the callback with the parsed ops", -> + @callback.calledWith(null, @rawUpdates).should.equal true - it "should call the callback ", -> - @callback.called.should.equal true + + describe "deleteAppliedDocUpdates", -> + beforeEach -> + @rclient.lrem = sinon.stub() + @rclient.srem = sinon.stub() + @rclient.exec = sinon.stub().callsArgWith(0) + @RedisManager.deleteAppliedDocUpdates @project_id, @doc_id, @jsonUpdates, @callback + + it "should delete the first update from redis", -> + @rclient.lrem + .calledWith("UncompressedHistoryOps:#{@doc_id}", 0, @jsonUpdates[0]) + .should.equal true + + it "should delete the second update from redis", -> + @rclient.lrem + .calledWith("UncompressedHistoryOps:#{@doc_id}", 0, @jsonUpdates[1]) + .should.equal true + + it "should delete the doc from the set of docs with history ops", -> + @rclient.srem + .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) + .should.equal true + + it "should call the callback ", -> + @callback.called.should.equal true describe "getDocIdsWithHistoryOps", -> beforeEach -> @@ -67,4 +81,4 @@ describe "RedisManager", -> .should.equal true it "should call the callback with the doc_ids", -> - @callback.calledWith(null, @doc_ids).should.equal true \ No newline at end of file + @callback.calledWith(null, @doc_ids).should.equal true diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 04a2111893..f1bfc13288 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -158,14 +158,15 @@ describe "UpdatesManager", -> describe "processUncompressedUpdates", -> beforeEach -> @UpdatesManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(4) - @RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(3) + @RedisManager.deleteAppliedDocUpdates = sinon.stub().callsArg(3) @MongoManager.backportProjectId = sinon.stub().callsArg(2) @UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock") describe "when there is fewer than one batch to send", -> beforeEach -> @updates = ["mock-update"] - @RedisManager.getOldestRawUpdates = sinon.stub().callsArgWith(2, null, @updates) + @RedisManager.getOldestDocUpdates = sinon.stub().callsArgWith(2, null, @updates) + @RedisManager.expandDocUpdates = sinon.stub().callsArgWith(1, null, @updates) @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @callback it "should check if the updates are temporary", -> @@ -179,7 +180,7 @@ describe "UpdatesManager", -> .should.equal true it "should get the oldest updates", -> - @RedisManager.getOldestRawUpdates + @RedisManager.getOldestDocUpdates .calledWith(@doc_id, @UpdatesManager.REDIS_READ_BATCH_SIZE) .should.equal true @@ -189,8 +190,8 @@ describe "UpdatesManager", -> .should.equal true it "should delete the batch of uncompressed updates that was just processed", -> - @RedisManager.deleteOldestRawUpdates - .calledWith(@project_id, @doc_id, @updates.length) + @RedisManager.deleteAppliedDocUpdates + .calledWith(@project_id, @doc_id, @updates) .should.equal true it "should call the callback", -> @@ -201,17 +202,20 @@ describe "UpdatesManager", -> @UpdatesManager.REDIS_READ_BATCH_SIZE = 2 @updates = ["mock-update-0", "mock-update-1", "mock-update-2", "mock-update-3", "mock-update-4"] @redisArray = @updates.slice() - @RedisManager.getOldestRawUpdates = (doc_id, batchSize, callback = (error, updates) ->) => + @RedisManager.getOldestDocUpdates = (doc_id, batchSize, callback = (error, updates) ->) => updates = @redisArray.slice(0, batchSize) @redisArray = @redisArray.slice(batchSize) callback null, updates - sinon.spy @RedisManager, "getOldestRawUpdates" + sinon.spy @RedisManager, "getOldestDocUpdates" + @RedisManager.expandDocUpdates = (jsonUpdates, callback) => + callback null, jsonUpdates + sinon.spy @RedisManager, "expandDocUpdates" @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, (args...) => @callback(args...) done() it "should get the oldest updates in three batches ", -> - @RedisManager.getOldestRawUpdates.callCount.should.equal 3 + @RedisManager.getOldestDocUpdates.callCount.should.equal 3 it "should compress and save the updates in batches", -> @UpdatesManager.compressAndSaveRawUpdates @@ -225,7 +229,7 @@ describe "UpdatesManager", -> .should.equal true it "should delete the batches of uncompressed updates", -> - @RedisManager.deleteOldestRawUpdates.callCount.should.equal 3 + @RedisManager.deleteAppliedDocUpdates.callCount.should.equal 3 it "should call the callback", -> @callback.called.should.equal true