conditionally enqueue history updates for project

This commit is contained in:
Hayden Faulds 2017-09-29 12:44:36 +01:00
parent c27df0bfef
commit d003aef31c
7 changed files with 146 additions and 87 deletions

View file

@ -138,7 +138,7 @@ module.exports = DocumentManager =
return callback(new Errors.NotFoundError("document not found: #{doc_id}"))
RangesManager.acceptChanges change_ids, ranges, (error, new_ranges) ->
return callback(error) if error?
RedisManager.updateDocument doc_id, lines, version, [], new_ranges, (error) ->
RedisManager.updateDocument project_id, doc_id, lines, version, [], new_ranges, (error) ->
return callback(error) if error?
callback()
@ -154,7 +154,7 @@ module.exports = DocumentManager =
return callback(new Errors.NotFoundError("document not found: #{doc_id}"))
RangesManager.deleteComment comment_id, ranges, (error, new_ranges) ->
return callback(error) if error?
RedisManager.updateDocument doc_id, lines, version, [], new_ranges, (error) ->
RedisManager.updateDocument project_id, doc_id, lines, version, [], new_ranges, (error) ->
return callback(error) if error?
callback()

View file

@ -32,6 +32,7 @@ MAX_RANGES_SIZE = 3 * MEGABYTES
keys = Settings.redis.documentupdater.key_schema
historyKeys = Settings.redis.history.key_schema
projectHistoryKeys = Settings.redis.project_history.key_schema
module.exports = RedisManager =
rclient: rclient
@ -204,7 +205,7 @@ module.exports = RedisManager =
DOC_OPS_TTL: 60 * minutes
DOC_OPS_MAX_LENGTH: 100
updateDocument : (doc_id, docLines, newVersion, appliedOps = [], ranges, callback = (error) ->)->
updateDocument : (project_id, doc_id, docLines, newVersion, appliedOps = [], ranges, callback = (error) ->)->
RedisManager.getDocVersion doc_id, (error, currentVersion) ->
return callback(error) if error?
if currentVersion + appliedOps.length != newVersion
@ -262,9 +263,16 @@ module.exports = RedisManager =
writeHash = result?[0]
if logHashWriteErrors and writeHash? and writeHash isnt newHash
logger.error doc_id: doc_id, writeHash: writeHash, origHash: newHash, docLines:newDocLines, "hash mismatch on updateDocument"
# return length of uncompressedHistoryOps queue (index 7)
uncompressedHistoryOpsLength = result?[7]
return callback(null, uncompressedHistoryOpsLength)
# length of uncompressedHistoryOps queue (index 7)
docUpdateCount = result[7]
if jsonOps.length > 0 && Settings.apis?.project_history?.enabled
rclient.rpush projectHistoryKeys.projectHistoryOps({project_id}), jsonOps..., (error, projectUpdateCount) ->
callback null, docUpdateCount, projectUpdateCount
else
callback null, docUpdateCount
clearUnflushedTime: (doc_id, callback = (error) ->) ->
rclient.del keys.unflushedTime(doc_id:doc_id), callback
@ -280,7 +288,7 @@ module.exports = RedisManager =
# Most doc will have empty ranges so don't fill redis with lots of '{}' keys
jsonRanges = null
return callback null, jsonRanges
_deserializeRanges: (ranges) ->
if !ranges? or ranges == ""
return {}

View file

@ -80,7 +80,7 @@ module.exports = UpdateManager =
RangesManager.applyUpdate project_id, doc_id, ranges, appliedOps, updatedDocLines, (error, new_ranges) ->
profile.log("RangesManager.applyUpdate")
return callback(error) if error?
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) ->
RedisManager.updateDocument project_id, doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) ->
profile.log("RedisManager.updateDocument")
return callback(error) if error?
HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, historyOpsLength, (error) ->

View file

@ -14,6 +14,9 @@ module.exports =
pass: "password"
trackchanges:
url: "http://localhost:3015"
project_history:
url: "http://localhost:3054"
enabled: true
redis:
realtime:
@ -65,6 +68,10 @@ module.exports =
key_schema:
uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}"
docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}"
project_history:
key_schema:
projectHistoryOps: ({project_id}) -> "ProjectHistory:Ops:#{project_id}"
# cluster: [{
# port: "7000"
# host: "localhost"

View file

