Merge pull request #11658 from overleaf/bg-add-concurrency-to-migration

add concurrency to the history migration script

GitOrigin-RevId: 059a1a8b402627b03cb6dab79b5da22189f32704
This commit is contained in:
Miguel Serrano 2023-02-07 10:14:50 +01:00 committed by Copybot
parent f8bc3b3357
commit 832a0facba

View file

@ -11,6 +11,7 @@ const { waitForDb } = require('../../app/src/infrastructure/mongodb')
const minimist = require('minimist')
const fs = require('fs')
const util = require('util')
const pLimit = require('p-limit')
const logger = require('@overleaf/logger')
logger.initialize('history-migration')
// disable logging to stdout from internal modules
@ -31,14 +32,16 @@ const argv = minimist(process.argv.slice(2), {
string: ['output'],
alias: {
verbose: 'v',
output: 'o',
'dry-run': 'd',
concurrency: 'j',
'use-query-hint': 'q',
'retry-failed': 'r',
'archive-on-failure': 'a',
},
default: {
output: DEFAULT_OUTPUT_FILE,
'write-concurrency': 10,
concurrency: 1,
'batch-size': 100,
'max-upgrades-to-attempt': false,
'max-failures': 50,
@ -122,6 +125,9 @@ async function migrateProjects(projectsToMigrate) {
let projectsFailed = 0
console.log('Starting migration...')
if (argv.concurrency > 1) {
console.log(`Using ${argv.concurrency} concurrent migrations`)
}
// send log output for each migration to a file
const output = fs.createWriteStream(argv.output, { flags: 'a' })
console.log(`Writing log output to ${argv.output}`)
@ -129,25 +135,43 @@ async function migrateProjects(projectsToMigrate) {
function logJson(obj) {
logger.log(JSON.stringify(obj))
}
// limit the number of concurrent migrations
const limit = pLimit(argv.concurrency)
const jobs = []
// throttle progress reporting to 2x per second
const progressBar = createProgressBar()
let i = 0
const N = projectsToMigrate.length
const progressBarTimer = setInterval(() => {
if (INTERRUPT) {
return // don't update the progress bar if we're shutting down
}
progressBar(
i,
N,
`Migrated: ${projectsMigrated}, Failed: ${projectsFailed}`
)
}, 500)
for (const project of projectsToMigrate) {
async function _migrateProject(project) {
if (INTERRUPT) {
return // don't start any new jobs if we're shutting down
}
const startTime = new Date()
try {
if (INTERRUPT) {
break
}
const result = await upgradeProject(project._id)
i++
if (INTERRUPT && limit.activeCount > 1) {
// an interrupt was requested while this job was running
// report that we're waiting for the remaining jobs to finish
console.log(
`Waiting for remaining ${
limit.activeCount - 1
} active jobs to finish\r`
)
}
if (result.error) {
// failed to migrate this project
logJson({
project_id: project._id,
result,
@ -157,6 +181,7 @@ async function migrateProjects(projectsToMigrate) {
})
projectsFailed++
} else {
// successfully migrated this project
logJson({
project_id: project._id,
result,
@ -166,6 +191,7 @@ async function migrateProjects(projectsToMigrate) {
projectsMigrated++
}
} catch (err) {
// unexpected error from the migration
projectsFailed++
logJson({
project_id: project._id,
@ -174,8 +200,13 @@ async function migrateProjects(projectsToMigrate) {
endTime: new Date(),
})
}
i++
}
for (const project of projectsToMigrate) {
jobs.push(limit(_migrateProject, project))
}
// wait for all the queued jobs to complete
await Promise.all(jobs)
clearInterval(progressBarTimer)
progressBar(i, N, `Migrated: ${projectsMigrated}, Failed: ${projectsFailed}`)
process.stdout.write('\n')
@ -210,7 +241,9 @@ async function main() {
// then history could get into a broken state
// Instead, skip any unprocessed projects and exit() at end of the batch.
process.on('SIGINT', function () {
console.log('\nCaught SIGINT, waiting for in process upgrades to complete')
console.log(
'\nCaught SIGINT, waiting for all in-progess upgrades to complete'
)
INTERRUPT = true
})