diff --git a/services/document-updater/app/coffee/LockManager.coffee b/services/document-updater/app/coffee/LockManager.coffee index 37cbe49914..d918dc4332 100644 --- a/services/document-updater/app/coffee/LockManager.coffee +++ b/services/document-updater/app/coffee/LockManager.coffee @@ -7,6 +7,8 @@ logger = require "logger-sharelatex" os = require "os" crypto = require "crypto" +Profiler = require "./Profiler" + HOST = os.hostname() PID = process.pid RND = crypto.randomBytes(4).toString('hex') @@ -30,28 +32,34 @@ module.exports = LockManager = tryLock : (doc_id, callback = (err, isFree)->)-> lockValue = LockManager.randomLock() key = keys.blockingKey(doc_id:doc_id) + profile = new Profiler("tryLock", {doc_id, key, lockValue}) rclient.set key, lockValue, "EX", @LOCK_TTL, "NX", (err, gotLock)-> return callback(err) if err? if gotLock == "OK" metrics.inc "doc-not-blocking" + profile.log("got lock").end() callback err, true, lockValue else metrics.inc "doc-blocking" - logger.log {doc_id}, "doc is locked" + profile.log("doc is locked").end() callback err, false getLock: (doc_id, callback = (error, lockValue) ->) -> startTime = Date.now() testInterval = LockManager.LOCK_TEST_INTERVAL + profile = new Profiler("getLock", {doc_id}) do attempt = () -> if Date.now() - startTime > LockManager.MAX_LOCK_WAIT_TIME e = new Error("Timeout") e.doc_id = doc_id + profile.log("timeout").end() return callback(e) LockManager.tryLock doc_id, (error, gotLock, lockValue) -> return callback(error) if error? + profile.log("tryLock") if gotLock + profile.end() callback(null, lockValue) else setTimeout attempt, testInterval @@ -72,10 +80,14 @@ module.exports = LockManager = releaseLock: (doc_id, lockValue, callback)-> key = keys.blockingKey(doc_id:doc_id) + profile = new Profiler("releaseLock", {doc_id, key, lockValue}) rclient.eval LockManager.unlockScript, 1, key, lockValue, (err, result) -> if err? return callback(err) - if result? and result isnt 1 # successful unlock should release exactly one key - logger.error {doc_id:doc_id, lockValue:lockValue, redis_err:err, redis_result:result}, "unlocking error" + else if result? and result isnt 1 # successful unlock should release exactly one key + profile.log("unlockScript:expired-lock").end() + logger.error {doc_id:doc_id, key:key, lockValue:lockValue, redis_err:err, redis_result:result}, "unlocking error" return callback(new Error("tried to release timed out lock")) - callback(err,result) + else + profile.log("unlockScript:ok").end() + callback(err,result) diff --git a/services/document-updater/app/coffee/Profiler.coffee b/services/document-updater/app/coffee/Profiler.coffee new file mode 100644 index 0000000000..dc88345334 --- /dev/null +++ b/services/document-updater/app/coffee/Profiler.coffee @@ -0,0 +1,33 @@ +Settings = require('settings-sharelatex') +logger = require('logger-sharelatex') + +deltaMs = (ta, tb) -> + nanoSeconds = (ta[0]-tb[0])*1e9 + (ta[1]-tb[1]) + milliSeconds = Math.floor(nanoSeconds*1e-6) + return milliSeconds + +module.exports = class Profiler + LOG_CUTOFF_TIME: 1000 + + constructor: (@name, @args) -> + @t0 = @t = process.hrtime() + @start = new Date() + @updateTimes = [] + + log: (label) -> + t1 = process.hrtime() + dtMilliSec = deltaMs(t1, @t) + @t = t1 + @updateTimes.push [label, dtMilliSec] # timings in ms + return @ # make it chainable + + end: (message) -> + totalTime = deltaMs(@t, @t0) + return if totalTime < @LOG_CUTOFF_TIME # skip anything less than cutoff + args = {} + for k,v of @args + args[k] = v + args.updateTimes = @updateTimes + args.start = @start + args.end = new Date() + logger.log args, @name diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index 269b16ee67..70caaf1e03 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -10,6 +10,7 @@ Metrics = require "./Metrics" Errors = require "./Errors" DocumentManager = require "./DocumentManager" RangesManager = require "./RangesManager" +Profiler = require "./Profiler" module.exports = UpdateManager = processOutstandingUpdates: (project_id, doc_id, callback = (error) ->) -> @@ -20,13 +21,17 @@ module.exports = UpdateManager = callback() processOutstandingUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> + profile = new Profiler("processOutstandingUpdatesWithLock", {project_id, doc_id}) LockManager.tryLock doc_id, (error, gotLock, lockValue) => return callback(error) if error? return callback() if !gotLock + profile.log("tryLock") UpdateManager.processOutstandingUpdates project_id, doc_id, (error) -> return UpdateManager._handleErrorInsideLock(doc_id, lockValue, error, callback) if error? + profile.log("processOutstandingUpdates") LockManager.releaseLock doc_id, lockValue, (error) => return callback(error) if error? + profile.log("releaseLock").end() UpdateManager.continueProcessingUpdatesWithLock project_id, doc_id, callback continueProcessingUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> @@ -38,13 +43,20 @@ module.exports = UpdateManager = callback() fetchAndApplyUpdates: (project_id, doc_id, callback = (error) ->) -> + profile = new Profiler("fetchAndApplyUpdates", {project_id, doc_id}) RealTimeRedisManager.getPendingUpdatesForDoc doc_id, (error, updates) => return callback(error) if error? if updates.length == 0 return callback() - async.eachSeries updates, - (update, cb) -> UpdateManager.applyUpdate project_id, doc_id, update, cb - callback + profile.log("getPendingUpdatesForDoc") + doUpdate = (update, cb)-> + UpdateManager.applyUpdate project_id, doc_id, update, (err) -> + profile.log("applyUpdate") + cb(err) + finalCallback = (err) -> + profile.log("async done").end() + callback(err) + async.eachSeries updates, doUpdate, finalCallback applyUpdate: (project_id, doc_id, update, _callback = (error) ->) -> callback = (error) -> @@ -66,14 +78,19 @@ module.exports = UpdateManager = HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, historyOpsLength, callback lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) -> + profile = new Profiler("lockUpdatesAndDo", {project_id, doc_id}) LockManager.getLock doc_id, (error, lockValue) -> + profile.log("getLock") return callback(error) if error? UpdateManager.processOutstandingUpdates project_id, doc_id, (error) -> return UpdateManager._handleErrorInsideLock(doc_id, lockValue, error, callback) if error? + profile.log("processOutStandingUpdates") method project_id, doc_id, args..., (error, response_args...) -> return UpdateManager._handleErrorInsideLock(doc_id, lockValue, error, callback) if error? + profile.log("method") LockManager.releaseLock doc_id, lockValue, (error) -> return callback(error) if error? + profile.log("releaseLock").end() callback null, response_args... # We held the lock for a while so updates might have queued up UpdateManager.continueProcessingUpdatesWithLock project_id, doc_id