@ -307,7 +307,7 @@ describe "DocumentManager", ->
it "should save the updated ranges", ->
@RedisManager.updateDocument
.calledWith(@doc_id, @lines, @version, [], @updated_ranges)
.calledWith(@project_id, @doc_id, @lines, @version, [], @updated_ranges)
.should.equal true
it "should call the callback", ->
@ -361,7 +361,7 @@ describe "DocumentManager", ->
it "should save the updated ranges", ->
@RedisManager.updateDocument
.calledWith(@doc_id, @lines, @version, [], @updated_ranges)
.calledWith(@project_id, @doc_id, @lines, @version, [], @updated_ranges)
.should.equal true
it "should call the callback", ->

View file

@ -15,8 +15,10 @@ describe "RedisManager", ->
@RedisManager = SandboxedModule.require modulePath,
requires:
"logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() }
"settings-sharelatex": {
"settings-sharelatex": @settings = {
documentupdater: {logHashErrors: {write:true, read:true}}
apis:
project_history: {enabled: true}
redis:
documentupdater:
key_schema:
@ -36,6 +38,9 @@ describe "RedisManager", ->
key_schema:
uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}"
docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}"
project_history:
key_schema:
projectHistoryOps: ({project_id}) -> "ProjectHistory:Ops:#{project_id}"
}
"redis-sharelatex":
createClient: () => @rclient
@ -314,83 +319,117 @@ describe "RedisManager", ->
describe "updateDocument", ->
beforeEach ->
@lines = ["one", "two", "three", "これは"]
@ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 8 }] }]
@version = 42
@hash = crypto.createHash('sha1').update(JSON.stringify(@lines),'utf8').digest('hex')
@ranges = { comments: "mock", entries: "mock" }
@doc_update_list_length = sinon.stub()
@project_update_list_length = sinon.stub()
@RedisManager.getDocVersion = sinon.stub()
@multi.set = sinon.stub()
@multi.rpush = sinon.stub()
@multi.expire = sinon.stub()
@multi.ltrim = sinon.stub()
@multi.del = sinon.stub()
@multi.eval = sinon.stub()
@RedisManager.getDocVersion = sinon.stub()
@lines = ["one", "two", "three", "これは"]
@ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 8 }] }]
@version = 42
@hash = crypto.createHash('sha1').update(JSON.stringify(@lines),'utf8').digest('hex')
@ranges = { comments: "mock", entries: "mock" }
@multi.exec = sinon.stub().callsArg(0, null, [@hash])
@multi.exec = sinon.stub().callsArgWith(0, null,
[@hash, null, null, null, null, null, null, @doc_update_list_length]
)
@rclient.rpush = sinon.stub().callsArgWith(@ops.length + 1, null, @project_update_list_length)
describe "with a consistent version", ->
beforeEach ->
@RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length)
@RedisManager.updateDocument @doc_id, @lines, @version, @ops, @ranges, @callback
it "should get the current doc version to check for consistency", ->
@RedisManager.getDocVersion
.calledWith(@doc_id)
.should.equal true
describe "with project history enabled", ->
beforeEach ->
@settings.apis.project_history.enabled = true
@RedisManager.updateDocument @project_id, @doc_id, @lines, @version, @ops, @ranges, @callback
it "should set the doclines", ->
@multi.eval
.calledWith(sinon.match(/redis.call/), 1, "doclines:#{@doc_id}", JSON.stringify(@lines))
.should.equal true
it "should get the current doc version to check for consistency", ->
@RedisManager.getDocVersion
.calledWith(@doc_id)
.should.equal true
it "should set the version", ->
@multi.set
.calledWith("DocVersion:#{@doc_id}", @version)
.should.equal true
it "should set the doclines", ->
@multi.eval
.calledWith(sinon.match(/redis.call/), 1, "doclines:#{@doc_id}", JSON.stringify(@lines))
.should.equal true
it "should set the hash", ->
@multi.set
.calledWith("DocHash:#{@doc_id}", @hash)
.should.equal true
it "should set the version", ->
@multi.set
.calledWith("DocVersion:#{@doc_id}", @version)
.should.equal true
it "should set the ranges", ->
@multi.set
.calledWith("Ranges:#{@doc_id}", JSON.stringify(@ranges))
.should.equal true
it "should set the hash", ->
@multi.set
.calledWith("DocHash:#{@doc_id}", @hash)
.should.equal true
it "should set the unflushed time", ->
@multi.set
.calledWith("UnflushedTime:#{@doc_id}", Date.now(), "NX")
.should.equal true
it "should set the ranges", ->
@multi.set
.calledWith("Ranges:#{@doc_id}", JSON.stringify(@ranges))
.should.equal true
it "should push the doc op into the doc ops list", ->
@multi.rpush
.calledWith("DocOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1]))
.should.equal true
it "should set the unflushed time", ->
@multi.set
.calledWith("UnflushedTime:#{@doc_id}", Date.now(), "NX")
.should.equal true
it "should renew the expiry ttl on the doc ops array", ->
@multi.expire
.calledWith("DocOps:#{@doc_id}", @RedisManager.DOC_OPS_TTL)
.should.equal true
it "should push the doc op into the doc ops list", ->
@multi.rpush
.calledWith("DocOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1]))
.should.equal true
it "should truncate the list to 100 members", ->
@multi.ltrim
.calledWith("DocOps:#{@doc_id}", -@RedisManager.DOC_OPS_MAX_LENGTH, -1)
.should.equal true
it "should renew the expiry ttl on the doc ops array", ->
@multi.expire
.calledWith("DocOps:#{@doc_id}", @RedisManager.DOC_OPS_TTL)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
it "should truncate the list to 100 members", ->
@multi.ltrim
.calledWith("DocOps:#{@doc_id}", -@RedisManager.DOC_OPS_MAX_LENGTH, -1)
.should.equal true
it 'should not log any errors', ->
@logger.error.calledWith()
.should.equal false
it "should push the updates into the history ops list", ->
@multi.rpush
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1]))
.should.equal true
it "should push the updates into the project history ops list", ->
@rclient.rpush
.calledWith("ProjectHistory:Ops:#{@project_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1]))
.should.equal true
it "should call the callback", ->
@callback
.calledWith(null, @doc_update_list_length, @project_update_list_length)
.should.equal true
it 'should not log any errors', ->
@logger.error.calledWith()
.should.equal false
describe "with project history disabled", ->
beforeEach ->
@rclient.rpush = sinon.stub()
@settings.apis.project_history.enabled = false
@RedisManager.updateDocument @project_id, @doc_id, @lines, @version, @ops, @ranges, @callback
it "should not push the updates into the project history ops list", ->
@rclient.rpush.called.should.equal false
it "should call the callback", ->
@callback
.calledWith(null, @doc_update_list_length)
.should.equal true
describe "with an inconsistent version", ->
beforeEach ->
@RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length - 1)
@RedisManager.updateDocument @doc_id, @lines, @version, @ops, @ranges, @callback
@RedisManager.updateDocument @project_id, @doc_id, @lines, @version, @ops, @ranges, @callback
it "should not call multi.exec", ->
@multi.exec.called.should.equal false
@ -402,15 +441,20 @@ describe "RedisManager", ->
describe "with no updates", ->
beforeEach ->
@multi.rpush = sinon.stub().callsArg(1)
@rclient.rpush = sinon.stub().callsArgWith(1, null, @project_update_list_length)
@RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version)
@RedisManager.updateDocument @doc_id, @lines, @version, [], @ranges, @callback
@RedisManager.updateDocument @project_id, @doc_id, @lines, @version, [], @ranges, @callback
it "should not do an rpush", ->
it "should not try to enqueue doc updates", ->
@multi.rpush
.called
.should.equal false
it "should not try to enqueue project updates", ->
@rclient.rpush
.called
.should.equal false
it "should still set the doclines", ->
@multi.eval
.calledWith(sinon.match(/redis.call/), 1, "doclines:#{@doc_id}", JSON.stringify(@lines))
@ -419,7 +463,7 @@ describe "RedisManager", ->
describe "with empty ranges", ->
beforeEach ->
@RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length)
@RedisManager.updateDocument @doc_id, @lines, @version, @ops, {}, @callback
@RedisManager.updateDocument @project_id, @doc_id, @lines, @version, @ops, {}, @callback
it "should not set the ranges", ->
@multi.set
@ -436,7 +480,7 @@ describe "RedisManager", ->
@badHash = "INVALID-HASH-VALUE"
@multi.exec = sinon.stub().callsArgWith(0, null, [@badHash])
@RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length)
@RedisManager.updateDocument @doc_id, @lines, @version, @ops, @ranges, @callback
@RedisManager.updateDocument @project_id, @doc_id, @lines, @version, @ops, @ranges, @callback
it 'should log a hash error', ->
@logger.error.calledWith()
@ -450,7 +494,7 @@ describe "RedisManager", ->
@RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length)
@_stringify = JSON.stringify
@JSON.stringify = () -> return '["bad bytes! \u0000 <- here"]'
@RedisManager.updateDocument @doc_id, @lines, @version, @ops, @ranges, @callback
@RedisManager.updateDocument @project_id, @doc_id, @lines, @version, @ops, @ranges, @callback
afterEach ->
@JSON.stringify = @_stringify
@ -465,7 +509,7 @@ describe "RedisManager", ->
beforeEach ->
@RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length)
@RedisManager._serializeRanges = sinon.stub().yields(new Error("ranges are too large"))
@RedisManager.updateDocument @doc_id, @lines, @version, @ops, @ranges, @callback
@RedisManager.updateDocument @project_id, @doc_id, @lines, @version, @ops, @ranges, @callback
it 'should log an error', ->
@logger.error.called.should.equal true

