flush track-changes and project-history in HistoryManager

This commit is contained in:
Hayden Faulds 2017-10-05 13:45:29 +01:00
parent d003aef31c
commit 962520fca8
6 changed files with 205 additions and 114 deletions

View file

@ -116,11 +116,8 @@ module.exports = DocumentManager =
DocumentManager.flushDocIfLoaded project_id, doc_id, (error) ->
return callback(error) if error?
# Flush in the background since it requires and http request
# to track changes
HistoryManager.flushDocChanges project_id, doc_id, (err) ->
if err?
logger.err {err, project_id, doc_id}, "error flushing to track changes"
# Flush in the background since it requires a http request
HistoryManager.flushChangesAsync project_id, doc_id
RedisManager.removeDocFromMemory project_id, doc_id, (error) ->
return callback(error) if error?

View file

@ -1,45 +1,73 @@
settings = require "settings-sharelatex"
Settings = require "settings-sharelatex"
request = require "request"
logger = require "logger-sharelatex"
async = require "async"
HistoryRedisManager = require "./HistoryRedisManager"
module.exports = HistoryManager =
flushDocChanges: (project_id, doc_id, callback = (error) ->) ->
if !settings.apis?.trackchanges?
logger.warn doc_id: doc_id, "track changes API is not configured, so not flushing"
return callback()
flushChangesAsync: (project_id, doc_id) ->
HistoryManager._flushDocChangesAsync project_id, doc_id
if Settings.apis?.project_history?.enabled
HistoryManager._flushProjectChangesAsync project_id
url = "#{settings.apis.trackchanges.url}/project/#{project_id}/doc/#{doc_id}/flush"
logger.log project_id: project_id, doc_id: doc_id, url: url, "flushing doc in track changes api"
_flushDocChangesAsync: (project_id, doc_id) ->
if !Settings.apis?.trackchanges?
logger.warn { doc_id }, "track changes API is not configured, so not flushing"
return
url = "#{Settings.apis.trackchanges.url}/project/#{project_id}/doc/#{doc_id}/flush"
logger.log { project_id, doc_id, url }, "flushing doc in track changes api"
request.post url, (error, res, body)->
if error?
return callback(error)
else if res.statusCode >= 200 and res.statusCode < 300
return callback(null)
else
error = new Error("track changes api returned a failure status code: #{res.statusCode}")
return callback(error)
logger.error(
{ error, doc_id, project_id},
"track changes doc to track changes api"
)
else if res.statusCode < 200 and res.statusCode >= 300
logger.error(
{ doc_id, project_id },
"track changes api returned a failure status code: #{res.statusCode}"
)
_flushProjectChangesAsync: (project_id) ->
return if !Settings.apis?.project_history?
url = "#{Settings.apis.project_history.url}/project/#{project_id}/flush"
logger.log { project_id, url }, "flushing doc in project history api"
request.post url, (error, res, body)->
if error?
logger.error { error, project_id}, "project history doc to track changes api"
else if res.statusCode < 200 and res.statusCode >= 300
logger.error { project_id }, "project history api returned a failure status code: #{res.statusCode}"
FLUSH_EVERY_N_OPS: 100
recordAndFlushHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) ->
recordAndFlushHistoryOps: (project_id, doc_id, ops = [], doc_ops_length, project_ops_length, callback = (error) ->) ->
if ops.length == 0
return callback()
HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) ->
return callback(error) if error?
return callback() if not length? # don't flush unless we know the length
# We want to flush every 100 ops, i.e. 100, 200, 300, etc
# Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these
# ops. If we've changed, then we've gone over a multiple of 100 and should flush.
# (Most of the time, we will only hit 100 and then flushing will put us back to 0)
previousLength = length - ops.length
prevBlock = Math.floor(previousLength / HistoryManager.FLUSH_EVERY_N_OPS)
newBlock = Math.floor(length / HistoryManager.FLUSH_EVERY_N_OPS)
if newBlock != prevBlock
if Settings.apis?.project_history?.enabled
if HistoryManager._shouldFlushHistoryOps(project_ops_length, ops, HistoryManager.FLUSH_EVERY_N_OPS)
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.
logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api"
HistoryManager.flushDocChanges project_id, doc_id, (error) ->
if error?
logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api"
logger.log { project_ops_length, project_id }, "flushing project history api"
HistoryManager._flushProjectChangesAsync project_id
HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) ->
return callback(error) if error?
if HistoryManager._shouldFlushHistoryOps(doc_ops_length, ops, HistoryManager.FLUSH_EVERY_N_OPS)
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.
logger.log { doc_ops_length, doc_id, project_id }, "flushing track changes api"
HistoryManager._flushDocChangesAsync project_id, doc_id
callback()
_shouldFlushHistoryOps: (length, ops, threshold) ->
return false if !length # don't flush unless we know the length
# We want to flush every 100 ops, i.e. 100, 200, 300, etc
# Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these
# ops. If we've changed, then we've gone over a multiple of 100 and should flush.
# (Most of the time, we will only hit 100 and then flushing will put us back to 0)
previousLength = length - ops.length
prevBlock = Math.floor(previousLength / threshold)
newBlock = Math.floor(length / threshold)
return newBlock != prevBlock

