2020-08-18 10:08:01 -04:00
|
|
|
const { promisify } = require('util')
|
2020-08-19 05:11:39 -04:00
|
|
|
const { ReadPreference, ObjectId } = require('mongodb')
|
2020-08-17 10:14:11 -04:00
|
|
|
const { getNativeDb } = require('../../app/src/infrastructure/Mongoose')
|
|
|
|
|
|
|
|
const BATCH_SIZE = parseInt(process.env.BATCH_SIZE, 10) || 1000
|
2020-08-19 05:11:39 -04:00
|
|
|
let BATCH_LAST_ID
|
|
|
|
if (process.env.BATCH_LAST_ID) {
|
|
|
|
BATCH_LAST_ID = ObjectId(process.env.BATCH_LAST_ID)
|
|
|
|
}
|
2020-08-17 10:14:11 -04:00
|
|
|
|
|
|
|
async function getNextBatch(collection, query, maxId) {
|
|
|
|
if (maxId) {
|
|
|
|
query['_id'] = { $gt: maxId }
|
|
|
|
}
|
|
|
|
const entries = await collection
|
|
|
|
.find(query, { _id: 1 })
|
|
|
|
.sort({ _id: 1 })
|
|
|
|
.limit(BATCH_SIZE)
|
|
|
|
.setReadPreference(ReadPreference.SECONDARY)
|
|
|
|
.toArray()
|
|
|
|
return entries.map(entry => entry._id)
|
|
|
|
}
|
|
|
|
|
|
|
|
async function performUpdate(collection, nextBatch, update) {
|
|
|
|
return collection.updateMany({ _id: { $in: nextBatch } }, update)
|
|
|
|
}
|
|
|
|
|
|
|
|
async function batchedUpdate(collectionName, query, update) {
|
2020-08-18 10:08:01 -04:00
|
|
|
// Apparently the mongo driver returns the connection too early.
|
|
|
|
// Some secondary connections are not ready as it returns, leading to
|
|
|
|
// failing cursor actions with a readPreference set to 'secondary'.
|
|
|
|
// TODO(das7pad): revisit/remove this delay after the mongo-driver update.
|
|
|
|
await Promise.all([getNativeDb(), promisify(setTimeout)(10 * 1000)])
|
|
|
|
|
2020-08-17 10:14:11 -04:00
|
|
|
const db = await getNativeDb()
|
|
|
|
const collection = db.collection(collectionName)
|
|
|
|
|
|
|
|
let nextBatch
|
|
|
|
let updated = 0
|
2020-08-19 05:11:39 -04:00
|
|
|
let maxId = BATCH_LAST_ID
|
2020-08-17 10:14:11 -04:00
|
|
|
while ((nextBatch = await getNextBatch(collection, query, maxId)).length) {
|
|
|
|
maxId = nextBatch[nextBatch.length - 1]
|
|
|
|
updated += nextBatch.length
|
|
|
|
console.log(JSON.stringify(nextBatch))
|
|
|
|
await performUpdate(collection, nextBatch, update)
|
|
|
|
}
|
|
|
|
return updated
|
|
|
|
}
|
|
|
|
|
|
|
|
function batchedUpdateWithResultHandling(collection, query, update) {
|
|
|
|
batchedUpdate(collection, query, update)
|
|
|
|
.then(updated => {
|
|
|
|
console.error({ updated })
|
|
|
|
process.exit(0)
|
|
|
|
})
|
|
|
|
.catch(error => {
|
|
|
|
console.error({ error })
|
|
|
|
process.exit(1)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = {
|
|
|
|
batchedUpdate,
|
|
|
|
batchedUpdateWithResultHandling
|
|
|
|
}
|