mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-07 20:31:06 -05:00
Merge pull request #11790 from overleaf/em-promisify-redis-manager
Clean up and promisify RedisManager in project-history GitOrigin-RevId: 8bd8bb7d51a0a68f7b1a97ffa310a674086714ba
This commit is contained in:
parent
3c9ace481d
commit
a7c9e3b20f
4 changed files with 566 additions and 931 deletions
|
@ -1,20 +1,7 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS205: Consider reworking code to avoid use of IIFEs
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
import { promisify } from 'util'
|
||||
import { callbackify, promisify } from 'node:util'
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
import logger from '@overleaf/logger'
|
||||
import Settings from '@overleaf/settings'
|
||||
import async from 'async'
|
||||
import redis from '@overleaf/redis-wrapper'
|
||||
import metrics from '@overleaf/metrics'
|
||||
import OError from '@overleaf/o-error'
|
||||
|
@ -39,190 +26,137 @@ const CACHE_TTL_IN_SECONDS = 3600
|
|||
const Keys = Settings.redis.project_history.key_schema
|
||||
const rclient = redis.createClient(Settings.redis.project_history)
|
||||
|
||||
/**
|
||||
* Container for functions that need to be mocked in tests
|
||||
*
|
||||
* TODO: Rewrite tests in terms of exported functions only
|
||||
*/
|
||||
export const _mocks = {}
|
||||
|
||||
export function countUnprocessedUpdates(project_id, callback) {
|
||||
const key = Keys.projectHistoryOps({ project_id })
|
||||
return rclient.llen(key, callback)
|
||||
async function countUnprocessedUpdates(projectId) {
|
||||
const key = Keys.projectHistoryOps({ project_id: projectId })
|
||||
const updates = await rclient.llen(key)
|
||||
return updates
|
||||
}
|
||||
|
||||
_mocks.getOldestDocUpdates = (project_id, batch_size, callback) => {
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
const key = Keys.projectHistoryOps({ project_id })
|
||||
rclient.lrange(key, 0, batch_size - 1, callback)
|
||||
async function getOldestDocUpdates(projectId, batchSize) {
|
||||
const key = Keys.projectHistoryOps({ project_id: projectId })
|
||||
const updates = await rclient.lrange(key, 0, batchSize - 1)
|
||||
return updates
|
||||
}
|
||||
|
||||
export function getOldestDocUpdates(...args) {
|
||||
_mocks.getOldestDocUpdates(...args)
|
||||
export function parseDocUpdates(jsonUpdates) {
|
||||
return jsonUpdates.map(update => JSON.parse(update))
|
||||
}
|
||||
|
||||
_mocks.parseDocUpdates = (json_updates, callback) => {
|
||||
let parsed_updates
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
try {
|
||||
parsed_updates = Array.from(json_updates || []).map(update =>
|
||||
JSON.parse(update)
|
||||
)
|
||||
} catch (e) {
|
||||
return callback(e)
|
||||
}
|
||||
callback(null, parsed_updates)
|
||||
}
|
||||
|
||||
export function parseDocUpdates(...args) {
|
||||
_mocks.parseDocUpdates(...args)
|
||||
}
|
||||
|
||||
export function getUpdatesInBatches(project_id, batch_size, runner, callback) {
|
||||
async function getUpdatesInBatches(projectId, batchSize, runner) {
|
||||
let moreBatches = true
|
||||
let lastResults = []
|
||||
|
||||
const processBatch = cb =>
|
||||
getOldestDocUpdates(project_id, batch_size, function (error, raw_updates) {
|
||||
let raw_update
|
||||
if (error != null) {
|
||||
return cb(OError.tag(error))
|
||||
}
|
||||
moreBatches = raw_updates.length === batch_size
|
||||
if (raw_updates.length === 0) {
|
||||
return cb()
|
||||
}
|
||||
// don't process any more batches if we are single stepping
|
||||
if (batch_size === 1) {
|
||||
moreBatches = false
|
||||
while (moreBatches) {
|
||||
let rawUpdates = await getOldestDocUpdates(projectId, batchSize)
|
||||
|
||||
moreBatches = rawUpdates.length === batchSize
|
||||
|
||||
if (rawUpdates.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
// don't process any more batches if we are single stepping
|
||||
if (batchSize === 1) {
|
||||
moreBatches = false
|
||||
}
|
||||
|
||||
// consume the updates up to a maximum total number of bytes
|
||||
// ensuring that at least one update will be processed (we may
|
||||
// exceed RAW_UPDATE_SIZE_THRESHOLD is the first update is bigger
|
||||
// than that).
|
||||
let totalRawUpdatesSize = 0
|
||||
const updatesToProcess = []
|
||||
for (const rawUpdate of rawUpdates) {
|
||||
const nextTotalSize = totalRawUpdatesSize + rawUpdate.length
|
||||
if (
|
||||
updatesToProcess.length > 0 &&
|
||||
nextTotalSize > RAW_UPDATE_SIZE_THRESHOLD
|
||||
) {
|
||||
// stop consuming updates if we have at least one and the
|
||||
// next update would exceed the size threshold
|
||||
break
|
||||
} else {
|
||||
updatesToProcess.push(rawUpdate)
|
||||
totalRawUpdatesSize += rawUpdate.length
|
||||
}
|
||||
}
|
||||
|
||||
// consume the updates up to a maximum total number of bytes
|
||||
// ensuring that at least one update will be processed (we may
|
||||
// exceed RAW_UPDATE_SIZE_THRESHOLD is the first update is bigger
|
||||
// than that).
|
||||
let total_raw_updates_size = 0
|
||||
const updates_to_process = []
|
||||
for (raw_update of Array.from(raw_updates)) {
|
||||
const next_total_size = total_raw_updates_size + raw_update.length
|
||||
if (
|
||||
updates_to_process.length > 0 &&
|
||||
next_total_size > RAW_UPDATE_SIZE_THRESHOLD
|
||||
) {
|
||||
// stop consuming updates if we have at least one and the
|
||||
// next update would exceed the size threshold
|
||||
break
|
||||
} else {
|
||||
updates_to_process.push(raw_update)
|
||||
total_raw_updates_size += raw_update.length
|
||||
}
|
||||
}
|
||||
// if we hit the size limit above, only process the updates up to that point
|
||||
if (updatesToProcess.length < rawUpdates.length) {
|
||||
moreBatches = true // process remaining raw updates in the next iteration
|
||||
rawUpdates = updatesToProcess
|
||||
}
|
||||
|
||||
// if we hit the size limit above, only process the updates up to that point
|
||||
if (updates_to_process.length < raw_updates.length) {
|
||||
moreBatches = true // process remaining raw updates in the next iteration
|
||||
raw_updates = updates_to_process
|
||||
}
|
||||
metrics.timing('redis.incoming.bytes', totalRawUpdatesSize, 1)
|
||||
if (totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) {
|
||||
const rawUpdateSizes = rawUpdates.map(rawUpdate => rawUpdate.length)
|
||||
logger.warn(
|
||||
{ projectId, totalRawUpdatesSize, rawUpdateSizes },
|
||||
'large raw update size'
|
||||
)
|
||||
}
|
||||
|
||||
metrics.timing('redis.incoming.bytes', total_raw_updates_size, 1)
|
||||
if (total_raw_updates_size > WARN_RAW_UPDATE_SIZE) {
|
||||
const raw_update_sizes = (() => {
|
||||
const result = []
|
||||
for (raw_update of Array.from(raw_updates)) {
|
||||
result.push(raw_update.length)
|
||||
}
|
||||
return result
|
||||
})()
|
||||
logger.warn(
|
||||
{ project_id, total_raw_updates_size, raw_update_sizes },
|
||||
'large raw update size'
|
||||
)
|
||||
}
|
||||
|
||||
return parseDocUpdates(raw_updates, function (error, updates) {
|
||||
if (error != null) {
|
||||
OError.tag(error, 'failed to parse updates', {
|
||||
project_id,
|
||||
updates,
|
||||
})
|
||||
return cb(error)
|
||||
}
|
||||
|
||||
// consume the updates up to a maximum number of ops (insertions and deletions)
|
||||
let total_op_length = 0
|
||||
let updates_to_process_count = 0
|
||||
let total_doc_content_count = 0
|
||||
for (const parsed_update of Array.from(updates)) {
|
||||
if (parsed_update.resyncDocContent) {
|
||||
total_doc_content_count++
|
||||
}
|
||||
if (total_doc_content_count > MAX_NEW_DOC_CONTENT_COUNT) {
|
||||
break
|
||||
}
|
||||
const next_total_op_length =
|
||||
total_op_length + (parsed_update?.op?.length || 1)
|
||||
if (
|
||||
updates_to_process_count > 0 &&
|
||||
next_total_op_length > MAX_UPDATE_OP_LENGTH
|
||||
) {
|
||||
break
|
||||
} else {
|
||||
total_op_length = next_total_op_length
|
||||
updates_to_process_count++
|
||||
}
|
||||
}
|
||||
|
||||
// if we hit the op limit above, only process the updates up to that point
|
||||
if (updates_to_process_count < updates.length) {
|
||||
logger.debug(
|
||||
{
|
||||
project_id,
|
||||
updates_to_process_count,
|
||||
updates_count: updates.length,
|
||||
total_op_length,
|
||||
},
|
||||
'restricting number of ops to be processed'
|
||||
)
|
||||
moreBatches = true
|
||||
// there is a 1:1 mapping between raw_updates and updates
|
||||
// which we need to preserve here to ensure we only
|
||||
// delete the updates that are actually processed
|
||||
raw_updates = raw_updates.slice(0, updates_to_process_count)
|
||||
updates = updates.slice(0, updates_to_process_count)
|
||||
}
|
||||
|
||||
logger.debug({ project_id }, 'retrieved raw updates from redis')
|
||||
return runner(updates, function (error, ...args) {
|
||||
lastResults = args
|
||||
if (error != null) {
|
||||
return cb(OError.tag(error))
|
||||
}
|
||||
return deleteAppliedDocUpdates(project_id, raw_updates, cb)
|
||||
})
|
||||
let updates
|
||||
try {
|
||||
updates = parseDocUpdates(rawUpdates)
|
||||
} catch (error) {
|
||||
throw OError.tag(error, 'failed to parse updates', {
|
||||
projectId,
|
||||
updates,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const hasMoreBatches = (...args) => {
|
||||
const cb = args[args.length - 1]
|
||||
return cb(null, moreBatches)
|
||||
// consume the updates up to a maximum number of ops (insertions and deletions)
|
||||
let totalOpLength = 0
|
||||
let updatesToProcessCount = 0
|
||||
let totalDocContentCount = 0
|
||||
for (const parsedUpdate of updates) {
|
||||
if (parsedUpdate.resyncDocContent) {
|
||||
totalDocContentCount++
|
||||
}
|
||||
if (totalDocContentCount > MAX_NEW_DOC_CONTENT_COUNT) {
|
||||
break
|
||||
}
|
||||
const nextTotalOpLength = totalOpLength + (parsedUpdate?.op?.length || 1)
|
||||
if (
|
||||
updatesToProcessCount > 0 &&
|
||||
nextTotalOpLength > MAX_UPDATE_OP_LENGTH
|
||||
) {
|
||||
break
|
||||
} else {
|
||||
totalOpLength = nextTotalOpLength
|
||||
updatesToProcessCount++
|
||||
}
|
||||
}
|
||||
|
||||
// if we hit the op limit above, only process the updates up to that point
|
||||
if (updatesToProcessCount < updates.length) {
|
||||
logger.debug(
|
||||
{
|
||||
projectId,
|
||||
updatesToProcessCount,
|
||||
updates_count: updates.length,
|
||||
totalOpLength,
|
||||
},
|
||||
'restricting number of ops to be processed'
|
||||
)
|
||||
moreBatches = true
|
||||
// there is a 1:1 mapping between rawUpdates and updates
|
||||
// which we need to preserve here to ensure we only
|
||||
// delete the updates that are actually processed
|
||||
rawUpdates = rawUpdates.slice(0, updatesToProcessCount)
|
||||
updates = updates.slice(0, updatesToProcessCount)
|
||||
}
|
||||
|
||||
logger.debug({ projectId }, 'retrieved raw updates from redis')
|
||||
await runner(updates)
|
||||
await deleteAppliedDocUpdates(projectId, rawUpdates)
|
||||
}
|
||||
|
||||
return async.doWhilst(processBatch, hasMoreBatches, error =>
|
||||
callback(error, ...Array.from(lastResults))
|
||||
)
|
||||
}
|
||||
|
||||
_mocks.deleteAppliedDocUpdates = (project_id, updates, callback) => {
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
async function deleteAppliedDocUpdates(projectId, updates) {
|
||||
const multi = rclient.multi()
|
||||
// Delete all the updates which have been applied (exact match)
|
||||
for (const update of Array.from(updates || [])) {
|
||||
for (const update of updates) {
|
||||
// Delete the first occurrence of the update with LREM KEY COUNT
|
||||
// VALUE by setting COUNT to 1 which 'removes COUNT elements equal to
|
||||
// value moving from head to tail.'
|
||||
|
@ -233,209 +167,161 @@ _mocks.deleteAppliedDocUpdates = (project_id, updates, callback) => {
|
|||
metrics.summary('redis.projectHistoryOps', update.length, {
|
||||
status: 'lrem',
|
||||
})
|
||||
multi.lrem(Keys.projectHistoryOps({ project_id }), 1, update)
|
||||
multi.del(Keys.projectHistoryFirstOpTimestamp({ project_id }))
|
||||
multi.lrem(Keys.projectHistoryOps({ project_id: projectId }), 1, update)
|
||||
}
|
||||
multi.exec(callback)
|
||||
if (updates.length > 0) {
|
||||
multi.del(Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }))
|
||||
}
|
||||
await multi.exec()
|
||||
}
|
||||
|
||||
export function deleteAppliedDocUpdates(...args) {
|
||||
_mocks.deleteAppliedDocUpdates(...args)
|
||||
}
|
||||
|
||||
export function destroyDocUpdatesQueue(project_id, callback) {
|
||||
// deletes the entire queue - use with caution
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
return rclient.del(
|
||||
Keys.projectHistoryOps({ project_id }),
|
||||
Keys.projectHistoryFirstOpTimestamp({ project_id }),
|
||||
callback
|
||||
/**
|
||||
* Deletes the entire queue - use with caution
|
||||
*/
|
||||
async function destroyDocUpdatesQueue(projectId) {
|
||||
await rclient.del(
|
||||
Keys.projectHistoryOps({ project_id: projectId }),
|
||||
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
||||
)
|
||||
}
|
||||
|
||||
// iterate over keys asynchronously using redis scan (non-blocking)
|
||||
// handle all the cluster nodes or single redis server
|
||||
function _getKeys(pattern, limit, callback) {
|
||||
const nodes = (typeof rclient.nodes === 'function'
|
||||
? rclient.nodes('master')
|
||||
: undefined) || [rclient]
|
||||
const doKeyLookupForNode = (node, cb) =>
|
||||
_getKeysFromNode(node, pattern, limit, cb)
|
||||
return async.concatSeries(nodes, doKeyLookupForNode, callback)
|
||||
/**
|
||||
* Iterate over keys asynchronously using redis scan (non-blocking)
|
||||
*
|
||||
* handle all the cluster nodes or single redis server
|
||||
*/
|
||||
async function _getKeys(pattern, limit) {
|
||||
const nodes = rclient.nodes?.('master') || [rclient]
|
||||
const keysByNode = []
|
||||
for (const node of nodes) {
|
||||
const keys = await _getKeysFromNode(node, pattern, limit)
|
||||
keysByNode.push(keys)
|
||||
}
|
||||
return [].concat(...keysByNode)
|
||||
}
|
||||
|
||||
function _getKeysFromNode(node, pattern, limit, callback) {
|
||||
async function _getKeysFromNode(node, pattern, limit) {
|
||||
let cursor = 0 // redis iterator
|
||||
const keySet = {} // use hash to avoid duplicate results
|
||||
const keySet = new Set() // avoid duplicate results
|
||||
const batchSize = limit != null ? Math.min(limit, 1000) : 1000
|
||||
|
||||
// scan over all keys looking for pattern
|
||||
const doIteration = (
|
||||
cb // avoid hitting redis too hard
|
||||
) =>
|
||||
node.scan(
|
||||
cursor,
|
||||
'MATCH',
|
||||
pattern,
|
||||
'COUNT',
|
||||
batchSize,
|
||||
function (error, reply) {
|
||||
let keys
|
||||
if (error != null) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
;[cursor, keys] = Array.from(reply)
|
||||
for (const key of Array.from(keys)) {
|
||||
keySet[key] = true
|
||||
}
|
||||
keys = Object.keys(keySet)
|
||||
const noResults = cursor === '0' // redis returns string results not numeric
|
||||
const limitReached = limit != null && keys.length >= limit
|
||||
if (noResults || limitReached) {
|
||||
return callback(null, keys)
|
||||
} else {
|
||||
return setTimeout(doIteration, 10)
|
||||
}
|
||||
}
|
||||
)
|
||||
return doIteration()
|
||||
while (true) {
|
||||
const reply = await node.scan(cursor, 'MATCH', pattern, 'COUNT', batchSize)
|
||||
const [newCursor, keys] = reply
|
||||
cursor = newCursor
|
||||
|
||||
for (const key of keys) {
|
||||
keySet.add(key)
|
||||
}
|
||||
|
||||
const noResults = cursor === '0' // redis returns string results not numeric
|
||||
const limitReached = limit != null && keySet.size >= limit
|
||||
if (noResults || limitReached) {
|
||||
return Array.from(keySet)
|
||||
}
|
||||
|
||||
// avoid hitting redis too hard
|
||||
await setTimeout(10)
|
||||
}
|
||||
}
|
||||
|
||||
// extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
|
||||
// or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
|
||||
/**
|
||||
* Extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
|
||||
* or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
|
||||
*/
|
||||
function _extractIds(keyList) {
|
||||
const ids = (() => {
|
||||
const result = []
|
||||
for (const key of Array.from(keyList)) {
|
||||
const m = key.match(/:\{?([0-9a-f]{24})\}?/) // extract object id
|
||||
result.push(m[1])
|
||||
}
|
||||
return result
|
||||
})()
|
||||
return ids
|
||||
return keyList.map(key => {
|
||||
const m = key.match(/:\{?([0-9a-f]{24})\}?/) // extract object id
|
||||
return m[1]
|
||||
})
|
||||
}
|
||||
|
||||
export function getProjectIdsWithHistoryOps(limit, callback) {
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
return _getKeys(
|
||||
async function getProjectIdsWithHistoryOps(limit) {
|
||||
const projectKeys = await _getKeys(
|
||||
Keys.projectHistoryOps({ project_id: '*' }),
|
||||
limit,
|
||||
function (error, project_keys) {
|
||||
if (error != null) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
const project_ids = _extractIds(project_keys)
|
||||
return callback(error, project_ids)
|
||||
}
|
||||
limit
|
||||
)
|
||||
const projectIds = _extractIds(projectKeys)
|
||||
return projectIds
|
||||
}
|
||||
|
||||
export function getProjectIdsWithHistoryOpsCount(callback) {
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
return getProjectIdsWithHistoryOps(null, function (error, projectIds) {
|
||||
if (error != null) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
const queuedProjectsCount = projectIds.length
|
||||
metrics.globalGauge('queued-projects', queuedProjectsCount)
|
||||
return callback(null, queuedProjectsCount)
|
||||
})
|
||||
async function getProjectIdsWithHistoryOpsCount() {
|
||||
const projectIds = await getProjectIdsWithHistoryOps()
|
||||
const queuedProjectsCount = projectIds.length
|
||||
metrics.globalGauge('queued-projects', queuedProjectsCount)
|
||||
return queuedProjectsCount
|
||||
}
|
||||
|
||||
export function setFirstOpTimestamp(project_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
const key = Keys.projectHistoryFirstOpTimestamp({ project_id })
|
||||
async function setFirstOpTimestamp(projectId) {
|
||||
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
||||
// store current time as an integer (string)
|
||||
return rclient.setnx(key, Date.now(), callback)
|
||||
await rclient.setnx(key, Date.now())
|
||||
}
|
||||
|
||||
export function getFirstOpTimestamp(project_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
async function getFirstOpTimestamp(projectId) {
|
||||
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
||||
const result = await rclient.get(key)
|
||||
|
||||
// convert stored time back to a numeric timestamp
|
||||
const timestamp = parseInt(result, 10)
|
||||
|
||||
// check for invalid timestamp
|
||||
if (isNaN(timestamp)) {
|
||||
return null
|
||||
}
|
||||
const key = Keys.projectHistoryFirstOpTimestamp({ project_id })
|
||||
return rclient.get(key, function (err, result) {
|
||||
if (err != null) {
|
||||
return callback(OError.tag(err))
|
||||
}
|
||||
// convert stored time back to a numeric timestamp
|
||||
const timestamp = parseInt(result, 10)
|
||||
// check for invalid timestamp
|
||||
if (isNaN(timestamp)) {
|
||||
return callback()
|
||||
}
|
||||
// convert numeric timestamp to a date object
|
||||
const firstOpTimestamp = new Date(timestamp)
|
||||
return callback(null, firstOpTimestamp)
|
||||
})
|
||||
|
||||
// convert numeric timestamp to a date object
|
||||
const firstOpTimestamp = new Date(timestamp)
|
||||
|
||||
return firstOpTimestamp
|
||||
}
|
||||
|
||||
export function clearFirstOpTimestamp(project_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
const key = Keys.projectHistoryFirstOpTimestamp({ project_id })
|
||||
return rclient.del(key, callback)
|
||||
async function clearFirstOpTimestamp(projectId) {
|
||||
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
||||
await rclient.del(key)
|
||||
}
|
||||
|
||||
export function getProjectIdsWithFirstOpTimestamps(limit, callback) {
|
||||
return _getKeys(
|
||||
async function getProjectIdsWithFirstOpTimestamps(limit) {
|
||||
const projectKeys = await _getKeys(
|
||||
Keys.projectHistoryFirstOpTimestamp({ project_id: '*' }),
|
||||
limit,
|
||||
function (error, project_keys) {
|
||||
if (error != null) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
const project_ids = _extractIds(project_keys)
|
||||
return callback(error, project_ids)
|
||||
}
|
||||
limit
|
||||
)
|
||||
const projectIds = _extractIds(projectKeys)
|
||||
return projectIds
|
||||
}
|
||||
|
||||
export function clearDanglingFirstOpTimestamp(project_id, callback) {
|
||||
rclient.exists(
|
||||
Keys.projectHistoryFirstOpTimestamp({ project_id }),
|
||||
Keys.projectHistoryOps({ project_id }),
|
||||
function (error, count) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
if (count === 2 || count === 0) {
|
||||
// both (or neither) keys are present, so don't delete the timestamp
|
||||
return callback(null, 0)
|
||||
}
|
||||
// only one key is present, which makes this a dangling record,
|
||||
// so delete the timestamp
|
||||
rclient.del(Keys.projectHistoryFirstOpTimestamp({ project_id }), callback)
|
||||
}
|
||||
async function clearDanglingFirstOpTimestamp(projectId) {
|
||||
const count = await rclient.exists(
|
||||
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }),
|
||||
Keys.projectHistoryOps({ project_id: projectId })
|
||||
)
|
||||
if (count === 2 || count === 0) {
|
||||
// both (or neither) keys are present, so don't delete the timestamp
|
||||
return 0
|
||||
}
|
||||
// only one key is present, which makes this a dangling record,
|
||||
// so delete the timestamp
|
||||
const cleared = await rclient.del(
|
||||
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
||||
)
|
||||
return cleared
|
||||
}
|
||||
|
||||
export function getCachedHistoryId(project_id, callback) {
|
||||
const key = Keys.projectHistoryCachedHistoryId({ project_id })
|
||||
rclient.get(key, function (err, historyId) {
|
||||
if (err) {
|
||||
return callback(OError.tag(err))
|
||||
}
|
||||
callback(null, historyId)
|
||||
})
|
||||
async function getCachedHistoryId(projectId) {
|
||||
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
|
||||
const historyId = await rclient.get(key)
|
||||
return historyId
|
||||
}
|
||||
|
||||
export function setCachedHistoryId(project_id, historyId, callback) {
|
||||
const key = Keys.projectHistoryCachedHistoryId({ project_id })
|
||||
rclient.setex(key, CACHE_TTL_IN_SECONDS, historyId, callback)
|
||||
async function setCachedHistoryId(projectId, historyId) {
|
||||
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
|
||||
await rclient.setex(key, CACHE_TTL_IN_SECONDS, historyId)
|
||||
}
|
||||
|
||||
export function clearCachedHistoryId(project_id, callback) {
|
||||
const key = Keys.projectHistoryCachedHistoryId({ project_id })
|
||||
rclient.del(key, callback)
|
||||
async function clearCachedHistoryId(projectId) {
|
||||
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
|
||||
await rclient.del(key)
|
||||
}
|
||||
|
||||
// for tests
|
||||
|
@ -451,10 +337,77 @@ export function setMaxNewDocContentCount(value) {
|
|||
MAX_NEW_DOC_CONTENT_COUNT = value
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
countUnprocessedUpdates: promisify(countUnprocessedUpdates),
|
||||
getProjectIdsWithFirstOpTimestamps: promisify(
|
||||
getProjectIdsWithFirstOpTimestamps
|
||||
),
|
||||
clearDanglingFirstOpTimestamp: promisify(clearDanglingFirstOpTimestamp),
|
||||
// EXPORTS
|
||||
|
||||
const countUnprocessedUpdatesCb = callbackify(countUnprocessedUpdates)
|
||||
const getOldestDocUpdatesCb = callbackify(getOldestDocUpdates)
|
||||
const deleteAppliedDocUpdatesCb = callbackify(deleteAppliedDocUpdates)
|
||||
const destroyDocUpdatesQueueCb = callbackify(destroyDocUpdatesQueue)
|
||||
const getProjectIdsWithHistoryOpsCb = callbackify(getProjectIdsWithHistoryOps)
|
||||
const getProjectIdsWithHistoryOpsCountCb = callbackify(
|
||||
getProjectIdsWithHistoryOpsCount
|
||||
)
|
||||
const setFirstOpTimestampCb = callbackify(setFirstOpTimestamp)
|
||||
const getFirstOpTimestampCb = callbackify(getFirstOpTimestamp)
|
||||
const clearFirstOpTimestampCb = callbackify(clearFirstOpTimestamp)
|
||||
const getProjectIdsWithFirstOpTimestampsCb = callbackify(
|
||||
getProjectIdsWithFirstOpTimestamps
|
||||
)
|
||||
const clearDanglingFirstOpTimestampCb = callbackify(
|
||||
clearDanglingFirstOpTimestamp
|
||||
)
|
||||
const getCachedHistoryIdCb = callbackify(getCachedHistoryId)
|
||||
const setCachedHistoryIdCb = callbackify(setCachedHistoryId)
|
||||
const clearCachedHistoryIdCb = callbackify(clearCachedHistoryId)
|
||||
|
||||
const getUpdatesInBatchesCb = function (
|
||||
projectId,
|
||||
batchSize,
|
||||
runner,
|
||||
callback
|
||||
) {
|
||||
const runnerPromises = promisify(runner)
|
||||
getUpdatesInBatches(projectId, batchSize, runnerPromises)
|
||||
.then(result => {
|
||||
callback(null, result)
|
||||
})
|
||||
.catch(err => {
|
||||
callback(err)
|
||||
})
|
||||
}
|
||||
|
||||
export {
|
||||
countUnprocessedUpdatesCb as countUnprocessedUpdates,
|
||||
getOldestDocUpdatesCb as getOldestDocUpdates,
|
||||
deleteAppliedDocUpdatesCb as deleteAppliedDocUpdates,
|
||||
destroyDocUpdatesQueueCb as destroyDocUpdatesQueue,
|
||||
getUpdatesInBatchesCb as getUpdatesInBatches,
|
||||
getProjectIdsWithHistoryOpsCb as getProjectIdsWithHistoryOps,
|
||||
getProjectIdsWithHistoryOpsCountCb as getProjectIdsWithHistoryOpsCount,
|
||||
setFirstOpTimestampCb as setFirstOpTimestamp,
|
||||
getFirstOpTimestampCb as getFirstOpTimestamp,
|
||||
clearFirstOpTimestampCb as clearFirstOpTimestamp,
|
||||
getProjectIdsWithFirstOpTimestampsCb as getProjectIdsWithFirstOpTimestamps,
|
||||
clearDanglingFirstOpTimestampCb as clearDanglingFirstOpTimestamp,
|
||||
getCachedHistoryIdCb as getCachedHistoryId,
|
||||
setCachedHistoryIdCb as setCachedHistoryId,
|
||||
clearCachedHistoryIdCb as clearCachedHistoryId,
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
countUnprocessedUpdates,
|
||||
getOldestDocUpdates,
|
||||
deleteAppliedDocUpdates,
|
||||
destroyDocUpdatesQueue,
|
||||
getUpdatesInBatches,
|
||||
getProjectIdsWithHistoryOps,
|
||||
getProjectIdsWithHistoryOpsCount,
|
||||
setFirstOpTimestamp,
|
||||
getFirstOpTimestamp,
|
||||
clearFirstOpTimestamp,
|
||||
getProjectIdsWithFirstOpTimestamps,
|
||||
clearDanglingFirstOpTimestamp,
|
||||
getCachedHistoryId,
|
||||
setCachedHistoryId,
|
||||
clearCachedHistoryId,
|
||||
}
|
||||
|
|
|
@ -19,11 +19,6 @@ import { Profiler } from './Profiler.js'
|
|||
|
||||
const keys = Settings.redis.lock.key_schema
|
||||
|
||||
const PROJECT_HISTORY = {
|
||||
ENABLED: 'enabled',
|
||||
NOT_ENABLED: 'not-enabled',
|
||||
}
|
||||
|
||||
export const REDIS_READ_BATCH_SIZE = 500
|
||||
|
||||
/**
|
||||
|
@ -41,25 +36,28 @@ export function getRawUpdates(projectId, batchSize, callback) {
|
|||
if (error != null) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
RedisManager.parseDocUpdates(rawUpdates, (error, updates) => {
|
||||
|
||||
let updates
|
||||
try {
|
||||
updates = RedisManager.parseDocUpdates(rawUpdates)
|
||||
} catch (error) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
|
||||
_getHistoryId(projectId, updates, (error, historyId) => {
|
||||
if (error != null) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
_getHistoryId(projectId, updates, (error, historyId) => {
|
||||
if (error != null) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
HistoryStoreManager.getMostRecentChunk(
|
||||
projectId,
|
||||
historyId,
|
||||
(error, chunk) => {
|
||||
if (error != null) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
callback(null, { project_id: projectId, chunk, updates })
|
||||
HistoryStoreManager.getMostRecentChunk(
|
||||
projectId,
|
||||
historyId,
|
||||
(error, chunk) => {
|
||||
if (error != null) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
)
|
||||
})
|
||||
callback(null, { project_id: projectId, chunk, updates })
|
||||
}
|
||||
)
|
||||
})
|
||||
}
|
||||
)
|
||||
|
@ -175,16 +173,11 @@ _mocks._countAndProcessUpdates = (
|
|||
(updates, cb) => {
|
||||
_processUpdatesBatch(projectId, updates, extendLock, cb)
|
||||
},
|
||||
(error, isProjectHistoryEnabled) => {
|
||||
// We can error before it is known whether project history is enabled
|
||||
// for the project, so this key has 3 values.
|
||||
const enabled = isProjectHistoryEnabled || 'unknown'
|
||||
// This metrics key tries to convet that processing is not atomic.
|
||||
// Some updates may have been processed even if there was an error.
|
||||
const success = error != null ? 'with-error' : 'without-error'
|
||||
metrics.gauge(`updates.${enabled}.${success}`, queueSize)
|
||||
metrics.count(`updates.${enabled}.${success}`, queueSize)
|
||||
callback(error, queueSize)
|
||||
error => {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
callback(null, queueSize)
|
||||
}
|
||||
)
|
||||
} else {
|
||||
|
@ -210,14 +203,14 @@ function _processUpdatesBatch(projectId, updates, extendLock, callback) {
|
|||
{ projectId },
|
||||
'discarding updates as project does not use history'
|
||||
)
|
||||
return callback(null, PROJECT_HISTORY.NOT_ENABLED)
|
||||
return callback()
|
||||
}
|
||||
|
||||
_processUpdates(projectId, historyId, updates, extendLock, error => {
|
||||
if (error != null) {
|
||||
return callback(OError.tag(error), PROJECT_HISTORY.ENABLED)
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
callback(null, PROJECT_HISTORY.ENABLED)
|
||||
callback()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import chai from 'chai'
|
||||
import sinonChai from 'sinon-chai'
|
||||
import chaiAsPromised from 'chai-as-promised'
|
||||
|
||||
// Chai configuration
|
||||
chai.should()
|
||||
chai.use(sinonChai)
|
||||
chai.use(chaiAsPromised)
|
||||
|
|
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue