Revert "Revert "Merge pull request #23 from sharelatex/bg-move-lock""

This reverts commit 85bc45099f047aa8dfa6d189f02f4b4327c9d602.
This commit is contained in:
Brian Gough 2017-03-27 14:23:34 +01:00
parent 5288814bd6
commit 28d2ec93b8
3 changed files with 79 additions and 54 deletions

View file

@ -68,53 +68,74 @@ module.exports = UpdatesManager =
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result?
callback()
REDIS_READ_BATCH_SIZE: 100
processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) ->
# Check whether the updates are temporary (per-project property)
_prepareProjectForUpdates: (project_id, callback = (error, temporary) ->) ->
UpdateTrimmer.shouldTrimUpdates project_id, (error, temporary) ->
return callback(error) if error?
MongoManager.backportProjectId project_id, doc_id, (error) ->
callback(null, temporary)
# Check for project id on document history (per-document property)
_prepareDocForUpdates: (project_id, doc_id, callback = (error) ->) ->
MongoManager.backportProjectId project_id, doc_id, (error) ->
return callback(error) if error?
callback(null)
# Apply updates for specific project/doc after preparing at project and doc level
REDIS_READ_BATCH_SIZE: 100
processUncompressedUpdates: (project_id, doc_id, temporary, callback = (error) ->) ->
# get the updates as strings from redis (so we can delete them after they are applied)
RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) ->
return callback(error) if error?
length = docUpdates.length
# parse the redis strings into ShareJs updates
RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) ->
return callback(error) if error?
# get the updates as strings from redis (so we can delete them after they are applied)
RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) ->
logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis"
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) ->
return callback(error) if error?
length = docUpdates.length
# parse the redis strings into ShareJs updates
RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) ->
logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates"
# delete the applied updates from redis
RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) ->
return callback(error) if error?
logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis"
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) ->
return callback(error) if error?
logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates"
# delete the applied updates from redis
RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) ->
return callback(error) if error?
if length == UpdatesManager.REDIS_READ_BATCH_SIZE
# There might be more updates
logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates"
setTimeout () ->
UpdatesManager.processUncompressedUpdates project_id, doc_id, callback
, 0
else
logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed"
callback()
if length == UpdatesManager.REDIS_READ_BATCH_SIZE
# There might be more updates
logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates"
setTimeout () ->
UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, callback
, 0
else
logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed"
callback()
# Process updates for a doc when we flush it individually
processUncompressedUpdatesWithLock: (project_id, doc_id, callback = (error) ->) ->
LockManager.runWithLock(
"HistoryLock:#{doc_id}",
(releaseLock) ->
UpdatesManager.processUncompressedUpdates project_id, doc_id, releaseLock
callback
)
UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) ->
return callback(error) if error?
UpdatesManager._processUncompressedUpdatesForDocWithLock project_id, doc_id, temporary, callback
# Process updates for a doc when the whole project is flushed (internal method)
_processUncompressedUpdatesForDocWithLock: (project_id, doc_id, temporary, callback = (error) ->) ->
UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) ->
return callback(error) if error?
LockManager.runWithLock(
"HistoryLock:#{doc_id}",
(releaseLock) ->
UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, releaseLock
callback
)
# Process all updates for a project, only check project-level information once
processUncompressedUpdatesForProject: (project_id, callback = (error) ->) ->
RedisManager.getDocIdsWithHistoryOps project_id, (error, doc_ids) ->
return callback(error) if error?
jobs = []
for doc_id in doc_ids
do (doc_id) ->
jobs.push (callback) ->
UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, callback
async.parallelLimit jobs, 5, callback
UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) ->
jobs = []
for doc_id in doc_ids
do (doc_id) ->
jobs.push (cb) ->
UpdatesManager._processUncompressedUpdatesForDocWithLock project_id, doc_id, temporary, cb
async.parallelLimit jobs, 5, callback
getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) ->
UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) ->

View file

@ -3,7 +3,7 @@ logger = require "logger-sharelatex"
Settings = require "settings-sharelatex"
# Don't let HTTP calls hang for a long time
MAX_HTTP_REQUEST_LENGTH = 15000 # 15 seconds
MAX_HTTP_REQUEST_LENGTH = 30000 # 30 seconds
# DEPRECATED! This method of getting user details via track-changes is deprecated
# in the way we lay out our services.

View file

@ -182,17 +182,7 @@ describe "UpdatesManager", ->
@updates = ["mock-update"]
@RedisManager.getOldestDocUpdates = sinon.stub().callsArgWith(2, null, @updates)
@RedisManager.expandDocUpdates = sinon.stub().callsArgWith(1, null, @updates)
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @callback
it "should check if the updates are temporary", ->
@UpdateTrimmer.shouldTrimUpdates
.calledWith(@project_id)
.should.equal true
it "should backport the project id", ->
@MongoManager.backportProjectId
.calledWith(@project_id, @doc_id)
.should.equal true
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @temporary, @callback
it "should get the oldest updates", ->
@RedisManager.getOldestDocUpdates
@ -225,7 +215,7 @@ describe "UpdatesManager", ->
@RedisManager.expandDocUpdates = (jsonUpdates, callback) =>
callback null, jsonUpdates
sinon.spy @RedisManager, "expandDocUpdates"
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, (args...) =>
@UpdatesManager.processUncompressedUpdates @project_id, @doc_id, @temporary, (args...) =>
@callback(args...)
done()
@ -251,10 +241,22 @@ describe "UpdatesManager", ->
describe "processCompressedUpdatesWithLock", ->
beforeEach ->
@UpdatesManager.processUncompressedUpdates = sinon.stub().callsArg(2)
@UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock")
@MongoManager.backportProjectId = sinon.stub().callsArg(2)
@UpdatesManager._processUncompressedUpdates = sinon.stub().callsArg(3)
@LockManager.runWithLock = sinon.stub().callsArg(2)
@UpdatesManager.processUncompressedUpdatesWithLock @project_id, @doc_id, @callback
it "should check if the updates are temporary", ->
@UpdateTrimmer.shouldTrimUpdates
.calledWith(@project_id)
.should.equal true
it "should backport the project id", ->
@MongoManager.backportProjectId
.calledWith(@project_id, @doc_id)
.should.equal true
it "should run processUncompressedUpdates with the lock", ->
@LockManager.runWithLock
.calledWith(
@ -313,7 +315,9 @@ describe "UpdatesManager", ->
describe "processUncompressedUpdatesForProject", ->
beforeEach (done) ->
@doc_ids = ["mock-id-1", "mock-id-2"]
@UpdatesManager.processUncompressedUpdatesWithLock = sinon.stub().callsArg(2)
@UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock")
@MongoManager.backportProjectId = sinon.stub().callsArg(2)
@UpdatesManager._processUncompressedUpdatesForDocWithLock = sinon.stub().callsArg(3)
@RedisManager.getDocIdsWithHistoryOps = sinon.stub().callsArgWith(1, null, @doc_ids)
@UpdatesManager.processUncompressedUpdatesForProject @project_id, () =>
@callback()
@ -326,8 +330,8 @@ describe "UpdatesManager", ->
it "should process the doc ops for the each doc_id", ->
for doc_id in @doc_ids
@UpdatesManager.processUncompressedUpdatesWithLock
.calledWith(@project_id, doc_id)
@UpdatesManager._processUncompressedUpdatesForDocWithLock
.calledWith(@project_id, doc_id, @temporary)
.should.equal true
it "should call the callback", ->
@ -798,4 +802,4 @@ describe "UpdatesManager", ->
user_ids: [@user_2.id]
start_ts: @now
end_ts: @now + 10
}]
}]