View file

@ -80,13 +80,12 @@ module.exports = UpdateManager =
RangesManager.applyUpdate project_id, doc_id, ranges, appliedOps, updatedDocLines, (error, new_ranges) ->
profile.log("RangesManager.applyUpdate")
return callback(error) if error?
RedisManager.updateDocument project_id, doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) ->
RedisManager.updateDocument project_id, doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, doc_ops_length, project_ops_length) ->
profile.log("RedisManager.updateDocument")
return callback(error) if error?
HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, historyOpsLength, (error) ->
HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, doc_ops_length, project_ops_length, (error) ->
profile.log("recordAndFlushHistoryOps")
return callback(error) if error?
callback()
callback(error)
lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) ->
profile = new Profiler("lockUpdatesAndDo", {project_id, doc_id})

View file

@ -35,7 +35,7 @@ describe "DocumentManager", ->
beforeEach ->
@RedisManager.removeDocFromMemory = sinon.stub().callsArg(2)
@DocumentManager.flushDocIfLoaded = sinon.stub().callsArgWith(2)
@HistoryManager.flushDocChanges = sinon.stub().callsArg(2)
@HistoryManager.flushChangesAsync = sinon.stub()
@DocumentManager.flushAndDeleteDoc @project_id, @doc_id, @callback
it "should flush the doc", ->
@ -55,7 +55,7 @@ describe "DocumentManager", ->
@Metrics.Timer::done.called.should.equal true
it "should flush to the history api", ->
@HistoryManager.flushDocChanges
@HistoryManager.flushChangesAsync
.calledWith(@project_id, @doc_id)
.should.equal true

View file

