Merge pull request #21441 from overleaf/jpa-batched-update-types

[web] add types to batchedUpdate

GitOrigin-RevId: a4ab8951bb43fbae6d90ac7a7afbaa781accdc39
This commit is contained in:
Jakob Ackermann 2024-10-31 13:17:06 +01:00 committed by Copybot
parent a7517eefcb
commit 0e4c87d131

View file

@ -1,3 +1,4 @@
// @ts-check
import mongodb from 'mongodb-legacy' import mongodb from 'mongodb-legacy'
import { import {
db, db,
@ -16,11 +17,33 @@ let BATCH_RANGE_START
let BATCH_RANGE_END let BATCH_RANGE_END
let BATCH_MAX_TIME_SPAN_IN_MS let BATCH_MAX_TIME_SPAN_IN_MS
/**
* @typedef {import("mongodb").Collection} Collection
* @typedef {import("mongodb").Document} Document
* @typedef {import("mongodb").FindOptions} FindOptions
* @typedef {import("mongodb").UpdateFilter<Document>} UpdateDocument
* @typedef {import("mongodb").ObjectId} ObjectId
*/
/**
* @typedef {Object} BatchedUpdateOptions
* @property {string} [BATCH_DESCENDING]
* @property {string} [BATCH_LAST_ID]
* @property {string} [BATCH_MAX_TIME_SPAN_IN_MS]
* @property {string} [BATCH_RANGE_END]
* @property {string} [BATCH_RANGE_START]
* @property {string} [BATCH_SIZE]
* @property {string} [VERBOSE_LOGGING]
*/
/**
* @param {BatchedUpdateOptions} options
*/
function refreshGlobalOptionsForBatchedUpdate(options = {}) { function refreshGlobalOptionsForBatchedUpdate(options = {}) {
options = Object.assign({}, options, process.env) options = Object.assign({}, options, process.env)
BATCH_DESCENDING = options.BATCH_DESCENDING === 'true' BATCH_DESCENDING = options.BATCH_DESCENDING === 'true'
BATCH_SIZE = parseInt(options.BATCH_SIZE, 10) || 1000 BATCH_SIZE = parseInt(options.BATCH_SIZE || '1000', 10) || 1000
VERBOSE_LOGGING = options.VERBOSE_LOGGING === 'true' VERBOSE_LOGGING = options.VERBOSE_LOGGING === 'true'
if (options.BATCH_LAST_ID) { if (options.BATCH_LAST_ID) {
BATCH_RANGE_START = new ObjectId(options.BATCH_LAST_ID) BATCH_RANGE_START = new ObjectId(options.BATCH_LAST_ID)
@ -33,8 +56,10 @@ function refreshGlobalOptionsForBatchedUpdate(options = {}) {
BATCH_RANGE_START = ID_EDGE_PAST BATCH_RANGE_START = ID_EDGE_PAST
} }
} }
BATCH_MAX_TIME_SPAN_IN_MS = BATCH_MAX_TIME_SPAN_IN_MS = parseInt(
parseInt(options.BATCH_MAX_TIME_SPAN_IN_MS, 10) || ONE_MONTH_IN_MS options.BATCH_MAX_TIME_SPAN_IN_MS || ONE_MONTH_IN_MS.toString(),
10
)
if (options.BATCH_RANGE_END) { if (options.BATCH_RANGE_END) {
BATCH_RANGE_END = new ObjectId(options.BATCH_RANGE_END) BATCH_RANGE_END = new ObjectId(options.BATCH_RANGE_END)
} else { } else {
@ -46,14 +71,23 @@ function refreshGlobalOptionsForBatchedUpdate(options = {}) {
} }
} }
async function getNextBatch({ /**
* @param {Collection} collection
* @param {Document} query
* @param {ObjectId} start
* @param {ObjectId} end
* @param {Document} projection
* @param {FindOptions} findOptions
* @return {Promise<Array<Document>>}
*/
async function getNextBatch(
collection, collection,
query, query,
start, start,
end, end,
projection, projection,
findOptions, findOptions
}) { ) {
if (BATCH_DESCENDING) { if (BATCH_DESCENDING) {
query._id = { query._id = {
$gt: end, $gt: end,
@ -73,21 +107,39 @@ async function getNextBatch({
.toArray() .toArray()
} }
/**
* @param {Collection} collection
* @param {Array<Document>} nextBatch
* @param {UpdateDocument} update
* @return {Promise<void>}
*/
async function performUpdate(collection, nextBatch, update) { async function performUpdate(collection, nextBatch, update) {
return collection.updateMany( await collection.updateMany(
{ _id: { $in: nextBatch.map(entry => entry._id) } }, { _id: { $in: nextBatch.map(entry => entry._id) } },
update update
) )
} }
/**
* @param {number} ms
* @return {ObjectId}
*/
function objectIdFromMs(ms) { function objectIdFromMs(ms) {
return ObjectId.createFromTime(ms / 1000) return ObjectId.createFromTime(ms / 1000)
} }
/**
* @param {ObjectId} id
* @return {number}
*/
function getMsFromObjectId(id) { function getMsFromObjectId(id) {
return id.getTimestamp().getTime() return id.getTimestamp().getTime()
} }
/**
* @param {ObjectId} start
* @return {ObjectId}
*/
function getNextEnd(start) { function getNextEnd(start) {
let end let end
if (BATCH_DESCENDING) { if (BATCH_DESCENDING) {
@ -104,6 +156,10 @@ function getNextEnd(start) {
return end return end
} }
/**
* @param {Collection} collection
* @return {Promise<ObjectId|null>}
*/
async function getIdEdgePast(collection) { async function getIdEdgePast(collection) {
const [first] = await collection const [first] = await collection
.find({}) .find({})
@ -112,11 +168,19 @@ async function getIdEdgePast(collection) {
.limit(1) .limit(1)
.toArray() .toArray()
if (!first) return null if (!first) return null
// Go 1s further into the past in order to include the first entry via // Go one second further into the past in order to include the first entry via
// first._id > ID_EDGE_PAST // first._id > ID_EDGE_PAST
return objectIdFromMs(Math.max(0, getMsFromObjectId(first._id) - 1000)) return objectIdFromMs(Math.max(0, getMsFromObjectId(first._id) - 1000))
} }
/**
* @param {string} collectionName
* @param {Document} query
* @param {UpdateDocument | ((batch: Array<Document>) => Promise<void>)} update
* @param {Document} [projection]
* @param {FindOptions} [findOptions]
* @param {BatchedUpdateOptions} [batchedUpdateOptions]
*/
async function batchedUpdate( async function batchedUpdate(
collectionName, collectionName,
query, query,
@ -143,14 +207,14 @@ async function batchedUpdate(
while (start !== BATCH_RANGE_END) { while (start !== BATCH_RANGE_END) {
let end = getNextEnd(start) let end = getNextEnd(start)
nextBatch = await getNextBatch({ nextBatch = await getNextBatch(
collection, collection,
query, query,
start, start,
end, end,
projection, projection,
findOptions, findOptions
}) )
if (nextBatch.length > 0) { if (nextBatch.length > 0) {
end = nextBatch[nextBatch.length - 1]._id end = nextBatch[nextBatch.length - 1]._id
updated += nextBatch.length updated += nextBatch.length
@ -177,14 +241,30 @@ async function batchedUpdate(
return updated return updated
} }
/**
* @param {string} collectionName
* @param {Document} query
* @param {UpdateDocument | ((batch: Array<Object>) => Promise<void>)} update
* @param {Document} [projection]
* @param {FindOptions} [findOptions]
* @param {BatchedUpdateOptions} [batchedUpdateOptions]
*/
function batchedUpdateWithResultHandling( function batchedUpdateWithResultHandling(
collection, collectionName,
query, query,
update, update,
projection, projection,
options findOptions,
batchedUpdateOptions
) { ) {
batchedUpdate(collection, query, update, projection, options) batchedUpdate(
collectionName,
query,
update,
projection,
findOptions,
batchedUpdateOptions
)
.then(processed => { .then(processed => {
console.error({ processed }) console.error({ processed })
process.exit(0) process.exit(0)