mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-07 20:31:06 -05:00
Merge pull request #8401 from overleaf/bg-use-redis-locker-in-web
extract web LockManager implementation into redis-wrapper GitOrigin-RevId: 60144d1b1136bab90475cb4b4a6597e1b3f6af22
This commit is contained in:
parent
b19c56ccaf
commit
b6ab0792a9
4 changed files with 226 additions and 194 deletions
192
libraries/redis-wrapper/RedisWebLocker.js
Normal file
192
libraries/redis-wrapper/RedisWebLocker.js
Normal file
|
@ -0,0 +1,192 @@
|
|||
const metrics = require('@overleaf/metrics')
|
||||
const logger = require('@overleaf/logger')
|
||||
const os = require('os')
|
||||
const crypto = require('crypto')
|
||||
const async = require('async')
|
||||
|
||||
const HOST = os.hostname()
|
||||
const PID = process.pid
|
||||
const RND = crypto.randomBytes(4).toString('hex')
|
||||
let COUNT = 0
|
||||
|
||||
const LOCK_QUEUES = new Map() // queue lock requests for each name/id so they get the lock on a first-come first-served basis
|
||||
|
||||
const UNLOCK_SCRIPT =
|
||||
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end'
|
||||
|
||||
module.exports = class RedisWebLocker {
|
||||
constructor({ rclient, getKey, options }) {
|
||||
this.rclient = rclient
|
||||
this.getKey = getKey
|
||||
|
||||
// ms between each test of the lock
|
||||
this.LOCK_TEST_INTERVAL = options.lockTestInterval || 50
|
||||
// back off to ms between each test of the lock
|
||||
this.MAX_TEST_INTERVAL = options.maxTestInterval || 1000
|
||||
// ms maximum time to spend trying to get the lock
|
||||
this.MAX_LOCK_WAIT_TIME = options.maxLockWaitTime || 10000
|
||||
// seconds. Time until lock auto expires in redis
|
||||
this.REDIS_LOCK_EXPIRY = options.redisLockExpiry || 30
|
||||
// ms, if execution takes longer than this then log
|
||||
this.SLOW_EXECUTION_THRESHOLD = options.slowExecutionThreshold || 5000
|
||||
// read-only copy for unit tests
|
||||
this.unlockScript = UNLOCK_SCRIPT
|
||||
}
|
||||
|
||||
// Use a signed lock value as described in
|
||||
// http://redis.io/topics/distlock#correct-implementation-with-a-single-instance
|
||||
// to prevent accidental unlocking by multiple processes
|
||||
randomLock() {
|
||||
const time = Date.now()
|
||||
return `locked:host=${HOST}:pid=${PID}:random=${RND}:time=${time}:count=${COUNT++}`
|
||||
}
|
||||
|
||||
runWithLock(namespace, id, runner, callback) {
|
||||
// runner must be a function accepting a callback, e.g. runner = (cb) ->
|
||||
|
||||
// This error is defined here so we get a useful stacktrace
|
||||
const slowExecutionError = new Error('slow execution during lock')
|
||||
|
||||
const timer = new metrics.Timer(`lock.${namespace}`)
|
||||
const key = this.getKey(namespace, id)
|
||||
this._getLock(key, namespace, (error, lockValue) => {
|
||||
if (error != null) {
|
||||
return callback(error)
|
||||
}
|
||||
|
||||
// The lock can expire in redis but the process carry on. This setTimeout call
|
||||
// is designed to log if this happens.
|
||||
function countIfExceededLockTimeout() {
|
||||
metrics.inc(`lock.${namespace}.exceeded_lock_timeout`)
|
||||
logger.debug('exceeded lock timeout', {
|
||||
namespace,
|
||||
id,
|
||||
slowExecutionError,
|
||||
})
|
||||
}
|
||||
const exceededLockTimeout = setTimeout(
|
||||
countIfExceededLockTimeout,
|
||||
this.REDIS_LOCK_EXPIRY * 1000
|
||||
)
|
||||
|
||||
runner((error1, ...values) =>
|
||||
this._releaseLock(key, lockValue, error2 => {
|
||||
clearTimeout(exceededLockTimeout)
|
||||
|
||||
const timeTaken = new Date() - timer.start
|
||||
if (timeTaken > this.SLOW_EXECUTION_THRESHOLD) {
|
||||
logger.debug('slow execution during lock', {
|
||||
namespace,
|
||||
id,
|
||||
timeTaken,
|
||||
slowExecutionError,
|
||||
})
|
||||
}
|
||||
|
||||
timer.done()
|
||||
error = error1 || error2
|
||||
if (error != null) {
|
||||
return callback(error)
|
||||
}
|
||||
callback(null, ...values)
|
||||
})
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
_tryLock(key, namespace, callback) {
|
||||
const lockValue = this.randomLock()
|
||||
this.rclient.set(
|
||||
key,
|
||||
lockValue,
|
||||
'EX',
|
||||
this.REDIS_LOCK_EXPIRY,
|
||||
'NX',
|
||||
(err, gotLock) => {
|
||||
if (err != null) {
|
||||
return callback(err)
|
||||
}
|
||||
if (gotLock === 'OK') {
|
||||
metrics.inc(`lock.${namespace}.try.success`)
|
||||
callback(err, true, lockValue)
|
||||
} else {
|
||||
metrics.inc(`lock.${namespace}.try.failed`)
|
||||
logger.debug({ key, redis_response: gotLock }, 'lock is locked')
|
||||
callback(err, false)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
// it's sufficient to serialize within a process because that is where the parallel operations occur
|
||||
_getLock(key, namespace, callback) {
|
||||
// this is what we need to do for each lock we want to request
|
||||
const task = next =>
|
||||
this._getLockByPolling(key, namespace, (error, lockValue) => {
|
||||
// tell the queue to start trying to get the next lock (if any)
|
||||
next()
|
||||
// we have got a lock result, so we can continue with our own execution
|
||||
callback(error, lockValue)
|
||||
})
|
||||
// create a queue for this key if needed
|
||||
const queueName = `${key}:${namespace}`
|
||||
let queue = LOCK_QUEUES.get(queueName)
|
||||
if (queue == null) {
|
||||
const handler = (fn, cb) => fn(cb)
|
||||
// set up a new queue for this key
|
||||
queue = async.queue(handler, 1)
|
||||
queue.push(task)
|
||||
// remove the queue object when queue is empty
|
||||
queue.drain = () => LOCK_QUEUES.delete(queueName)
|
||||
// store the queue in our global map
|
||||
LOCK_QUEUES.set(queueName, queue)
|
||||
} else {
|
||||
// queue the request to get the lock
|
||||
queue.push(task)
|
||||
}
|
||||
}
|
||||
|
||||
_getLockByPolling(key, namespace, callback) {
|
||||
const startTime = Date.now()
|
||||
const testInterval = this.LOCK_TEST_INTERVAL
|
||||
let attempts = 0
|
||||
const attempt = () => {
|
||||
if (Date.now() - startTime > this.MAX_LOCK_WAIT_TIME) {
|
||||
metrics.inc(`lock.${namespace}.get.failed`)
|
||||
return callback(new Error('Timeout'))
|
||||
}
|
||||
|
||||
attempts += 1
|
||||
this._tryLock(key, namespace, (error, gotLock, lockValue) => {
|
||||
if (error != null) {
|
||||
return callback(error)
|
||||
}
|
||||
if (gotLock) {
|
||||
metrics.gauge(`lock.${namespace}.get.success.tries`, attempts)
|
||||
callback(null, lockValue)
|
||||
} else {
|
||||
setTimeout(attempt, testInterval)
|
||||
}
|
||||
})
|
||||
}
|
||||
attempt()
|
||||
}
|
||||
|
||||
_releaseLock(key, lockValue, callback) {
|
||||
this.rclient.eval(this.unlockScript, 1, key, lockValue, (err, result) => {
|
||||
if (err != null) {
|
||||
callback(err)
|
||||
} else if (result != null && result !== 1) {
|
||||
// successful unlock should release exactly one key
|
||||
logger.warn(
|
||||
{ key, lockValue, redis_err: err, redis_result: result },
|
||||
'unlocking error'
|
||||
)
|
||||
metrics.inc('unlock-error')
|
||||
callback(new Error('tried to release timed out lock'))
|
||||
} else {
|
||||
callback(null, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -6,7 +6,8 @@
|
|||
"files": [
|
||||
"index.js",
|
||||
"Errors.js",
|
||||
"RedisLocker.js"
|
||||
"RedisLocker.js",
|
||||
"RedisWebLocker.js"
|
||||
],
|
||||
"author": "Overleaf (https://www.overleaf.com)",
|
||||
"repository": "github:overleaf/redis-wrapper",
|
||||
|
@ -25,7 +26,8 @@
|
|||
"@overleaf/o-error": "^3.4.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"ioredis": "~4.27.1"
|
||||
"ioredis": "~4.27.1",
|
||||
"async": "0.6.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@overleaf/o-error": "^3.4.0",
|
||||
|
|
14
package-lock.json
generated
14
package-lock.json
generated
|
@ -946,6 +946,7 @@
|
|||
"version": "2.1.0",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"async": "0.6.2",
|
||||
"ioredis": "~4.27.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
@ -962,6 +963,11 @@
|
|||
"@overleaf/o-error": "^3.4.0"
|
||||
}
|
||||
},
|
||||
"libraries/redis-wrapper/node_modules/async": {
|
||||
"version": "0.6.2",
|
||||
"resolved": "https://registry.npmjs.org/async/-/async-0.6.2.tgz",
|
||||
"integrity": "sha512-fWbn+CMBgn1KOL/UvYdsmH+gMN/fW+lzAoadt4VUFvB/t0pB4aY9RfRCCvhoA58jocHyYm5TGbeuZsPc9i1Cpg=="
|
||||
},
|
||||
"libraries/settings": {
|
||||
"name": "@overleaf/settings",
|
||||
"version": "3.0.0"
|
||||
|
@ -42026,11 +42032,19 @@
|
|||
"requires": {
|
||||
"@overleaf/logger": "*",
|
||||
"@overleaf/o-error": "^3.4.0",
|
||||
"async": "0.6.2",
|
||||
"chai": "^4.3.6",
|
||||
"ioredis": "~4.27.1",
|
||||
"mocha": "^8.4.0",
|
||||
"sandboxed-module": "^2.0.4",
|
||||
"sinon": "^9.2.4"
|
||||
},
|
||||
"dependencies": {
|
||||
"async": {
|
||||
"version": "0.6.2",
|
||||
"resolved": "https://registry.npmjs.org/async/-/async-0.6.2.tgz",
|
||||
"integrity": "sha512-fWbn+CMBgn1KOL/UvYdsmH+gMN/fW+lzAoadt4VUFvB/t0pB4aY9RfRCCvhoA58jocHyYm5TGbeuZsPc9i1Cpg=="
|
||||
}
|
||||
}
|
||||
},
|
||||
"@overleaf/references": {
|
||||
|
|
|
@ -1,204 +1,28 @@
|
|||
const { callbackify, promisify } = require('util')
|
||||
const metrics = require('@overleaf/metrics')
|
||||
const settings = require('@overleaf/settings')
|
||||
const RedisWrapper = require('./RedisWrapper')
|
||||
const rclient = RedisWrapper.client('lock')
|
||||
const logger = require('@overleaf/logger')
|
||||
const os = require('os')
|
||||
const crypto = require('crypto')
|
||||
const async = require('async')
|
||||
const settings = require('@overleaf/settings')
|
||||
const { callbackify, promisify } = require('util')
|
||||
|
||||
const HOST = os.hostname()
|
||||
const PID = process.pid
|
||||
const RND = crypto.randomBytes(4).toString('hex')
|
||||
let COUNT = 0
|
||||
const RedisWebLocker = require('@overleaf/redis-wrapper/RedisWebLocker')
|
||||
|
||||
const LOCK_QUEUES = new Map() // queue lock requests for each name/id so they get the lock on a first-come first-served basis
|
||||
const LockManager = new RedisWebLocker({
|
||||
rclient,
|
||||
getKey(namespace, id) {
|
||||
return `lock:web:${namespace}:${id}`
|
||||
},
|
||||
options: settings.lockManager,
|
||||
})
|
||||
|
||||
logger.debug(
|
||||
{ lockManagerSettings: settings.lockManager },
|
||||
'LockManager initialising'
|
||||
// need to bind the promisified function when it is part of a class
|
||||
// see https://nodejs.org/dist/latest-v16.x/docs/api/util.html#utilpromisifyoriginal
|
||||
const promisifiedRunWithLock = promisify(LockManager.runWithLock).bind(
|
||||
LockManager
|
||||
)
|
||||
|
||||
const LockManager = {
|
||||
// ms between each test of the lock
|
||||
LOCK_TEST_INTERVAL: settings.lockManager.lockTestInterval || 50,
|
||||
// back off to ms between each test of the lock
|
||||
MAX_TEST_INTERVAL: settings.lockManager.maxTestInterval || 1000,
|
||||
// ms maximum time to spend trying to get the lock
|
||||
MAX_LOCK_WAIT_TIME: settings.lockManager.maxLockWaitTime || 10000,
|
||||
// seconds. Time until lock auto expires in redis
|
||||
REDIS_LOCK_EXPIRY: settings.lockManager.redisLockExpiry || 30,
|
||||
// ms, if execution takes longer than this then log
|
||||
SLOW_EXECUTION_THRESHOLD: settings.lockManager.slowExecutionThreshold || 5000,
|
||||
|
||||
// Use a signed lock value as described in
|
||||
// http://redis.io/topics/distlock#correct-implementation-with-a-single-instance
|
||||
// to prevent accidental unlocking by multiple processes
|
||||
randomLock() {
|
||||
const time = Date.now()
|
||||
return `locked:host=${HOST}:pid=${PID}:random=${RND}:time=${time}:count=${COUNT++}`
|
||||
},
|
||||
|
||||
unlockScript:
|
||||
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end',
|
||||
|
||||
runWithLock(namespace, id, runner, callback) {
|
||||
// runner must be a function accepting a callback, e.g. runner = (cb) ->
|
||||
|
||||
// This error is defined here so we get a useful stacktrace
|
||||
const slowExecutionError = new Error('slow execution during lock')
|
||||
|
||||
const timer = new metrics.Timer(`lock.${namespace}`)
|
||||
const key = `lock:web:${namespace}:${id}`
|
||||
LockManager._getLock(key, namespace, (error, lockValue) => {
|
||||
if (error != null) {
|
||||
return callback(error)
|
||||
}
|
||||
|
||||
// The lock can expire in redis but the process carry on. This setTimeout call
|
||||
// is designed to log if this happens.
|
||||
function countIfExceededLockTimeout() {
|
||||
metrics.inc(`lock.${namespace}.exceeded_lock_timeout`)
|
||||
logger.debug('exceeded lock timeout', {
|
||||
namespace,
|
||||
id,
|
||||
slowExecutionError,
|
||||
})
|
||||
}
|
||||
const exceededLockTimeout = setTimeout(
|
||||
countIfExceededLockTimeout,
|
||||
LockManager.REDIS_LOCK_EXPIRY * 1000
|
||||
)
|
||||
|
||||
runner((error1, ...values) =>
|
||||
LockManager._releaseLock(key, lockValue, error2 => {
|
||||
clearTimeout(exceededLockTimeout)
|
||||
|
||||
const timeTaken = new Date() - timer.start
|
||||
if (timeTaken > LockManager.SLOW_EXECUTION_THRESHOLD) {
|
||||
logger.debug('slow execution during lock', {
|
||||
namespace,
|
||||
id,
|
||||
timeTaken,
|
||||
slowExecutionError,
|
||||
})
|
||||
}
|
||||
|
||||
timer.done()
|
||||
error = error1 || error2
|
||||
if (error != null) {
|
||||
return callback(error)
|
||||
}
|
||||
callback(null, ...values)
|
||||
})
|
||||
)
|
||||
})
|
||||
},
|
||||
|
||||
_tryLock(key, namespace, callback) {
|
||||
const lockValue = LockManager.randomLock()
|
||||
rclient.set(
|
||||
key,
|
||||
lockValue,
|
||||
'EX',
|
||||
LockManager.REDIS_LOCK_EXPIRY,
|
||||
'NX',
|
||||
(err, gotLock) => {
|
||||
if (err != null) {
|
||||
return callback(err)
|
||||
}
|
||||
if (gotLock === 'OK') {
|
||||
metrics.inc(`lock.${namespace}.try.success`)
|
||||
callback(err, true, lockValue)
|
||||
} else {
|
||||
metrics.inc(`lock.${namespace}.try.failed`)
|
||||
logger.debug({ key, redis_response: gotLock }, 'lock is locked')
|
||||
callback(err, false)
|
||||
}
|
||||
}
|
||||
)
|
||||
},
|
||||
|
||||
// it's sufficient to serialize within a process because that is where the parallel operations occur
|
||||
_getLock(key, namespace, callback) {
|
||||
// this is what we need to do for each lock we want to request
|
||||
const task = next =>
|
||||
LockManager._getLockByPolling(key, namespace, (error, lockValue) => {
|
||||
// tell the queue to start trying to get the next lock (if any)
|
||||
next()
|
||||
// we have got a lock result, so we can continue with our own execution
|
||||
callback(error, lockValue)
|
||||
})
|
||||
// create a queue for this key if needed
|
||||
const queueName = `${key}:${namespace}`
|
||||
let queue = LOCK_QUEUES.get(queueName)
|
||||
if (queue == null) {
|
||||
const handler = (fn, cb) => fn(cb)
|
||||
// set up a new queue for this key
|
||||
queue = async.queue(handler, 1)
|
||||
queue.push(task)
|
||||
// remove the queue object when queue is empty
|
||||
queue.drain = () => LOCK_QUEUES.delete(queueName)
|
||||
// store the queue in our global map
|
||||
LOCK_QUEUES.set(queueName, queue)
|
||||
} else {
|
||||
// queue the request to get the lock
|
||||
queue.push(task)
|
||||
}
|
||||
},
|
||||
|
||||
_getLockByPolling(key, namespace, callback) {
|
||||
const startTime = Date.now()
|
||||
const testInterval = LockManager.LOCK_TEST_INTERVAL
|
||||
let attempts = 0
|
||||
function attempt() {
|
||||
if (Date.now() - startTime > LockManager.MAX_LOCK_WAIT_TIME) {
|
||||
metrics.inc(`lock.${namespace}.get.failed`)
|
||||
return callback(new Error('Timeout'))
|
||||
}
|
||||
|
||||
attempts += 1
|
||||
LockManager._tryLock(key, namespace, (error, gotLock, lockValue) => {
|
||||
if (error != null) {
|
||||
return callback(error)
|
||||
}
|
||||
if (gotLock) {
|
||||
metrics.gauge(`lock.${namespace}.get.success.tries`, attempts)
|
||||
callback(null, lockValue)
|
||||
} else {
|
||||
setTimeout(attempt, testInterval)
|
||||
}
|
||||
})
|
||||
}
|
||||
attempt()
|
||||
},
|
||||
|
||||
_releaseLock(key, lockValue, callback) {
|
||||
rclient.eval(LockManager.unlockScript, 1, key, lockValue, (err, result) => {
|
||||
if (err != null) {
|
||||
callback(err)
|
||||
} else if (result != null && result !== 1) {
|
||||
// successful unlock should release exactly one key
|
||||
logger.warn(
|
||||
{ key, lockValue, redis_err: err, redis_result: result },
|
||||
'unlocking error'
|
||||
)
|
||||
metrics.inc('unlock-error')
|
||||
callback(new Error('tried to release timed out lock'))
|
||||
} else {
|
||||
callback(null, result)
|
||||
}
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
module.exports = LockManager
|
||||
|
||||
const promisifiedRunWithLock = promisify(LockManager.runWithLock)
|
||||
LockManager.promises = {
|
||||
runWithLock(namespace, id, runner) {
|
||||
const cbRunner = callbackify(runner)
|
||||
return promisifiedRunWithLock(namespace, id, cbRunner)
|
||||
},
|
||||
}
|
||||
|
||||
module.exports = LockManager
|
||||
|
|
Loading…
Reference in a new issue