2023-01-13 12:42:29 +00:00
|
|
|
// TODO: This file was created by bulk-decaffeinate.
|
|
|
|
// Fix any style issues and re-enable lint.
|
|
|
|
/*
|
|
|
|
* decaffeinate suggestions:
|
|
|
|
* DS101: Remove unnecessary use of Array.from
|
|
|
|
* DS102: Remove unnecessary code created because of implicit returns
|
|
|
|
* DS207: Consider shorter variations of null checks
|
|
|
|
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
|
|
|
*/
|
|
|
|
import async from 'async'
|
|
|
|
import logger from '@overleaf/logger'
|
|
|
|
import OError from '@overleaf/o-error'
|
|
|
|
import _ from 'lodash'
|
|
|
|
import * as RedisManager from './RedisManager.js'
|
|
|
|
import * as UpdatesProcessor from './UpdatesProcessor.js'
|
|
|
|
import * as ErrorRecorder from './ErrorRecorder.js'
|
|
|
|
|
2023-03-20 13:41:11 +00:00
|
|
|
export function flushIfOld(projectId, cutoffTime, callback) {
|
2023-01-13 12:42:29 +00:00
|
|
|
if (callback == null) {
|
|
|
|
callback = function () {}
|
|
|
|
}
|
|
|
|
return RedisManager.getFirstOpTimestamp(
|
2023-03-20 13:41:11 +00:00
|
|
|
projectId,
|
2023-01-13 12:42:29 +00:00
|
|
|
function (err, firstOpTimestamp) {
|
|
|
|
if (err != null) {
|
|
|
|
return callback(OError.tag(err))
|
|
|
|
}
|
2023-03-01 08:47:14 +00:00
|
|
|
// In the normal case, the flush marker will be set with the
|
|
|
|
// timestamp of the oldest operation in the queue by docupdater.
|
|
|
|
// If the marker is not set for any reason, we flush it anyway
|
|
|
|
// for safety.
|
|
|
|
if (!firstOpTimestamp || firstOpTimestamp < cutoffTime) {
|
|
|
|
logger.debug(
|
2023-03-20 13:41:11 +00:00
|
|
|
{ projectId, firstOpTimestamp, cutoffTime },
|
2023-03-01 08:47:14 +00:00
|
|
|
'flushing old project'
|
|
|
|
)
|
2023-03-20 13:41:11 +00:00
|
|
|
return UpdatesProcessor.processUpdatesForProject(projectId, callback)
|
2023-01-13 12:42:29 +00:00
|
|
|
} else {
|
2023-03-01 08:47:14 +00:00
|
|
|
return callback()
|
2023-01-13 12:42:29 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
export function flushOldOps(options, callback) {
|
|
|
|
if (callback == null) {
|
|
|
|
callback = function () {}
|
|
|
|
}
|
|
|
|
logger.debug({ options }, 'starting flush of old ops')
|
|
|
|
// allow running flush in background for cron jobs
|
|
|
|
if (options.background) {
|
|
|
|
// return immediate response to client, then discard callback
|
|
|
|
callback(null, { message: 'running flush in background' })
|
|
|
|
callback = function () {}
|
|
|
|
}
|
|
|
|
return RedisManager.getProjectIdsWithHistoryOps(
|
|
|
|
null,
|
|
|
|
function (error, projectIds) {
|
|
|
|
if (error != null) {
|
|
|
|
return callback(OError.tag(error))
|
|
|
|
}
|
2024-03-25 10:51:40 +00:00
|
|
|
return ErrorRecorder.getFailedProjects(
|
|
|
|
function (error, projectHistoryFailures) {
|
|
|
|
if (error != null) {
|
|
|
|
return callback(OError.tag(error))
|
|
|
|
}
|
|
|
|
// exclude failed projects already in projectHistoryFailures
|
|
|
|
const failedProjects = new Set()
|
|
|
|
for (const entry of Array.from(projectHistoryFailures)) {
|
|
|
|
failedProjects.add(entry.project_id)
|
|
|
|
}
|
|
|
|
// randomise order so we get different projects if there is a limit
|
|
|
|
projectIds = _.shuffle(projectIds)
|
|
|
|
const maxAge = options.maxAge || 6 * 3600 // default to 6 hours
|
|
|
|
const cutoffTime = new Date(Date.now() - maxAge * 1000)
|
|
|
|
const startTime = new Date()
|
|
|
|
let count = 0
|
|
|
|
const jobs = projectIds.map(
|
|
|
|
projectId =>
|
|
|
|
function (cb) {
|
|
|
|
const timeTaken = new Date() - startTime
|
|
|
|
count++
|
|
|
|
if (
|
|
|
|
(options != null ? options.timeout : undefined) &&
|
|
|
|
timeTaken > options.timeout
|
|
|
|
) {
|
|
|
|
// finish early due to timeout, return an error to bail out of the async iteration
|
|
|
|
logger.debug('background retries timed out')
|
|
|
|
return cb(new OError('retries timed out'))
|
|
|
|
}
|
|
|
|
if (
|
|
|
|
(options != null ? options.limit : undefined) &&
|
|
|
|
count > options.limit
|
|
|
|
) {
|
|
|
|
// finish early due to reaching limit, return an error to bail out of the async iteration
|
|
|
|
logger.debug({ count }, 'background retries hit limit')
|
|
|
|
return cb(new OError('hit limit'))
|
|
|
|
}
|
|
|
|
if (failedProjects.has(projectId)) {
|
|
|
|
// skip failed projects
|
|
|
|
return setTimeout(cb, options.queueDelay || 100) // pause between flushes
|
|
|
|
}
|
|
|
|
return flushIfOld(projectId, cutoffTime, function (err) {
|
|
|
|
if (err != null) {
|
|
|
|
logger.warn(
|
|
|
|
{ projectId, flushErr: err },
|
|
|
|
'error flushing old project'
|
|
|
|
)
|
|
|
|
}
|
|
|
|
return setTimeout(cb, options.queueDelay || 100)
|
|
|
|
})
|
2023-01-13 12:42:29 +00:00
|
|
|
}
|
2024-03-25 10:51:40 +00:00
|
|
|
) // pause between flushes
|
|
|
|
return async.series(
|
|
|
|
async.reflectAll(jobs),
|
|
|
|
function (error, results) {
|
|
|
|
const success = []
|
|
|
|
const failure = []
|
|
|
|
results.forEach((result, i) => {
|
|
|
|
if (
|
|
|
|
result.error != null &&
|
|
|
|
!['retries timed out', 'hit limit'].includes(
|
|
|
|
result?.error?.message
|
2023-01-13 12:42:29 +00:00
|
|
|
)
|
2024-03-25 10:51:40 +00:00
|
|
|
) {
|
|
|
|
// ignore expected errors
|
|
|
|
return failure.push(projectIds[i])
|
|
|
|
} else {
|
|
|
|
return success.push(projectIds[i])
|
2023-01-13 12:42:29 +00:00
|
|
|
}
|
|
|
|
})
|
2024-03-25 10:51:40 +00:00
|
|
|
return callback(error, { success, failure, failedProjects })
|
2023-01-13 12:42:29 +00:00
|
|
|
}
|
2024-03-25 10:51:40 +00:00
|
|
|
)
|
|
|
|
}
|
|
|
|
)
|
2023-01-13 12:42:29 +00:00
|
|
|
}
|
|
|
|
)
|
|
|
|
}
|