mirror of
https://github.com/overleaf/overleaf.git
synced 2025-01-24 02:31:19 +00:00
move lock inside web http calls
This commit is contained in:
parent
ac5d59211d
commit
6795820933
2 changed files with 70 additions and 38 deletions
|
@ -70,51 +70,79 @@ module.exports = UpdatesManager =
|
|||
|
||||
REDIS_READ_BATCH_SIZE: 100
|
||||
processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) ->
|
||||
UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) ->
|
||||
return callback(error) if error?
|
||||
UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) ->
|
||||
return callback(error) if error?
|
||||
UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback
|
||||
|
||||
# Check whether the updates are temporary (per-project property)
|
||||
_prepareProjectForUpdates: (project_id, callback = (error, temporary) ->) ->
|
||||
UpdateTrimmer.shouldTrimUpdates project_id, (error, temporary) ->
|
||||
return callback(error) if error?
|
||||
MongoManager.backportProjectId project_id, doc_id, (error) ->
|
||||
callback(null, temporary)
|
||||
|
||||
# Check for project id on document history (per-document property)
|
||||
_prepareDocForUpdates: (project_id, doc_id, callback = (error) ->) ->
|
||||
MongoManager.backportProjectId project_id, doc_id, (error) ->
|
||||
return callback(error) if error?
|
||||
callback(null)
|
||||
|
||||
# Apply updates for specific project/doc after preparing at project and doc level
|
||||
_processUncompressedUpdates: (project_id, doc_id, temporary, callback = (error) ->) ->
|
||||
# get the updates as strings from redis (so we can delete them after they are applied)
|
||||
RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) ->
|
||||
return callback(error) if error?
|
||||
length = docUpdates.length
|
||||
# parse the redis strings into ShareJs updates
|
||||
RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) ->
|
||||
return callback(error) if error?
|
||||
# get the updates as strings from redis (so we can delete them after they are applied)
|
||||
RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) ->
|
||||
logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis"
|
||||
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) ->
|
||||
return callback(error) if error?
|
||||
length = docUpdates.length
|
||||
# parse the redis strings into ShareJs updates
|
||||
RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) ->
|
||||
logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates"
|
||||
# delete the applied updates from redis
|
||||
RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) ->
|
||||
return callback(error) if error?
|
||||
logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis"
|
||||
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) ->
|
||||
return callback(error) if error?
|
||||
logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates"
|
||||
# delete the applied updates from redis
|
||||
RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) ->
|
||||
return callback(error) if error?
|
||||
if length == UpdatesManager.REDIS_READ_BATCH_SIZE
|
||||
# There might be more updates
|
||||
logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates"
|
||||
setTimeout () ->
|
||||
UpdatesManager.processUncompressedUpdates project_id, doc_id, callback
|
||||
, 0
|
||||
else
|
||||
logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed"
|
||||
callback()
|
||||
if length == UpdatesManager.REDIS_READ_BATCH_SIZE
|
||||
# There might be more updates
|
||||
logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates"
|
||||
setTimeout () ->
|
||||
UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, callback
|
||||
, 0
|
||||
else
|
||||
logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed"
|
||||
callback()
|
||||
|
||||
# Process updates for a doc when we flush it individually
|
||||
processUncompressedUpdatesWithLock: (project_id, doc_id, callback = (error) ->) ->
|
||||
LockManager.runWithLock(
|
||||
"HistoryLock:#{doc_id}",
|
||||
(releaseLock) ->
|
||||
UpdatesManager.processUncompressedUpdates project_id, doc_id, releaseLock
|
||||
callback
|
||||
)
|
||||
UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) ->
|
||||
return callback(error) if error?
|
||||
UpdatesManager._processUncompressedUpdatesForDoc project_id, doc_id, temporary, callback
|
||||
|
||||
|
||||
# Process updates for a doc when the whole project is flushed (internal method)
|
||||
_processUncompressedUpdatesForDoc: (project_id, doc_id, temporary, callback = (error) ->) ->
|
||||
UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) ->
|
||||
return callback(error) if error?
|
||||
LockManager.runWithLock(
|
||||
"HistoryLock:#{doc_id}",
|
||||
(releaseLock) ->
|
||||
UpdatesManager._processUncompressedUpdates project_id, doc_id, temporary, releaseLock
|
||||
callback
|
||||
)
|
||||
|
||||
# Process all updates for a project, only check project-level information once
|
||||
processUncompressedUpdatesForProject: (project_id, callback = (error) ->) ->
|
||||
RedisManager.getDocIdsWithHistoryOps project_id, (error, doc_ids) ->
|
||||
return callback(error) if error?
|
||||
jobs = []
|
||||
for doc_id in doc_ids
|
||||
do (doc_id) ->
|
||||
jobs.push (callback) ->
|
||||
UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, callback
|
||||
async.parallelLimit jobs, 5, callback
|
||||
UpdatesManager._prepareProjectForUpdates project_id, (error, temporary) ->
|
||||
jobs = []
|
||||
for doc_id in doc_ids
|
||||
do (doc_id) ->
|
||||
jobs.push (cb) ->
|
||||
UpdatesManager._processUncompressedUpdatesForDoc project_id, doc_id, temporary, cb
|
||||
async.parallelLimit jobs, 5, callback
|
||||
|
||||
getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) ->
|
||||
UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) ->
|
||||
|
|
|
@ -251,7 +251,9 @@ describe "UpdatesManager", ->
|
|||
|
||||
describe "processCompressedUpdatesWithLock", ->
|
||||
beforeEach ->
|
||||
@UpdatesManager.processUncompressedUpdates = sinon.stub().callsArg(2)
|
||||
@UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock")
|
||||
@MongoManager.backportProjectId = sinon.stub().callsArg(2)
|
||||
@UpdatesManager._processUncompressedUpdates = sinon.stub().callsArg(3)
|
||||
@LockManager.runWithLock = sinon.stub().callsArg(2)
|
||||
@UpdatesManager.processUncompressedUpdatesWithLock @project_id, @doc_id, @callback
|
||||
|
||||
|
@ -313,7 +315,9 @@ describe "UpdatesManager", ->
|
|||
describe "processUncompressedUpdatesForProject", ->
|
||||
beforeEach (done) ->
|
||||
@doc_ids = ["mock-id-1", "mock-id-2"]
|
||||
@UpdatesManager.processUncompressedUpdatesWithLock = sinon.stub().callsArg(2)
|
||||
@UpdateTrimmer.shouldTrimUpdates = sinon.stub().callsArgWith(1, null, @temporary = "temp mock")
|
||||
@MongoManager.backportProjectId = sinon.stub().callsArg(2)
|
||||
@UpdatesManager._processUncompressedUpdatesForDoc = sinon.stub().callsArg(3)
|
||||
@RedisManager.getDocIdsWithHistoryOps = sinon.stub().callsArgWith(1, null, @doc_ids)
|
||||
@UpdatesManager.processUncompressedUpdatesForProject @project_id, () =>
|
||||
@callback()
|
||||
|
@ -326,8 +330,8 @@ describe "UpdatesManager", ->
|
|||
|
||||
it "should process the doc ops for the each doc_id", ->
|
||||
for doc_id in @doc_ids
|
||||
@UpdatesManager.processUncompressedUpdatesWithLock
|
||||
.calledWith(@project_id, doc_id)
|
||||
@UpdatesManager._processUncompressedUpdatesForDoc
|
||||
.calledWith(@project_id, doc_id, @temporary)
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback", ->
|
||||
|
|
Loading…
Reference in a new issue