Only flush project updates when crossing the threshold

This commit is contained in:
Michael Walker 2018-01-24 11:37:28 +00:00
parent 24c74db0dc
commit 3028fb9c3d
4 changed files with 23 additions and 27 deletions

View file

@ -7,7 +7,7 @@ module.exports = HistoryManager =
flushChangesAsync: (project_id, doc_id) ->
HistoryManager._flushDocChangesAsync project_id, doc_id
if Settings.apis?.project_history?.enabled
HistoryManager._flushProjectChangesAsync project_id
HistoryManager.flushProjectChangesAsync project_id
_flushDocChangesAsync: (project_id, doc_id) ->
if !Settings.apis?.trackchanges?
@ -22,7 +22,7 @@ module.exports = HistoryManager =
else if res.statusCode < 200 and res.statusCode >= 300
logger.error { doc_id, project_id }, "track changes api returned a failure status code: #{res.statusCode}"
_flushProjectChangesAsync: (project_id) ->
flushProjectChangesAsync: (project_id) ->
return if !Settings.apis?.project_history?
url = "#{Settings.apis.project_history.url}/project/#{project_id}/flush"
@ -42,7 +42,10 @@ module.exports = HistoryManager =
if Settings.apis?.project_history?.enabled
if HistoryManager._shouldFlushHistoryOps(project_ops_length, ops, HistoryManager.FLUSH_PROJECT_EVERY_N_OPS)
HistoryManager.flushProjectChanges project_id, project_ops_length
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.
logger.log { project_ops_length, project_id }, "flushing project history api"
HistoryManager.flushProjectChangesAsync project_id
HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) ->
return callback(error) if error?
@ -53,19 +56,16 @@ module.exports = HistoryManager =
HistoryManager._flushDocChangesAsync project_id, doc_id
callback()
flushProjectChanges: (project_id, project_ops_length) ->
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.
logger.log { project_ops_length, project_id }, "flushing project history api"
HistoryManager._flushProjectChangesAsync project_id
_shouldFlushHistoryOps: (length, ops, threshold) ->
return HistoryManager.shouldFlushHistoryOps(length, ops.length, threshold)
shouldFlushHistoryOps: (length, ops_length, threshold) ->
return false if !length # don't flush unless we know the length
# We want to flush every 100 ops, i.e. 100, 200, 300, etc
# Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these
# ops. If we've changed, then we've gone over a multiple of 100 and should flush.
# (Most of the time, we will only hit 100 and then flushing will put us back to 0)
previousLength = length - ops.length
previousLength = length - ops_length
prevBlock = Math.floor(previousLength / threshold)
newBlock = Math.floor(length / threshold)
return newBlock != prevBlock

View file

@ -122,10 +122,9 @@ module.exports = ProjectManager =
async.each docUpdates, handleDocUpdate, (error) ->
return callback(error) if error?
async.each fileUpdates, handleFileUpdate, (error) ->
async.each fileUpdates, handleFileUpdate, (error, project_ops_length) ->
return callback(error) if error?
RedisManager.numQueuedProjectUpdates project_id, (error, length) ->
return callback(error) if error?
if length >= HistoryManager.FLUSH_PROJECT_EVERY_N_OPS
HistoryManager.flushProjectChanges project_id, length
callback()
if HistoryManager.shouldFlushHistoryOps(project_ops_length, docUpdates.length + fileUpdates.length, HistoryManager.FLUSH_PROJECT_EVERY_N_OPS)
logger.log { project_ops_length, project_id }, "flushing project history api"
HistoryManager.flushProjectChangesAsync project_id
callback()

View file

@ -315,9 +315,6 @@ module.exports = RedisManager =
rclient.rpush projectHistoryKeys.projectHistoryOps({project_id}), jsonUpdate, callback
numQueuedProjectUpdates: (project_id, callback = (error, length) ->) ->
rclient.llen projectHistoryKeys.projectHistoryOps({project_id}), callback
clearUnflushedTime: (doc_id, callback = (error) ->) ->
rclient.del keys.unflushedTime(doc_id:doc_id), callback

View file

@ -24,7 +24,7 @@ describe "HistoryManager", ->
describe "flushChangesAsync", ->
beforeEach ->
@HistoryManager._flushDocChangesAsync = sinon.stub()
@HistoryManager._flushProjectChangesAsync = sinon.stub()
@HistoryManager.flushProjectChangesAsync = sinon.stub()
@HistoryManager.flushChangesAsync(@project_id, @doc_id)
@ -34,7 +34,7 @@ describe "HistoryManager", ->
.should.equal true
it "flushes project changes", ->
@HistoryManager._flushProjectChangesAsync
@HistoryManager.flushProjectChangesAsync
.calledWith(@project_id)
.should.equal true
@ -49,11 +49,11 @@ describe "HistoryManager", ->
.calledWith("#{@Settings.apis.trackchanges.url}/project/#{@project_id}/doc/#{@doc_id}/flush")
.should.equal true
describe "_flushProjectChangesAsync", ->
describe "flushProjectChangesAsync", ->
beforeEach ->
@request.post = sinon.stub().callsArgWith(1, null, statusCode: 204)
@HistoryManager._flushProjectChangesAsync @project_id
@HistoryManager.flushProjectChangesAsync @project_id
it "should send a request to the project history api", ->
@request.post
@ -66,7 +66,7 @@ describe "HistoryManager", ->
@project_ops_length = 10
@doc_ops_length = 5
@HistoryManager._flushProjectChangesAsync = sinon.stub()
@HistoryManager.flushProjectChangesAsync = sinon.stub()
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArg(3)
@HistoryManager._flushDocChangesAsync = sinon.stub()
@ -77,7 +77,7 @@ describe "HistoryManager", ->
)
it "should not flush project changes", ->
@HistoryManager._flushProjectChangesAsync.called.should.equal false
@HistoryManager.flushProjectChangesAsync.called.should.equal false
it "should not record doc has history ops", ->
@HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false
@ -99,7 +99,7 @@ describe "HistoryManager", ->
)
it "should flush project changes", ->
@HistoryManager._flushProjectChangesAsync
@HistoryManager.flushProjectChangesAsync
.calledWith(@project_id)
.should.equal true
@ -124,7 +124,7 @@ describe "HistoryManager", ->
)
it "should not flush project changes", ->
@HistoryManager._flushProjectChangesAsync.called.should.equal false
@HistoryManager.flushProjectChangesAsync.called.should.equal false
it "should record doc has history ops", ->
@HistoryRedisManager.recordDocHasHistoryOps