From c2332439484dc2a82f30ebe0d3ba268fce19956e Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 10 Mar 2025 16:16:06 +0000 Subject: [PATCH] Merge pull request #24200 from overleaf/bg-backup-queue-pending-jobs fix backup worker and backup scheduler to handle pending projects GitOrigin-RevId: a97e011615666b3ae2b8fafe26a96d41b3609edd --- .../storage/lib/backup_store/index.js | 17 +- .../storage/scripts/backup_scheduler.mjs | 212 +++++++++++++++++- .../storage/scripts/backup_worker.mjs | 8 +- 3 files changed, 219 insertions(+), 18 deletions(-) diff --git a/services/history-v1/storage/lib/backup_store/index.js b/services/history-v1/storage/lib/backup_store/index.js index 37770c702f..1253afea06 100644 --- a/services/history-v1/storage/lib/backup_store/index.js +++ b/services/history-v1/storage/lib/backup_store/index.js @@ -3,8 +3,18 @@ const { projects, backedUpBlobs } = require('../mongodb') const OError = require('@overleaf/o-error') // List projects with pending backups older than the specified interval -function listPendingBackups(timeIntervalMs = 0) { +function listPendingBackups(timeIntervalMs = 0, limit = null) { const cutoffTime = new Date(Date.now() - timeIntervalMs) + const options = { + projection: { 'overleaf.backup.pendingChangeAt': 1 }, + sort: { 'overleaf.backup.pendingChangeAt': 1 }, + } + + // Apply limit if provided + if (limit) { + options.limit = limit + } + const cursor = projects.find( { 'overleaf.backup.pendingChangeAt': { @@ -12,10 +22,7 @@ function listPendingBackups(timeIntervalMs = 0) { $lt: cutoffTime, }, }, - { - projection: { 'overleaf.backup': 1, 'overleaf.history': 1 }, - sort: { 'overleaf.backup.pendingChangeAt': 1 }, - } + options ) return cursor } diff --git a/services/history-v1/storage/scripts/backup_scheduler.mjs b/services/history-v1/storage/scripts/backup_scheduler.mjs index 44c6b7c4ec..21acedd78b 100644 --- a/services/history-v1/storage/scripts/backup_scheduler.mjs +++ b/services/history-v1/storage/scripts/backup_scheduler.mjs @@ -2,6 +2,7 @@ import Queue from 'bull' import config from 'config' import commandLineArgs from 'command-line-args' import logger from '@overleaf/logger' +import { listPendingBackups } from '../lib/backup_store/index.js' logger.initialize('backup-queue') @@ -28,6 +29,42 @@ const optionDefinitions = [ description: 'Project IDs or date range in YYYY-MM-DD:YYYY-MM-DD format', }, { name: 'monitor', type: Boolean }, + { + name: 'queue-pending', + type: Number, + description: + 'Find projects with pending changes older than N minutes and add them to the queue', + }, + { + name: 'show-pending', + type: Number, + description: + 'Show count of pending projects older than N minutes without adding to queue', + }, + { + name: 'limit', + type: Number, + description: 'Limit the number of jobs to be added', + }, + { + name: 'backoff-delay', + type: Number, + description: + 'Backoff delay in milliseconds for failed jobs (default: 1000)', + defaultValue: 1000, + }, + { + name: 'attempts', + type: Number, + description: 'Number of retry attempts for failed jobs (default: 3)', + defaultValue: 3, + }, + { + name: 'verbose', + alias: 'v', + type: Boolean, + description: 'Show detailed information when used with --show-pending', + }, ] // Parse command line arguments @@ -38,6 +75,46 @@ function isValidDateFormat(dateStr) { return /^\d{4}-\d{2}-\d{2}$/.test(dateStr) } +// Helper to validate the pending time parameter +function validatePendingTime(option, value) { + if (typeof value !== 'number' || value <= 0) { + console.error( + `Error: --${option} requires a positive numeric TIME argument in minutes` + ) + console.error(`Example: --${option} 60`) + process.exit(1) + } + return value +} + +// Helper to calculate minutes since a given date +function minutesSince(date) { + const now = new Date() + const diffMs = now - date + return Math.floor(diffMs / (1000 * 60)) +} + +// Helper to format the pending time display +function formatPendingTime(timestamp) { + const minutes = minutesSince(timestamp) + return `${timestamp.toISOString()} (${minutes} minutes ago)` +} + +// Helper to add a job to the queue, checking for duplicates +async function addJobWithCheck(queue, data, options) { + const jobId = options.jobId + + // Check if the job already exists + const existingJob = await queue.getJob(jobId) + + if (existingJob) { + return { job: existingJob, added: false } + } else { + const job = await queue.add(data, options) + return { job, added: true } + } +} + // Setup queue event listeners function setupMonitoring() { console.log('Starting queue monitoring. Press Ctrl+C to exit.') @@ -99,15 +176,90 @@ async function addDateRangeJob(input) { ) return } - const job = await backupQueue.add( + + const jobId = `backup-${startDate}-to-${endDate}` + const { job, added } = await addJobWithCheck( + backupQueue, { startDate, endDate }, - { jobId: `backup-${startDate}-to-${endDate}` } + { jobId } ) + console.log( - `Added date range backup job: ${startDate} to ${endDate}, job ID: ${job.id}` + `${added ? 'Added' : 'Already exists'}: date range backup job: ${startDate} to ${endDate}, job ID: ${job.id}` ) } +// Process pending projects with changes older than the specified minutes +async function processPendingProjects( + minutes, + showOnly, + limit, + verbose, + jobOpts = {} +) { + const timeIntervalMs = minutes * 60 * 1000 + console.log( + `Finding projects with pending changes older than ${minutes} minutes${showOnly ? ' (count only)' : ''}` + ) + + let count = 0 + let addedCount = 0 + let existingCount = 0 + // Pass the limit directly to MongoDB query for better performance + const pendingCursor = listPendingBackups(timeIntervalMs, limit) + + for await (const project of pendingCursor) { + const projectId = project._id.toHexString() + const pendingAt = project.overleaf?.backup?.pendingChangeAt + + if (showOnly && verbose) { + console.log( + `Project: ${projectId} (pending since: ${formatPendingTime(pendingAt)})` + ) + } else if (!showOnly) { + const { job, added } = await addJobWithCheck( + backupQueue, + { projectId }, + { ...jobOpts, jobId: projectId } + ) + + if (added) { + if (verbose) { + console.log( + `Added job for project: ${projectId}, job ID: ${job.id} (pending since: ${formatPendingTime(pendingAt)})` + ) + } + addedCount++ + } else { + if (verbose) { + console.log( + `Job already exists for project: ${projectId}, job ID: ${job.id} (pending since: ${formatPendingTime(pendingAt)})` + ) + } + existingCount++ + } + } + + count++ + if (count % 1000 === 0) { + console.log( + `Processed ${count} projects`, + showOnly ? '' : `(${addedCount} added, ${existingCount} existing)` + ) + } + } + + if (showOnly) { + console.log( + `Found ${count} projects with pending changes (not added to queue)` + ) + } else { + console.log(`Found ${count} projects with pending changes:`) + console.log(` ${addedCount} jobs added to queue`) + console.log(` ${existingCount} jobs already existed in queue`) + } +} + // Main execution block async function run() { const optionCount = [ @@ -115,6 +267,8 @@ async function run() { options.status, options.add, options.monitor, + options['queue-pending'] !== undefined, + options['show-pending'] !== undefined, ].filter(Boolean).length if (optionCount > 1) { console.error('Only one option can be specified') @@ -141,24 +295,64 @@ async function run() { await addDateRangeJob(input) } else { // Handle project ID format - const job = await backupQueue.add( + const { job, added } = await addJobWithCheck( + backupQueue, { projectId: input }, { jobId: input } ) - console.log(`Added job for project: ${input}, job ID: ${job.id}`) + console.log( + `${added ? 'Added' : 'Already exists'}: job for project: ${input}, job ID: ${job.id}` + ) } } } else if (options.monitor) { setupMonitoring() + } else if (options['queue-pending'] !== undefined) { + const minutes = validatePendingTime( + 'queue-pending', + options['queue-pending'] + ) + await processPendingProjects( + minutes, + false, + options.limit, + options.verbose, + { + attempts: options.attempts, + backoff: { + type: 'exponential', + delay: options['backoff-delay'], + }, + } + ) + } else if (options['show-pending'] !== undefined) { + const minutes = validatePendingTime('show-pending', options['show-pending']) + await processPendingProjects(minutes, true, options.limit, options.verbose) } else { console.log('Usage:') - console.log(' --clean Clean up completed and failed jobs') - console.log(' --status Show current job counts') - console.log(' --add [projectId] Add a job for the specified projectId') + console.log(' --clean Clean up completed and failed jobs') + console.log(' --status Show current job counts') + console.log(' --add [projectId] Add a job for the specified projectId') console.log( ' --add [YYYY-MM-DD:YYYY-MM-DD] Add a job for the specified date range' ) - console.log(' --monitor Monitor queue events') + console.log(' --monitor Monitor queue events') + console.log( + ' --queue-pending TIME Find projects with changes older than TIME minutes and add them to the queue' + ) + console.log( + ' --show-pending TIME Show count of pending projects older than TIME minutes' + ) + console.log(' --limit N Limit the number of jobs to be added') + console.log( + ' --backoff-delay TIME Backoff delay in milliseconds for failed jobs (default: 1000)' + ) + console.log( + ' --attempts N Number of retry attempts for failed jobs (default: 3)' + ) + console.log( + ' --verbose, -v Show detailed information when used with --show-pending' + ) } } diff --git a/services/history-v1/storage/scripts/backup_worker.mjs b/services/history-v1/storage/scripts/backup_worker.mjs index f09381e3d3..0161669a5a 100644 --- a/services/history-v1/storage/scripts/backup_worker.mjs +++ b/services/history-v1/storage/scripts/backup_worker.mjs @@ -86,14 +86,14 @@ async function runBackup(projectId) { ) try { logger.info({ projectId }, 'processing backup for project') - const { errors, completed } = await backupProject(projectId, {}) - metrics.inc('backup_worker_project', completed - errors, { + await backupProject(projectId, {}) + metrics.inc('backup_worker_project', 1, { status: 'success', }) - metrics.inc('backup_worker_project', errors, { status: 'failed' }) timer.done() - return `backup completed ${projectId} (${errors} failed in ${completed} projects)` + return `backup completed ${projectId}` } catch (err) { + metrics.inc('backup_worker_project', 1, { status: 'failed' }) logger.error({ projectId, err }, 'backup failed') throw err // Re-throw to mark job as failed }