overleaf/services/history-v1/storage/scripts/recover_doc_versions.js
Eric Mc Sween a2d5e030c6 Merge pull request #18041 from overleaf/em-jpa-recovery-script
[history-v1] add doc version recovery script

GitOrigin-RevId: 3f240f313465ce5fa9c53f72a992807f9396ebb4
2024-04-23 08:05:40 +00:00

243 lines
6.3 KiB
JavaScript

const fsPromises = require('fs/promises')
const { ObjectId } = require('mongodb')
const BPromise = require('bluebird')
const logger = require('@overleaf/logger')
const mongodb = require('../lib/mongodb')
const { chunkStore } = require('..')
const Events = require('events')
// Silence warning.
Events.setMaxListeners(20)
const BATCH_SIZE = 1000
const OPTIONS = {
concurrency: parseInt(process.env.DOC_VERSION_RECOVERY_CONCURRENCY, 10) || 20,
force: process.env.DOC_VERSION_RECOVERY_FORCE === 'true',
'skip-history-failures':
process.env.DOC_VERSION_RECOVERY_SKIP_HISTORY_FAILURES === 'true',
'resyncs-needed-file': process.env.DOC_VERSION_RECOVERY_RESYNCS_NEEDED_FILE,
}
const db = {
deletedProjects: mongodb.db.collection('deletedProjects'),
docs: mongodb.db.collection('docs'),
migrations: mongodb.db.collection('migrations'),
projects: mongodb.db.collection('projects'),
}
const BAD_MIGRATION_NAME =
'20231219081700_move_doc_versions_from_docops_to_docs'
let loggingChain = Promise.resolve()
const projectIdsThatNeedResyncing = []
async function flushLogQueue() {
const logPath = OPTIONS['resyncs-needed-file']
loggingChain = loggingChain.then(async () => {
const batch = projectIdsThatNeedResyncing.splice(0)
if (batch.length === 0) return
try {
await fsPromises.appendFile(logPath, batch.join('\n') + '\n')
} catch (err) {
projectIdsThatNeedResyncing.push(...batch)
logger.err({ err, logPath, batch }, 'Failed to write to log file')
}
})
await loggingChain
}
async function recordProjectNeedsResync(projectId) {
if (OPTIONS['resyncs-needed-file']) {
projectIdsThatNeedResyncing.push(projectId)
await flushLogQueue()
} else {
console.log(`Project ${projectId} needs a hard resync.`)
}
}
async function main() {
const badMigration = await db.migrations.findOne({ name: BAD_MIGRATION_NAME })
if (OPTIONS.force || badMigration != null) {
console.warn('Need to recover doc versions. This will take a while.')
await runRecovery()
}
await db.migrations.deleteOne({ name: BAD_MIGRATION_NAME })
console.log('Done.')
}
async function runRecovery() {
let batch = []
const summary = {
updated: 0,
ignored: 0,
skipped: 0,
deletedUpdated: 0,
deletedIgnored: 0,
}
const processBatchAndLogProgress = async () => {
try {
await BPromise.map(batch, project => processProject(project, summary), {
concurrency: OPTIONS.concurrency,
})
} finally {
console.log(`${summary.updated} projects updated`)
console.log(`${summary.ignored} projects had good versions`)
console.log(`${summary.deletedUpdated} deleted projects updated`)
console.log(
`${summary.deletedIgnored} deleted projects had good versions`
)
console.log(`${summary.skipped} projects skipped`)
}
batch = []
}
await printDBStats()
await touchResyncsNeededFile()
for await (const project of getProjects()) {
batch.push(project)
if (batch.length >= BATCH_SIZE) {
await processBatchAndLogProgress()
}
}
for await (const deletedProject of getDeletedProjects()) {
const project = deletedProject.project
project.isDeleted = true
batch.push(project)
if (batch.length >= BATCH_SIZE) {
await processBatchAndLogProgress()
}
}
if (batch.length > 0) {
await processBatchAndLogProgress()
}
await backfillMissingVersions()
}
async function printDBStats() {
const projects = await db.projects.estimatedDocumentCount()
const docs = await db.docs.estimatedDocumentCount()
console.log(
`Need to check ${projects} projects with a total of ${docs} docs.`
)
}
async function touchResyncsNeededFile() {
if (OPTIONS['resyncs-needed-file']) {
await fsPromises.appendFile(OPTIONS['resyncs-needed-file'], '')
}
}
function getProjects() {
return db.projects.find({}, { projection: { _id: 1, overleaf: 1 } })
}
function getDeletedProjects() {
return db.deletedProjects.find(
{ project: { $ne: null } },
{ projection: { 'project._id': 1, 'project.overleaf': 1 } }
)
}
async function processProject(project, summary) {
const projectId = project._id.toString()
let updated = false
try {
const historyDocVersions = await getHistoryDocVersions(project)
for (const { docId, version } of historyDocVersions) {
const update = await fixMongoDocVersion(docId, version)
if (update != null) {
updated = true
}
}
if (project.isDeleted) {
if (updated) {
summary.deletedUpdated += 1
} else {
summary.deletedIgnored += 1
}
} else {
await recordProjectNeedsResync(projectId)
if (updated) {
summary.updated += 1
} else {
summary.ignored += 1
}
}
} catch (err) {
logger.error({ err, projectId }, 'Failed to process project')
if (OPTIONS['skip-history-failures']) {
summary.skipped += 1
} else {
throw err
}
}
}
async function getHistoryDocVersions(project) {
const historyId = project.overleaf.history.id
const chunk = await chunkStore.loadLatest(historyId)
if (chunk == null) {
return []
}
const snapshot = chunk.getSnapshot()
const changes = chunk.getChanges()
snapshot.applyAll(changes)
const v2DocVersions = snapshot.getV2DocVersions()
if (v2DocVersions == null) {
return []
}
return Object.entries(v2DocVersions.data).map(([docId, versionInfo]) => ({
docId,
version: versionInfo.v,
}))
}
async function fixMongoDocVersion(docId, historyVersion) {
const docBeforeUpdate = await db.docs.findOneAndUpdate(
{
_id: new ObjectId(docId),
$or: [
{ version: { $lte: historyVersion } },
{ version: { $exists: false } },
],
},
{ $set: { version: historyVersion + 1 } }
)
if (docBeforeUpdate != null) {
return {
previousVersion: docBeforeUpdate.version,
newVersion: historyVersion + 1,
}
} else {
return null
}
}
/**
* Set all remaining versions to 0
*/
async function backfillMissingVersions() {
console.log('Defaulting version to 0 for remaining docs.')
await db.docs.updateMany(
{ version: { $exists: false } },
{ $set: { version: 0 } }
)
}
main()
.finally(async () => {
console.log('Flushing log queue.')
await flushLogQueue()
})
.then(() => {
process.exit(0)
})
.catch(err => {
console.error(err)
process.exit(1)
})