mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-29 03:53:40 -05:00
3ead64344f
Read updates from Redis in smaller batches GitOrigin-RevId: 06901e4a9e43976e446c014d5d46c2488691c205
398 lines
12 KiB
JavaScript
398 lines
12 KiB
JavaScript
import { callbackify, promisify } from 'node:util'
|
|
import { setTimeout } from 'node:timers/promises'
|
|
import logger from '@overleaf/logger'
|
|
import Settings from '@overleaf/settings'
|
|
import redis from '@overleaf/redis-wrapper'
|
|
import metrics from '@overleaf/metrics'
|
|
import OError from '@overleaf/o-error'
|
|
|
|
/**
|
|
* Maximum size taken from the redis queue, to prevent project history
|
|
* consuming unbounded amounts of memory
|
|
*/
|
|
export const RAW_UPDATE_SIZE_THRESHOLD = 4 * 1024 * 1024
|
|
|
|
/**
|
|
* Batch size when reading updates from Redis
|
|
*/
|
|
export const RAW_UPDATES_BATCH_SIZE = 50
|
|
|
|
/**
|
|
* Maximum length of ops (insertion and deletions) to process in a single
|
|
* iteration
|
|
*/
|
|
export const MAX_UPDATE_OP_LENGTH = 1024
|
|
|
|
/**
|
|
* Warn if we exceed this raw update size, the final compressed updates we
|
|
* send could be smaller than this
|
|
*/
|
|
const WARN_RAW_UPDATE_SIZE = 1024 * 1024
|
|
|
|
/**
|
|
* Maximum number of new docs to process in a single iteration
|
|
*/
|
|
export const MAX_NEW_DOC_CONTENT_COUNT = 32
|
|
|
|
const CACHE_TTL_IN_SECONDS = 3600
|
|
|
|
const Keys = Settings.redis.project_history.key_schema
|
|
const rclient = redis.createClient(Settings.redis.project_history)
|
|
|
|
async function countUnprocessedUpdates(projectId) {
|
|
const key = Keys.projectHistoryOps({ project_id: projectId })
|
|
const updates = await rclient.llen(key)
|
|
return updates
|
|
}
|
|
|
|
async function* getRawUpdates(projectId) {
|
|
const key = Keys.projectHistoryOps({ project_id: projectId })
|
|
let start = 0
|
|
while (true) {
|
|
const stop = start + RAW_UPDATES_BATCH_SIZE - 1
|
|
const updates = await rclient.lrange(key, start, stop)
|
|
for (const update of updates) {
|
|
yield update
|
|
}
|
|
if (updates.length < RAW_UPDATES_BATCH_SIZE) {
|
|
return
|
|
}
|
|
start += RAW_UPDATES_BATCH_SIZE
|
|
}
|
|
}
|
|
|
|
async function getRawUpdatesBatch(projectId, batchSize) {
|
|
const rawUpdates = []
|
|
let totalRawUpdatesSize = 0
|
|
let hasMore = false
|
|
for await (const rawUpdate of getRawUpdates(projectId)) {
|
|
totalRawUpdatesSize += rawUpdate.length
|
|
if (
|
|
rawUpdates.length > 0 &&
|
|
totalRawUpdatesSize > RAW_UPDATE_SIZE_THRESHOLD
|
|
) {
|
|
hasMore = true
|
|
break
|
|
}
|
|
rawUpdates.push(rawUpdate)
|
|
if (rawUpdates.length >= batchSize) {
|
|
hasMore = true
|
|
break
|
|
}
|
|
}
|
|
metrics.timing('redis.incoming.bytes', totalRawUpdatesSize, 1)
|
|
if (totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) {
|
|
const rawUpdateSizes = rawUpdates.map(rawUpdate => rawUpdate.length)
|
|
logger.warn(
|
|
{
|
|
projectId,
|
|
totalRawUpdatesSize,
|
|
rawUpdateSizes,
|
|
},
|
|
'large raw update size'
|
|
)
|
|
}
|
|
return { rawUpdates, hasMore }
|
|
}
|
|
|
|
export function parseDocUpdates(jsonUpdates) {
|
|
return jsonUpdates.map(update => JSON.parse(update))
|
|
}
|
|
|
|
async function getUpdatesInBatches(projectId, batchSize, runner) {
|
|
let moreBatches = true
|
|
while (moreBatches) {
|
|
const redisBatch = await getRawUpdatesBatch(projectId, batchSize)
|
|
if (redisBatch.rawUpdates.length === 0) {
|
|
break
|
|
}
|
|
moreBatches = redisBatch.hasMore
|
|
|
|
const rawUpdates = []
|
|
const updates = []
|
|
let totalOpLength = 0
|
|
let totalDocContentCount = 0
|
|
for (const rawUpdate of redisBatch.rawUpdates) {
|
|
let update
|
|
try {
|
|
update = JSON.parse(rawUpdate)
|
|
} catch (error) {
|
|
throw OError.tag(error, 'failed to parse update', {
|
|
projectId,
|
|
update,
|
|
})
|
|
}
|
|
|
|
totalOpLength += update?.op?.length || 1
|
|
if (update.resyncDocContent) {
|
|
totalDocContentCount += 1
|
|
}
|
|
|
|
if (
|
|
updates.length > 0 &&
|
|
(totalOpLength > MAX_UPDATE_OP_LENGTH ||
|
|
totalDocContentCount > MAX_NEW_DOC_CONTENT_COUNT)
|
|
) {
|
|
moreBatches = true
|
|
break
|
|
}
|
|
|
|
rawUpdates.push(rawUpdate)
|
|
updates.push(update)
|
|
}
|
|
|
|
await runner(updates)
|
|
await deleteAppliedDocUpdates(projectId, rawUpdates)
|
|
|
|
if (batchSize === 1) {
|
|
// Special case for single stepping, don't process more batches
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
async function deleteAppliedDocUpdates(projectId, updates) {
|
|
const multi = rclient.multi()
|
|
// Delete all the updates which have been applied (exact match)
|
|
for (const update of updates) {
|
|
// Delete the first occurrence of the update with LREM KEY COUNT
|
|
// VALUE by setting COUNT to 1 which 'removes COUNT elements equal to
|
|
// value moving from head to tail.'
|
|
//
|
|
// If COUNT is 0 the entire list would be searched which would block
|
|
// redis snce it would be an O(N) operation where N is the length of
|
|
// the queue, in a multi of the batch size.
|
|
metrics.summary('redis.projectHistoryOps', update.length, {
|
|
status: 'lrem',
|
|
})
|
|
multi.lrem(Keys.projectHistoryOps({ project_id: projectId }), 1, update)
|
|
}
|
|
if (updates.length > 0) {
|
|
multi.del(Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }))
|
|
}
|
|
await multi.exec()
|
|
}
|
|
|
|
/**
|
|
* Deletes the entire queue - use with caution
|
|
*/
|
|
async function destroyDocUpdatesQueue(projectId) {
|
|
await rclient.del(
|
|
Keys.projectHistoryOps({ project_id: projectId }),
|
|
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Iterate over keys asynchronously using redis scan (non-blocking)
|
|
*
|
|
* handle all the cluster nodes or single redis server
|
|
*/
|
|
async function _getKeys(pattern, limit) {
|
|
const nodes = rclient.nodes?.('master') || [rclient]
|
|
const keysByNode = []
|
|
for (const node of nodes) {
|
|
const keys = await _getKeysFromNode(node, pattern, limit)
|
|
keysByNode.push(keys)
|
|
}
|
|
return [].concat(...keysByNode)
|
|
}
|
|
|
|
async function _getKeysFromNode(node, pattern, limit) {
|
|
let cursor = 0 // redis iterator
|
|
const keySet = new Set() // avoid duplicate results
|
|
const batchSize = limit != null ? Math.min(limit, 1000) : 1000
|
|
|
|
// scan over all keys looking for pattern
|
|
while (true) {
|
|
const reply = await node.scan(cursor, 'MATCH', pattern, 'COUNT', batchSize)
|
|
const [newCursor, keys] = reply
|
|
cursor = newCursor
|
|
|
|
for (const key of keys) {
|
|
keySet.add(key)
|
|
}
|
|
|
|
const noResults = cursor === '0' // redis returns string results not numeric
|
|
const limitReached = limit != null && keySet.size >= limit
|
|
if (noResults || limitReached) {
|
|
return Array.from(keySet)
|
|
}
|
|
|
|
// avoid hitting redis too hard
|
|
await setTimeout(10)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
|
|
* or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
|
|
*/
|
|
function _extractIds(keyList) {
|
|
return keyList.map(key => {
|
|
const m = key.match(/:\{?([0-9a-f]{24})\}?/) // extract object id
|
|
return m[1]
|
|
})
|
|
}
|
|
|
|
async function getProjectIdsWithHistoryOps(limit) {
|
|
const projectKeys = await _getKeys(
|
|
Keys.projectHistoryOps({ project_id: '*' }),
|
|
limit
|
|
)
|
|
const projectIds = _extractIds(projectKeys)
|
|
return projectIds
|
|
}
|
|
|
|
async function getProjectIdsWithHistoryOpsCount() {
|
|
const projectIds = await getProjectIdsWithHistoryOps()
|
|
const queuedProjectsCount = projectIds.length
|
|
metrics.globalGauge('queued-projects', queuedProjectsCount)
|
|
return queuedProjectsCount
|
|
}
|
|
|
|
async function setFirstOpTimestamp(projectId) {
|
|
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
// store current time as an integer (string)
|
|
await rclient.setnx(key, Date.now())
|
|
}
|
|
|
|
async function getFirstOpTimestamp(projectId) {
|
|
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
const result = await rclient.get(key)
|
|
|
|
// convert stored time back to a numeric timestamp
|
|
const timestamp = parseInt(result, 10)
|
|
|
|
// check for invalid timestamp
|
|
if (isNaN(timestamp)) {
|
|
return null
|
|
}
|
|
|
|
// convert numeric timestamp to a date object
|
|
const firstOpTimestamp = new Date(timestamp)
|
|
|
|
return firstOpTimestamp
|
|
}
|
|
|
|
async function clearFirstOpTimestamp(projectId) {
|
|
const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
await rclient.del(key)
|
|
}
|
|
|
|
async function getProjectIdsWithFirstOpTimestamps(limit) {
|
|
const projectKeys = await _getKeys(
|
|
Keys.projectHistoryFirstOpTimestamp({ project_id: '*' }),
|
|
limit
|
|
)
|
|
const projectIds = _extractIds(projectKeys)
|
|
return projectIds
|
|
}
|
|
|
|
async function clearDanglingFirstOpTimestamp(projectId) {
|
|
const count = await rclient.exists(
|
|
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }),
|
|
Keys.projectHistoryOps({ project_id: projectId })
|
|
)
|
|
if (count === 2 || count === 0) {
|
|
// both (or neither) keys are present, so don't delete the timestamp
|
|
return 0
|
|
}
|
|
// only one key is present, which makes this a dangling record,
|
|
// so delete the timestamp
|
|
const cleared = await rclient.del(
|
|
Keys.projectHistoryFirstOpTimestamp({ project_id: projectId })
|
|
)
|
|
return cleared
|
|
}
|
|
|
|
async function getCachedHistoryId(projectId) {
|
|
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
|
|
const historyId = await rclient.get(key)
|
|
return historyId
|
|
}
|
|
|
|
async function setCachedHistoryId(projectId, historyId) {
|
|
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
|
|
await rclient.setex(key, CACHE_TTL_IN_SECONDS, historyId)
|
|
}
|
|
|
|
async function clearCachedHistoryId(projectId) {
|
|
const key = Keys.projectHistoryCachedHistoryId({ project_id: projectId })
|
|
await rclient.del(key)
|
|
}
|
|
|
|
// EXPORTS
|
|
|
|
const countUnprocessedUpdatesCb = callbackify(countUnprocessedUpdates)
|
|
const getRawUpdatesBatchCb = callbackify(getRawUpdatesBatch)
|
|
const deleteAppliedDocUpdatesCb = callbackify(deleteAppliedDocUpdates)
|
|
const destroyDocUpdatesQueueCb = callbackify(destroyDocUpdatesQueue)
|
|
const getProjectIdsWithHistoryOpsCb = callbackify(getProjectIdsWithHistoryOps)
|
|
const getProjectIdsWithHistoryOpsCountCb = callbackify(
|
|
getProjectIdsWithHistoryOpsCount
|
|
)
|
|
const setFirstOpTimestampCb = callbackify(setFirstOpTimestamp)
|
|
const getFirstOpTimestampCb = callbackify(getFirstOpTimestamp)
|
|
const clearFirstOpTimestampCb = callbackify(clearFirstOpTimestamp)
|
|
const getProjectIdsWithFirstOpTimestampsCb = callbackify(
|
|
getProjectIdsWithFirstOpTimestamps
|
|
)
|
|
const clearDanglingFirstOpTimestampCb = callbackify(
|
|
clearDanglingFirstOpTimestamp
|
|
)
|
|
const getCachedHistoryIdCb = callbackify(getCachedHistoryId)
|
|
const setCachedHistoryIdCb = callbackify(setCachedHistoryId)
|
|
const clearCachedHistoryIdCb = callbackify(clearCachedHistoryId)
|
|
|
|
const getUpdatesInBatchesCb = function (
|
|
projectId,
|
|
batchSize,
|
|
runner,
|
|
callback
|
|
) {
|
|
const runnerPromises = promisify(runner)
|
|
getUpdatesInBatches(projectId, batchSize, runnerPromises)
|
|
.then(result => {
|
|
callback(null, result)
|
|
})
|
|
.catch(err => {
|
|
callback(err)
|
|
})
|
|
}
|
|
|
|
export {
|
|
countUnprocessedUpdatesCb as countUnprocessedUpdates,
|
|
getRawUpdatesBatchCb as getRawUpdatesBatch,
|
|
deleteAppliedDocUpdatesCb as deleteAppliedDocUpdates,
|
|
destroyDocUpdatesQueueCb as destroyDocUpdatesQueue,
|
|
getUpdatesInBatchesCb as getUpdatesInBatches,
|
|
getProjectIdsWithHistoryOpsCb as getProjectIdsWithHistoryOps,
|
|
getProjectIdsWithHistoryOpsCountCb as getProjectIdsWithHistoryOpsCount,
|
|
setFirstOpTimestampCb as setFirstOpTimestamp,
|
|
getFirstOpTimestampCb as getFirstOpTimestamp,
|
|
clearFirstOpTimestampCb as clearFirstOpTimestamp,
|
|
getProjectIdsWithFirstOpTimestampsCb as getProjectIdsWithFirstOpTimestamps,
|
|
clearDanglingFirstOpTimestampCb as clearDanglingFirstOpTimestamp,
|
|
getCachedHistoryIdCb as getCachedHistoryId,
|
|
setCachedHistoryIdCb as setCachedHistoryId,
|
|
clearCachedHistoryIdCb as clearCachedHistoryId,
|
|
}
|
|
|
|
export const promises = {
|
|
countUnprocessedUpdates,
|
|
getRawUpdatesBatch,
|
|
deleteAppliedDocUpdates,
|
|
destroyDocUpdatesQueue,
|
|
getUpdatesInBatches,
|
|
getProjectIdsWithHistoryOps,
|
|
getProjectIdsWithHistoryOpsCount,
|
|
setFirstOpTimestamp,
|
|
getFirstOpTimestamp,
|
|
clearFirstOpTimestamp,
|
|
getProjectIdsWithFirstOpTimestamps,
|
|
clearDanglingFirstOpTimestamp,
|
|
getCachedHistoryId,
|
|
setCachedHistoryId,
|
|
clearCachedHistoryId,
|
|
}
|