mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-19 12:15:19 +00:00
Merge pull request #24200 from overleaf/bg-backup-queue-pending-jobs
fix backup worker and backup scheduler to handle pending projects GitOrigin-RevId: a97e011615666b3ae2b8fafe26a96d41b3609edd
This commit is contained in:
parent
f045361b49
commit
c233243948
3 changed files with 219 additions and 18 deletions
|
@ -3,8 +3,18 @@ const { projects, backedUpBlobs } = require('../mongodb')
|
|||
const OError = require('@overleaf/o-error')
|
||||
|
||||
// List projects with pending backups older than the specified interval
|
||||
function listPendingBackups(timeIntervalMs = 0) {
|
||||
function listPendingBackups(timeIntervalMs = 0, limit = null) {
|
||||
const cutoffTime = new Date(Date.now() - timeIntervalMs)
|
||||
const options = {
|
||||
projection: { 'overleaf.backup.pendingChangeAt': 1 },
|
||||
sort: { 'overleaf.backup.pendingChangeAt': 1 },
|
||||
}
|
||||
|
||||
// Apply limit if provided
|
||||
if (limit) {
|
||||
options.limit = limit
|
||||
}
|
||||
|
||||
const cursor = projects.find(
|
||||
{
|
||||
'overleaf.backup.pendingChangeAt': {
|
||||
|
@ -12,10 +22,7 @@ function listPendingBackups(timeIntervalMs = 0) {
|
|||
$lt: cutoffTime,
|
||||
},
|
||||
},
|
||||
{
|
||||
projection: { 'overleaf.backup': 1, 'overleaf.history': 1 },
|
||||
sort: { 'overleaf.backup.pendingChangeAt': 1 },
|
||||
}
|
||||
options
|
||||
)
|
||||
return cursor
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ import Queue from 'bull'
|
|||
import config from 'config'
|
||||
import commandLineArgs from 'command-line-args'
|
||||
import logger from '@overleaf/logger'
|
||||
import { listPendingBackups } from '../lib/backup_store/index.js'
|
||||
|
||||
logger.initialize('backup-queue')
|
||||
|
||||
|
@ -28,6 +29,42 @@ const optionDefinitions = [
|
|||
description: 'Project IDs or date range in YYYY-MM-DD:YYYY-MM-DD format',
|
||||
},
|
||||
{ name: 'monitor', type: Boolean },
|
||||
{
|
||||
name: 'queue-pending',
|
||||
type: Number,
|
||||
description:
|
||||
'Find projects with pending changes older than N minutes and add them to the queue',
|
||||
},
|
||||
{
|
||||
name: 'show-pending',
|
||||
type: Number,
|
||||
description:
|
||||
'Show count of pending projects older than N minutes without adding to queue',
|
||||
},
|
||||
{
|
||||
name: 'limit',
|
||||
type: Number,
|
||||
description: 'Limit the number of jobs to be added',
|
||||
},
|
||||
{
|
||||
name: 'backoff-delay',
|
||||
type: Number,
|
||||
description:
|
||||
'Backoff delay in milliseconds for failed jobs (default: 1000)',
|
||||
defaultValue: 1000,
|
||||
},
|
||||
{
|
||||
name: 'attempts',
|
||||
type: Number,
|
||||
description: 'Number of retry attempts for failed jobs (default: 3)',
|
||||
defaultValue: 3,
|
||||
},
|
||||
{
|
||||
name: 'verbose',
|
||||
alias: 'v',
|
||||
type: Boolean,
|
||||
description: 'Show detailed information when used with --show-pending',
|
||||
},
|
||||
]
|
||||
|
||||
// Parse command line arguments
|
||||
|
@ -38,6 +75,46 @@ function isValidDateFormat(dateStr) {
|
|||
return /^\d{4}-\d{2}-\d{2}$/.test(dateStr)
|
||||
}
|
||||
|
||||
// Helper to validate the pending time parameter
|
||||
function validatePendingTime(option, value) {
|
||||
if (typeof value !== 'number' || value <= 0) {
|
||||
console.error(
|
||||
`Error: --${option} requires a positive numeric TIME argument in minutes`
|
||||
)
|
||||
console.error(`Example: --${option} 60`)
|
||||
process.exit(1)
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
// Helper to calculate minutes since a given date
|
||||
function minutesSince(date) {
|
||||
const now = new Date()
|
||||
const diffMs = now - date
|
||||
return Math.floor(diffMs / (1000 * 60))
|
||||
}
|
||||
|
||||
// Helper to format the pending time display
|
||||
function formatPendingTime(timestamp) {
|
||||
const minutes = minutesSince(timestamp)
|
||||
return `${timestamp.toISOString()} (${minutes} minutes ago)`
|
||||
}
|
||||
|
||||
// Helper to add a job to the queue, checking for duplicates
|
||||
async function addJobWithCheck(queue, data, options) {
|
||||
const jobId = options.jobId
|
||||
|
||||
// Check if the job already exists
|
||||
const existingJob = await queue.getJob(jobId)
|
||||
|
||||
if (existingJob) {
|
||||
return { job: existingJob, added: false }
|
||||
} else {
|
||||
const job = await queue.add(data, options)
|
||||
return { job, added: true }
|
||||
}
|
||||
}
|
||||
|
||||
// Setup queue event listeners
|
||||
function setupMonitoring() {
|
||||
console.log('Starting queue monitoring. Press Ctrl+C to exit.')
|
||||
|
@ -99,15 +176,90 @@ async function addDateRangeJob(input) {
|
|||
)
|
||||
return
|
||||
}
|
||||
const job = await backupQueue.add(
|
||||
|
||||
const jobId = `backup-${startDate}-to-${endDate}`
|
||||
const { job, added } = await addJobWithCheck(
|
||||
backupQueue,
|
||||
{ startDate, endDate },
|
||||
{ jobId: `backup-${startDate}-to-${endDate}` }
|
||||
{ jobId }
|
||||
)
|
||||
|
||||
console.log(
|
||||
`Added date range backup job: ${startDate} to ${endDate}, job ID: ${job.id}`
|
||||
`${added ? 'Added' : 'Already exists'}: date range backup job: ${startDate} to ${endDate}, job ID: ${job.id}`
|
||||
)
|
||||
}
|
||||
|
||||
// Process pending projects with changes older than the specified minutes
|
||||
async function processPendingProjects(
|
||||
minutes,
|
||||
showOnly,
|
||||
limit,
|
||||
verbose,
|
||||
jobOpts = {}
|
||||
) {
|
||||
const timeIntervalMs = minutes * 60 * 1000
|
||||
console.log(
|
||||
`Finding projects with pending changes older than ${minutes} minutes${showOnly ? ' (count only)' : ''}`
|
||||
)
|
||||
|
||||
let count = 0
|
||||
let addedCount = 0
|
||||
let existingCount = 0
|
||||
// Pass the limit directly to MongoDB query for better performance
|
||||
const pendingCursor = listPendingBackups(timeIntervalMs, limit)
|
||||
|
||||
for await (const project of pendingCursor) {
|
||||
const projectId = project._id.toHexString()
|
||||
const pendingAt = project.overleaf?.backup?.pendingChangeAt
|
||||
|
||||
if (showOnly && verbose) {
|
||||
console.log(
|
||||
`Project: ${projectId} (pending since: ${formatPendingTime(pendingAt)})`
|
||||
)
|
||||
} else if (!showOnly) {
|
||||
const { job, added } = await addJobWithCheck(
|
||||
backupQueue,
|
||||
{ projectId },
|
||||
{ ...jobOpts, jobId: projectId }
|
||||
)
|
||||
|
||||
if (added) {
|
||||
if (verbose) {
|
||||
console.log(
|
||||
`Added job for project: ${projectId}, job ID: ${job.id} (pending since: ${formatPendingTime(pendingAt)})`
|
||||
)
|
||||
}
|
||||
addedCount++
|
||||
} else {
|
||||
if (verbose) {
|
||||
console.log(
|
||||
`Job already exists for project: ${projectId}, job ID: ${job.id} (pending since: ${formatPendingTime(pendingAt)})`
|
||||
)
|
||||
}
|
||||
existingCount++
|
||||
}
|
||||
}
|
||||
|
||||
count++
|
||||
if (count % 1000 === 0) {
|
||||
console.log(
|
||||
`Processed ${count} projects`,
|
||||
showOnly ? '' : `(${addedCount} added, ${existingCount} existing)`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if (showOnly) {
|
||||
console.log(
|
||||
`Found ${count} projects with pending changes (not added to queue)`
|
||||
)
|
||||
} else {
|
||||
console.log(`Found ${count} projects with pending changes:`)
|
||||
console.log(` ${addedCount} jobs added to queue`)
|
||||
console.log(` ${existingCount} jobs already existed in queue`)
|
||||
}
|
||||
}
|
||||
|
||||
// Main execution block
|
||||
async function run() {
|
||||
const optionCount = [
|
||||
|
@ -115,6 +267,8 @@ async function run() {
|
|||
options.status,
|
||||
options.add,
|
||||
options.monitor,
|
||||
options['queue-pending'] !== undefined,
|
||||
options['show-pending'] !== undefined,
|
||||
].filter(Boolean).length
|
||||
if (optionCount > 1) {
|
||||
console.error('Only one option can be specified')
|
||||
|
@ -141,24 +295,64 @@ async function run() {
|
|||
await addDateRangeJob(input)
|
||||
} else {
|
||||
// Handle project ID format
|
||||
const job = await backupQueue.add(
|
||||
const { job, added } = await addJobWithCheck(
|
||||
backupQueue,
|
||||
{ projectId: input },
|
||||
{ jobId: input }
|
||||
)
|
||||
console.log(`Added job for project: ${input}, job ID: ${job.id}`)
|
||||
console.log(
|
||||
`${added ? 'Added' : 'Already exists'}: job for project: ${input}, job ID: ${job.id}`
|
||||
)
|
||||
}
|
||||
}
|
||||
} else if (options.monitor) {
|
||||
setupMonitoring()
|
||||
} else if (options['queue-pending'] !== undefined) {
|
||||
const minutes = validatePendingTime(
|
||||
'queue-pending',
|
||||
options['queue-pending']
|
||||
)
|
||||
await processPendingProjects(
|
||||
minutes,
|
||||
false,
|
||||
options.limit,
|
||||
options.verbose,
|
||||
{
|
||||
attempts: options.attempts,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: options['backoff-delay'],
|
||||
},
|
||||
}
|
||||
)
|
||||
} else if (options['show-pending'] !== undefined) {
|
||||
const minutes = validatePendingTime('show-pending', options['show-pending'])
|
||||
await processPendingProjects(minutes, true, options.limit, options.verbose)
|
||||
} else {
|
||||
console.log('Usage:')
|
||||
console.log(' --clean Clean up completed and failed jobs')
|
||||
console.log(' --status Show current job counts')
|
||||
console.log(' --add [projectId] Add a job for the specified projectId')
|
||||
console.log(' --clean Clean up completed and failed jobs')
|
||||
console.log(' --status Show current job counts')
|
||||
console.log(' --add [projectId] Add a job for the specified projectId')
|
||||
console.log(
|
||||
' --add [YYYY-MM-DD:YYYY-MM-DD] Add a job for the specified date range'
|
||||
)
|
||||
console.log(' --monitor Monitor queue events')
|
||||
console.log(' --monitor Monitor queue events')
|
||||
console.log(
|
||||
' --queue-pending TIME Find projects with changes older than TIME minutes and add them to the queue'
|
||||
)
|
||||
console.log(
|
||||
' --show-pending TIME Show count of pending projects older than TIME minutes'
|
||||
)
|
||||
console.log(' --limit N Limit the number of jobs to be added')
|
||||
console.log(
|
||||
' --backoff-delay TIME Backoff delay in milliseconds for failed jobs (default: 1000)'
|
||||
)
|
||||
console.log(
|
||||
' --attempts N Number of retry attempts for failed jobs (default: 3)'
|
||||
)
|
||||
console.log(
|
||||
' --verbose, -v Show detailed information when used with --show-pending'
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -86,14 +86,14 @@ async function runBackup(projectId) {
|
|||
)
|
||||
try {
|
||||
logger.info({ projectId }, 'processing backup for project')
|
||||
const { errors, completed } = await backupProject(projectId, {})
|
||||
metrics.inc('backup_worker_project', completed - errors, {
|
||||
await backupProject(projectId, {})
|
||||
metrics.inc('backup_worker_project', 1, {
|
||||
status: 'success',
|
||||
})
|
||||
metrics.inc('backup_worker_project', errors, { status: 'failed' })
|
||||
timer.done()
|
||||
return `backup completed ${projectId} (${errors} failed in ${completed} projects)`
|
||||
return `backup completed ${projectId}`
|
||||
} catch (err) {
|
||||
metrics.inc('backup_worker_project', 1, { status: 'failed' })
|
||||
logger.error({ projectId, err }, 'backup failed')
|
||||
throw err // Re-throw to mark job as failed
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue