mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-07 20:31:06 -05:00
only delete the applied ops from redis
This commit is contained in:
parent
992857d6a2
commit
e65549099c
4 changed files with 78 additions and 54 deletions
|
@ -6,20 +6,24 @@ rawUpdatesKey = (doc_id) -> "UncompressedHistoryOps:#{doc_id}"
|
|||
docsWithHistoryOpsKey = (project_id) -> "DocsWithHistoryOps:#{project_id}"
|
||||
|
||||
module.exports = RedisManager =
|
||||
getOldestRawUpdates: (doc_id, batchSize, callback = (error, rawUpdates) ->) ->
|
||||
key = rawUpdatesKey(doc_id)
|
||||
rclient.lrange key, 0, batchSize - 1, (error, jsonUpdates) ->
|
||||
try
|
||||
rawUpdates = ( JSON.parse(update) for update in jsonUpdates or [] )
|
||||
catch e
|
||||
return callback(e)
|
||||
callback null, rawUpdates
|
||||
|
||||
deleteOldestRawUpdates: (project_id, doc_id, batchSize, callback = (error) ->) ->
|
||||
getOldestDocUpdates: (doc_id, batchSize, callback = (error, jsonUpdates) ->) ->
|
||||
key = rawUpdatesKey(doc_id)
|
||||
rclient.lrange key, 0, batchSize - 1, callback
|
||||
|
||||
expandDocUpdates: (jsonUpdates, callback = (error, rawUpdates) ->) ->
|
||||
try
|
||||
rawUpdates = ( JSON.parse(update) for update in jsonUpdates or [] )
|
||||
catch e
|
||||
return callback(e)
|
||||
callback null, rawUpdates
|
||||
|
||||
deleteAppliedDocUpdates: (project_id, doc_id, docUpdates, callback = (error) ->) ->
|
||||
# It's ok to delete the doc_id from the set here. Even though the list
|
||||
# of updates may not be empty, we will continue to process it until it is.
|
||||
multi = rclient.multi()
|
||||
multi.ltrim rawUpdatesKey(doc_id), batchSize, -1
|
||||
for update in docUpdates or []
|
||||
multi.lrem rawUpdatesKey(doc_id), 0, update
|
||||
multi.srem docsWithHistoryOpsKey(project_id), doc_id
|
||||
multi.exec (error, results) ->
|
||||
return callback(error) if error?
|
||||
|
|
|
@ -59,23 +59,25 @@ module.exports = UpdatesManager =
|
|||
return callback(error) if error?
|
||||
MongoManager.backportProjectId project_id, doc_id, (error) ->
|
||||
return callback(error) if error?
|
||||
RedisManager.getOldestRawUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, rawUpdates) ->
|
||||
RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) ->
|
||||
return callback(error) if error?
|
||||
length = rawUpdates.length
|
||||
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) ->
|
||||
length = docUpdates.length
|
||||
RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) ->
|
||||
return callback(error) if error?
|
||||
logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates"
|
||||
RedisManager.deleteOldestRawUpdates project_id, doc_id, length, (error) ->
|
||||
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) ->
|
||||
return callback(error) if error?
|
||||
if length == UpdatesManager.REDIS_READ_BATCH_SIZE
|
||||
# There might be more updates
|
||||
logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates"
|
||||
setTimeout () ->
|
||||
UpdatesManager.processUncompressedUpdates project_id, doc_id, callback
|
||||
, 0
|
||||
else
|
||||
logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed"
|
||||
callback()
|
||||
logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates"
|
||||
RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) ->
|
||||
return callback(error) if error?
|
||||
if length == UpdatesManager.REDIS_READ_BATCH_SIZE
|
||||
# There might be more updates
|
||||
logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates"
|
||||
setTimeout () ->
|
||||
UpdatesManager.processUncompressedUpdates project_id, doc_id, callback
|
||||
, 0
|
||||
else
|
||||
logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed"
|
||||
callback()
|
||||
|
||||
processUncompressedUpdatesWithLock: (project_id, doc_id, callback = (error) ->) ->
|
||||
LockManager.runWithLock(
|
||||
|
|
|
@ -20,40 +20,54 @@ describe "RedisManager", ->
|
|||
@batchSize = 100
|
||||
@callback = sinon.stub()
|
||||
|
||||
describe "getOldestRawUpdates", ->
|
||||
describe "getOldestDocUpdates", ->
|
||||
beforeEach ->
|
||||
@rawUpdates = [ {v: 42, op: "mock-op-42"}, { v: 45, op: "mock-op-45" }]
|
||||
@jsonUpdates = (JSON.stringify(update) for update in @rawUpdates)
|
||||
@rclient.lrange = sinon.stub().callsArgWith(3, null, @jsonUpdates)
|
||||
@RedisManager.getOldestRawUpdates @doc_id, @batchSize, @callback
|
||||
@RedisManager.getOldestDocUpdates @doc_id, @batchSize, @callback
|
||||
|
||||
it "should read the updates from redis", ->
|
||||
@rclient.lrange
|
||||
.calledWith("UncompressedHistoryOps:#{@doc_id}", 0, @batchSize - 1)
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback with the parsed ops", ->
|
||||
@callback.calledWith(null, @rawUpdates).should.equal true
|
||||
it "should call the callback with the unparsed ops", ->
|
||||
@callback.calledWith(null, @jsonUpdates).should.equal true
|
||||
|
||||
describe "deleteOldestRawUpdates", ->
|
||||
beforeEach ->
|
||||
@rclient.ltrim = sinon.stub()
|
||||
@rclient.srem = sinon.stub()
|
||||
@rclient.exec = sinon.stub().callsArgWith(0)
|
||||
@RedisManager.deleteOldestRawUpdates @project_id, @doc_id, @batchSize, @callback
|
||||
|
||||
it "should delete the updates from redis", ->
|
||||
@rclient.ltrim
|
||||
.calledWith("UncompressedHistoryOps:#{@doc_id}", @batchSize, -1)
|
||||
.should.equal true
|
||||
describe "expandDocUpdates", ->
|
||||
beforeEach ->
|
||||
@RedisManager.expandDocUpdates @jsonUpdates, @callback
|
||||
|
||||
it "should delete the doc from the set of docs with history ops", ->
|
||||
@rclient.srem
|
||||
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
|
||||
.should.equal true
|
||||
it "should call the callback with the parsed ops", ->
|
||||
@callback.calledWith(null, @rawUpdates).should.equal true
|
||||
|
||||
it "should call the callback ", ->
|
||||
@callback.called.should.equal true
|
||||
|
||||
describe "deleteAppliedDocUpdates", ->
|
||||
beforeEach ->
|
||||
@rclient.lrem = sinon.stub()
|
||||
@rclient.srem = sinon.stub()
|
||||
@rclient.exec = sinon.stub().callsArgWith(0)
|
||||
@RedisManager.deleteAppliedDocUpdates @project_id, @doc_id, @jsonUpdates, @callback
|
||||
|
||||
it "should delete the first update from redis", ->
|
||||
@rclient.lrem
|
||||
.calledWith("UncompressedHistoryOps:#{@doc_id}", 0, @jsonUpdates[0])
|
||||
.should.equal true
|
||||
|
||||
it "should delete the second update from redis", ->
|
||||
@rclient.lrem
|
||||
.calledWith("UncompressedHistoryOps:#{@doc_id}", 0, @jsonUpdates[1])
|
||||
.should.equal true
|
||||
|
||||
it "should delete the doc from the set of docs with history ops", ->
|
||||
@rclient.srem
|
||||
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback ", ->
|
||||
@callback.called.should.equal true
|
||||
|
||||
describe "getDocIdsWithHistoryOps", ->
|
||||
beforeEach ->
|
||||
|
@ -67,4 +81,4 @@ describe "RedisManager", ->
|
|||
.should.equal true
|
||||
|
||||
it "should call the callback with the doc_ids", ->
|
||||
@callback.calledWith(null, @doc_ids).should.equal true
|
||||
@callback.calledWith(null, @doc_ids).should.equal true
|
||||
|
|
|
@ -158,14 +158,15 @@ describe "UpdatesManager", ->
|
|||
describe "processUncompressedUpdates", ->
|
||||
beforeEach ->
|
||||
@UpdatesManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(4)
|
||||
@RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(3)
|
||||
@RedisManager.deleteAppliedDocUpdates = sinon.stub().callsArg(3)
|
||||
@MongoManager.backportProjectId = sinon.stub().callsArg(2)
|
||||
@UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock")
|
||||
|
||||
describe "when there is fewer than one batch to send", ->
|
||||
beforeEach ->
|
||||
@updates = ["mock-update"]
|
||||
@RedisManager.getOldestRawUpdates = sinon.stub().callsArgWith(2, null, @updates)
|
||||
@RedisManager.getOldestDocUpdates = sinon.stub().callsArgWith(2, null, @updates)
|
||||
@RedisManager.expandDocUpdates = sinon.stub().callsArgWith(1, null, @updates)
|
||||
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @callback
|
||||
|
||||
it "should check if the updates are temporary", ->
|
||||
|
@ -179,7 +180,7 @@ describe "UpdatesManager", ->
|
|||
.should.equal true
|
||||
|
||||
it "should get the oldest updates", ->
|
||||
@RedisManager.getOldestRawUpdates
|
||||
@RedisManager.getOldestDocUpdates
|
||||
.calledWith(@doc_id, @UpdatesManager.REDIS_READ_BATCH_SIZE)
|
||||
.should.equal true
|
||||
|
||||
|
@ -189,8 +190,8 @@ describe "UpdatesManager", ->
|
|||
.should.equal true
|
||||
|
||||
it "should delete the batch of uncompressed updates that was just processed", ->
|
||||
@RedisManager.deleteOldestRawUpdates
|
||||
.calledWith(@project_id, @doc_id, @updates.length)
|
||||
@RedisManager.deleteAppliedDocUpdates
|
||||
.calledWith(@project_id, @doc_id, @updates)
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback", ->
|
||||
|
@ -201,17 +202,20 @@ describe "UpdatesManager", ->
|
|||
@UpdatesManager.REDIS_READ_BATCH_SIZE = 2
|
||||
@updates = ["mock-update-0", "mock-update-1", "mock-update-2", "mock-update-3", "mock-update-4"]
|
||||
@redisArray = @updates.slice()
|
||||
@RedisManager.getOldestRawUpdates = (doc_id, batchSize, callback = (error, updates) ->) =>
|
||||
@RedisManager.getOldestDocUpdates = (doc_id, batchSize, callback = (error, updates) ->) =>
|
||||
updates = @redisArray.slice(0, batchSize)
|
||||
@redisArray = @redisArray.slice(batchSize)
|
||||
callback null, updates
|
||||
sinon.spy @RedisManager, "getOldestRawUpdates"
|
||||
sinon.spy @RedisManager, "getOldestDocUpdates"
|
||||
@RedisManager.expandDocUpdates = (jsonUpdates, callback) =>
|
||||
callback null, jsonUpdates
|
||||
sinon.spy @RedisManager, "expandDocUpdates"
|
||||
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, (args...) =>
|
||||
@callback(args...)
|
||||
done()
|
||||
|
||||
it "should get the oldest updates in three batches ", ->
|
||||
@RedisManager.getOldestRawUpdates.callCount.should.equal 3
|
||||
@RedisManager.getOldestDocUpdates.callCount.should.equal 3
|
||||
|
||||
it "should compress and save the updates in batches", ->
|
||||
@UpdatesManager.compressAndSaveRawUpdates
|
||||
|
@ -225,7 +229,7 @@ describe "UpdatesManager", ->
|
|||
.should.equal true
|
||||
|
||||
it "should delete the batches of uncompressed updates", ->
|
||||
@RedisManager.deleteOldestRawUpdates.callCount.should.equal 3
|
||||
@RedisManager.deleteAppliedDocUpdates.callCount.should.equal 3
|
||||
|
||||
it "should call the callback", ->
|
||||
@callback.called.should.equal true
|
||||
|
|
Loading…
Reference in a new issue