View file

@ -60,14 +60,14 @@ describe "UpdateManager", ->
it "should process the outstanding updates", ->
@UpdateManager.processOutstandingUpdates.calledWith(@project_id, @doc_id).should.equal true
it "should do everything with the lock acquired", ->
@UpdateManager.processOutstandingUpdates.calledAfter(@LockManager.tryLock).should.equal true
@UpdateManager.processOutstandingUpdates.calledBefore(@LockManager.releaseLock).should.equal true
it "should continue processing new updates that may have come in", ->
@UpdateManager.continueProcessingUpdatesWithLock.calledWith(@project_id, @doc_id).should.equal true
it "should return the callback", ->
@callback.called.should.equal true
@ -78,7 +78,7 @@ describe "UpdateManager", ->
it "should free the lock", ->
@LockManager.releaseLock.calledWith(@doc_id, @lockValue).should.equal true
it "should return the error in the callback", ->
@callback.calledWith(@error).should.equal true
@ -93,7 +93,7 @@ describe "UpdateManager", ->
it "should not process the updates", ->
@UpdateManager.processOutstandingUpdates.called.should.equal false
describe "continueProcessingUpdatesWithLock", ->
describe "when there are outstanding updates", ->
beforeEach ->
@ -137,7 +137,7 @@ describe "UpdateManager", ->
@UpdateManager.applyUpdate
.calledWith(@project_id, @doc_id, update)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
@ -154,7 +154,7 @@ describe "UpdateManager", ->
it "should call the callback", ->
@callback.called.should.equal true
describe "applyUpdate", ->
beforeEach ->
@update = {op: [{p: 42, i: "foo"}]}
@ -170,16 +170,16 @@ describe "UpdateManager", ->
@RedisManager.updateDocument = sinon.stub().yields()
@RealTimeRedisManager.sendData = sinon.stub()
@HistoryManager.recordAndFlushHistoryOps = sinon.stub().callsArg(4)
describe "normally", ->
beforeEach ->
@UpdateManager.applyUpdate @project_id, @doc_id, @update, @callback
it "should apply the updates via ShareJS", ->
@ShareJsUpdateManager.applyUpdate
.calledWith(@project_id, @doc_id, @update, @lines, @version)
.should.equal true
it "should update the ranges", ->
@RangesManager.applyUpdate
.calledWith(@project_id, @doc_id, @ranges, @appliedOps, @updatedDocLines)
@ -187,9 +187,9 @@ describe "UpdateManager", ->
it "should save the document", ->
@RedisManager.updateDocument
.calledWith(@doc_id, @updatedDocLines, @version, @appliedOps, @updated_ranges)
.calledWith(@project_id, @doc_id, @updatedDocLines, @version, @appliedOps, @updated_ranges)
.should.equal true
it "should push the applied ops into the history queue", ->
@HistoryManager.recordAndFlushHistoryOps
.calledWith(@project_id, @doc_id, @appliedOps)
@ -207,16 +207,16 @@ describe "UpdateManager", ->
@ShareJsUpdateManager.applyUpdate
.calledWith(@project_id, @doc_id, @update)
.should.equal true
# \uFFFD is 'replacement character'
@update.op[0].i.should.equal "\uFFFD\uFFFD"
describe "with an error", ->
beforeEach ->
@error = new Error("something went wrong")
@ShareJsUpdateManager.applyUpdate = sinon.stub().yields(@error)
@UpdateManager.applyUpdate @project_id, @doc_id, @update, @callback
it "should call RealTimeRedisManager.sendData with the error", ->
@RealTimeRedisManager.sendData
.calledWith({
@ -228,7 +228,7 @@ describe "UpdateManager", ->
it "should call the callback with the error", ->
@callback.calledWith(@error).should.equal true
describe "lockUpdatesAndDo", ->
beforeEach ->
@ -283,7 +283,7 @@ describe "UpdateManager", ->
it "should free the lock", ->
@LockManager.releaseLock.calledWith(@doc_id, @lockValue).should.equal true
it "should return the error in the callback", ->
@callback.calledWith(@error).should.equal true
@ -295,7 +295,7 @@ describe "UpdateManager", ->
it "should free the lock", ->
@LockManager.releaseLock.calledWith(@doc_id, @lockValue).should.equal true
it "should return the error in the callback", ->
@callback.calledWith(@error).should.equal true