2020-08-19 05:11:39 -04:00
|
|
|
const { ReadPreference, ObjectId } = require('mongodb')
|
2020-10-07 09:17:49 -04:00
|
|
|
const { db, waitForDb } = require('../../app/src/infrastructure/mongodb')
|
2020-08-17 10:14:11 -04:00
|
|
|
|
2023-02-16 05:41:58 -05:00
|
|
|
let BATCH_DESCENDING
|
|
|
|
let BATCH_SIZE
|
|
|
|
let VERBOSE_LOGGING
|
2020-08-19 05:11:39 -04:00
|
|
|
let BATCH_LAST_ID
|
2021-11-30 05:41:53 -05:00
|
|
|
let BATCH_RANGE_END
|
2023-02-16 05:41:58 -05:00
|
|
|
refreshGlobalOptionsForBatchedUpdate()
|
|
|
|
|
|
|
|
function refreshGlobalOptionsForBatchedUpdate(options = {}) {
|
|
|
|
options = Object.assign({}, options, process.env)
|
|
|
|
|
|
|
|
BATCH_DESCENDING = options.BATCH_DESCENDING === 'true'
|
|
|
|
BATCH_SIZE = parseInt(options.BATCH_SIZE, 10) || 1000
|
|
|
|
VERBOSE_LOGGING = options.VERBOSE_LOGGING === 'true'
|
|
|
|
if (options.BATCH_LAST_ID) {
|
|
|
|
BATCH_LAST_ID = ObjectId(options.BATCH_LAST_ID)
|
|
|
|
} else if (options.BATCH_RANGE_START) {
|
|
|
|
BATCH_LAST_ID = ObjectId(options.BATCH_RANGE_START)
|
|
|
|
}
|
|
|
|
if (options.BATCH_RANGE_END) {
|
|
|
|
BATCH_RANGE_END = ObjectId(options.BATCH_RANGE_END)
|
|
|
|
}
|
2020-08-19 05:11:39 -04:00
|
|
|
}
|
2020-08-17 10:14:11 -04:00
|
|
|
|
2021-08-04 09:09:06 -04:00
|
|
|
async function getNextBatch(collection, query, maxId, projection, options) {
|
2021-11-30 05:41:53 -05:00
|
|
|
const queryIdField = {}
|
2020-10-21 05:48:30 -04:00
|
|
|
maxId = maxId || BATCH_LAST_ID
|
2020-08-17 10:14:11 -04:00
|
|
|
if (maxId) {
|
2021-11-11 10:10:01 -05:00
|
|
|
if (BATCH_DESCENDING) {
|
2021-11-30 05:41:53 -05:00
|
|
|
queryIdField.$lt = maxId
|
|
|
|
} else {
|
|
|
|
queryIdField.$gt = maxId
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (BATCH_RANGE_END) {
|
|
|
|
if (BATCH_DESCENDING) {
|
|
|
|
queryIdField.$gt = BATCH_RANGE_END
|
2021-11-11 10:10:01 -05:00
|
|
|
} else {
|
2021-11-30 05:41:53 -05:00
|
|
|
queryIdField.$lt = BATCH_RANGE_END
|
2021-11-11 10:10:01 -05:00
|
|
|
}
|
2020-08-17 10:14:11 -04:00
|
|
|
}
|
2021-11-30 05:41:53 -05:00
|
|
|
if (queryIdField.$gt || queryIdField.$lt) {
|
|
|
|
query._id = queryIdField
|
|
|
|
}
|
2020-08-17 10:14:11 -04:00
|
|
|
const entries = await collection
|
2021-08-04 09:09:06 -04:00
|
|
|
.find(query, options)
|
2020-10-07 09:17:49 -04:00
|
|
|
.project(projection)
|
2021-11-11 10:10:01 -05:00
|
|
|
.sort({ _id: BATCH_DESCENDING ? -1 : 1 })
|
2020-08-17 10:14:11 -04:00
|
|
|
.limit(BATCH_SIZE)
|
|
|
|
.toArray()
|
2020-09-25 04:40:07 -04:00
|
|
|
return entries
|
2020-08-17 10:14:11 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
async function performUpdate(collection, nextBatch, update) {
|
2020-09-25 04:40:07 -04:00
|
|
|
return collection.updateMany(
|
|
|
|
{ _id: { $in: nextBatch.map(entry => entry._id) } },
|
|
|
|
update
|
|
|
|
)
|
2020-08-17 10:14:11 -04:00
|
|
|
}
|
|
|
|
|
2021-08-04 09:09:06 -04:00
|
|
|
async function batchedUpdate(
|
|
|
|
collectionName,
|
|
|
|
query,
|
|
|
|
update,
|
|
|
|
projection,
|
2023-02-16 05:41:58 -05:00
|
|
|
findOptions,
|
|
|
|
batchedUpdateOptions
|
2021-08-04 09:09:06 -04:00
|
|
|
) {
|
2023-02-16 05:41:58 -05:00
|
|
|
refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions)
|
2020-10-07 09:17:49 -04:00
|
|
|
await waitForDb()
|
|
|
|
const collection = db[collectionName]
|
2020-08-17 10:14:11 -04:00
|
|
|
|
2023-02-16 05:41:58 -05:00
|
|
|
findOptions = findOptions || {}
|
|
|
|
findOptions.readPreference = ReadPreference.SECONDARY
|
2021-08-04 09:09:06 -04:00
|
|
|
|
2020-09-25 04:40:07 -04:00
|
|
|
projection = projection || { _id: 1 }
|
2020-08-17 10:14:11 -04:00
|
|
|
let nextBatch
|
|
|
|
let updated = 0
|
2020-10-21 05:48:30 -04:00
|
|
|
let maxId
|
2020-09-25 04:40:07 -04:00
|
|
|
while (
|
2021-08-04 09:09:06 -04:00
|
|
|
(nextBatch = await getNextBatch(
|
|
|
|
collection,
|
|
|
|
query,
|
|
|
|
maxId,
|
|
|
|
projection,
|
2023-02-16 05:41:58 -05:00
|
|
|
findOptions
|
2021-08-04 09:09:06 -04:00
|
|
|
)).length
|
2020-09-25 04:40:07 -04:00
|
|
|
) {
|
|
|
|
maxId = nextBatch[nextBatch.length - 1]._id
|
2020-08-17 10:14:11 -04:00
|
|
|
updated += nextBatch.length
|
2022-06-08 07:03:21 -04:00
|
|
|
if (VERBOSE_LOGGING) {
|
|
|
|
console.log(
|
|
|
|
`Running update on batch with ids ${JSON.stringify(
|
|
|
|
nextBatch.map(entry => entry._id)
|
|
|
|
)}`
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
console.error(`Running update on batch ending ${maxId}`)
|
|
|
|
}
|
2020-09-25 04:40:07 -04:00
|
|
|
|
|
|
|
if (typeof update === 'function') {
|
|
|
|
await update(collection, nextBatch)
|
|
|
|
} else {
|
|
|
|
await performUpdate(collection, nextBatch, update)
|
|
|
|
}
|
2020-11-04 04:53:26 -05:00
|
|
|
|
|
|
|
console.error(`Completed batch ending ${maxId}`)
|
2020-08-17 10:14:11 -04:00
|
|
|
}
|
|
|
|
return updated
|
|
|
|
}
|
|
|
|
|
2022-01-25 05:18:07 -05:00
|
|
|
function batchedUpdateWithResultHandling(
|
|
|
|
collection,
|
|
|
|
query,
|
|
|
|
update,
|
|
|
|
projection,
|
|
|
|
options
|
|
|
|
) {
|
|
|
|
batchedUpdate(collection, query, update, projection, options)
|
2020-08-17 10:14:11 -04:00
|
|
|
.then(updated => {
|
|
|
|
console.error({ updated })
|
|
|
|
process.exit(0)
|
|
|
|
})
|
|
|
|
.catch(error => {
|
|
|
|
console.error({ error })
|
|
|
|
process.exit(1)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = {
|
2020-10-21 05:48:30 -04:00
|
|
|
getNextBatch,
|
2020-08-17 10:14:11 -04:00
|
|
|
batchedUpdate,
|
2021-04-27 03:52:58 -04:00
|
|
|
batchedUpdateWithResultHandling,
|
2020-08-17 10:14:11 -04:00
|
|
|
}
|