mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-07 20:31:06 -05:00
aabe2d18b9
- promisify - merge health check for single node and cluster - replace the first multi with simple SET in health check - reworked health check with o-error context/stack-traces for failures - drop console.error on health check timeout, consumer logs the error - cleanup unwrapping of ioredis multi result - Promise support for multi.exec This has been squashed from das7pad s fork. REF: b3dd8c5cf4cc6482fd450e6bb67013508844f93f
167 lines
4.6 KiB
JavaScript
167 lines
4.6 KiB
JavaScript
const crypto = require('crypto')
|
|
const os = require('os')
|
|
const { promisify } = require('util')
|
|
|
|
const Redis = require('ioredis')
|
|
|
|
const {
|
|
RedisHealthCheckTimedOut,
|
|
RedisHealthCheckWriteError,
|
|
RedisHealthCheckVerifyError,
|
|
} = require('./Errors')
|
|
|
|
const HEARTBEAT_TIMEOUT = 2000
|
|
|
|
// generate unique values for health check
|
|
const HOST = os.hostname()
|
|
const PID = process.pid
|
|
const RND = crypto.randomBytes(4).toString('hex')
|
|
let COUNT = 0
|
|
|
|
function createClient(opts) {
|
|
const standardOpts = Object.assign({}, opts)
|
|
delete standardOpts.key_schema
|
|
|
|
if (standardOpts.retry_max_delay == null) {
|
|
standardOpts.retry_max_delay = 5000 // ms
|
|
}
|
|
|
|
let client
|
|
if (opts.cluster) {
|
|
delete standardOpts.cluster
|
|
client = new Redis.Cluster(opts.cluster, standardOpts)
|
|
} else {
|
|
client = new Redis(standardOpts)
|
|
}
|
|
monkeyPatchIoRedisExec(client)
|
|
client.healthCheck = (callback) => {
|
|
if (callback) {
|
|
// callback based invocation
|
|
healthCheck(client).then(callback).catch(callback)
|
|
} else {
|
|
// Promise based invocation
|
|
return healthCheck(client)
|
|
}
|
|
}
|
|
return client
|
|
}
|
|
|
|
async function healthCheck(client) {
|
|
// check the redis connection by storing and retrieving a unique key/value pair
|
|
const uniqueToken = `host=${HOST}:pid=${PID}:random=${RND}:time=${Date.now()}:count=${COUNT++}`
|
|
|
|
// o-error context
|
|
const context = {
|
|
uniqueToken,
|
|
stage: 'add context for a timeout',
|
|
}
|
|
|
|
await runWithTimeout({
|
|
runner: runCheck(client, uniqueToken, context),
|
|
timeout: HEARTBEAT_TIMEOUT,
|
|
context,
|
|
})
|
|
}
|
|
|
|
async function runCheck(client, uniqueToken, context) {
|
|
const healthCheckKey = `_redis-wrapper:healthCheckKey:{${uniqueToken}}`
|
|
const healthCheckValue = `_redis-wrapper:healthCheckValue:{${uniqueToken}}`
|
|
|
|
// set the unique key/value pair
|
|
context.stage = 'write'
|
|
const writeAck = await client
|
|
.set(healthCheckKey, healthCheckValue, 'EX', 60)
|
|
.catch((err) => {
|
|
throw new RedisHealthCheckWriteError('write errored', context, err)
|
|
})
|
|
if (writeAck !== 'OK') {
|
|
context.writeAck = writeAck
|
|
throw new RedisHealthCheckWriteError('write failed', context)
|
|
}
|
|
|
|
// check that we can retrieve the unique key/value pair
|
|
context.stage = 'verify'
|
|
const [roundTrippedHealthCheckValue, deleteAck] = await client
|
|
.multi()
|
|
.get(healthCheckKey)
|
|
.del(healthCheckKey)
|
|
.exec()
|
|
.catch((err) => {
|
|
throw new RedisHealthCheckVerifyError(
|
|
'read/delete errored',
|
|
context,
|
|
err
|
|
)
|
|
})
|
|
if (roundTrippedHealthCheckValue !== healthCheckValue) {
|
|
context.roundTrippedHealthCheckValue = roundTrippedHealthCheckValue
|
|
throw new RedisHealthCheckVerifyError('read failed', context)
|
|
}
|
|
if (deleteAck !== 1) {
|
|
context.deleteAck = deleteAck
|
|
throw new RedisHealthCheckVerifyError('delete failed', context)
|
|
}
|
|
}
|
|
|
|
function unwrapMultiResult(result, callback) {
|
|
// ioredis exec returns a results like:
|
|
// [ [null, 42], [null, "foo"] ]
|
|
// where the first entries in each 2-tuple are
|
|
// presumably errors for each individual command,
|
|
// and the second entry is the result. We need to transform
|
|
// this into the same result as the old redis driver:
|
|
// [ 42, "foo" ]
|
|
//
|
|
// Basically reverse:
|
|
// https://github.com/luin/ioredis/blob/v4.17.3/lib/utils/index.ts#L75-L92
|
|
const filteredResult = []
|
|
for (const [err, value] of result || []) {
|
|
if (err) {
|
|
return callback(err)
|
|
} else {
|
|
filteredResult.push(value)
|
|
}
|
|
}
|
|
callback(null, filteredResult)
|
|
}
|
|
const unwrapMultiResultPromisified = promisify(unwrapMultiResult)
|
|
|
|
function monkeyPatchIoRedisExec(client) {
|
|
const _multi = client.multi
|
|
client.multi = function () {
|
|
const multi = _multi.apply(client, arguments)
|
|
const _exec = multi.exec
|
|
multi.exec = (callback) => {
|
|
if (callback) {
|
|
// callback based invocation
|
|
_exec.call(multi, (error, result) => {
|
|
// The command can fail all-together due to syntax errors
|
|
if (error) return callback(error)
|
|
unwrapMultiResult(result, callback)
|
|
})
|
|
} else {
|
|
// Promise based invocation
|
|
return _exec.call(multi).then(unwrapMultiResultPromisified)
|
|
}
|
|
}
|
|
return multi
|
|
}
|
|
}
|
|
|
|
async function runWithTimeout({ runner, timeout, context }) {
|
|
let healthCheckDeadline
|
|
await Promise.race([
|
|
new Promise((resolve, reject) => {
|
|
healthCheckDeadline = setTimeout(() => {
|
|
// attach the timeout when hitting the timeout only
|
|
context.timeout = timeout
|
|
reject(new RedisHealthCheckTimedOut('timeout', context))
|
|
}, timeout)
|
|
}),
|
|
runner.finally(() => clearTimeout(healthCheckDeadline)),
|
|
])
|
|
}
|
|
|
|
module.exports = {
|
|
createClient,
|
|
}
|