Merge pull request #18 from sharelatex/msw-flush-project-ops

Flush project structure changes every 500
This commit is contained in:
James Allen 2018-02-16 10:30:00 +00:00 committed by GitHub
commit 3d5740fd7d
8 changed files with 141 additions and 31 deletions

View file

@ -7,7 +7,7 @@ module.exports = HistoryManager =
flushChangesAsync: (project_id, doc_id) -> flushChangesAsync: (project_id, doc_id) ->
HistoryManager._flushDocChangesAsync project_id, doc_id HistoryManager._flushDocChangesAsync project_id, doc_id
if Settings.apis?.project_history?.enabled if Settings.apis?.project_history?.enabled
HistoryManager._flushProjectChangesAsync project_id HistoryManager.flushProjectChangesAsync project_id
_flushDocChangesAsync: (project_id, doc_id) -> _flushDocChangesAsync: (project_id, doc_id) ->
if !Settings.apis?.trackchanges? if !Settings.apis?.trackchanges?
@ -22,7 +22,7 @@ module.exports = HistoryManager =
else if res.statusCode < 200 and res.statusCode >= 300 else if res.statusCode < 200 and res.statusCode >= 300
logger.error { doc_id, project_id }, "track changes api returned a failure status code: #{res.statusCode}" logger.error { doc_id, project_id }, "track changes api returned a failure status code: #{res.statusCode}"
_flushProjectChangesAsync: (project_id) -> flushProjectChangesAsync: (project_id) ->
return if !Settings.apis?.project_history? return if !Settings.apis?.project_history?
url = "#{Settings.apis.project_history.url}/project/#{project_id}/flush" url = "#{Settings.apis.project_history.url}/project/#{project_id}/flush"
@ -41,28 +41,28 @@ module.exports = HistoryManager =
return callback() return callback()
if Settings.apis?.project_history?.enabled if Settings.apis?.project_history?.enabled
if HistoryManager._shouldFlushHistoryOps(project_ops_length, ops, HistoryManager.FLUSH_PROJECT_EVERY_N_OPS) if HistoryManager.shouldFlushHistoryOps(project_ops_length, ops.length, HistoryManager.FLUSH_PROJECT_EVERY_N_OPS)
# Do this in the background since it uses HTTP and so may be too # Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update. # slow to wait for when processing a doc update.
logger.log { project_ops_length, project_id }, "flushing project history api" logger.log { project_ops_length, project_id }, "flushing project history api"
HistoryManager._flushProjectChangesAsync project_id HistoryManager.flushProjectChangesAsync project_id
HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) -> HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) ->
return callback(error) if error? return callback(error) if error?
if HistoryManager._shouldFlushHistoryOps(doc_ops_length, ops, HistoryManager.FLUSH_DOC_EVERY_N_OPS) if HistoryManager.shouldFlushHistoryOps(doc_ops_length, ops.length, HistoryManager.FLUSH_DOC_EVERY_N_OPS)
# Do this in the background since it uses HTTP and so may be too # Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update. # slow to wait for when processing a doc update.
logger.log { doc_ops_length, doc_id, project_id }, "flushing track changes api" logger.log { doc_ops_length, doc_id, project_id }, "flushing track changes api"
HistoryManager._flushDocChangesAsync project_id, doc_id HistoryManager._flushDocChangesAsync project_id, doc_id
callback() callback()
_shouldFlushHistoryOps: (length, ops, threshold) -> shouldFlushHistoryOps: (length, ops_length, threshold) ->
return false if !length # don't flush unless we know the length return false if !length # don't flush unless we know the length
# We want to flush every 100 ops, i.e. 100, 200, 300, etc # We want to flush every 100 ops, i.e. 100, 200, 300, etc
# Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these # Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these
# ops. If we've changed, then we've gone over a multiple of 100 and should flush. # ops. If we've changed, then we've gone over a multiple of 100 and should flush.
# (Most of the time, we will only hit 100 and then flushing will put us back to 0) # (Most of the time, we will only hit 100 and then flushing will put us back to 0)
previousLength = length - ops.length previousLength = length - ops_length
prevBlock = Math.floor(previousLength / threshold) prevBlock = Math.floor(previousLength / threshold)
newBlock = Math.floor(length / threshold) newBlock = Math.floor(length / threshold)
return newBlock != prevBlock return newBlock != prevBlock

View file

