Merge pull request #3877 from overleaf/jpa-script-regenerate-referreal-ids-optimize-queries

[scripts] regenerate_duplicate_referral_ids: optimize queries, add tests

GitOrigin-RevId: 5b6881b857b20a80bc8db6e01fb7668dc0377675
This commit is contained in:
Timothée Alby 2021-04-19 14:35:39 +02:00 committed by Copybot
parent dcd6bd347f
commit 5520553e34
2 changed files with 226 additions and 38 deletions

View file

@ -1,54 +1,53 @@
const VERBOSE_LOGGING = process.env.VERBOSE_LOGGING === 'true'
const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY, 10) || 10
const BATCH_SIZE = parseInt(process.env.BATCH_SIZE, 10) || 100
// persist fallback in order to keep batchedUpdate in-sync
process.env.BATCH_SIZE = BATCH_SIZE
const logger = require('logger-sharelatex')
const { db, waitForDb } = require('../app/src/infrastructure/mongodb')
const { ReadPreference } = require('mongodb')
const { db } = require('../app/src/infrastructure/mongodb')
const { promiseMapWithLimit } = require('../app/src/util/promises')
const TokenGenerator = require('../app/src/Features/TokenGenerator/TokenGenerator')
const UserUpdater = require('../app/src/Features/User/UserUpdater')
async function main() {
logger.info({}, 'Regenerating duplicate referral IDs')
await waitForDb()
const duplicates = await db.users
.aggregate(
[
{ $match: { referal_id: { $exists: true } } },
{ $group: { _id: '$referal_id', count: { $sum: 1 } } },
{ $match: { count: { $gt: 1 } } }
],
{ allowDiskUse: true, readPreference: 'secondary' }
)
.maxTimeMS(600000)
const duplicateReferralIds = []
let duplicate
while ((duplicate = await duplicates.next())) {
duplicateReferralIds.push(duplicate._id)
}
logger.info(
{},
`Found ${duplicateReferralIds.length} duplicate referral ID to regenerate`
)
const { batchedUpdate } = require('./helpers/batchedUpdate')
async function rewriteDuplicates(duplicateReferralIds) {
// duplicateReferralIds contains at least one duplicate.
// Find out which is the duplicate in parallel and update
// any users if necessary.
await promiseMapWithLimit(
WRITE_CONCURRENCY,
duplicateReferralIds,
async referralId => {
const users = await db.users
.find({
referal_id: referralId
})
.toArray()
try {
const users = await db.users
.find(
{ referal_id: referralId },
{
projection: { _id: 1 },
readPreference: ReadPreference.SECONDARY
}
)
.toArray()
if (users.length === 1) {
// This referral id was part of a batch of duplicates.
// Keep the write load low and skip the update.
return
}
if (VERBOSE_LOGGING) {
console.log('Found duplicate:', referralId)
}
for (const user of users) {
const newReferralId = TokenGenerator.generateReferralId()
await UserUpdater.promises.updateUser(user._id, {
$set: {
referal_id: newReferralId
await db.users.updateOne(
{ _id: user._id },
{
$set: {
referal_id: newReferralId
}
}
})
)
}
} catch (error) {
console.error(
@ -60,8 +59,41 @@ async function main() {
)
}
async function processBatch(_, users) {
const uniqueReferalIdsInBatch = Array.from(
new Set(users.map(user => user.referal_id))
)
if (uniqueReferalIdsInBatch.length !== users.length) {
if (VERBOSE_LOGGING) {
console.log('Got duplicates from looking at batch.')
}
await rewriteDuplicates(uniqueReferalIdsInBatch)
return
}
const nMatching = await db.users.count(
{ referal_id: { $in: uniqueReferalIdsInBatch } },
{ readPreference: ReadPreference.SECONDARY }
)
if (nMatching !== uniqueReferalIdsInBatch.length) {
if (VERBOSE_LOGGING) {
console.log('Got duplicates from running count.')
}
await rewriteDuplicates(uniqueReferalIdsInBatch)
}
}
async function main() {
await batchedUpdate(
'users',
{ referal_id: { $exists: true } },
processBatch,
{ _id: 1, referal_id: 1 }
)
}
main()
.then(() => {
console.error('Done.')
process.exit(0)
})
.catch(error => {

View file

@ -0,0 +1,156 @@
const { exec } = require('child_process')
const { promisify } = require('util')
const { expect } = require('chai')
const logger = require('logger-sharelatex')
const { db } = require('../../../app/src/infrastructure/mongodb')
const BATCH_SIZE = 100
let n = 0
function getUniqueReferralId() {
return `unique_${n++}`
}
function getUserWithReferralId(referralId) {
const email = `${Math.random()}@example.com`
return {
referal_id: referralId,
// Make the unique indexes happy.
email,
emails: [{ email }]
}
}
async function getBatch(batchCounter) {
return (
await db.users
.find(
{},
{
projection: { _id: 1 },
skip: BATCH_SIZE * --batchCounter,
limit: BATCH_SIZE
}
)
.toArray()
).map(user => user._id)
}
describe('RegenerateDuplicateReferralIds', function() {
let firstBatch, secondBatch, thirdBatch, forthBatch, duplicateAcrossBatch
beforeEach('insert duplicates', async function() {
// full batch of duplicates
await db.users.insertMany(
Array(BATCH_SIZE)
.fill(0)
.map(() => {
return getUserWithReferralId('duplicate1')
})
)
firstBatch = await getBatch(1)
// batch of 999 duplicates and 1 unique
await db.users.insertMany(
Array(BATCH_SIZE - 1)
.fill(0)
.map(() => {
return getUserWithReferralId('duplicate2')
})
.concat([getUserWithReferralId(getUniqueReferralId())])
)
secondBatch = await getBatch(2)
// duplicate outside batch
duplicateAcrossBatch = getUniqueReferralId()
await db.users.insertMany(
Array(BATCH_SIZE - 1)
.fill(0)
.map(() => {
return getUserWithReferralId(getUniqueReferralId())
})
.concat([getUserWithReferralId(duplicateAcrossBatch)])
)
thirdBatch = await getBatch(3)
// no new duplicates onwards
await db.users.insertMany(
Array(BATCH_SIZE - 1)
.fill(0)
.map(() => {
return getUserWithReferralId(getUniqueReferralId())
})
.concat([getUserWithReferralId(duplicateAcrossBatch)])
)
forthBatch = await getBatch(4)
})
let result
beforeEach('run script', async function() {
try {
result = await promisify(exec)(
[
// set low BATCH_SIZE
`BATCH_SIZE=${BATCH_SIZE}`,
// log details on duplicate matching
'VERBOSE_LOGGING=true',
// disable verbose logging from logger-sharelatex
'LOG_LEVEL=ERROR',
// actual command
'node',
'scripts/regenerate_duplicate_referral_ids'
].join(' ')
)
} catch (err) {
// dump details like exit code, stdErr and stdOut
logger.error({ err }, 'script failed')
throw err
}
})
it('should do the correct operations', function() {
let { stderr: stdErr, stdout: stdOut } = result
stdErr = stdErr
.split('\n')
.filter(line => !line.includes('DeprecationWarning'))
stdOut = stdOut
.split('\n')
.filter(line => !line.includes('Using settings from'))
expect(stdErr).to.deep.equal([
`Completed batch ending ${firstBatch[BATCH_SIZE - 1]}`,
`Completed batch ending ${secondBatch[BATCH_SIZE - 1]}`,
`Completed batch ending ${thirdBatch[BATCH_SIZE - 1]}`,
`Completed batch ending ${forthBatch[BATCH_SIZE - 1]}`,
'Done.',
''
])
expect(stdOut).to.deep.equal([
// only duplicates
`Running update on batch with ids ${JSON.stringify(firstBatch)}`,
'Got duplicates from looking at batch.',
'Found duplicate: duplicate1',
// duplicate in batch
`Running update on batch with ids ${JSON.stringify(secondBatch)}`,
'Got duplicates from looking at batch.',
'Found duplicate: duplicate2',
// duplicate with next batch
`Running update on batch with ids ${JSON.stringify(thirdBatch)}`,
'Got duplicates from running count.',
`Found duplicate: ${duplicateAcrossBatch}`,
// no new duplicates
`Running update on batch with ids ${JSON.stringify(forthBatch)}`,
''
])
})
it('should give all users a unique refereal_id', async function() {
const users = await db.users
.find({}, { projection: { referal_id: 1 } })
.toArray()
const uniqueReferralIds = Array.from(
new Set(users.map(user => user.referal_id))
)
expect(users).to.have.length(4 * BATCH_SIZE)
expect(uniqueReferralIds).to.have.length(users.length)
})
})