mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-20 12:23:47 +00:00
Add verification looper and handle shutdown signals
Shutdown signals become more relevant now that we are looping as we want to gracefully stop processing records rather than continue looping. GitOrigin-RevId: dbb499388c86d552d77954988f8fc27d140da3f1
This commit is contained in:
parent
9d72eeeeac
commit
78481e010e
3 changed files with 111 additions and 19 deletions
|
@ -17,6 +17,11 @@ import { mongodb } from './storage/index.js'
|
|||
import { expressify } from '@overleaf/promise-utils'
|
||||
import { Blob } from 'overleaf-editor-core'
|
||||
import { loadGlobalBlobs } from './storage/lib/blob_store/index.js'
|
||||
import { EventEmitter } from 'node:events'
|
||||
import {
|
||||
loopRandomProjects,
|
||||
setWriteMetrics,
|
||||
} from './backupVerifier/ProjectVerifier.mjs'
|
||||
|
||||
const app = express()
|
||||
|
||||
|
@ -49,6 +54,7 @@ app.get(
|
|||
)
|
||||
|
||||
app.get('/status', (req, res) => {
|
||||
logger.info({}, 'status check')
|
||||
res.send('history-v1-backup-verifier is up')
|
||||
})
|
||||
|
||||
|
@ -66,6 +72,23 @@ app.use((err, req, res, next) => {
|
|||
next(err)
|
||||
})
|
||||
|
||||
const shutdownEmitter = new EventEmitter()
|
||||
|
||||
shutdownEmitter.once('shutdown', async code => {
|
||||
logger.info({}, 'shutting down')
|
||||
await mongodb.client.close()
|
||||
await setTimeout(100)
|
||||
process.exit(code)
|
||||
})
|
||||
|
||||
process.on('SIGTERM', () => {
|
||||
shutdownEmitter.emit('shutdown', 0)
|
||||
})
|
||||
|
||||
process.on('SIGINT', () => {
|
||||
shutdownEmitter.emit('shutdown', 0)
|
||||
})
|
||||
|
||||
/**
|
||||
* @param {number} port
|
||||
* @return {Promise<http.Server>}
|
||||
|
@ -76,18 +99,19 @@ export async function startApp(port) {
|
|||
await healthCheck()
|
||||
const server = http.createServer(app)
|
||||
await promisify(server.listen.bind(server, port))()
|
||||
loopRandomProjects(shutdownEmitter)
|
||||
return server
|
||||
}
|
||||
|
||||
setWriteMetrics(true)
|
||||
|
||||
// Run this if we're called directly
|
||||
if (process.argv[1] === fileURLToPath(import.meta.url)) {
|
||||
const PORT = parseInt(process.env.PORT || '3102', 10)
|
||||
try {
|
||||
await startApp(PORT)
|
||||
} catch (error) {
|
||||
shutdownEmitter.emit('shutdown', 1)
|
||||
logger.error({ error }, 'error starting app')
|
||||
await mongodb.client.close()
|
||||
await setTimeout(100)
|
||||
process.exit(1)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import {
|
|||
getProjectsUpdatedInDateRangeCursor,
|
||||
} from './ProjectSampler.mjs'
|
||||
import OError from '@overleaf/o-error'
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
|
||||
const MS_PER_30_DAYS = 30 * 24 * 60 * 60 * 1000
|
||||
|
||||
|
@ -20,6 +21,10 @@ const METRICS = {
|
|||
|
||||
let WRITE_METRICS = false
|
||||
|
||||
/**
|
||||
* @typedef {import('node:events').EventEmitter} EventEmitter
|
||||
*/
|
||||
|
||||
/**
|
||||
* Allows writing metrics to be enabled or disabled.
|
||||
* @param {Boolean} writeMetrics
|
||||
|
@ -65,14 +70,24 @@ function splitJobs(startDate, endDate, interval) {
|
|||
|
||||
/**
|
||||
*
|
||||
* @param historyIdCursor
|
||||
* @param {AsyncGenerator<string>} historyIdCursor
|
||||
* @param {EventEmitter} [eventEmitter]
|
||||
* @return {Promise<{verified: number, total: number, errorTypes: *[], hasFailure: boolean}>}
|
||||
*/
|
||||
async function verifyProjectsFromCursor(historyIdCursor) {
|
||||
async function verifyProjectsFromCursor(historyIdCursor, eventEmitter) {
|
||||
const errorTypes = []
|
||||
let verified = 0
|
||||
let total = 0
|
||||
let receivedShutdownSignal = false
|
||||
if (eventEmitter) {
|
||||
eventEmitter.once('shutdown', () => {
|
||||
receivedShutdownSignal = true
|
||||
})
|
||||
}
|
||||
for await (const historyId of historyIdCursor) {
|
||||
if (receivedShutdownSignal) {
|
||||
break
|
||||
}
|
||||
total++
|
||||
try {
|
||||
await verifyProjectWithErrorContext(historyId)
|
||||
|
@ -81,7 +96,8 @@ async function verifyProjectsFromCursor(historyIdCursor) {
|
|||
metrics.inc(METRICS.backup_project_verification_succeeded)
|
||||
verified++
|
||||
} catch (error) {
|
||||
errorTypes.push(handleVerificationError(error, historyId))
|
||||
const errorType = handleVerificationError(error, historyId)
|
||||
errorTypes.push(errorType)
|
||||
}
|
||||
}
|
||||
return {
|
||||
|
@ -95,11 +111,12 @@ async function verifyProjectsFromCursor(historyIdCursor) {
|
|||
/**
|
||||
*
|
||||
* @param {number} nProjectsToSample
|
||||
* @param {EventEmitter} [signal]
|
||||
* @return {Promise<VerificationJobStatus>}
|
||||
*/
|
||||
export async function verifyRandomProjectSample(nProjectsToSample) {
|
||||
export async function verifyRandomProjectSample(nProjectsToSample, signal) {
|
||||
const historyIds = await getSampleProjectsCursor(nProjectsToSample)
|
||||
return await verifyProjectsFromCursor(historyIds)
|
||||
return await verifyProjectsFromCursor(historyIds, signal)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -108,13 +125,15 @@ export async function verifyRandomProjectSample(nProjectsToSample) {
|
|||
* @param {Date} startDate
|
||||
* @param {Date} endDate
|
||||
* @param {number} projectsPerRange
|
||||
* @param {EventEmitter} [signal]
|
||||
* @return {Promise<VerificationJobStatus>}
|
||||
*/
|
||||
async function verifyRange(startDate, endDate, projectsPerRange) {
|
||||
async function verifyRange(startDate, endDate, projectsPerRange, signal) {
|
||||
logger.info({ startDate, endDate }, 'verifying range')
|
||||
|
||||
const results = await verifyProjectsFromCursor(
|
||||
getProjectsCreatedInDateRangeCursor(startDate, endDate, projectsPerRange)
|
||||
getProjectsCreatedInDateRangeCursor(startDate, endDate, projectsPerRange),
|
||||
signal
|
||||
)
|
||||
|
||||
if (results.total === 0) {
|
||||
|
@ -154,6 +173,7 @@ async function verifyRange(startDate, endDate, projectsPerRange) {
|
|||
* @property {number} [interval]
|
||||
* @property {number} [projectsPerRange]
|
||||
* @property {number} [concurrency]
|
||||
* @property {EventEmitter} [signal]
|
||||
*/
|
||||
|
||||
/**
|
||||
|
@ -167,6 +187,7 @@ export async function verifyProjectsCreatedInDateRange({
|
|||
startDate,
|
||||
endDate,
|
||||
interval = MS_PER_30_DAYS,
|
||||
signal,
|
||||
}) {
|
||||
const jobs = splitJobs(startDate, endDate, interval)
|
||||
if (jobs.length === 0) {
|
||||
|
@ -180,7 +201,7 @@ export async function verifyProjectsCreatedInDateRange({
|
|||
concurrency,
|
||||
jobs,
|
||||
({ startDate, endDate }) =>
|
||||
verifyRange(startDate, endDate, projectsPerRange)
|
||||
verifyRange(startDate, endDate, projectsPerRange, signal)
|
||||
)
|
||||
return settlements.reduce(
|
||||
/**
|
||||
|
@ -220,19 +241,22 @@ export async function verifyProjectsCreatedInDateRange({
|
|||
* @param {Date} startDate
|
||||
* @param {Date} endDate
|
||||
* @param {number} nProjects
|
||||
* @param {EventEmitter} [signal]
|
||||
* @return {Promise<VerificationJobStatus>}
|
||||
*/
|
||||
export async function verifyProjectsUpdatedInDateRange(
|
||||
startDate,
|
||||
endDate,
|
||||
nProjects
|
||||
nProjects,
|
||||
signal
|
||||
) {
|
||||
logger.debug(
|
||||
{ startDate, endDate, nProjects },
|
||||
'Sampling projects updated in date range'
|
||||
)
|
||||
const results = await verifyProjectsFromCursor(
|
||||
getProjectsUpdatedInDateRangeCursor(startDate, endDate, nProjects)
|
||||
getProjectsUpdatedInDateRangeCursor(startDate, endDate, nProjects),
|
||||
signal
|
||||
)
|
||||
|
||||
if (results.total === 0) {
|
||||
|
@ -254,3 +278,29 @@ export async function verifyProjectsUpdatedInDateRange(
|
|||
)
|
||||
return jobStatus
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {EventEmitter} signal
|
||||
* @return {void}
|
||||
*/
|
||||
export function loopRandomProjects(signal) {
|
||||
let shutdown = false
|
||||
signal.on('shutdown', function () {
|
||||
shutdown = true
|
||||
})
|
||||
async function loop() {
|
||||
do {
|
||||
try {
|
||||
const result = await verifyRandomProjectSample(100, signal)
|
||||
logger.debug({ result }, 'verified random project sample')
|
||||
} catch (error) {
|
||||
logger.error({ error }, 'error verifying random project sample')
|
||||
}
|
||||
|
||||
await setTimeout(300_000)
|
||||
// eslint-disable-next-line no-unmodified-loop-condition
|
||||
} while (!shutdown)
|
||||
}
|
||||
loop()
|
||||
}
|
||||
|
|
|
@ -12,6 +12,8 @@ import { setTimeout } from 'node:timers/promises'
|
|||
import logger from '@overleaf/logger'
|
||||
import { loadGlobalBlobs } from '../lib/blob_store/index.js'
|
||||
import { getDatesBeforeRPO } from '../../backupVerifier/utils.mjs'
|
||||
import { EventEmitter } from 'node:events'
|
||||
import { mongodb } from '../index.js'
|
||||
|
||||
logger.logger.level('fatal')
|
||||
|
||||
|
@ -39,7 +41,7 @@ const STATS = {
|
|||
|
||||
/**
|
||||
* @typedef {Object} CLIOptions
|
||||
* @property {() => Promise<VerificationJobStatus>} projectVerifier
|
||||
* @property {(signal: EventEmitter) => Promise<VerificationJobStatus>} projectVerifier
|
||||
* @property {boolean} verbose
|
||||
*/
|
||||
|
||||
|
@ -88,17 +90,18 @@ function getOptions() {
|
|||
console.log('Verifying random projects')
|
||||
return {
|
||||
verbose,
|
||||
projectVerifier: () => verifyRandomProjectSample(nProjects),
|
||||
projectVerifier: signal => verifyRandomProjectSample(nProjects, signal),
|
||||
}
|
||||
case 'recent':
|
||||
return {
|
||||
verbose,
|
||||
projectVerifier: async () => {
|
||||
projectVerifier: async signal => {
|
||||
const { startDate, endDate } = getDatesBeforeRPO(3 * 3600)
|
||||
return await verifyProjectsUpdatedInDateRange(
|
||||
startDate,
|
||||
endDate,
|
||||
nProjects
|
||||
nProjects,
|
||||
signal
|
||||
)
|
||||
},
|
||||
}
|
||||
|
@ -122,12 +125,13 @@ function getOptions() {
|
|||
}
|
||||
STATS.ranges = 0
|
||||
return {
|
||||
projectVerifier: () =>
|
||||
projectVerifier: signal =>
|
||||
verifyProjectsCreatedInDateRange({
|
||||
startDate: new Date(start),
|
||||
endDate: new Date(end),
|
||||
projectsPerRange: nProjects,
|
||||
concurrency,
|
||||
signal,
|
||||
}),
|
||||
verbose,
|
||||
}
|
||||
|
@ -181,10 +185,24 @@ function displayStats(stats) {
|
|||
}
|
||||
}
|
||||
|
||||
const shutdownEmitter = new EventEmitter()
|
||||
|
||||
shutdownEmitter.on('shutdown', async () => {
|
||||
await gracefulShutdown()
|
||||
})
|
||||
|
||||
process.on('SIGTERM', () => {
|
||||
shutdownEmitter.emit('shutdown')
|
||||
})
|
||||
|
||||
process.on('SIGINT', () => {
|
||||
shutdownEmitter.emit('shutdown')
|
||||
})
|
||||
|
||||
await loadGlobalBlobs()
|
||||
|
||||
try {
|
||||
const stats = await projectVerifier()
|
||||
const stats = await projectVerifier(shutdownEmitter)
|
||||
displayStats(stats)
|
||||
console.log(`completed`)
|
||||
} catch (error) {
|
||||
|
|
Loading…
Add table
Reference in a new issue