@ -1,5 +1,6 @@
RedisManager = require "./RedisManager" RedisManager = require "./RedisManager"
DocumentManager = require "./DocumentManager" DocumentManager = require "./DocumentManager"
HistoryManager = require "./HistoryManager"
async = require "async" async = require "async"
logger = require "logger-sharelatex" logger = require "logger-sharelatex"
Metrics = require "./Metrics" Metrics = require "./Metrics"
@ -105,20 +106,34 @@ module.exports = ProjectManager =
timer.done() timer.done()
_callback(args...) _callback(args...)
project_ops_length = 0
handleDocUpdate = (update, cb) -> handleDocUpdate = (update, cb) ->
doc_id = update.id doc_id = update.id
if update.docLines? if update.docLines?
RedisManager.addEntity project_id, 'doc', doc_id, user_id, update, cb RedisManager.addEntity project_id, 'doc', doc_id, user_id, update, (error, count) =>
project_ops_length = count
cb(error)
else else
DocumentManager.renameDocWithLock project_id, doc_id, user_id, update, cb DocumentManager.renameDocWithLock project_id, doc_id, user_id, update, (error, count) =>
project_ops_length = count
cb(error)
handleFileUpdate = (update, cb) -> handleFileUpdate = (update, cb) ->
file_id = update.id file_id = update.id
if update.url? if update.url?
RedisManager.addEntity project_id, 'file', file_id, user_id, update, cb RedisManager.addEntity project_id, 'file', file_id, user_id, update, (error, count) =>
project_ops_length = count
cb(error)
else else
RedisManager.renameFile project_id, file_id, user_id, update, cb RedisManager.renameFile project_id, file_id, user_id, update, (error, count) =>
project_ops_length = count
cb(error)
async.each docUpdates, handleDocUpdate, (error) -> async.each docUpdates, handleDocUpdate, (error) ->
return callback(error) if error? return callback(error) if error?
async.each fileUpdates, handleFileUpdate, callback async.each fileUpdates, handleFileUpdate, (error) ->
return callback(error) if error?
if HistoryManager.shouldFlushHistoryOps(project_ops_length, docUpdates.length + fileUpdates.length, HistoryManager.FLUSH_PROJECT_EVERY_N_OPS)
HistoryManager.flushProjectChangesAsync project_id
callback()

View file

@ -5,6 +5,7 @@ Settings = require('settings-sharelatex')
rclient_history = require("redis-sharelatex").createClient(Settings.redis.history) rclient_history = require("redis-sharelatex").createClient(Settings.redis.history)
ProjectHistoryKeys = Settings.redis.project_history.key_schema ProjectHistoryKeys = Settings.redis.project_history.key_schema
MockProjectHistoryApi = require "./helpers/MockProjectHistoryApi"
MockWebApi = require "./helpers/MockWebApi" MockWebApi = require "./helpers/MockWebApi"
DocUpdaterClient = require "./helpers/DocUpdaterClient" DocUpdaterClient = require "./helpers/DocUpdaterClient"
DocUpdaterApp = require "./helpers/DocUpdaterApp" DocUpdaterApp = require "./helpers/DocUpdaterApp"
@ -150,3 +151,60 @@ describe "Applying updates to a project's structure", ->
done() done()
describe "with enough updates to flush to the history service", ->
before (done) ->
@project_id = DocUpdaterClient.randomId()
@user_id = DocUpdaterClient.randomId()
updates = []
for v in [0..599] # Should flush after 500 ops
updates.push
id: DocUpdaterClient.randomId(),
pathname: '/file-' + v
docLines: 'a\nb'
sinon.spy MockProjectHistoryApi, "flushProject"
# Send updates in chunks to causes multiple flushes
projectId = @project_id
userId = @project_id
DocUpdaterClient.sendProjectUpdate projectId, userId, updates.slice(0, 250), [], (error) ->
throw error if error?
DocUpdaterClient.sendProjectUpdate projectId, userId, updates.slice(250), [], (error) ->
throw error if error?
setTimeout done, 2000
after ->
MockProjectHistoryApi.flushProject.restore()
it "should flush project history", ->
MockProjectHistoryApi.flushProject.calledWith(@project_id).should.equal true
describe "with too few updates to flush to the history service", ->
before (done) ->
@project_id = DocUpdaterClient.randomId()
@user_id = DocUpdaterClient.randomId()
updates = []
for v in [0..42] # Should flush after 500 ops
updates.push
id: DocUpdaterClient.randomId(),
pathname: '/file-' + v
docLines: 'a\nb'
sinon.spy MockProjectHistoryApi, "flushProject"
# Send updates in chunks
projectId = @project_id
userId = @project_id
DocUpdaterClient.sendProjectUpdate projectId, userId, updates.slice(0, 10), [], (error) ->
throw error if error?
DocUpdaterClient.sendProjectUpdate projectId, userId, updates.slice(10), [], (error) ->
throw error if error?
setTimeout done, 2000
after ->
MockProjectHistoryApi.flushProject.restore()
it "should not flush project history", ->
MockProjectHistoryApi.flushProject.calledWith(@project_id).should.equal false

