mirror of
https://github.com/overleaf/overleaf.git
synced 2025-01-16 07:52:58 +00:00
f6bfc14a79
Revert batched Redis reads GitOrigin-RevId: 4f71dcb7a7e7ae92046ab7edef0930c0358da945
413 lines
13 KiB
JavaScript
413 lines
13 KiB
JavaScript
import { callbackify, promisify } from 'node:util'
|
|
import { setTimeout } from 'node:timers/promises'
|
|
import logger from '@overleaf/logger'
|
|
import Settings from '@overleaf/settings'
|
|
import redis from '@overleaf/redis-wrapper'
|
|
import metrics from '@overleaf/metrics'
|
|
import OError from '@overleaf/o-error'
|
|
|
|
// maximum size taken from the redis queue, to prevent project history
|
|
// consuming unbounded amounts of memory
|
|
let RAW_UPDATE_SIZE_THRESHOLD = 4 * 1024 * 1024
|
|
|
|
// maximum length of ops (insertion and deletions) to process in a single
|
|
// iteration
|
|
let MAX_UPDATE_OP_LENGTH = 1024
|
|
|
|
// warn if we exceed this raw update size, the final compressed updates we send
|
|
// could be smaller than this
|
|
const WARN_RAW_UPDATE_SIZE = 1024 * 1024
|
|
|
|
// maximum number of new docs to process in a single iteration
|
|
let MAX_NEW_DOC_CONTENT_COUNT = 32
|
|
|
|
const CACHE_TTL_IN_SECONDS = 3600
|
|
|
|
const Keys = Settings.redis.project_history.key_schema
|
|
const rclient = redis.createClient(Settings.redis.project_history)
|
|
|
|
async function countUnprocessedUpdates(projectId) {
|
|
const key = Keys.projectHistoryOps({ project_id: projectId })
|
|
const updates = await rclient.llen(key)
|
|
return updates
|
|
}
|
|
|
|
async function getOldestDocUpdates(projectId, batchSize) {
|
|
const key = Keys.projectHistoryOps({ project_id: projectId })
|
|
const updates = await rclient.lrange(key, 0, batchSize - 1)
|
|
return updates
|
|
}
|
|
|
|
export function parseDocUpdates(jsonUpdates) {
|
|
return jsonUpdates.map(update => JSON.parse(update))
|
|
}
|
|
|
|
async function getUpdatesInBatches(projectId, batchSize, runner) {
|
|
let moreBatches = true
|
|
|
|
while (moreBatches) {
|
|
let rawUpdates = await getOldestDocUpdates(projectId, batchSize)
|
|
|
|
moreBatches = rawUpdates.length === batchSize
|
|
|
|
if (rawUpdates.length === 0) {
|
|
return
|
|
}
|
|
|
|
// don't process any more batches if we are single stepping
|
|
if (batchSize === 1) {
|
|
moreBatches = false
|
|
}
|
|
|
|
// consume the updates up to a maximum total number of bytes
|
|
// ensuring that at least one update will be processed (we may
|
|
// exceed RAW_UPDATE_SIZE_THRESHOLD is the first update is bigger
|
|
// than that).
|
|
let totalRawUpdatesSize = 0
|
|
const updatesToProcess = []
|
|
for (const rawUpdate of rawUpdates) {
|
|
const nextTotalSize = totalRawUpdatesSize + rawUpdate.length
|
|
if (
|
|
updatesToProcess.length > 0 &&
|
|
nextTotalSize > RAW_UPDATE_SIZE_THRESHOLD
|
|
) {
|
|
// stop consuming updates if we have at least one and the
|
|
// next update would exceed the size threshold
|
|
break
|
|
} else {
|
|
updatesToProcess.push(rawUpdate)
|
|
totalRawUpdatesSize += rawUpdate.length
|
|
}
|
|
}
|
|
|
|
// if we hit the size limit above, only process the updates up to that point
|
|
if (updatesToProcess.length < rawUpdates.length) {
|
|
moreBatches = true // process remaining raw updates in the next iteration
|
|
rawUpdates = updatesToProcess
|
|
}
|
|
|
|
metrics.timing('redis.incoming.bytes', totalRawUpdatesSize, 1)
|
|
if (totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) {
|
|
const rawUpdateSizes = rawUpdates.map(rawUpdate => rawUpdate.length)
|
|
logger.warn(
|
|
{ projectId, totalRawUpdatesSize, rawUpdateSizes },
|
|
'large raw update size'
|
|
)
|
|
}
|
|
|
|
let updates
|
|
try {
|
|
updates = parseDocUpdates(rawUpdates)
|
|
} catch (error) {
|
|
throw OError.tag(error, 'failed to parse updates', {
|
|
projectId,
|
|
updates,
|
|
})
|
|
}
|
|
|
|
// consume the updates up to a maximum number of ops (insertions and deletions)
|
|
let totalOpLength = 0
|
|
let updatesToProcessCount = 0
|
|
let totalDocContentCount = 0
|
|
for (const parsedUpdate of updates) {
|
|
if (parsedUpdate.resyncDocContent) {
|
|
totalDocContentCount++
|
|
}
|
|
if (totalDocContentCount > MAX_NEW_DOC_CONTENT_COUNT) {
|
|
break
|
|
}
|
|
const nextTotalOpLength = totalOpLength + (parsedUpdate?.op?.length || 1)
|
|
if (
|
|
updatesToProcessCount > 0 &&
|
|
nextTotalOpLength > MAX_UPDATE_OP_LENGTH
|
|
) {
|
|
break
|
|
} else {
|
|
totalOpLength = nextTotalOpLength
|
|
updatesToProcessCount++
|
|
}
|
|
}
|
|
|
|
// if we hit the op limit above, only process the updates up to that point
|
|
if (updatesToProcessCount < updates.length) {
|
|
logger.debug(
|
|
{
|
|
projectId,
|
|
updatesToProcessCount,
|
|
updates_count: updates.length,
|
|
totalOpLength,
|
|
},
|
|
'restricting number of ops to be processed'
|
|
)
|
|
moreBatches = true
|
|
// there is a 1:1 mapping between rawUpdates and updates
|
|
// which we need to preserve here to ensure we only
|
|
// delete the updates that are actually processed
|
|
rawUpdates = rawUpdates.slice(0, updatesToProcessCount)
|
|
updates = updates.slice(0, updatesToProcessCount)
|
|
}
|
|
|
|
logger.debug({ projectId }, 'retrieved raw updates from redis')
|
|
await runner(updates)
|
|
await deleteAppliedDocUpdates(projectId, rawUpdates)
|
|
}
|
|
}
|
|
|
|
async function deleteAppliedDocUpdates(projectId, updates) {
|
|
const multi = rclient.multi()
|
|
// Delete all the updates which have been applied (exact match)
|
|
for (const update of updates) {
|
|
// Delete the first occurrence of the update with LREM KEY COUNT
|
|
// VALUE by setting COUNT to 1 which 'removes COUNT elements equal to
|
|
// value moving from head to tail.'
|
|
//
|
|
// If COUNT is 0 the entire list would be searched which would block
|
|
// redis snce it would be an O(N) operation where N is the length of
|
|
// the queue, in a multi of the batch size.
|
|
metrics.summary('redis.projectHistoryOps', update.length, {
|
|
status: 'lrem',
|
|
})
|
|
multi.lrem(Keys.projectHistoryOps({ project_id: projectId }), 1, update)
|
|
}
|
|
if (updates.length > 0) {
|
|
multi.del(Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }))
|
|
}
|
|
await multi.exec()
|
|
}
|
|
|
|
/**
|
|
* Deletes the entire queue - use with caution
|
|
*/
|
|
async function destroyDocUpdatesQueue(projectId) {
|
|
await rclient.del(
|
|
Keys.projectHistoryOps({ project_id: projectId }),
|
|
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Iterate over keys asynchronously using redis scan (non-blocking)
|
|
*
|
|
* handle all the cluster nodes or single redis server
|
|
*/
|
|
async function _getKeys(pattern, limit) {
|
|
const nodes = rclient.nodes?.('master') || [rclient]
|
|
const keysByNode = []
|
|
for (const node of nodes) {
|
|
const keys = await _getKeysFromNode(node, pattern, limit)
|
|
keysByNode.push(keys)
|
|
}
|
|
return [].concat(...keysByNode)
|
|
}
|
|
|
|
async function _getKeysFromNode(node, pattern, limit) {
|
|
let cursor = 0 // redis iterator
|
|
const keySet = new Set() // avoid duplicate results
|
|
const batchSize = limit != null ? Math.min(limit, 1000) : 1000
|
|
|
|
// scan over all keys looking for pattern
|
|
while (true) {
|
|
const reply = await node.scan(cursor, 'MATCH', pattern, 'COUNT', batchSize)
|
|
const [newCursor, keys] = reply
|
|
cursor = newCursor
|
|
|
|
for (const key of keys) {
|
|
keySet.add(key)
|
|
}
|
|
|
|
const noResults = cursor === '0' // redis returns string results not numeric
|
|
const limitReached = limit != null && keySet.size >= limit
|
|
if (noResults || limitReached) {
|
|
return Array.from(keySet)
|
|
}
|
|
|
|
// avoid hitting redis too hard
|
|
await setTimeout(10)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
|
|
* or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
|
|
*/
|
|
function _extractIds(keyList) {
|
|
return keyList.map(key => {
|
|
const m = key.match(/:\{?([0-9a-f]{24})\}?/) // extract object id
|
|
return m[1]
|
|
})
|
|
}
|
|
|
|
async function getProjectIdsWithHistoryOps(limit) {
|
|
const projectKeys = await _getKeys(
|
|
Keys.projectHistoryOps({ project_id: '*' }),
|
|
limit
|
|
)
|
|
const projectIds = _extractIds(projectKeys)
|
|
return projectIds
|
|
}
|
|
|
|
async function getProjectIdsWithHistoryOpsCount() {
|
|
const projectIds = await getProjectIdsWithHistoryOps()
|
|
const queuedProjectsCount = projectIds.length
|
|
metrics.globalGauge('queued-projects', queuedProjectsCount)
|
|
return queuedProjectsCount
|
|
}
|
|
|
|
async function setFirstOpTimestamp(projectId) {
|
|
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
// store current time as an integer (string)
|
|
await rclient.setnx(key, Date.now())
|
|
}
|
|
|
|
async function getFirstOpTimestamp(projectId) {
|
|
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
const result = await rclient.get(key)
|
|
|
|
// convert stored time back to a numeric timestamp
|
|
const timestamp = parseInt(result, 10)
|
|
|
|
// check for invalid timestamp
|
|
if (isNaN(timestamp)) {
|
|
return null
|
|
}
|
|
|
|
// convert numeric timestamp to a date object
|
|
const firstOpTimestamp = new Date(timestamp)
|
|
|
|
return firstOpTimestamp
|
|
}
|
|
|
|
async function clearFirstOpTimestamp(projectId) {
|
|
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
await rclient.del(key)
|
|
}
|
|
|
|
async function getProjectIdsWithFirstOpTimestamps(limit) {
|
|
const projectKeys = await _getKeys(
|
|
Keys.projectHistoryFirstOpTimestamp({ project_id: '*' }),
|
|
limit
|
|
)
|
|
const projectIds = _extractIds(projectKeys)
|
|
return projectIds
|
|
}
|
|
|
|
async function clearDanglingFirstOpTimestamp(projectId) {
|
|
const count = await rclient.exists(
|
|
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }),
|
|
Keys.projectHistoryOps({ project_id: projectId })
|
|
)
|
|
if (count === 2 || count === 0) {
|
|
// both (or neither) keys are present, so don't delete the timestamp
|
|
return 0
|
|
}
|
|
// only one key is present, which makes this a dangling record,
|
|
// so delete the timestamp
|
|
const cleared = await rclient.del(
|
|
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
)
|
|
return cleared
|
|
}
|
|
|
|
async function getCachedHistoryId(projectId) {
|
|
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
|
|
const historyId = await rclient.get(key)
|
|
return historyId
|
|
}
|
|
|
|
async function setCachedHistoryId(projectId, historyId) {
|
|
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
|
|
await rclient.setex(key, CACHE_TTL_IN_SECONDS, historyId)
|
|
}
|
|
|
|
async function clearCachedHistoryId(projectId) {
|
|
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
|
|
await rclient.del(key)
|
|
}
|
|
|
|
// for tests
|
|
export function setMaxUpdateOpLength(value) {
|
|
MAX_UPDATE_OP_LENGTH = value
|
|
}
|
|
|
|
export function setRawUpdateSizeThreshold(value) {
|
|
RAW_UPDATE_SIZE_THRESHOLD = value
|
|
}
|
|
|
|
export function setMaxNewDocContentCount(value) {
|
|
MAX_NEW_DOC_CONTENT_COUNT = value
|
|
}
|
|
|
|
// EXPORTS
|
|
|
|
const countUnprocessedUpdatesCb = callbackify(countUnprocessedUpdates)
|
|
const getOldestDocUpdatesCb = callbackify(getOldestDocUpdates)
|
|
const deleteAppliedDocUpdatesCb = callbackify(deleteAppliedDocUpdates)
|
|
const destroyDocUpdatesQueueCb = callbackify(destroyDocUpdatesQueue)
|
|
const getProjectIdsWithHistoryOpsCb = callbackify(getProjectIdsWithHistoryOps)
|
|
const getProjectIdsWithHistoryOpsCountCb = callbackify(
|
|
getProjectIdsWithHistoryOpsCount
|
|
)
|
|
const setFirstOpTimestampCb = callbackify(setFirstOpTimestamp)
|
|
const getFirstOpTimestampCb = callbackify(getFirstOpTimestamp)
|
|
const clearFirstOpTimestampCb = callbackify(clearFirstOpTimestamp)
|
|
const getProjectIdsWithFirstOpTimestampsCb = callbackify(
|
|
getProjectIdsWithFirstOpTimestamps
|
|
)
|
|
const clearDanglingFirstOpTimestampCb = callbackify(
|
|
clearDanglingFirstOpTimestamp
|
|
)
|
|
const getCachedHistoryIdCb = callbackify(getCachedHistoryId)
|
|
const setCachedHistoryIdCb = callbackify(setCachedHistoryId)
|
|
const clearCachedHistoryIdCb = callbackify(clearCachedHistoryId)
|
|
|
|
const getUpdatesInBatchesCb = function (
|
|
projectId,
|
|
batchSize,
|
|
runner,
|
|
callback
|
|
) {
|
|
const runnerPromises = promisify(runner)
|
|
getUpdatesInBatches(projectId, batchSize, runnerPromises)
|
|
.then(result => {
|
|
callback(null, result)
|
|
})
|
|
.catch(err => {
|
|
callback(err)
|
|
})
|
|
}
|
|
|
|
export {
|
|
countUnprocessedUpdatesCb as countUnprocessedUpdates,
|
|
getOldestDocUpdatesCb as getOldestDocUpdates,
|
|
deleteAppliedDocUpdatesCb as deleteAppliedDocUpdates,
|
|
destroyDocUpdatesQueueCb as destroyDocUpdatesQueue,
|
|
getUpdatesInBatchesCb as getUpdatesInBatches,
|
|
getProjectIdsWithHistoryOpsCb as getProjectIdsWithHistoryOps,
|
|
getProjectIdsWithHistoryOpsCountCb as getProjectIdsWithHistoryOpsCount,
|
|
setFirstOpTimestampCb as setFirstOpTimestamp,
|
|
getFirstOpTimestampCb as getFirstOpTimestamp,
|
|
clearFirstOpTimestampCb as clearFirstOpTimestamp,
|
|
getProjectIdsWithFirstOpTimestampsCb as getProjectIdsWithFirstOpTimestamps,
|
|
clearDanglingFirstOpTimestampCb as clearDanglingFirstOpTimestamp,
|
|
getCachedHistoryIdCb as getCachedHistoryId,
|
|
setCachedHistoryIdCb as setCachedHistoryId,
|
|
clearCachedHistoryIdCb as clearCachedHistoryId,
|
|
}
|
|
|
|
export const promises = {
|
|
countUnprocessedUpdates,
|
|
getOldestDocUpdates,
|
|
deleteAppliedDocUpdates,
|
|
destroyDocUpdatesQueue,
|
|
getUpdatesInBatches,
|
|
getProjectIdsWithHistoryOps,
|
|
getProjectIdsWithHistoryOpsCount,
|
|
setFirstOpTimestamp,
|
|
getFirstOpTimestamp,
|
|
clearFirstOpTimestamp,
|
|
getProjectIdsWithFirstOpTimestamps,
|
|
clearDanglingFirstOpTimestamp,
|
|
getCachedHistoryId,
|
|
setCachedHistoryId,
|
|
clearCachedHistoryId,
|
|
}
|