const VERBOSE_LOGGING = process.env.VERBOSE_LOGGING === 'true' const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY, 10) || 10 const BATCH_SIZE = parseInt(process.env.BATCH_SIZE, 10) || 100 // persist fallback in order to keep batchedUpdate in-sync process.env.BATCH_SIZE = BATCH_SIZE const { ReadPreference } = require('mongodb') const { db } = require('../app/src/infrastructure/mongodb') const { promiseMapWithLimit } = require('../app/src/util/promises') const TokenGenerator = require('../app/src/Features/TokenGenerator/TokenGenerator') const { batchedUpdate } = require('./helpers/batchedUpdate') async function rewriteDuplicates(duplicateReferralIds) { // duplicateReferralIds contains at least one duplicate. // Find out which is the duplicate in parallel and update // any users if necessary. await promiseMapWithLimit( WRITE_CONCURRENCY, duplicateReferralIds, async referralId => { try { const users = await db.users .find( { referal_id: referralId }, { projection: { _id: 1 }, readPreference: ReadPreference.SECONDARY, } ) .toArray() if (users.length === 1) { // This referral id was part of a batch of duplicates. // Keep the write load low and skip the update. return } if (VERBOSE_LOGGING) { console.log('Found duplicate:', referralId) } for (const user of users) { const newReferralId = TokenGenerator.generateReferralId() await db.users.updateOne( { _id: user._id }, { $set: { referal_id: newReferralId, }, } ) } } catch (error) { console.error( { err: error }, `Failed to generate new referral ID for duplicate ID: ${referralId}` ) } } ) } async function processBatch(_, users) { const uniqueReferalIdsInBatch = Array.from( new Set(users.map(user => user.referal_id)) ) if (uniqueReferalIdsInBatch.length !== users.length) { if (VERBOSE_LOGGING) { console.log('Got duplicates from looking at batch.') } await rewriteDuplicates(uniqueReferalIdsInBatch) return } const nMatching = await db.users.count( { referal_id: { $in: uniqueReferalIdsInBatch } }, { readPreference: ReadPreference.SECONDARY } ) if (nMatching !== uniqueReferalIdsInBatch.length) { if (VERBOSE_LOGGING) { console.log('Got duplicates from running count.') } await rewriteDuplicates(uniqueReferalIdsInBatch) } } async function main() { await batchedUpdate( 'users', { referal_id: { $exists: true } }, processBatch, { _id: 1, referal_id: 1 } ) } main() .then(() => { console.error('Done.') process.exit(0) }) .catch(error => { console.error({ error }) process.exit(1) })