View file

@ -24,7 +24,7 @@ describe "HistoryManager", ->
describe "flushChangesAsync", -> describe "flushChangesAsync", ->
beforeEach -> beforeEach ->
@HistoryManager._flushDocChangesAsync = sinon.stub() @HistoryManager._flushDocChangesAsync = sinon.stub()
@HistoryManager._flushProjectChangesAsync = sinon.stub() @HistoryManager.flushProjectChangesAsync = sinon.stub()
@HistoryManager.flushChangesAsync(@project_id, @doc_id) @HistoryManager.flushChangesAsync(@project_id, @doc_id)
@ -34,7 +34,7 @@ describe "HistoryManager", ->
.should.equal true .should.equal true
it "flushes project changes", -> it "flushes project changes", ->
@HistoryManager._flushProjectChangesAsync @HistoryManager.flushProjectChangesAsync
.calledWith(@project_id) .calledWith(@project_id)
.should.equal true .should.equal true
@ -49,11 +49,11 @@ describe "HistoryManager", ->
.calledWith("#{@Settings.apis.trackchanges.url}/project/#{@project_id}/doc/#{@doc_id}/flush") .calledWith("#{@Settings.apis.trackchanges.url}/project/#{@project_id}/doc/#{@doc_id}/flush")
.should.equal true .should.equal true
describe "_flushProjectChangesAsync", -> describe "flushProjectChangesAsync", ->
beforeEach -> beforeEach ->
@request.post = sinon.stub().callsArgWith(1, null, statusCode: 204) @request.post = sinon.stub().callsArgWith(1, null, statusCode: 204)
@HistoryManager._flushProjectChangesAsync @project_id @HistoryManager.flushProjectChangesAsync @project_id
it "should send a request to the project history api", -> it "should send a request to the project history api", ->
@request.post @request.post
@ -66,7 +66,7 @@ describe "HistoryManager", ->
@project_ops_length = 10 @project_ops_length = 10
@doc_ops_length = 5 @doc_ops_length = 5
@HistoryManager._flushProjectChangesAsync = sinon.stub() @HistoryManager.flushProjectChangesAsync = sinon.stub()
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArg(3) @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArg(3)
@HistoryManager._flushDocChangesAsync = sinon.stub() @HistoryManager._flushDocChangesAsync = sinon.stub()
@ -77,7 +77,7 @@ describe "HistoryManager", ->
) )
it "should not flush project changes", -> it "should not flush project changes", ->
@HistoryManager._flushProjectChangesAsync.called.should.equal false @HistoryManager.flushProjectChangesAsync.called.should.equal false
it "should not record doc has history ops", -> it "should not record doc has history ops", ->
@HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false @HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false
@ -90,16 +90,16 @@ describe "HistoryManager", ->
describe "with enough ops to flush project changes", -> describe "with enough ops to flush project changes", ->
beforeEach -> beforeEach ->
@HistoryManager._shouldFlushHistoryOps = sinon.stub() @HistoryManager.shouldFlushHistoryOps = sinon.stub()
@HistoryManager._shouldFlushHistoryOps.withArgs(@project_ops_length).returns(true) @HistoryManager.shouldFlushHistoryOps.withArgs(@project_ops_length).returns(true)
@HistoryManager._shouldFlushHistoryOps.withArgs(@doc_ops_length).returns(false) @HistoryManager.shouldFlushHistoryOps.withArgs(@doc_ops_length).returns(false)
@HistoryManager.recordAndFlushHistoryOps( @HistoryManager.recordAndFlushHistoryOps(
@project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback @project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback
) )
it "should flush project changes", -> it "should flush project changes", ->
@HistoryManager._flushProjectChangesAsync @HistoryManager.flushProjectChangesAsync
.calledWith(@project_id) .calledWith(@project_id)
.should.equal true .should.equal true
@ -115,16 +115,16 @@ describe "HistoryManager", ->
describe "with enough ops to flush doc changes", -> describe "with enough ops to flush doc changes", ->
beforeEach -> beforeEach ->
@HistoryManager._shouldFlushHistoryOps = sinon.stub() @HistoryManager.shouldFlushHistoryOps = sinon.stub()
@HistoryManager._shouldFlushHistoryOps.withArgs(@project_ops_length).returns(false) @HistoryManager.shouldFlushHistoryOps.withArgs(@project_ops_length).returns(false)
@HistoryManager._shouldFlushHistoryOps.withArgs(@doc_ops_length).returns(true) @HistoryManager.shouldFlushHistoryOps.withArgs(@doc_ops_length).returns(true)
@HistoryManager.recordAndFlushHistoryOps( @HistoryManager.recordAndFlushHistoryOps(
@project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback @project_id, @doc_id, @ops, @doc_ops_length, @project_ops_length, @callback
) )
it "should not flush project changes", -> it "should not flush project changes", ->
@HistoryManager._flushProjectChangesAsync.called.should.equal false @HistoryManager.flushProjectChangesAsync.called.should.equal false
it "should record doc has history ops", -> it "should record doc has history ops", ->
@HistoryRedisManager.recordDocHasHistoryOps @HistoryRedisManager.recordDocHasHistoryOps
@ -154,24 +154,24 @@ describe "HistoryManager", ->
it "should call the callback with the error", -> it "should call the callback with the error", ->
@callback.calledWith(@error).should.equal true @callback.calledWith(@error).should.equal true
describe "_shouldFlushHistoryOps", -> describe "shouldFlushHistoryOps", ->
it "should return false if the number of ops is not known", -> it "should return false if the number of ops is not known", ->
@HistoryManager._shouldFlushHistoryOps(null, ['a', 'b', 'c'], 1).should.equal false @HistoryManager.shouldFlushHistoryOps(null, ['a', 'b', 'c'].length, 1).should.equal false
it "should return false if the updates didn't take us past the threshold", -> it "should return false if the updates didn't take us past the threshold", ->
# Currently there are 14 ops # Currently there are 14 ops
# Previously we were on 11 ops # Previously we were on 11 ops
# We didn't pass over a multiple of 5 # We didn't pass over a multiple of 5
@HistoryManager._shouldFlushHistoryOps(14, ['a', 'b', 'c'], 5).should.equal false @HistoryManager.shouldFlushHistoryOps(14, ['a', 'b', 'c'].length, 5).should.equal false
it "should return true if the updates took to the threshold", -> it "should return true if the updates took to the threshold", ->
# Currently there are 15 ops # Currently there are 15 ops
# Previously we were on 12 ops # Previously we were on 12 ops
# We've reached a new multiple of 5 # We've reached a new multiple of 5
@HistoryManager._shouldFlushHistoryOps(15, ['a', 'b', 'c'], 5).should.equal true @HistoryManager.shouldFlushHistoryOps(15, ['a', 'b', 'c'].length, 5).should.equal true
it "should return true if the updates took past the threshold", -> it "should return true if the updates took past the threshold", ->
# Currently there are 19 ops # Currently there are 19 ops
# Previously we were on 16 ops # Previously we were on 16 ops
# We didn't pass over a multiple of 5 # We didn't pass over a multiple of 5
@HistoryManager._shouldFlushHistoryOps(17, ['a', 'b', 'c'], 5).should.equal true @HistoryManager.shouldFlushHistoryOps(17, ['a', 'b', 'c'].length, 5).should.equal true

