overleaf/services/project-history/app/js/RedisManager.js
Eric Mc Sween 3ead64344f Merge pull request #11952 from overleaf/em-batch-redis-reads
Read updates from Redis in smaller batches

GitOrigin-RevId: 06901e4a9e43976e446c014d5d46c2488691c205
2023-02-28 09:04:23 +00:00

398 lines
12 KiB
JavaScript

import { callbackify, promisify } from 'node:util'
import { setTimeout } from 'node:timers/promises'
import logger from '@overleaf/logger'
import Settings from '@overleaf/settings'
import redis from '@overleaf/redis-wrapper'
import metrics from '@overleaf/metrics'
import OError from '@overleaf/o-error'
/**
* Maximum size taken from the redis queue, to prevent project history
* consuming unbounded amounts of memory
*/
export const RAW_UPDATE_SIZE_THRESHOLD = 4 * 1024 * 1024
/**
* Batch size when reading updates from Redis
*/
export const RAW_UPDATES_BATCH_SIZE = 50
/**
* Maximum length of ops (insertion and deletions) to process in a single
* iteration
*/
export const MAX_UPDATE_OP_LENGTH = 1024
/**
* Warn if we exceed this raw update size, the final compressed updates we
* send could be smaller than this
*/
const WARN_RAW_UPDATE_SIZE = 1024 * 1024
/**
* Maximum number of new docs to process in a single iteration
*/
export const MAX_NEW_DOC_CONTENT_COUNT = 32
const CACHE_TTL_IN_SECONDS = 3600
const Keys = Settings.redis.project_history.key_schema
const rclient = redis.createClient(Settings.redis.project_history)
async function countUnprocessedUpdates(projectId) {
const key = Keys.projectHistoryOps({ project_id: projectId })
const updates = await rclient.llen(key)
return updates
}
async function* getRawUpdates(projectId) {
const key = Keys.projectHistoryOps({ project_id: projectId })
let start = 0
while (true) {
const stop = start + RAW_UPDATES_BATCH_SIZE - 1
const updates = await rclient.lrange(key, start, stop)
for (const update of updates) {
yield update
}
if (updates.length < RAW_UPDATES_BATCH_SIZE) {
return
}
start += RAW_UPDATES_BATCH_SIZE
}
}
async function getRawUpdatesBatch(projectId, batchSize) {
const rawUpdates = []
let totalRawUpdatesSize = 0
let hasMore = false
for await (const rawUpdate of getRawUpdates(projectId)) {
totalRawUpdatesSize += rawUpdate.length
if (
rawUpdates.length > 0 &&
totalRawUpdatesSize > RAW_UPDATE_SIZE_THRESHOLD
) {
hasMore = true
break
}
rawUpdates.push(rawUpdate)
if (rawUpdates.length >= batchSize) {
hasMore = true
break
}
}
metrics.timing('redis.incoming.bytes', totalRawUpdatesSize, 1)
if (totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) {
const rawUpdateSizes = rawUpdates.map(rawUpdate => rawUpdate.length)
logger.warn(
{
projectId,
totalRawUpdatesSize,
rawUpdateSizes,
},
'large raw update size'
)
}
return { rawUpdates, hasMore }
}
export function parseDocUpdates(jsonUpdates) {
return jsonUpdates.map(update => JSON.parse(update))
}
async function getUpdatesInBatches(projectId, batchSize, runner) {
let moreBatches = true
while (moreBatches) {
const redisBatch = await getRawUpdatesBatch(projectId, batchSize)
if (redisBatch.rawUpdates.length === 0) {
break
}
moreBatches = redisBatch.hasMore
const rawUpdates = []
const updates = []
let totalOpLength = 0
let totalDocContentCount = 0
for (const rawUpdate of redisBatch.rawUpdates) {
let update
try {
update = JSON.parse(rawUpdate)
} catch (error) {
throw OError.tag(error, 'failed to parse update', {
projectId,
update,
})
}
totalOpLength += update?.op?.length || 1
if (update.resyncDocContent) {
totalDocContentCount += 1
}
if (
updates.length > 0 &&
(totalOpLength > MAX_UPDATE_OP_LENGTH ||
totalDocContentCount > MAX_NEW_DOC_CONTENT_COUNT)
) {
moreBatches = true
break
}
rawUpdates.push(rawUpdate)
updates.push(update)
}
await runner(updates)
await deleteAppliedDocUpdates(projectId, rawUpdates)
if (batchSize === 1) {
// Special case for single stepping, don't process more batches
break
}
}
}
async function deleteAppliedDocUpdates(projectId, updates) {
const multi = rclient.multi()
// Delete all the updates which have been applied (exact match)
for (const update of updates) {
// Delete the first occurrence of the update with LREM KEY COUNT
// VALUE by setting COUNT to 1 which 'removes COUNT elements equal to
// value moving from head to tail.'
//
// If COUNT is 0 the entire list would be searched which would block
// redis snce it would be an O(N) operation where N is the length of
// the queue, in a multi of the batch size.
metrics.summary('redis.projectHistoryOps', update.length, {
status: 'lrem',
})
multi.lrem(Keys.projectHistoryOps({ project_id: projectId }), 1, update)
}
if (updates.length > 0) {
multi.del(Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }))
}
await multi.exec()
}
/**
* Deletes the entire queue - use with caution
*/
async function destroyDocUpdatesQueue(projectId) {
await rclient.del(
Keys.projectHistoryOps({ project_id: projectId }),
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
)
}
/**
* Iterate over keys asynchronously using redis scan (non-blocking)
*
* handle all the cluster nodes or single redis server
*/
async function _getKeys(pattern, limit) {
const nodes = rclient.nodes?.('master') || [rclient]
const keysByNode = []
for (const node of nodes) {
const keys = await _getKeysFromNode(node, pattern, limit)
keysByNode.push(keys)
}
return [].concat(...keysByNode)
}
async function _getKeysFromNode(node, pattern, limit) {
let cursor = 0 // redis iterator
const keySet = new Set() // avoid duplicate results
const batchSize = limit != null ? Math.min(limit, 1000) : 1000
// scan over all keys looking for pattern
while (true) {
const reply = await node.scan(cursor, 'MATCH', pattern, 'COUNT', batchSize)
const [newCursor, keys] = reply
cursor = newCursor
for (const key of keys) {
keySet.add(key)
}
const noResults = cursor === '0' // redis returns string results not numeric
const limitReached = limit != null && keySet.size >= limit
if (noResults || limitReached) {
return Array.from(keySet)
}
// avoid hitting redis too hard
await setTimeout(10)
}
}
/**
* Extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
* or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
*/
function _extractIds(keyList) {
return keyList.map(key => {
const m = key.match(/:\{?([0-9a-f]{24})\}?/) // extract object id
return m[1]
})
}
async function getProjectIdsWithHistoryOps(limit) {
const projectKeys = await _getKeys(
Keys.projectHistoryOps({ project_id: '*' }),
limit
)
const projectIds = _extractIds(projectKeys)
return projectIds
}
async function getProjectIdsWithHistoryOpsCount() {
const projectIds = await getProjectIdsWithHistoryOps()
const queuedProjectsCount = projectIds.length
metrics.globalGauge('queued-projects', queuedProjectsCount)
return queuedProjectsCount
}
async function setFirstOpTimestamp(projectId) {
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
// store current time as an integer (string)
await rclient.setnx(key, Date.now())
}
async function getFirstOpTimestamp(projectId) {
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
const result = await rclient.get(key)
// convert stored time back to a numeric timestamp
const timestamp = parseInt(result, 10)
// check for invalid timestamp
if (isNaN(timestamp)) {
return null
}
// convert numeric timestamp to a date object
const firstOpTimestamp = new Date(timestamp)
return firstOpTimestamp
}
async function clearFirstOpTimestamp(projectId) {
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
await rclient.del(key)
}
async function getProjectIdsWithFirstOpTimestamps(limit) {
const projectKeys = await _getKeys(
Keys.projectHistoryFirstOpTimestamp({ project_id: '*' }),
limit
)
const projectIds = _extractIds(projectKeys)
return projectIds
}
async function clearDanglingFirstOpTimestamp(projectId) {
const count = await rclient.exists(
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }),
Keys.projectHistoryOps({ project_id: projectId })
)
if (count === 2 || count === 0) {
// both (or neither) keys are present, so don't delete the timestamp
return 0
}
// only one key is present, which makes this a dangling record,
// so delete the timestamp
const cleared = await rclient.del(
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
)
return cleared
}
async function getCachedHistoryId(projectId) {
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
const historyId = await rclient.get(key)
return historyId
}
async function setCachedHistoryId(projectId, historyId) {
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
await rclient.setex(key, CACHE_TTL_IN_SECONDS, historyId)
}
async function clearCachedHistoryId(projectId) {
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
await rclient.del(key)
}
// EXPORTS
const countUnprocessedUpdatesCb = callbackify(countUnprocessedUpdates)
const getRawUpdatesBatchCb = callbackify(getRawUpdatesBatch)
const deleteAppliedDocUpdatesCb = callbackify(deleteAppliedDocUpdates)
const destroyDocUpdatesQueueCb = callbackify(destroyDocUpdatesQueue)
const getProjectIdsWithHistoryOpsCb = callbackify(getProjectIdsWithHistoryOps)
const getProjectIdsWithHistoryOpsCountCb = callbackify(
getProjectIdsWithHistoryOpsCount
)
const setFirstOpTimestampCb = callbackify(setFirstOpTimestamp)
const getFirstOpTimestampCb = callbackify(getFirstOpTimestamp)
const clearFirstOpTimestampCb = callbackify(clearFirstOpTimestamp)
const getProjectIdsWithFirstOpTimestampsCb = callbackify(
getProjectIdsWithFirstOpTimestamps
)
const clearDanglingFirstOpTimestampCb = callbackify(
clearDanglingFirstOpTimestamp
)
const getCachedHistoryIdCb = callbackify(getCachedHistoryId)
const setCachedHistoryIdCb = callbackify(setCachedHistoryId)
const clearCachedHistoryIdCb = callbackify(clearCachedHistoryId)
const getUpdatesInBatchesCb = function (
projectId,
batchSize,
runner,
callback
) {
const runnerPromises = promisify(runner)
getUpdatesInBatches(projectId, batchSize, runnerPromises)
.then(result => {
callback(null, result)
})
.catch(err => {
callback(err)
})
}
export {
countUnprocessedUpdatesCb as countUnprocessedUpdates,
getRawUpdatesBatchCb as getRawUpdatesBatch,
deleteAppliedDocUpdatesCb as deleteAppliedDocUpdates,
destroyDocUpdatesQueueCb as destroyDocUpdatesQueue,
getUpdatesInBatchesCb as getUpdatesInBatches,
getProjectIdsWithHistoryOpsCb as getProjectIdsWithHistoryOps,
getProjectIdsWithHistoryOpsCountCb as getProjectIdsWithHistoryOpsCount,
setFirstOpTimestampCb as setFirstOpTimestamp,
getFirstOpTimestampCb as getFirstOpTimestamp,
clearFirstOpTimestampCb as clearFirstOpTimestamp,
getProjectIdsWithFirstOpTimestampsCb as getProjectIdsWithFirstOpTimestamps,
clearDanglingFirstOpTimestampCb as clearDanglingFirstOpTimestamp,
getCachedHistoryIdCb as getCachedHistoryId,
setCachedHistoryIdCb as setCachedHistoryId,
clearCachedHistoryIdCb as clearCachedHistoryId,
}
export const promises = {
countUnprocessedUpdates,
getRawUpdatesBatch,
deleteAppliedDocUpdates,
destroyDocUpdatesQueue,
getUpdatesInBatches,
getProjectIdsWithHistoryOps,
getProjectIdsWithHistoryOpsCount,
setFirstOpTimestamp,
getFirstOpTimestamp,
clearFirstOpTimestamp,
getProjectIdsWithFirstOpTimestamps,
clearDanglingFirstOpTimestamp,
getCachedHistoryId,
setCachedHistoryId,
clearCachedHistoryId,
}