get history ops length directly from redis update

This commit is contained in:
Brian Gough 2017-05-09 10:34:31 +01:00
parent 2d158b03d7
commit fdf5e8e0b8
7 changed files with 32 additions and 34 deletions

View file

@ -22,11 +22,12 @@ module.exports = HistoryManager =
return callback(error)
FLUSH_EVERY_N_OPS: 50
pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) ->
pushUncompressedHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) ->
if ops.length == 0
return callback()
HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error, length) ->
HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) ->
return callback(error) if error?
return callback() if not length? # don't flush unless we know the length
# We want to flush every 50 ops, i.e. 50, 100, 150, etc
# Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these
# ops. If we've changed, then we've gone over a multiple of 50 and should flush.

View file

@ -5,12 +5,10 @@ async = require "async"
logger = require('logger-sharelatex')
module.exports = HistoryRedisManager =
recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) ->
recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) ->
if ops.length == 0
return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush
logger.log project_id: project_id, doc_id: doc_id, "marking doc in project for history ops"
rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, (error) ->
return callback(error) if error?
rclient.llen Keys.uncompressedHistoryOps({doc_id}), (error, length) ->
return callback(error) if error?
callback(null, length)
callback()

View file

@ -177,25 +177,28 @@ module.exports = RedisManager =
logger.error {err: error, doc_id}, error.message
return callback(error)
multi = rclient.multi()
multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines
multi.set keys.docVersion(doc_id:doc_id), newVersion
multi.set keys.docHash(doc_id:doc_id), newHash
if jsonOps.length > 0
multi.rpush keys.docOps(doc_id: doc_id), jsonOps...
multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps...
multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL
multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1
multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines # index 0
multi.set keys.docVersion(doc_id:doc_id), newVersion # index 1
multi.set keys.docHash(doc_id:doc_id), newHash # index 2
multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL # index 3
multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 # index 4
if ranges?
multi.set keys.ranges(doc_id:doc_id), ranges
multi.set keys.ranges(doc_id:doc_id), ranges # index 5
else
multi.del keys.ranges(doc_id:doc_id)
multi.del keys.ranges(doc_id:doc_id) # also index 5
# push the ops last so we can get the lengths at fixed index positions 6 and 7
if jsonOps.length > 0
multi.rpush keys.docOps(doc_id: doc_id), jsonOps... # index 6
multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... # index 7
multi.exec (error, result) ->
return callback(error) if error?
# check the hash computed on the redis server
writeHash = result?[0]
if logHashWriteErrors and writeHash? and writeHash isnt newHash
logger.error doc_id: doc_id, writeHash: writeHash, origHash: newHash, docLines:newDocLines, "hash mismatch on updateDocument"
return callback()
# return length of uncompressedHistoryOps queue (index 7)
uncompressedHistoryOpsLength = result?[7]
return callback(null, uncompressedHistoryOpsLength)
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
rclient.smembers keys.docsInProject(project_id: project_id), callback

View file

@ -61,9 +61,9 @@ module.exports = UpdateManager =
return callback(error) if error?
RangesManager.applyUpdate project_id, doc_id, ranges, appliedOps, updatedDocLines, (error, new_ranges) ->
return callback(error) if error?
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error) ->
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) ->
return callback(error) if error?
HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback
HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, historyOpsLength, callback
lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) ->
LockManager.getLock doc_id, (error, lockValue) ->

View file

@ -47,8 +47,8 @@ describe "HistoryManager", ->
describe "pushing the op", ->
beforeEach ->
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 1)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 1, @callback
it "should push the ops into redis", ->
@HistoryRedisManager.recordDocHasHistoryOps
@ -64,8 +64,8 @@ describe "HistoryManager", ->
describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", ->
beforeEach ->
@HistoryRedisManager.recordDocHasHistoryOps =
sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
sinon.stub().callsArgWith(3, null)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback
it "should tell the track changes api to flush", ->
@HistoryManager.flushDocChanges
@ -76,8 +76,8 @@ describe "HistoryManager", ->
beforeEach ->
@ops = ["op1", "op2", "op3"]
@HistoryRedisManager.recordDocHasHistoryOps =
sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
sinon.stub().callsArgWith(3, null)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback
it "should tell the track changes api to flush", ->
@HistoryManager.flushDocChanges
@ -87,9 +87,9 @@ describe "HistoryManager", ->
describe "when HistoryManager errors", ->
beforeEach ->
@HistoryRedisManager.recordDocHasHistoryOps =
sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS)
sinon.stub().callsArgWith(3, null)
@HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops"))
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback
it "should log out the error", ->
@logger.error
@ -103,8 +103,8 @@ describe "HistoryManager", ->
describe "with no ops", ->
beforeEach ->
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 1)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], @callback
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], 1, @callback
it "should not call HistoryRedisManager.recordDocHasHistoryOps", ->
@HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false

View file

@ -27,7 +27,6 @@ describe "HistoryRedisManager", ->
describe "recordDocHasHistoryOps", ->
beforeEach ->
@ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }]
@rclient.llen = sinon.stub().yields(null, @length = 42)
@rclient.sadd = sinon.stub().yields()
describe "with ops", ->
@ -41,9 +40,6 @@ describe "HistoryRedisManager", ->
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
.should.equal true
it "should call the callback with the length", ->
@callback.calledWith(null, @length).should.equal true
describe "with no ops", ->
beforeEach (done) ->
@HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, [], (args...) =>

View file

@ -166,7 +166,7 @@ describe "UpdateManager", ->
@ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps)
@RedisManager.updateDocument = sinon.stub().yields()
@RealTimeRedisManager.sendData = sinon.stub()
@HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3)
@HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(4)
describe "normally", ->
beforeEach ->