diff --git a/services/web/scripts/helpers/batchedUpdate.mjs b/services/web/scripts/helpers/batchedUpdate.mjs index fb746a7b55..7f89959314 100644 --- a/services/web/scripts/helpers/batchedUpdate.mjs +++ b/services/web/scripts/helpers/batchedUpdate.mjs @@ -1,3 +1,4 @@ +// @ts-check import mongodb from 'mongodb-legacy' import { db, @@ -16,11 +17,33 @@ let BATCH_RANGE_START let BATCH_RANGE_END let BATCH_MAX_TIME_SPAN_IN_MS +/** + * @typedef {import("mongodb").Collection} Collection + * @typedef {import("mongodb").Document} Document + * @typedef {import("mongodb").FindOptions} FindOptions + * @typedef {import("mongodb").UpdateFilter} UpdateDocument + * @typedef {import("mongodb").ObjectId} ObjectId + */ + +/** + * @typedef {Object} BatchedUpdateOptions + * @property {string} [BATCH_DESCENDING] + * @property {string} [BATCH_LAST_ID] + * @property {string} [BATCH_MAX_TIME_SPAN_IN_MS] + * @property {string} [BATCH_RANGE_END] + * @property {string} [BATCH_RANGE_START] + * @property {string} [BATCH_SIZE] + * @property {string} [VERBOSE_LOGGING] + */ + +/** + * @param {BatchedUpdateOptions} options + */ function refreshGlobalOptionsForBatchedUpdate(options = {}) { options = Object.assign({}, options, process.env) BATCH_DESCENDING = options.BATCH_DESCENDING === 'true' - BATCH_SIZE = parseInt(options.BATCH_SIZE, 10) || 1000 + BATCH_SIZE = parseInt(options.BATCH_SIZE || '1000', 10) || 1000 VERBOSE_LOGGING = options.VERBOSE_LOGGING === 'true' if (options.BATCH_LAST_ID) { BATCH_RANGE_START = new ObjectId(options.BATCH_LAST_ID) @@ -33,8 +56,10 @@ function refreshGlobalOptionsForBatchedUpdate(options = {}) { BATCH_RANGE_START = ID_EDGE_PAST } } - BATCH_MAX_TIME_SPAN_IN_MS = - parseInt(options.BATCH_MAX_TIME_SPAN_IN_MS, 10) || ONE_MONTH_IN_MS + BATCH_MAX_TIME_SPAN_IN_MS = parseInt( + options.BATCH_MAX_TIME_SPAN_IN_MS || ONE_MONTH_IN_MS.toString(), + 10 + ) if (options.BATCH_RANGE_END) { BATCH_RANGE_END = new ObjectId(options.BATCH_RANGE_END) } else { @@ -46,14 +71,23 @@ function refreshGlobalOptionsForBatchedUpdate(options = {}) { } } -async function getNextBatch({ +/** + * @param {Collection} collection + * @param {Document} query + * @param {ObjectId} start + * @param {ObjectId} end + * @param {Document} projection + * @param {FindOptions} findOptions + * @return {Promise>} + */ +async function getNextBatch( collection, query, start, end, projection, - findOptions, -}) { + findOptions +) { if (BATCH_DESCENDING) { query._id = { $gt: end, @@ -73,21 +107,39 @@ async function getNextBatch({ .toArray() } +/** + * @param {Collection} collection + * @param {Array} nextBatch + * @param {UpdateDocument} update + * @return {Promise} + */ async function performUpdate(collection, nextBatch, update) { - return collection.updateMany( + await collection.updateMany( { _id: { $in: nextBatch.map(entry => entry._id) } }, update ) } +/** + * @param {number} ms + * @return {ObjectId} + */ function objectIdFromMs(ms) { return ObjectId.createFromTime(ms / 1000) } +/** + * @param {ObjectId} id + * @return {number} + */ function getMsFromObjectId(id) { return id.getTimestamp().getTime() } +/** + * @param {ObjectId} start + * @return {ObjectId} + */ function getNextEnd(start) { let end if (BATCH_DESCENDING) { @@ -104,6 +156,10 @@ function getNextEnd(start) { return end } +/** + * @param {Collection} collection + * @return {Promise} + */ async function getIdEdgePast(collection) { const [first] = await collection .find({}) @@ -112,11 +168,19 @@ async function getIdEdgePast(collection) { .limit(1) .toArray() if (!first) return null - // Go 1s further into the past in order to include the first entry via + // Go one second further into the past in order to include the first entry via // first._id > ID_EDGE_PAST return objectIdFromMs(Math.max(0, getMsFromObjectId(first._id) - 1000)) } +/** + * @param {string} collectionName + * @param {Document} query + * @param {UpdateDocument | ((batch: Array) => Promise)} update + * @param {Document} [projection] + * @param {FindOptions} [findOptions] + * @param {BatchedUpdateOptions} [batchedUpdateOptions] + */ async function batchedUpdate( collectionName, query, @@ -143,14 +207,14 @@ async function batchedUpdate( while (start !== BATCH_RANGE_END) { let end = getNextEnd(start) - nextBatch = await getNextBatch({ + nextBatch = await getNextBatch( collection, query, start, end, projection, - findOptions, - }) + findOptions + ) if (nextBatch.length > 0) { end = nextBatch[nextBatch.length - 1]._id updated += nextBatch.length @@ -177,14 +241,30 @@ async function batchedUpdate( return updated } +/** + * @param {string} collectionName + * @param {Document} query + * @param {UpdateDocument | ((batch: Array) => Promise)} update + * @param {Document} [projection] + * @param {FindOptions} [findOptions] + * @param {BatchedUpdateOptions} [batchedUpdateOptions] + */ function batchedUpdateWithResultHandling( - collection, + collectionName, query, update, projection, - options + findOptions, + batchedUpdateOptions ) { - batchedUpdate(collection, query, update, projection, options) + batchedUpdate( + collectionName, + query, + update, + projection, + findOptions, + batchedUpdateOptions + ) .then(processed => { console.error({ processed }) process.exit(0)