View file

@ -10,6 +10,7 @@ describe "ProjectManager - flushAndDeleteProject", ->
"./RedisManager": @RedisManager = {} "./RedisManager": @RedisManager = {}
"./DocumentManager": @DocumentManager = {} "./DocumentManager": @DocumentManager = {}
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"./HistoryManager": @HistoryManager = {}
"./Metrics": @Metrics = "./Metrics": @Metrics =
Timer: class Timer Timer: class Timer
done: sinon.stub() done: sinon.stub()

View file

@ -10,6 +10,7 @@ describe "ProjectManager - flushProject", ->
"./RedisManager": @RedisManager = {} "./RedisManager": @RedisManager = {}
"./DocumentManager": @DocumentManager = {} "./DocumentManager": @DocumentManager = {}
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"./HistoryManager": @HistoryManager = {}
"./Metrics": @Metrics = "./Metrics": @Metrics =
Timer: class Timer Timer: class Timer
done: sinon.stub() done: sinon.stub()

View file

@ -11,6 +11,7 @@ describe "ProjectManager - getProjectDocsAndFlushIfOld", ->
"./RedisManager": @RedisManager = {} "./RedisManager": @RedisManager = {}
"./DocumentManager": @DocumentManager = {} "./DocumentManager": @DocumentManager = {}
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"./HistoryManager": @HistoryManager = {}
"./Metrics": @Metrics = "./Metrics": @Metrics =
Timer: class Timer Timer: class Timer
done: sinon.stub() done: sinon.stub()