@ -7,106 +7,171 @@ describe "HistoryManager", ->
beforeEach ->
@HistoryManager = SandboxedModule.require modulePath, requires:
"request": @request = {}
"settings-sharelatex": @Settings = {}
"settings-sharelatex": @Settings = {
apis:
project_history:
enabled: true
url: "http://project_history.example.com"
trackchanges:
url: "http://trackchanges.example.com"
}
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"./HistoryRedisManager": @HistoryRedisManager = {}
@project_id = "mock-project-id"
@doc_id = "mock-doc-id"
@callback = sinon.stub()
describe "flushDocChanges", ->
describe "flushChangesAsync", ->
beforeEach ->
@Settings.apis =
trackchanges: url: "http://trackchanges.example.com"
@HistoryManager._flushDocChangesAsync = sinon.stub()
@HistoryManager._flushProjectChangesAsync = sinon.stub()
describe "successfully", ->
beforeEach ->
@request.post = sinon.stub().callsArgWith(1, null, statusCode: 204)
@HistoryManager.flushDocChanges @project_id, @doc_id, @callback
@HistoryManager.flushChangesAsync(@project_id, @doc_id)
it "should send a request to the track changes api", ->
@request.post
.calledWith("#{@Settings.apis.trackchanges.url}/project/#{@project_id}/doc/#{@doc_id}/flush")
.should.equal true
it "flushes doc changes", ->
@HistoryManager._flushDocChangesAsync
.calledWith(@project_id, @doc_id)
.should.equal true
it "should return the callback", ->
@callback.calledWith(null).should.equal true
it "flushes project changes", ->
@HistoryManager._flushProjectChangesAsync
.calledWith(@project_id)
.should.equal true
describe "when the track changes api returns an error", ->
beforeEach ->
@request.post = sinon.stub().callsArgWith(1, null, statusCode: 500)
@HistoryManager.flushDocChanges @project_id, @doc_id, @callback
describe "_flushDocChangesAsync", ->
beforeEach ->
@request.post = sinon.stub().callsArgWith(1, null, statusCode: 204)
it "should return the callback with an error", ->
@callback.calledWith(new Error("track changes api return non-success code: 500")).should.equal true
@HistoryManager._flushDocChangesAsync @project_id, @doc_id
it "should send a request to the track changes api", ->
@request.post
.calledWith("#{@Settings.apis.trackchanges.url}/project/#{@project_id}/doc/#{@doc_id}/flush")
.should.equal true
describe "_flushProjectChangesAsync", ->
beforeEach ->
@request.post = sinon.stub().callsArgWith(1, null, statusCode: 204)
@HistoryManager._flushProjectChangesAsync @project_id
it "should send a request to the project history api", ->
@request.post
.calledWith("#{@Settings.apis.project_history.url}/project/#{@project_id}/flush")
.should.equal true
describe "recordAndFlushHistoryOps", ->
beforeEach ->
@ops = ["mock-ops"]
@HistoryManager.flushDocChanges = sinon.stub().callsArg(2)
@ops = [ 'mock-ops' ]
@project_ops_length = 10
@doc_ops_length = 5
describe "pushing the op", ->
@HistoryManager._flushProjectChangesAsync = sinon.stub()
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArg(3)
@HistoryManager._flushDocChangesAsync = sinon.stub()
describe "with no ops", ->
beforeEach ->
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null)
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 1, @callback
@HistoryManager.recordAndFlushHistoryOps(
@project_id, @doc_id, [], @doc_ops_length, @project_ops_length, @callback
)
it "should push the ops into redis", ->
it "should not flush project changes", ->
@HistoryManager._flushProjectChangesAsync.called.should.equal false
it "should not record doc has history ops", ->
@HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false
it "should not flush doc changes", ->
@HistoryManager._flushDocChangesAsync.called.should.equal false
it "should call the callback", ->
@callback.called.should.equal true
describe "with enough ops to flush project changes", ->
beforeEach ->
@HistoryManager._shouldFlushHistoryOps = sinon.stub()
@HistoryManager._shouldFlushHistoryOps.withArgs(@project_ops_length).returns(true)
@HistoryManager._shouldFlushHistoryOps.withArgs(@doc_ops_length).returns(false)
@HistoryManager.recordAndFlushHistoryOps(
@project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback
)
it "should flush project changes", ->
@HistoryManager._flushProjectChangesAsync
.calledWith(@project_id)
.should.equal true
it "should record doc has history ops", ->
@HistoryRedisManager.recordDocHasHistoryOps
.calledWith(@project_id, @doc_id, @ops)
it "should not flush doc changes", ->
@HistoryManager._flushDocChangesAsync.called.should.equal false
it "should call the callback", ->
@callback.called.should.equal true
describe "with enough ops to flush doc changes", ->
beforeEach ->
@HistoryManager._shouldFlushHistoryOps = sinon.stub()
@HistoryManager._shouldFlushHistoryOps.withArgs(@project_ops_length).returns(false)
@HistoryManager._shouldFlushHistoryOps.withArgs(@doc_ops_length).returns(true)
@HistoryManager.recordAndFlushHistoryOps(
@project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback
)
it "should not flush project changes", ->
@HistoryManager._flushProjectChangesAsync.called.should.equal false
it "should record doc has history ops", ->
@HistoryRedisManager.recordDocHasHistoryOps
.calledWith(@project_id, @doc_id, @ops)
it "should flush doc changes", ->
@HistoryManager._flushDocChangesAsync
.calledWith(@project_id, @doc_id)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
it "should not try to flush the op", ->
@HistoryManager.flushDocChanges.called.should.equal false
describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", ->
describe "when recording doc has history ops errors", ->
beforeEach ->
@error = new Error("error")
@HistoryRedisManager.recordDocHasHistoryOps =
sinon.stub().callsArgWith(3, null)
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback
sinon.stub().callsArgWith(3, @error)
it "should tell the track changes api to flush", ->
@HistoryManager.flushDocChanges
.calledWith(@project_id, @doc_id)
.should.equal true
@HistoryManager.recordAndFlushHistoryOps(
@project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback
)
describe "when we go over a multiple of FLUSH_EVERY_N_OPS ops", ->
beforeEach ->
@ops = ["op1", "op2", "op3"]
@HistoryRedisManager.recordDocHasHistoryOps =
sinon.stub().callsArgWith(3, null)
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback
it "should not flush doc changes", ->
@HistoryManager._flushDocChangesAsync.called.should.equal false
it "should tell the track changes api to flush", ->
@HistoryManager.flushDocChanges
.calledWith(@project_id, @doc_id)
.should.equal true
it "should call the callback with the error", ->
@callback.calledWith(@error).should.equal true
describe "when HistoryManager errors", ->
beforeEach ->
@HistoryRedisManager.recordDocHasHistoryOps =
sinon.stub().callsArgWith(3, null)
@HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops"))
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback
describe "_shouldFlushHistoryOps", ->
it "should return false if the number of ops is not known", ->
@HistoryManager._shouldFlushHistoryOps(null, ['a', 'b', 'c'], 1).should.equal false
it "should log out the error", ->
@logger.error
.calledWith(
err: @error
doc_id: @doc_id
project_id: @project_id
"error flushing doc to track changes api"
)
.should.equal true
describe "with no ops", ->
beforeEach ->
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null)
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, [], 1, @callback
it "should not call HistoryRedisManager.recordDocHasHistoryOps", ->
@HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false
it "should return false if the updates didn't take us past the threshold", ->
# Currently there are 14 ops
# Previously we were on 11 ops
# We didn't pass over a multiple of 5
@HistoryManager._shouldFlushHistoryOps(14, ['a', 'b', 'c'], 5).should.equal false
it "should return true if the updates took to the threshold", ->
# Currently there are 15 ops
# Previously we were on 12 ops
# We've reached a new multiple of 5
@HistoryManager._shouldFlushHistoryOps(15, ['a', 'b', 'c'], 5).should.equal true
it "should return true if the updates took past the threshold", ->
# Currently there are 19 ops
# Previously we were on 16 ops
# We didn't pass over a multiple of 5
@HistoryManager._shouldFlushHistoryOps(17, ['a', 'b', 'c'], 5).should.equal true

