Merge pull request #19273 from overleaf/em-history-migration-concurrency

Add concurrency option to history ranges support migration script

GitOrigin-RevId: 8707abc9b76116090332b6abb11030adb17ceb4e
This commit is contained in:
Eric Mc Sween 2024-07-05 08:16:31 -04:00 committed by Copybot
parent 934151106e
commit fbdf245517
2 changed files with 82 additions and 37 deletions

View file

@ -26,6 +26,7 @@ const {
* @param {boolean} [opts.force] * @param {boolean} [opts.force]
* @param {boolean} [opts.stopOnError] * @param {boolean} [opts.stopOnError]
* @param {boolean} [opts.quickOnly] * @param {boolean} [opts.quickOnly]
* @param {number} [opts.concurrency]
*/ */
async function migrateProjects(opts = {}) { async function migrateProjects(opts = {}) {
const { const {
@ -38,6 +39,7 @@ async function migrateProjects(opts = {}) {
force = false, force = false,
stopOnError = false, stopOnError = false,
quickOnly = false, quickOnly = false,
concurrency = 1,
} = opts } = opts
const clauses = [] const clauses = []
@ -70,11 +72,31 @@ async function migrateProjects(opts = {}) {
}) })
.sort({ _id: -1 }) .sort({ _id: -1 })
let terminating = false
const handleSignal = signal => {
logger.info({ signal }, 'History ranges support migration received signal')
terminating = true
}
process.on('SIGINT', handleSignal)
process.on('SIGTERM', handleSignal)
let projectsProcessed = 0 let projectsProcessed = 0
const jobsByProjectId = new Map()
let errors = 0
for await (const project of projects) { for await (const project of projects) {
if (projectsProcessed >= maxCount) { if (projectsProcessed >= maxCount) {
break break
} }
if (errors > 0 && stopOnError) {
break
}
if (terminating) {
break
}
const projectId = project._id.toString() const projectId = project._id.toString()
if (!force) { if (!force) {
@ -89,45 +111,63 @@ async function migrateProjects(opts = {}) {
} }
} }
const startTimeMs = Date.now() if (jobsByProjectId.size >= concurrency) {
let quickMigrationSuccess // Wait until the next job finishes
try { await Promise.race(jobsByProjectId.values())
quickMigrationSuccess = await quickMigration(projectId, direction)
if (!quickMigrationSuccess) {
if (quickOnly) {
logger.info(
{ projectId, direction },
'Quick migration failed, skipping project'
)
} else {
await migrateProject(projectId, direction)
}
}
} catch (err) {
logger.error(
{ err, projectId, direction, projectsProcessed },
'Failed to migrate history ranges support'
)
projectsProcessed += 1
if (stopOnError) {
break
} else {
continue
}
} }
const elapsedMs = Date.now() - startTimeMs
projectsProcessed += 1 const job = processProject(projectId, direction, quickOnly)
logger.info( .then(info => {
{ jobsByProjectId.delete(projectId)
projectId, projectsProcessed += 1
direction, logger.info(
projectsProcessed, {
elapsedMs, projectId,
quick: quickMigrationSuccess, direction,
}, projectsProcessed,
'Migrated history ranges support' errors,
) ...info,
},
'History ranges support migration'
)
})
.catch(err => {
jobsByProjectId.delete(projectId)
errors += 1
logger.error(
{ err, projectId, direction, projectsProcessed, errors },
'Failed to migrate history ranges support'
)
})
jobsByProjectId.set(projectId, job)
} }
// Let the last jobs finish
await Promise.all(jobsByProjectId.values())
}
/**
* Migrate a single project
*
* @param {string} projectId
* @param {"forwards" | "backwards"} direction
* @param {boolean} quickOnly
*/
async function processProject(projectId, direction, quickOnly) {
const startTimeMs = Date.now()
const quickMigrationSuccess = await quickMigration(projectId, direction)
let migrationType
if (quickMigrationSuccess) {
migrationType = 'quick'
} else if (quickOnly) {
migrationType = 'skipped'
} else {
await migrateProject(projectId, direction)
migrationType = 'resync'
}
const elapsedMs = Date.now() - startTimeMs
return { migrationType, elapsedMs }
} }
/** /**

View file

@ -14,6 +14,7 @@ async function main() {
force, force,
stopOnError, stopOnError,
quickOnly, quickOnly,
concurrency,
} = parseArgs() } = parseArgs()
await HistoryRangesSupportMigration.promises.migrateProjects({ await HistoryRangesSupportMigration.promises.migrateProjects({
projectIds, projectIds,
@ -25,6 +26,7 @@ async function main() {
force, force,
stopOnError, stopOnError,
quickOnly, quickOnly,
concurrency,
}) })
} }
@ -44,6 +46,7 @@ Options:
--force Migrate projects even if they were already migrated --force Migrate projects even if they were already migrated
--stop-on-error Stop after first migration error --stop-on-error Stop after first migration error
--quick-only Do not try a resync migration if quick migration fails --quick-only Do not try a resync migration if quick migration fails
--concurrency How many jobs to run in parallel
`) `)
} }
@ -67,6 +70,7 @@ function parseArgs() {
const force = args.force const force = args.force
const stopOnError = args['stop-on-error'] const stopOnError = args['stop-on-error']
const quickOnly = args['quick-only'] const quickOnly = args['quick-only']
const concurrency = args.concurrency ?? 1
const all = args.all const all = args.all
if ( if (
@ -94,6 +98,7 @@ function parseArgs() {
force, force,
stopOnError, stopOnError,
quickOnly, quickOnly,
concurrency,
} }
} }