metrics = require('metrics-sharelatex') Settings = require('settings-sharelatex') RedisWrapper = require("./RedisWrapper") rclient = RedisWrapper.client("lock") logger = require "logger-sharelatex" os = require "os" crypto = require "crypto" async = require "async" HOST = os.hostname() PID = process.pid RND = crypto.randomBytes(4).toString('hex') COUNT = 0 LOCK_QUEUES = new Map() # queue lock requests for each name/id so they get the lock on a first-come first-served basis module.exports = LockManager = LOCK_TEST_INTERVAL: 50 # 50ms between each test of the lock MAX_TEST_INTERVAL: 1000 # back off to 1s between each test of the lock MAX_LOCK_WAIT_TIME: 10000 # 10s maximum time to spend trying to get the lock REDIS_LOCK_EXPIRY: 30 # seconds. Time until lock auto expires in redis SLOW_EXECUTION_THRESHOLD: 5000 # 5s, if execution takes longer than this then log # Use a signed lock value as described in # http://redis.io/topics/distlock#correct-implementation-with-a-single-instance # to prevent accidental unlocking by multiple processes randomLock : () -> time = Date.now() return "locked:host=#{HOST}:pid=#{PID}:random=#{RND}:time=#{time}:count=#{COUNT++}" unlockScript: 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end' runWithLock: (namespace, id, runner = ( (releaseLock = (error) ->) -> ), callback = ( (error) -> )) -> # This error is defined here so we get a useful stacktrace slowExecutionError = new Error "slow execution during lock" timer = new metrics.Timer("lock.#{namespace}") key = "lock:web:#{namespace}:#{id}" LockManager._getLock key, namespace, (error, lockValue) -> return callback(error) if error? # The lock can expire in redis but the process carry on. This setTimout call # is designed to log if this happens. countIfExceededLockTimeout = () -> metrics.inc "lock.#{namespace}.exceeded_lock_timeout" logger.log "exceeded lock timeout", { namespace, id, slowExecutionError } exceededLockTimeout = setTimeout countIfExceededLockTimeout, LockManager.REDIS_LOCK_EXPIRY * 1000 runner (error1, values...) -> LockManager._releaseLock key, lockValue, (error2) -> clearTimeout exceededLockTimeout timeTaken = new Date - timer.start if timeTaken > LockManager.SLOW_EXECUTION_THRESHOLD logger.log "slow execution during lock", { namespace, id, timeTaken, slowExecutionError } timer.done() error = error1 or error2 return callback(error) if error? callback null, values... _tryLock : (key, namespace, callback = (err, isFree, lockValue)->)-> lockValue = LockManager.randomLock() rclient.set key, lockValue, "EX", LockManager.REDIS_LOCK_EXPIRY, "NX", (err, gotLock)-> return callback(err) if err? if gotLock == "OK" metrics.inc "lock.#{namespace}.try.success" callback err, true, lockValue else metrics.inc "lock.#{namespace}.try.failed" logger.log key: key, redis_response: gotLock, "lock is locked" callback err, false # it's sufficient to serialize within a process because that is where the parallel operations occur _getLock: (key, namespace, callback = (error, lockValue) ->) -> # this is what we need to do for each lock we want to request task = (next) -> LockManager._getLockByPolling key, namespace, (error, lockValue) -> # tell the queue to start trying to get the next lock (if any) next() # we have got a lock result, so we can continue with our own execution callback(error, lockValue) # create a queue for this key if needed queueName = "#{key}:#{namespace}" queue = LOCK_QUEUES.get queueName if !queue? handler = (fn, cb) -> fn(cb) # execute any function as our task # set up a new queue for this key queue = async.queue handler, 1 queue.push task # remove the queue object when queue is empty queue.drain = () -> LOCK_QUEUES.delete queueName # store the queue in our global map LOCK_QUEUES.set queueName, queue else # queue the request to get the lock queue.push task _getLockByPolling: (key, namespace, callback = (error, lockValue) ->) -> startTime = Date.now() testInterval = LockManager.LOCK_TEST_INTERVAL attempts = 0 do attempt = () -> if Date.now() - startTime > LockManager.MAX_LOCK_WAIT_TIME metrics.inc "lock.#{namespace}.get.failed" return callback(new Error("Timeout")) attempts += 1 LockManager._tryLock key, namespace, (error, gotLock, lockValue) -> return callback(error) if error? if gotLock metrics.gauge "lock.#{namespace}.get.success.tries", attempts callback(null, lockValue) else setTimeout attempt, testInterval _releaseLock: (key, lockValue, callback)-> rclient.eval LockManager.unlockScript, 1, key, lockValue, (err, result) -> if err? return callback(err) else if result? and result isnt 1 # successful unlock should release exactly one key logger.error {key:key, lockValue:lockValue, redis_err:err, redis_result:result}, "unlocking error" metrics.inc "unlock-error" return callback(new Error("tried to release timed out lock")) else callback(null,result)