overleaf/services/web/scripts/helpers/batchedUpdate.mjs
Jakob Ackermann bf2ad1952c Merge pull request #21506 from overleaf/jpa-fix-batched-update-edge
[web] fix edge behavior of batchedUpdate

GitOrigin-RevId: 5d1c0b884aa02c52682d114fa66b2d9b201e724e
2024-11-04 09:04:55 +00:00

281 lines
6.6 KiB
JavaScript

// @ts-check
import mongodb from 'mongodb-legacy'
import {
db,
READ_PREFERENCE_SECONDARY,
} from '../../app/src/infrastructure/mongodb.js'
const { ObjectId } = mongodb
const ONE_MONTH_IN_MS = 1000 * 60 * 60 * 24 * 31
let ID_EDGE_PAST
const ID_EDGE_FUTURE = objectIdFromMs(Date.now() + 1000)
let BATCH_DESCENDING
let BATCH_SIZE
let VERBOSE_LOGGING
let BATCH_RANGE_START
let BATCH_RANGE_END
let BATCH_MAX_TIME_SPAN_IN_MS
/**
* @typedef {import("mongodb").Collection} Collection
* @typedef {import("mongodb").Document} Document
* @typedef {import("mongodb").FindOptions} FindOptions
* @typedef {import("mongodb").UpdateFilter<Document>} UpdateDocument
* @typedef {import("mongodb").ObjectId} ObjectId
*/
/**
* @typedef {Object} BatchedUpdateOptions
* @property {string} [BATCH_DESCENDING]
* @property {string} [BATCH_LAST_ID]
* @property {string} [BATCH_MAX_TIME_SPAN_IN_MS]
* @property {string} [BATCH_RANGE_END]
* @property {string} [BATCH_RANGE_START]
* @property {string} [BATCH_SIZE]
* @property {string} [VERBOSE_LOGGING]
*/
/**
* @param {BatchedUpdateOptions} options
*/
function refreshGlobalOptionsForBatchedUpdate(options = {}) {
options = Object.assign({}, options, process.env)
BATCH_DESCENDING = options.BATCH_DESCENDING === 'true'
BATCH_SIZE = parseInt(options.BATCH_SIZE || '1000', 10) || 1000
VERBOSE_LOGGING = options.VERBOSE_LOGGING === 'true'
if (options.BATCH_LAST_ID) {
BATCH_RANGE_START = new ObjectId(options.BATCH_LAST_ID)
} else if (options.BATCH_RANGE_START) {
BATCH_RANGE_START = new ObjectId(options.BATCH_RANGE_START)
} else {
if (BATCH_DESCENDING) {
BATCH_RANGE_START = ID_EDGE_FUTURE
} else {
BATCH_RANGE_START = ID_EDGE_PAST
}
}
BATCH_MAX_TIME_SPAN_IN_MS = parseInt(
options.BATCH_MAX_TIME_SPAN_IN_MS || ONE_MONTH_IN_MS.toString(),
10
)
if (options.BATCH_RANGE_END) {
BATCH_RANGE_END = new ObjectId(options.BATCH_RANGE_END)
} else {
if (BATCH_DESCENDING) {
BATCH_RANGE_END = ID_EDGE_PAST
} else {
BATCH_RANGE_END = ID_EDGE_FUTURE
}
}
}
/**
* @param {Collection} collection
* @param {Document} query
* @param {ObjectId} start
* @param {ObjectId} end
* @param {Document} projection
* @param {FindOptions} findOptions
* @return {Promise<Array<Document>>}
*/
async function getNextBatch(
collection,
query,
start,
end,
projection,
findOptions
) {
if (BATCH_DESCENDING) {
query._id = {
$gt: end,
$lte: start,
}
} else {
query._id = {
$gt: start,
$lte: end,
}
}
return await collection
.find(query, findOptions)
.project(projection)
.sort({ _id: BATCH_DESCENDING ? -1 : 1 })
.limit(BATCH_SIZE)
.toArray()
}
/**
* @param {Collection} collection
* @param {Array<Document>} nextBatch
* @param {UpdateDocument} update
* @return {Promise<void>}
*/
async function performUpdate(collection, nextBatch, update) {
await collection.updateMany(
{ _id: { $in: nextBatch.map(entry => entry._id) } },
update
)
}
/**
* @param {number} ms
* @return {ObjectId}
*/
function objectIdFromMs(ms) {
return ObjectId.createFromTime(ms / 1000)
}
/**
* @param {ObjectId} id
* @return {number}
*/
function getMsFromObjectId(id) {
return id.getTimestamp().getTime()
}
/**
* @param {ObjectId} start
* @return {ObjectId}
*/
function getNextEnd(start) {
let end
if (BATCH_DESCENDING) {
end = objectIdFromMs(getMsFromObjectId(start) - BATCH_MAX_TIME_SPAN_IN_MS)
if (getMsFromObjectId(end) <= getMsFromObjectId(BATCH_RANGE_END)) {
end = BATCH_RANGE_END
}
} else {
end = objectIdFromMs(getMsFromObjectId(start) + BATCH_MAX_TIME_SPAN_IN_MS)
if (getMsFromObjectId(end) >= getMsFromObjectId(BATCH_RANGE_END)) {
end = BATCH_RANGE_END
}
}
return end
}
/**
* @param {Collection} collection
* @return {Promise<ObjectId|null>}
*/
async function getIdEdgePast(collection) {
const [first] = await collection
.find({})
.project({ _id: 1 })
.sort({ _id: 1 })
.limit(1)
.toArray()
if (!first) return null
// Go one second further into the past in order to include the first entry via
// first._id > ID_EDGE_PAST
return objectIdFromMs(Math.max(0, getMsFromObjectId(first._id) - 1000))
}
/**
* @param {string} collectionName
* @param {Document} query
* @param {UpdateDocument | ((batch: Array<Document>) => Promise<void>)} update
* @param {Document} [projection]
* @param {FindOptions} [findOptions]
* @param {BatchedUpdateOptions} [batchedUpdateOptions]
*/
async function batchedUpdate(
collectionName,
query,
update,
projection,
findOptions,
batchedUpdateOptions
) {
const collection = db[collectionName]
ID_EDGE_PAST = await getIdEdgePast(collection)
if (!ID_EDGE_PAST) {
console.warn(`The collection ${collectionName} appears to be empty.`)
return 0
}
refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions)
findOptions = findOptions || {}
findOptions.readPreference = READ_PREFERENCE_SECONDARY
projection = projection || { _id: 1 }
let nextBatch
let updated = 0
let start = BATCH_RANGE_START
while (start !== BATCH_RANGE_END) {
let end = getNextEnd(start)
nextBatch = await getNextBatch(
collection,
query,
start,
end,
projection,
findOptions
)
if (nextBatch.length > 0) {
end = nextBatch[nextBatch.length - 1]._id
updated += nextBatch.length
if (VERBOSE_LOGGING) {
console.log(
`Running update on batch with ids ${JSON.stringify(
nextBatch.map(entry => entry._id)
)}`
)
} else {
console.error(`Running update on batch ending ${end}`)
}
if (typeof update === 'function') {
await update(nextBatch)
} else {
await performUpdate(collection, nextBatch, update)
}
}
console.error(`Completed batch ending ${end}`)
start = end
}
return updated
}
/**
* @param {string} collectionName
* @param {Document} query
* @param {UpdateDocument | ((batch: Array<Object>) => Promise<void>)} update
* @param {Document} [projection]
* @param {FindOptions} [findOptions]
* @param {BatchedUpdateOptions} [batchedUpdateOptions]
*/
function batchedUpdateWithResultHandling(
collectionName,
query,
update,
projection,
findOptions,
batchedUpdateOptions
) {
batchedUpdate(
collectionName,
query,
update,
projection,
findOptions,
batchedUpdateOptions
)
.then(processed => {
console.error({ processed })
process.exit(0)
})
.catch(error => {
console.error({ error })
process.exit(1)
})
}
export default {
batchedUpdate,
batchedUpdateWithResultHandling,
}