diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee index 6c47c24c1c..c6aa7797cf 100644 --- a/services/document-updater/app/coffee/HistoryManager.coffee +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -22,11 +22,12 @@ module.exports = HistoryManager = return callback(error) FLUSH_EVERY_N_OPS: 50 - pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> + pushUncompressedHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) -> if ops.length == 0 return callback() - HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error, length) -> + HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) -> return callback(error) if error? + return callback() if not length? # don't flush unless we know the length # We want to flush every 50 ops, i.e. 50, 100, 150, etc # Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these # ops. If we've changed, then we've gone over a multiple of 50 and should flush. diff --git a/services/document-updater/app/coffee/HistoryRedisManager.coffee b/services/document-updater/app/coffee/HistoryRedisManager.coffee index 2329b6f433..0ac8723359 100644 --- a/services/document-updater/app/coffee/HistoryRedisManager.coffee +++ b/services/document-updater/app/coffee/HistoryRedisManager.coffee @@ -5,12 +5,10 @@ async = require "async" logger = require('logger-sharelatex') module.exports = HistoryRedisManager = - recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) -> + recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> if ops.length == 0 return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush logger.log project_id: project_id, doc_id: doc_id, "marking doc in project for history ops" rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, (error) -> return callback(error) if error? - rclient.llen Keys.uncompressedHistoryOps({doc_id}), (error, length) -> - return callback(error) if error? - callback(null, length) + callback() diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 1942b86dc3..4aad7ec109 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -177,25 +177,28 @@ module.exports = RedisManager = logger.error {err: error, doc_id}, error.message return callback(error) multi = rclient.multi() - multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines - multi.set keys.docVersion(doc_id:doc_id), newVersion - multi.set keys.docHash(doc_id:doc_id), newHash - if jsonOps.length > 0 - multi.rpush keys.docOps(doc_id: doc_id), jsonOps... - multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... - multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL - multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 + multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines # index 0 + multi.set keys.docVersion(doc_id:doc_id), newVersion # index 1 + multi.set keys.docHash(doc_id:doc_id), newHash # index 2 + multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL # index 3 + multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 # index 4 if ranges? - multi.set keys.ranges(doc_id:doc_id), ranges + multi.set keys.ranges(doc_id:doc_id), ranges # index 5 else - multi.del keys.ranges(doc_id:doc_id) + multi.del keys.ranges(doc_id:doc_id) # also index 5 + # push the ops last so we can get the lengths at fixed index positions 6 and 7 + if jsonOps.length > 0 + multi.rpush keys.docOps(doc_id: doc_id), jsonOps... # index 6 + multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... # index 7 multi.exec (error, result) -> return callback(error) if error? # check the hash computed on the redis server writeHash = result?[0] if logHashWriteErrors and writeHash? and writeHash isnt newHash logger.error doc_id: doc_id, writeHash: writeHash, origHash: newHash, docLines:newDocLines, "hash mismatch on updateDocument" - return callback() + # return length of uncompressedHistoryOps queue (index 7) + uncompressedHistoryOpsLength = result?[7] + return callback(null, uncompressedHistoryOpsLength) getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index 5022f6bb38..b903f6615f 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -61,9 +61,9 @@ module.exports = UpdateManager = return callback(error) if error? RangesManager.applyUpdate project_id, doc_id, ranges, appliedOps, updatedDocLines, (error, new_ranges) -> return callback(error) if error? - RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error) -> + RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) -> return callback(error) if error? - HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback + HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, historyOpsLength, callback lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) -> LockManager.getLock doc_id, (error, lockValue) -> diff --git a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee index bff896cdd8..b41c9b9f7a 100644 --- a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee @@ -47,8 +47,8 @@ describe "HistoryManager", -> describe "pushing the op", -> beforeEach -> - @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 1) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 1, @callback it "should push the ops into redis", -> @HistoryRedisManager.recordDocHasHistoryOps @@ -64,8 +64,8 @@ describe "HistoryManager", -> describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> @HistoryRedisManager.recordDocHasHistoryOps = - sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + sinon.stub().callsArgWith(3, null) + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback it "should tell the track changes api to flush", -> @HistoryManager.flushDocChanges @@ -76,8 +76,8 @@ describe "HistoryManager", -> beforeEach -> @ops = ["op1", "op2", "op3"] @HistoryRedisManager.recordDocHasHistoryOps = - sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + sinon.stub().callsArgWith(3, null) + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback it "should tell the track changes api to flush", -> @HistoryManager.flushDocChanges @@ -87,9 +87,9 @@ describe "HistoryManager", -> describe "when HistoryManager errors", -> beforeEach -> @HistoryRedisManager.recordDocHasHistoryOps = - sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) + sinon.stub().callsArgWith(3, null) @HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback it "should log out the error", -> @logger.error @@ -103,8 +103,8 @@ describe "HistoryManager", -> describe "with no ops", -> beforeEach -> - @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 1) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], @callback + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], 1, @callback it "should not call HistoryRedisManager.recordDocHasHistoryOps", -> @HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false diff --git a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee index f14e6da27f..ca3937d4c5 100644 --- a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee @@ -27,7 +27,6 @@ describe "HistoryRedisManager", -> describe "recordDocHasHistoryOps", -> beforeEach -> @ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }] - @rclient.llen = sinon.stub().yields(null, @length = 42) @rclient.sadd = sinon.stub().yields() describe "with ops", -> @@ -41,9 +40,6 @@ describe "HistoryRedisManager", -> .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) .should.equal true - it "should call the callback with the length", -> - @callback.calledWith(null, @length).should.equal true - describe "with no ops", -> beforeEach (done) -> @HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, [], (args...) => diff --git a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee index 3e659ee078..57bd9166d1 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee @@ -166,7 +166,7 @@ describe "UpdateManager", -> @ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps) @RedisManager.updateDocument = sinon.stub().yields() @RealTimeRedisManager.sendData = sinon.stub() - @HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3) + @HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(4) describe "normally", -> beforeEach ->