View file

@ -10,17 +10,21 @@ describe "ProjectManager", ->
"./RedisManager": @RedisManager = {} "./RedisManager": @RedisManager = {}
"./DocumentManager": @DocumentManager = {} "./DocumentManager": @DocumentManager = {}
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"./HistoryManager": @HistoryManager = {}
"./Metrics": @Metrics = "./Metrics": @Metrics =
Timer: class Timer Timer: class Timer
done: sinon.stub() done: sinon.stub()
@project_id = "project-id-123" @project_id = "project-id-123"
@user_id = "user-id-123" @user_id = "user-id-123"
@HistoryManager.shouldFlushHistoryOps = sinon.stub().returns(false)
@HistoryManager.flushProjectChangesAsync = sinon.stub()
@callback = sinon.stub() @callback = sinon.stub()
describe "updateProjectWithLocks", -> describe "updateProjectWithLocks", ->
describe "rename operations", -> describe "rename operations", ->
beforeEach -> beforeEach ->
@firstDocUpdate = @firstDocUpdate =
id: 1 id: 1
pathname: 'foo' pathname: 'foo'
@ -55,6 +59,11 @@ describe "ProjectManager", ->
.calledWith(@project_id, @firstFileUpdate.id, @user_id, @firstFileUpdate) .calledWith(@project_id, @firstFileUpdate.id, @user_id, @firstFileUpdate)
.should.equal true .should.equal true
it "should not flush the history", ->
@HistoryManager.flushProjectChangesAsync
.calledWith(@project_id)
.should.equal false
it "should call the callback", -> it "should call the callback", ->
@callback.called.should.equal true @callback.called.should.equal true
@ -76,6 +85,16 @@ describe "ProjectManager", ->
it "should call the callback with the error", -> it "should call the callback with the error", ->
@callback.calledWith(@error).should.equal true @callback.calledWith(@error).should.equal true
describe "with enough ops to flush", ->
beforeEach ->
@HistoryManager.shouldFlushHistoryOps = sinon.stub().returns(true)
@ProjectManager.updateProjectWithLocks @project_id, @user_id, @docUpdates, @fileUpdates, @callback
it "should flush the history", ->
@HistoryManager.flushProjectChangesAsync
.calledWith(@project_id)
.should.equal true
describe "add operations", -> describe "add operations", ->
beforeEach -> beforeEach ->
@firstDocUpdate = @firstDocUpdate =
@ -108,6 +127,11 @@ describe "ProjectManager", ->
.calledWith(@project_id, 'file', @firstFileUpdate.id, @user_id, @firstFileUpdate) .calledWith(@project_id, 'file', @firstFileUpdate.id, @user_id, @firstFileUpdate)
.should.equal true .should.equal true
it "should not flush the history", ->
@HistoryManager.flushProjectChangesAsync
.calledWith(@project_id)
.should.equal false
it "should call the callback", -> it "should call the callback", ->
@callback.called.should.equal true @callback.called.should.equal true
@ -129,3 +153,13 @@ describe "ProjectManager", ->
it "should call the callback with the error", -> it "should call the callback with the error", ->
@callback.calledWith(@error).should.equal true @callback.calledWith(@error).should.equal true
describe "with enough ops to flush", ->
beforeEach ->
@HistoryManager.shouldFlushHistoryOps = sinon.stub().returns(true)
@ProjectManager.updateProjectWithLocks @project_id, @user_id, @docUpdates, @fileUpdates, @callback
it "should flush the history", ->
@HistoryManager.flushProjectChangesAsync
.calledWith(@project_id)
.should.equal true