mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
Delete doc_id from DocsWithHistoryOps set when processing
This commit is contained in:
parent
6aaa7ba8d5
commit
cd9cb51027
9 changed files with 46 additions and 23 deletions
|
@ -4,11 +4,12 @@ redisConf = Settings.redis?.web or {host: "localhost", port: 6379}
|
||||||
rclient = redis.createClient(redisConf.port, redisConf.host)
|
rclient = redis.createClient(redisConf.port, redisConf.host)
|
||||||
rclient.auth(redisConf.password)
|
rclient.auth(redisConf.password)
|
||||||
|
|
||||||
buildRawUpdatesKey = (doc_id) -> "UncompressedHistoryOps:#{doc_id}"
|
rawUpdatesKey = (doc_id) -> "UncompressedHistoryOps:#{doc_id}"
|
||||||
|
docsWithHistoryOpsKey = (project_id) -> "DocsWithHistoryOps:#{project_id}"
|
||||||
|
|
||||||
module.exports = RedisManager =
|
module.exports = RedisManager =
|
||||||
getOldestRawUpdates: (doc_id, batchSize, callback = (error, rawUpdates) ->) ->
|
getOldestRawUpdates: (doc_id, batchSize, callback = (error, rawUpdates) ->) ->
|
||||||
key = buildRawUpdatesKey(doc_id)
|
key = rawUpdatesKey(doc_id)
|
||||||
rclient.lrange key, 0, batchSize - 1, (error, jsonUpdates) ->
|
rclient.lrange key, 0, batchSize - 1, (error, jsonUpdates) ->
|
||||||
try
|
try
|
||||||
rawUpdates = ( JSON.parse(update) for update in jsonUpdates or [] )
|
rawUpdates = ( JSON.parse(update) for update in jsonUpdates or [] )
|
||||||
|
@ -16,6 +17,12 @@ module.exports = RedisManager =
|
||||||
return callback(e)
|
return callback(e)
|
||||||
callback null, rawUpdates
|
callback null, rawUpdates
|
||||||
|
|
||||||
deleteOldestRawUpdates: (doc_id, batchSize, callback = (error, rawUpdates) ->) ->
|
deleteOldestRawUpdates: (project_id, doc_id, batchSize, callback = (error) ->) ->
|
||||||
key = buildRawUpdatesKey(doc_id)
|
# It's ok to delete the doc_id from the set here. Even though the list
|
||||||
rclient.ltrim key, batchSize, -1, callback
|
# of updates may not be empty, we will continue to process it until it is.
|
||||||
|
multi = rclient.multi()
|
||||||
|
multi.ltrim rawUpdatesKey(doc_id), batchSize, -1
|
||||||
|
multi.srem docsWithHistoryOpsKey(project_id), doc_id
|
||||||
|
multi.exec (error, results) ->
|
||||||
|
return callback(error) if error?
|
||||||
|
callback null
|
||||||
|
|
|
@ -44,7 +44,7 @@ module.exports = UpdatesManager =
|
||||||
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, (error) ->
|
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, (error) ->
|
||||||
return callback(error) if error?
|
return callback(error) if error?
|
||||||
logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates"
|
logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates"
|
||||||
RedisManager.deleteOldestRawUpdates doc_id, length, (error) ->
|
RedisManager.deleteOldestRawUpdates project_id, doc_id, length, (error) ->
|
||||||
return callback(error) if error?
|
return callback(error) if error?
|
||||||
if length == UpdatesManager.REDIS_READ_BATCH_SIZE
|
if length == UpdatesManager.REDIS_READ_BATCH_SIZE
|
||||||
# There might be more updates
|
# There might be more updates
|
||||||
|
|
|
@ -16,7 +16,7 @@ describe "Appending doc ops to the history", ->
|
||||||
@project_id = ObjectId().toString()
|
@project_id = ObjectId().toString()
|
||||||
@doc_id = ObjectId().toString()
|
@doc_id = ObjectId().toString()
|
||||||
@user_id = ObjectId().toString()
|
@user_id = ObjectId().toString()
|
||||||
TrackChangesClient.pushRawUpdates @doc_id, [{
|
TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{
|
||||||
op: [{ i: "f", p: 3 }]
|
op: [{ i: "f", p: 3 }]
|
||||||
meta: { ts: Date.now(), user_id: @user_id }
|
meta: { ts: Date.now(), user_id: @user_id }
|
||||||
v: 3
|
v: 3
|
||||||
|
@ -48,12 +48,17 @@ describe "Appending doc ops to the history", ->
|
||||||
it "should store the project id", ->
|
it "should store the project id", ->
|
||||||
expect(@updates[0].project_id.toString()).to.equal @project_id
|
expect(@updates[0].project_id.toString()).to.equal @project_id
|
||||||
|
|
||||||
|
it "should clear the doc from the DocsWithHistoryOps set", (done) ->
|
||||||
|
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, member) ->
|
||||||
|
member.should.equal 0
|
||||||
|
done()
|
||||||
|
|
||||||
describe "when the history has already been started", ->
|
describe "when the history has already been started", ->
|
||||||
beforeEach (done) ->
|
beforeEach (done) ->
|
||||||
@project_id = ObjectId().toString()
|
@project_id = ObjectId().toString()
|
||||||
@doc_id = ObjectId().toString()
|
@doc_id = ObjectId().toString()
|
||||||
@user_id = ObjectId().toString()
|
@user_id = ObjectId().toString()
|
||||||
TrackChangesClient.pushRawUpdates @doc_id, [{
|
TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{
|
||||||
op: [{ i: "f", p: 3 }]
|
op: [{ i: "f", p: 3 }]
|
||||||
meta: { ts: Date.now(), user_id: @user_id }
|
meta: { ts: Date.now(), user_id: @user_id }
|
||||||
v: 3
|
v: 3
|
||||||
|
@ -73,7 +78,7 @@ describe "Appending doc ops to the history", ->
|
||||||
|
|
||||||
describe "when the updates are recent and from the same user", ->
|
describe "when the updates are recent and from the same user", ->
|
||||||
beforeEach (done) ->
|
beforeEach (done) ->
|
||||||
TrackChangesClient.pushRawUpdates @doc_id, [{
|
TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{
|
||||||
op: [{ i: "b", p: 6 }]
|
op: [{ i: "b", p: 6 }]
|
||||||
meta: { ts: Date.now(), user_id: @user_id }
|
meta: { ts: Date.now(), user_id: @user_id }
|
||||||
v: 6
|
v: 6
|
||||||
|
@ -103,7 +108,7 @@ describe "Appending doc ops to the history", ->
|
||||||
describe "when the updates are far apart", ->
|
describe "when the updates are far apart", ->
|
||||||
beforeEach (done) ->
|
beforeEach (done) ->
|
||||||
oneDay = 24 * 60 * 60 * 1000
|
oneDay = 24 * 60 * 60 * 1000
|
||||||
TrackChangesClient.pushRawUpdates @doc_id, [{
|
TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{
|
||||||
op: [{ i: "b", p: 6 }]
|
op: [{ i: "b", p: 6 }]
|
||||||
meta: { ts: Date.now() + oneDay, user_id: @user_id }
|
meta: { ts: Date.now() + oneDay, user_id: @user_id }
|
||||||
v: 6
|
v: 6
|
||||||
|
@ -144,7 +149,7 @@ describe "Appending doc ops to the history", ->
|
||||||
}
|
}
|
||||||
@expectedOp[0].i = "a" + @expectedOp[0].i
|
@expectedOp[0].i = "a" + @expectedOp[0].i
|
||||||
|
|
||||||
TrackChangesClient.pushRawUpdates @doc_id, updates, (error) =>
|
TrackChangesClient.pushRawUpdates @project_id, @doc_id, updates, (error) =>
|
||||||
throw error if error?
|
throw error if error?
|
||||||
TrackChangesClient.flushAndGetCompressedUpdates @project_id, @doc_id, (error, @updates) =>
|
TrackChangesClient.flushAndGetCompressedUpdates @project_id, @doc_id, (error, @updates) =>
|
||||||
throw error if error?
|
throw error if error?
|
||||||
|
@ -163,7 +168,7 @@ describe "Appending doc ops to the history", ->
|
||||||
@doc_id = ObjectId().toString()
|
@doc_id = ObjectId().toString()
|
||||||
@user_id = ObjectId().toString()
|
@user_id = ObjectId().toString()
|
||||||
oneDay = 24 * 60 * 60 * 1000
|
oneDay = 24 * 60 * 60 * 1000
|
||||||
TrackChangesClient.pushRawUpdates @doc_id, [{
|
TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{
|
||||||
op: [{ i: "f", p: 3 }, { i: "o", p: 4 }, { i: "o", p: 5 }]
|
op: [{ i: "f", p: 3 }, { i: "o", p: 4 }, { i: "o", p: 5 }]
|
||||||
meta: { ts: Date.now(), user_id: @user_id }
|
meta: { ts: Date.now(), user_id: @user_id }
|
||||||
v: 3
|
v: 3
|
||||||
|
|
|
@ -58,7 +58,7 @@ describe "Getting a diff", ->
|
||||||
lines: @lines
|
lines: @lines
|
||||||
version: 7
|
version: 7
|
||||||
|
|
||||||
TrackChangesClient.pushRawUpdates @doc_id, @updates, (error) =>
|
TrackChangesClient.pushRawUpdates @project_id, @doc_id, @updates, (error) =>
|
||||||
throw error if error?
|
throw error if error?
|
||||||
TrackChangesClient.getDiff @project_id, @doc_id, @fromVersion, @toVersion, (error, diff) =>
|
TrackChangesClient.getDiff @project_id, @doc_id, @fromVersion, @toVersion, (error, diff) =>
|
||||||
throw error if error?
|
throw error if error?
|
||||||
|
|
|
@ -41,7 +41,7 @@ describe "Getting updates", ->
|
||||||
v: 2 * i + 2
|
v: 2 * i + 2
|
||||||
}
|
}
|
||||||
|
|
||||||
TrackChangesClient.pushRawUpdates @doc_id, @updates, (error) =>
|
TrackChangesClient.pushRawUpdates @project_id, @doc_id, @updates, (error) =>
|
||||||
throw error if error?
|
throw error if error?
|
||||||
TrackChangesClient.flushUpdates @project_id, @doc_id, (error) =>
|
TrackChangesClient.flushUpdates @project_id, @doc_id, (error) =>
|
||||||
throw error if error?
|
throw error if error?
|
||||||
|
|
|
@ -52,7 +52,7 @@ describe "Restoring a version", ->
|
||||||
lines: @lines
|
lines: @lines
|
||||||
version: 7
|
version: 7
|
||||||
|
|
||||||
TrackChangesClient.pushRawUpdates @doc_id, @updates, (error) =>
|
TrackChangesClient.pushRawUpdates @project_id, @doc_id, @updates, (error) =>
|
||||||
throw error if error?
|
throw error if error?
|
||||||
TrackChangesClient.restoreDoc @project_id, @doc_id, @beforeVersion, @user_id, (error) =>
|
TrackChangesClient.restoreDoc @project_id, @doc_id, @beforeVersion, @user_id, (error) =>
|
||||||
throw error if error?
|
throw error if error?
|
||||||
|
|
|
@ -21,7 +21,9 @@ module.exports = TrackChangesClient =
|
||||||
.sort("meta.end_ts": 1)
|
.sort("meta.end_ts": 1)
|
||||||
.toArray callback
|
.toArray callback
|
||||||
|
|
||||||
pushRawUpdates: (doc_id, updates, callback = (error) ->) ->
|
pushRawUpdates: (project_id, doc_id, updates, callback = (error) ->) ->
|
||||||
|
rclient.sadd "DocsWithHistoryOps:#{project_id}", doc_id, (error) ->
|
||||||
|
return callback(error) if error?
|
||||||
rclient.rpush "UncompressedHistoryOps:#{doc_id}", (JSON.stringify(u) for u in updates)..., callback
|
rclient.rpush "UncompressedHistoryOps:#{doc_id}", (JSON.stringify(u) for u in updates)..., callback
|
||||||
|
|
||||||
getDiff: (project_id, doc_id, from, to, callback = (error, diff) ->) ->
|
getDiff: (project_id, doc_id, from, to, callback = (error, diff) ->) ->
|
||||||
|
|
|
@ -11,8 +11,10 @@ describe "RedisManager", ->
|
||||||
"redis" :
|
"redis" :
|
||||||
createClient: () => @rclient =
|
createClient: () => @rclient =
|
||||||
auth: sinon.stub()
|
auth: sinon.stub()
|
||||||
|
multi: () => @rclient
|
||||||
"settings-sharelatex": {}
|
"settings-sharelatex": {}
|
||||||
@doc_id = "doc-id-123"
|
@doc_id = "doc-id-123"
|
||||||
|
@project_id = "project-id-123"
|
||||||
@batchSize = 100
|
@batchSize = 100
|
||||||
@callback = sinon.stub()
|
@callback = sinon.stub()
|
||||||
|
|
||||||
|
@ -33,13 +35,20 @@ describe "RedisManager", ->
|
||||||
|
|
||||||
describe "deleteOldestRawUpdates", ->
|
describe "deleteOldestRawUpdates", ->
|
||||||
beforeEach ->
|
beforeEach ->
|
||||||
@rclient.ltrim = sinon.stub().callsArg(3)
|
@rclient.ltrim = sinon.stub()
|
||||||
@RedisManager.deleteOldestRawUpdates @doc_id, @batchSize, @callback
|
@rclient.srem = sinon.stub()
|
||||||
|
@rclient.exec = sinon.stub().callsArgWith(0)
|
||||||
|
@RedisManager.deleteOldestRawUpdates @project_id, @doc_id, @batchSize, @callback
|
||||||
|
|
||||||
it "should delete the updates from redis", ->
|
it "should delete the updates from redis", ->
|
||||||
@rclient.ltrim
|
@rclient.ltrim
|
||||||
.calledWith("UncompressedHistoryOps:#{@doc_id}", @batchSize, -1)
|
.calledWith("UncompressedHistoryOps:#{@doc_id}", @batchSize, -1)
|
||||||
.should.equal true
|
.should.equal true
|
||||||
|
|
||||||
|
it "should delete the doc from the set of docs with history ops", ->
|
||||||
|
@rclient.srem
|
||||||
|
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
|
||||||
|
.should.equal true
|
||||||
|
|
||||||
it "should call the callback ", ->
|
it "should call the callback ", ->
|
||||||
@callback.called.should.equal true
|
@callback.called.should.equal true
|
||||||
|
|
|
@ -125,7 +125,7 @@ describe "UpdatesManager", ->
|
||||||
@updates = ["mock-update"]
|
@updates = ["mock-update"]
|
||||||
@RedisManager.getOldestRawUpdates = sinon.stub().callsArgWith(2, null, @updates)
|
@RedisManager.getOldestRawUpdates = sinon.stub().callsArgWith(2, null, @updates)
|
||||||
@UpdatesManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(3)
|
@UpdatesManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(3)
|
||||||
@RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(2)
|
@RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(3)
|
||||||
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @callback
|
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @callback
|
||||||
|
|
||||||
it "should get the oldest updates", ->
|
it "should get the oldest updates", ->
|
||||||
|
@ -140,7 +140,7 @@ describe "UpdatesManager", ->
|
||||||
|
|
||||||
it "should delete the batch of uncompressed updates that was just processed", ->
|
it "should delete the batch of uncompressed updates that was just processed", ->
|
||||||
@RedisManager.deleteOldestRawUpdates
|
@RedisManager.deleteOldestRawUpdates
|
||||||
.calledWith(@doc_id, @updates.length)
|
.calledWith(@project_id, @doc_id, @updates.length)
|
||||||
.should.equal true
|
.should.equal true
|
||||||
|
|
||||||
it "should call the callback", ->
|
it "should call the callback", ->
|
||||||
|
@ -157,7 +157,7 @@ describe "UpdatesManager", ->
|
||||||
callback null, updates
|
callback null, updates
|
||||||
sinon.spy @RedisManager, "getOldestRawUpdates"
|
sinon.spy @RedisManager, "getOldestRawUpdates"
|
||||||
@UpdatesManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(3)
|
@UpdatesManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(3)
|
||||||
@RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(2)
|
@RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(3)
|
||||||
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, (args...) =>
|
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, (args...) =>
|
||||||
@callback(args...)
|
@callback(args...)
|
||||||
done()
|
done()
|
||||||
|
|
Loading…
Reference in a new issue