overleaf/services/history-v1/storage/tasks/backfill_start_version.js

110 lines
2.9 KiB
JavaScript
Raw Normal View History

const commandLineArgs = require('command-line-args')
const BPromise = require('bluebird')
const timersPromises = require('node:timers/promises')
const { knex, historyStore } = require('..')
const MAX_POSTGRES_INTEGER = 2147483647
const DEFAULT_BATCH_SIZE = 1000
const DEFAULT_CONCURRENCY = 1
const MAX_RETRIES = 10
const RETRY_DELAY_MS = 5000
async function main() {
const options = parseOptions()
let batchStart = options.minId
while (batchStart <= options.maxId) {
const chunks = await getChunks(batchStart, options.maxId, options.batchSize)
if (chunks.length === 0) {
// No results. We're done.
break
}
const batchEnd = chunks[chunks.length - 1].id
await processBatch(chunks, options)
console.log(`Processed chunks ${batchStart} to ${batchEnd}`)
batchStart = batchEnd + 1
}
}
function parseOptions() {
const args = commandLineArgs([
{ name: 'min-id', type: Number, defaultValue: 1 },
{
name: 'max-id',
type: Number,
defaultValue: MAX_POSTGRES_INTEGER,
},
{ name: 'batch-size', type: Number, defaultValue: DEFAULT_BATCH_SIZE },
{ name: 'concurrency', type: Number, defaultValue: DEFAULT_CONCURRENCY },
])
return {
minId: args['min-id'],
maxId: args['max-id'],
batchSize: args['batch-size'],
concurrency: args.concurrency,
}
}
async function getChunks(minId, maxId, batchSize) {
const chunks = await knex('chunks')
.where('id', '>=', minId)
.andWhere('id', '<=', maxId)
.orderBy('id')
.limit(batchSize)
return chunks
}
async function processBatch(chunks, options) {
let retries = 0
while (true) {
const results = await BPromise.map(chunks, processChunk, {
concurrency: options.concurrency,
})
const failedChunks = results
.filter(result => !result.success)
.map(result => result.chunk)
if (failedChunks.length === 0) {
// All chunks processed. Carry on.
break
}
// Some projects failed. Retry.
retries += 1
if (retries > MAX_RETRIES) {
console.log('Too many retries processing chunks. Giving up.')
process.exit(1)
}
console.log(
`Retrying chunks: ${failedChunks.map(chunk => chunk.id).join(', ')}`
)
await timersPromises.setTimeout(RETRY_DELAY_MS)
chunks = failedChunks
}
}
async function processChunk(chunk) {
try {
const rawHistory = await historyStore.loadRaw(
chunk.doc_id.toString(),
chunk.id
)
const startVersion = chunk.end_version - rawHistory.changes.length
await knex('chunks')
.where('id', chunk.id)
.update({ start_version: startVersion })
return { chunk, success: true }
} catch (err) {
console.error(`Failed to process chunk ${chunk.id}:`, err.stack)
return { chunk, success: false }
}
}
main()
.then(() => {
process.exit()
})
.catch(err => {
console.error(err)
process.exit(1)
})