View file

@ -19,7 +19,7 @@ describe "UpdateManager", ->
"./Metrics": @Metrics =
Timer: class Timer
done: sinon.stub()
"settings-sharelatex": Settings = {}
"settings-sharelatex": @Settings = {}
"./DocumentManager": @DocumentManager = {}
"./RangesManager": @RangesManager = {}
"./Profiler": class Profiler
@ -164,12 +164,14 @@ describe "UpdateManager", ->
@ranges = { entries: "mock", comments: "mock" }
@updated_ranges = { entries: "updated", comments: "updated" }
@appliedOps = ["mock-applied-ops"]
@doc_ops_length = sinon.stub()
@project_ops_length = sinon.stub()
@DocumentManager.getDoc = sinon.stub().yields(null, @lines, @version, @ranges)
@RangesManager.applyUpdate = sinon.stub().yields(null, @updated_ranges)
@ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps)
@RedisManager.updateDocument = sinon.stub().yields()
@RedisManager.updateDocument = sinon.stub().yields(null, @doc_ops_length, @project_ops_length)
@RealTimeRedisManager.sendData = sinon.stub()
@HistoryManager.recordAndFlushHistoryOps = sinon.stub().callsArg(4)
@HistoryManager.recordAndFlushHistoryOps = sinon.stub().callsArg(5)
describe "normally", ->
beforeEach ->
@ -192,7 +194,7 @@ describe "UpdateManager", ->
it "should push the applied ops into the history queue", ->
@HistoryManager.recordAndFlushHistoryOps
.calledWith(@project_id, @doc_id, @appliedOps)
.calledWith(@project_id, @doc_id, @appliedOps, @doc_ops_length, @project_ops_length)
.should.equal true
it "should call the callback", ->