diff --git a/services/track-changes/app/coffee/RedisManager.coffee b/services/track-changes/app/coffee/RedisManager.coffee index 81a1375bf6..98f310a9f8 100644 --- a/services/track-changes/app/coffee/RedisManager.coffee +++ b/services/track-changes/app/coffee/RedisManager.coffee @@ -4,11 +4,12 @@ redisConf = Settings.redis?.web or {host: "localhost", port: 6379} rclient = redis.createClient(redisConf.port, redisConf.host) rclient.auth(redisConf.password) -buildRawUpdatesKey = (doc_id) -> "UncompressedHistoryOps:#{doc_id}" +rawUpdatesKey = (doc_id) -> "UncompressedHistoryOps:#{doc_id}" +docsWithHistoryOpsKey = (project_id) -> "DocsWithHistoryOps:#{project_id}" module.exports = RedisManager = getOldestRawUpdates: (doc_id, batchSize, callback = (error, rawUpdates) ->) -> - key = buildRawUpdatesKey(doc_id) + key = rawUpdatesKey(doc_id) rclient.lrange key, 0, batchSize - 1, (error, jsonUpdates) -> try rawUpdates = ( JSON.parse(update) for update in jsonUpdates or [] ) @@ -16,6 +17,12 @@ module.exports = RedisManager = return callback(e) callback null, rawUpdates - deleteOldestRawUpdates: (doc_id, batchSize, callback = (error, rawUpdates) ->) -> - key = buildRawUpdatesKey(doc_id) - rclient.ltrim key, batchSize, -1, callback + deleteOldestRawUpdates: (project_id, doc_id, batchSize, callback = (error) ->) -> + # It's ok to delete the doc_id from the set here. Even though the list + # of updates may not be empty, we will continue to process it until it is. + multi = rclient.multi() + multi.ltrim rawUpdatesKey(doc_id), batchSize, -1 + multi.srem docsWithHistoryOpsKey(project_id), doc_id + multi.exec (error, results) -> + return callback(error) if error? + callback null diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index 839c74fe6d..e582108e29 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -44,7 +44,7 @@ module.exports = UpdatesManager = UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, (error) -> return callback(error) if error? logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates" - RedisManager.deleteOldestRawUpdates doc_id, length, (error) -> + RedisManager.deleteOldestRawUpdates project_id, doc_id, length, (error) -> return callback(error) if error? if length == UpdatesManager.REDIS_READ_BATCH_SIZE # There might be more updates diff --git a/services/track-changes/test/acceptance/coffee/AppendingUpdatesTests.coffee b/services/track-changes/test/acceptance/coffee/AppendingUpdatesTests.coffee index 1fa4166f60..7a8d465504 100644 --- a/services/track-changes/test/acceptance/coffee/AppendingUpdatesTests.coffee +++ b/services/track-changes/test/acceptance/coffee/AppendingUpdatesTests.coffee @@ -16,7 +16,7 @@ describe "Appending doc ops to the history", -> @project_id = ObjectId().toString() @doc_id = ObjectId().toString() @user_id = ObjectId().toString() - TrackChangesClient.pushRawUpdates @doc_id, [{ + TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{ op: [{ i: "f", p: 3 }] meta: { ts: Date.now(), user_id: @user_id } v: 3 @@ -48,12 +48,17 @@ describe "Appending doc ops to the history", -> it "should store the project id", -> expect(@updates[0].project_id.toString()).to.equal @project_id + it "should clear the doc from the DocsWithHistoryOps set", (done) -> + rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, member) -> + member.should.equal 0 + done() + describe "when the history has already been started", -> beforeEach (done) -> @project_id = ObjectId().toString() @doc_id = ObjectId().toString() @user_id = ObjectId().toString() - TrackChangesClient.pushRawUpdates @doc_id, [{ + TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{ op: [{ i: "f", p: 3 }] meta: { ts: Date.now(), user_id: @user_id } v: 3 @@ -73,7 +78,7 @@ describe "Appending doc ops to the history", -> describe "when the updates are recent and from the same user", -> beforeEach (done) -> - TrackChangesClient.pushRawUpdates @doc_id, [{ + TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{ op: [{ i: "b", p: 6 }] meta: { ts: Date.now(), user_id: @user_id } v: 6 @@ -103,7 +108,7 @@ describe "Appending doc ops to the history", -> describe "when the updates are far apart", -> beforeEach (done) -> oneDay = 24 * 60 * 60 * 1000 - TrackChangesClient.pushRawUpdates @doc_id, [{ + TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{ op: [{ i: "b", p: 6 }] meta: { ts: Date.now() + oneDay, user_id: @user_id } v: 6 @@ -144,7 +149,7 @@ describe "Appending doc ops to the history", -> } @expectedOp[0].i = "a" + @expectedOp[0].i - TrackChangesClient.pushRawUpdates @doc_id, updates, (error) => + TrackChangesClient.pushRawUpdates @project_id, @doc_id, updates, (error) => throw error if error? TrackChangesClient.flushAndGetCompressedUpdates @project_id, @doc_id, (error, @updates) => throw error if error? @@ -163,7 +168,7 @@ describe "Appending doc ops to the history", -> @doc_id = ObjectId().toString() @user_id = ObjectId().toString() oneDay = 24 * 60 * 60 * 1000 - TrackChangesClient.pushRawUpdates @doc_id, [{ + TrackChangesClient.pushRawUpdates @project_id, @doc_id, [{ op: [{ i: "f", p: 3 }, { i: "o", p: 4 }, { i: "o", p: 5 }] meta: { ts: Date.now(), user_id: @user_id } v: 3 diff --git a/services/track-changes/test/acceptance/coffee/GettingADiffTests.coffee b/services/track-changes/test/acceptance/coffee/GettingADiffTests.coffee index 6fe573c19e..7561b68673 100644 --- a/services/track-changes/test/acceptance/coffee/GettingADiffTests.coffee +++ b/services/track-changes/test/acceptance/coffee/GettingADiffTests.coffee @@ -58,7 +58,7 @@ describe "Getting a diff", -> lines: @lines version: 7 - TrackChangesClient.pushRawUpdates @doc_id, @updates, (error) => + TrackChangesClient.pushRawUpdates @project_id, @doc_id, @updates, (error) => throw error if error? TrackChangesClient.getDiff @project_id, @doc_id, @fromVersion, @toVersion, (error, diff) => throw error if error? diff --git a/services/track-changes/test/acceptance/coffee/GettingUpdatesTests.coffee b/services/track-changes/test/acceptance/coffee/GettingUpdatesTests.coffee index 13e70162da..8319c80456 100644 --- a/services/track-changes/test/acceptance/coffee/GettingUpdatesTests.coffee +++ b/services/track-changes/test/acceptance/coffee/GettingUpdatesTests.coffee @@ -41,7 +41,7 @@ describe "Getting updates", -> v: 2 * i + 2 } - TrackChangesClient.pushRawUpdates @doc_id, @updates, (error) => + TrackChangesClient.pushRawUpdates @project_id, @doc_id, @updates, (error) => throw error if error? TrackChangesClient.flushUpdates @project_id, @doc_id, (error) => throw error if error? diff --git a/services/track-changes/test/acceptance/coffee/RestoringVersions.coffee b/services/track-changes/test/acceptance/coffee/RestoringVersions.coffee index 03f4c8b8f7..7e3c29a0e7 100644 --- a/services/track-changes/test/acceptance/coffee/RestoringVersions.coffee +++ b/services/track-changes/test/acceptance/coffee/RestoringVersions.coffee @@ -52,7 +52,7 @@ describe "Restoring a version", -> lines: @lines version: 7 - TrackChangesClient.pushRawUpdates @doc_id, @updates, (error) => + TrackChangesClient.pushRawUpdates @project_id, @doc_id, @updates, (error) => throw error if error? TrackChangesClient.restoreDoc @project_id, @doc_id, @beforeVersion, @user_id, (error) => throw error if error? diff --git a/services/track-changes/test/acceptance/coffee/helpers/TrackChangesClient.coffee b/services/track-changes/test/acceptance/coffee/helpers/TrackChangesClient.coffee index f9e3a42e2f..dea3a885d5 100644 --- a/services/track-changes/test/acceptance/coffee/helpers/TrackChangesClient.coffee +++ b/services/track-changes/test/acceptance/coffee/helpers/TrackChangesClient.coffee @@ -21,8 +21,10 @@ module.exports = TrackChangesClient = .sort("meta.end_ts": 1) .toArray callback - pushRawUpdates: (doc_id, updates, callback = (error) ->) -> - rclient.rpush "UncompressedHistoryOps:#{doc_id}", (JSON.stringify(u) for u in updates)..., callback + pushRawUpdates: (project_id, doc_id, updates, callback = (error) ->) -> + rclient.sadd "DocsWithHistoryOps:#{project_id}", doc_id, (error) -> + return callback(error) if error? + rclient.rpush "UncompressedHistoryOps:#{doc_id}", (JSON.stringify(u) for u in updates)..., callback getDiff: (project_id, doc_id, from, to, callback = (error, diff) ->) -> request.get { diff --git a/services/track-changes/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/track-changes/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 669e64103d..27c829a948 100644 --- a/services/track-changes/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -11,8 +11,10 @@ describe "RedisManager", -> "redis" : createClient: () => @rclient = auth: sinon.stub() + multi: () => @rclient "settings-sharelatex": {} @doc_id = "doc-id-123" + @project_id = "project-id-123" @batchSize = 100 @callback = sinon.stub() @@ -33,13 +35,20 @@ describe "RedisManager", -> describe "deleteOldestRawUpdates", -> beforeEach -> - @rclient.ltrim = sinon.stub().callsArg(3) - @RedisManager.deleteOldestRawUpdates @doc_id, @batchSize, @callback + @rclient.ltrim = sinon.stub() + @rclient.srem = sinon.stub() + @rclient.exec = sinon.stub().callsArgWith(0) + @RedisManager.deleteOldestRawUpdates @project_id, @doc_id, @batchSize, @callback it "should delete the updates from redis", -> @rclient.ltrim .calledWith("UncompressedHistoryOps:#{@doc_id}", @batchSize, -1) .should.equal true - it "should call the callback", -> + it "should delete the doc from the set of docs with history ops", -> + @rclient.srem + .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) + .should.equal true + + it "should call the callback ", -> @callback.called.should.equal true diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 59e1293ac4..c6abb6a7b6 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -125,7 +125,7 @@ describe "UpdatesManager", -> @updates = ["mock-update"] @RedisManager.getOldestRawUpdates = sinon.stub().callsArgWith(2, null, @updates) @UpdatesManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(3) - @RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(2) + @RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(3) @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @callback it "should get the oldest updates", -> @@ -140,7 +140,7 @@ describe "UpdatesManager", -> it "should delete the batch of uncompressed updates that was just processed", -> @RedisManager.deleteOldestRawUpdates - .calledWith(@doc_id, @updates.length) + .calledWith(@project_id, @doc_id, @updates.length) .should.equal true it "should call the callback", -> @@ -157,7 +157,7 @@ describe "UpdatesManager", -> callback null, updates sinon.spy @RedisManager, "getOldestRawUpdates" @UpdatesManager.compressAndSaveRawUpdates = sinon.stub().callsArgWith(3) - @RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(2) + @RedisManager.deleteOldestRawUpdates = sinon.stub().callsArg(3) @UpdatesManager.processUncompressedUpdates @project_id, @doc_id, (args...) => @callback(args...) done()