Merge pull request #12276 from overleaf/jpa-batched-update-window

[web] add time based window queries to batchedUpdate

GitOrigin-RevId: e56c01b888cd9749f39d42b77de09bc3fe2d0ec1
This commit is contained in:
Jakob Ackermann 2023-03-21 09:09:06 +00:00 committed by Copybot
parent aee777ca1d
commit 03f45c02c3
6 changed files with 163 additions and 170 deletions

View file

@ -1,12 +1,15 @@
const { ReadPreference, ObjectId } = require('mongodb') const { ReadPreference, ObjectId } = require('mongodb')
const { db, waitForDb } = require('../../app/src/infrastructure/mongodb') const { db, waitForDb } = require('../../app/src/infrastructure/mongodb')
const ONE_MONTH_IN_MS = 1000 * 60 * 60 * 24 * 31
let ID_EDGE_PAST
const ID_EDGE_FUTURE = objectIdFromMs(Date.now() + 1000)
let BATCH_DESCENDING let BATCH_DESCENDING
let BATCH_SIZE let BATCH_SIZE
let VERBOSE_LOGGING let VERBOSE_LOGGING
let BATCH_LAST_ID let BATCH_RANGE_START
let BATCH_RANGE_END let BATCH_RANGE_END
refreshGlobalOptionsForBatchedUpdate() let BATCH_MAX_TIME_SPAN_IN_MS
function refreshGlobalOptionsForBatchedUpdate(options = {}) { function refreshGlobalOptionsForBatchedUpdate(options = {}) {
options = Object.assign({}, options, process.env) options = Object.assign({}, options, process.env)
@ -15,42 +18,54 @@ function refreshGlobalOptionsForBatchedUpdate(options = {}) {
BATCH_SIZE = parseInt(options.BATCH_SIZE, 10) || 1000 BATCH_SIZE = parseInt(options.BATCH_SIZE, 10) || 1000
VERBOSE_LOGGING = options.VERBOSE_LOGGING === 'true' VERBOSE_LOGGING = options.VERBOSE_LOGGING === 'true'
if (options.BATCH_LAST_ID) { if (options.BATCH_LAST_ID) {
BATCH_LAST_ID = ObjectId(options.BATCH_LAST_ID) BATCH_RANGE_START = ObjectId(options.BATCH_LAST_ID)
} else if (options.BATCH_RANGE_START) { } else if (options.BATCH_RANGE_START) {
BATCH_LAST_ID = ObjectId(options.BATCH_RANGE_START) BATCH_RANGE_START = ObjectId(options.BATCH_RANGE_START)
} else {
if (BATCH_DESCENDING) {
BATCH_RANGE_START = ID_EDGE_FUTURE
} else {
BATCH_RANGE_START = ID_EDGE_PAST
}
} }
BATCH_MAX_TIME_SPAN_IN_MS =
parseInt(options.BATCH_MAX_TIME_SPAN_IN_MS, 10) || ONE_MONTH_IN_MS
if (options.BATCH_RANGE_END) { if (options.BATCH_RANGE_END) {
BATCH_RANGE_END = ObjectId(options.BATCH_RANGE_END) BATCH_RANGE_END = ObjectId(options.BATCH_RANGE_END)
} else {
if (BATCH_DESCENDING) {
BATCH_RANGE_END = ID_EDGE_PAST
} else {
BATCH_RANGE_END = ID_EDGE_FUTURE
}
} }
} }
async function getNextBatch(collection, query, maxId, projection, options) { async function getNextBatch({
const queryIdField = {} collection,
maxId = maxId || BATCH_LAST_ID query,
if (maxId) { start,
if (BATCH_DESCENDING) { end,
queryIdField.$lt = maxId projection,
} else { findOptions,
queryIdField.$gt = maxId }) {
if (BATCH_DESCENDING) {
query._id = {
$gt: end,
$lt: start,
}
} else {
query._id = {
$gt: start,
$lt: end,
} }
} }
if (BATCH_RANGE_END) { return await collection
if (BATCH_DESCENDING) { .find(query, findOptions)
queryIdField.$gt = BATCH_RANGE_END
} else {
queryIdField.$lt = BATCH_RANGE_END
}
}
if (queryIdField.$gt || queryIdField.$lt) {
query._id = queryIdField
}
const entries = await collection
.find(query, options)
.project(projection) .project(projection)
.sort({ _id: BATCH_DESCENDING ? -1 : 1 }) .sort({ _id: BATCH_DESCENDING ? -1 : 1 })
.limit(BATCH_SIZE) .limit(BATCH_SIZE)
.toArray() .toArray()
return entries
} }
async function performUpdate(collection, nextBatch, update) { async function performUpdate(collection, nextBatch, update) {
@ -60,6 +75,42 @@ async function performUpdate(collection, nextBatch, update) {
) )
} }
function objectIdFromMs(ms) {
return ObjectId.createFromTime(ms / 1000)
}
function getMsFromObjectId(id) {
return id.getTimestamp().getTime()
}
function getNextEnd(start) {
let end
if (BATCH_DESCENDING) {
end = objectIdFromMs(getMsFromObjectId(start) - BATCH_MAX_TIME_SPAN_IN_MS)
if (getMsFromObjectId(end) <= getMsFromObjectId(BATCH_RANGE_END)) {
end = BATCH_RANGE_END
}
} else {
end = objectIdFromMs(getMsFromObjectId(start) + BATCH_MAX_TIME_SPAN_IN_MS)
if (getMsFromObjectId(end) >= getMsFromObjectId(BATCH_RANGE_END)) {
end = BATCH_RANGE_END
}
}
return end
}
async function getIdEdgePast(collection) {
const [first] = await collection
.find({})
.project({ _id: 1 })
.limit(1)
.toArray()
if (!first) return null
// Go 1s further into the past in order to include the first entry via
// first._id > ID_EDGE_PAST
return objectIdFromMs(Math.max(0, getMsFromObjectId(first._id) - 1000))
}
async function batchedUpdate( async function batchedUpdate(
collectionName, collectionName,
query, query,
@ -68,9 +119,14 @@ async function batchedUpdate(
findOptions, findOptions,
batchedUpdateOptions batchedUpdateOptions
) { ) {
refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions)
await waitForDb() await waitForDb()
const collection = db[collectionName] const collection = db[collectionName]
ID_EDGE_PAST = await getIdEdgePast(collection)
if (!ID_EDGE_PAST) {
console.warn(`The collection ${collectionName} appears to be empty.`)
return 0
}
refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions)
findOptions = findOptions || {} findOptions = findOptions || {}
findOptions.readPreference = ReadPreference.SECONDARY findOptions.readPreference = ReadPreference.SECONDARY
@ -78,35 +134,40 @@ async function batchedUpdate(
projection = projection || { _id: 1 } projection = projection || { _id: 1 }
let nextBatch let nextBatch
let updated = 0 let updated = 0
let maxId let start = BATCH_RANGE_START
while (
(nextBatch = await getNextBatch( while (start !== BATCH_RANGE_END) {
let end = getNextEnd(start)
nextBatch = await getNextBatch({
collection, collection,
query, query,
maxId, start,
end,
projection, projection,
findOptions findOptions,
)).length })
) { if (nextBatch.length > 0) {
maxId = nextBatch[nextBatch.length - 1]._id end = nextBatch[nextBatch.length - 1]._id
updated += nextBatch.length updated += nextBatch.length
if (VERBOSE_LOGGING) {
console.log(
`Running update on batch with ids ${JSON.stringify(
nextBatch.map(entry => entry._id)
)}`
)
} else {
console.error(`Running update on batch ending ${maxId}`)
}
if (typeof update === 'function') { if (VERBOSE_LOGGING) {
await update(collection, nextBatch) console.log(
} else { `Running update on batch with ids ${JSON.stringify(
await performUpdate(collection, nextBatch, update) nextBatch.map(entry => entry._id)
} )}`
)
} else {
console.error(`Running update on batch ending ${end}`)
}
console.error(`Completed batch ending ${maxId}`) if (typeof update === 'function') {
await update(collection, nextBatch)
} else {
await performUpdate(collection, nextBatch, update)
}
}
console.error(`Completed batch ending ${end}`)
start = end
} }
return updated return updated
} }
@ -119,8 +180,8 @@ function batchedUpdateWithResultHandling(
options options
) { ) {
batchedUpdate(collection, query, update, projection, options) batchedUpdate(collection, query, update, projection, options)
.then(updated => { .then(processed => {
console.error({ updated }) console.error({ processed })
process.exit(0) process.exit(0)
}) })
.catch(error => { .catch(error => {
@ -130,7 +191,6 @@ function batchedUpdateWithResultHandling(
} }
module.exports = { module.exports = {
getNextBatch,
batchedUpdate, batchedUpdate,
batchedUpdateWithResultHandling, batchedUpdateWithResultHandling,
} }

View file

@ -1,5 +1,4 @@
const { getNextBatch } = require('./helpers/batchedUpdate') const { batchedUpdateWithResultHandling } = require('./helpers/batchedUpdate')
const { db, waitForDb } = require('../app/src/infrastructure/mongodb')
const MODEL_NAME = process.argv.pop() const MODEL_NAME = process.argv.pop()
const Model = require(`../app/src/models/${MODEL_NAME}`)[MODEL_NAME] const Model = require(`../app/src/models/${MODEL_NAME}`)[MODEL_NAME]
@ -14,34 +13,11 @@ function processBatch(batch) {
} }
} }
async function main() { batchedUpdateWithResultHandling(
await waitForDb() Model.collection.name,
const collection = db[Model.collection.name] {},
async (_, nextBatch) => {
const query = {} await processBatch(nextBatch)
const projection = {} },
{}
let nextBatch )
let processed = 0
let maxId
while (
(nextBatch = await getNextBatch(collection, query, maxId, projection))
.length
) {
processBatch(nextBatch)
maxId = nextBatch[nextBatch.length - 1]._id
processed += nextBatch.length
console.error(maxId, processed)
}
console.error('done')
}
main()
.then(() => {
process.exit(0)
})
.catch(error => {
console.error({ error })
process.exit(1)
})

View file

@ -75,7 +75,7 @@ describe('BackFillDeletedFiles', function () {
let result let result
try { try {
result = await promisify(exec)( result = await promisify(exec)(
['LET_USER_DOUBLE_CHECK_INPUTS_FOR=1'] ['LET_USER_DOUBLE_CHECK_INPUTS_FOR=1', 'VERBOSE_LOGGING=true']
.concat(['node', 'scripts/back_fill_deleted_files']) .concat(['node', 'scripts/back_fill_deleted_files'])
.concat(args) .concat(args)
.join(' ') .join(' ')
@ -85,9 +85,23 @@ describe('BackFillDeletedFiles', function () {
logger.error({ error }, 'script failed') logger.error({ error }, 'script failed')
throw error throw error
} }
const { stderr: stdErr } = result const { stdout: stdOut } = result
expect(stdErr).to.include(`Completed batch ending ${projectId5}`) expect(stdOut).to.match(
new RegExp(`Running update on batch with ids .+${projectId1}`)
)
expect(stdOut).to.match(
new RegExp(`Running update on batch with ids .+${projectId2}`)
)
expect(stdOut).to.not.match(
new RegExp(`Running update on batch with ids .+${projectId3}`)
)
expect(stdOut).to.not.match(
new RegExp(`Running update on batch with ids .+${projectId4}`)
)
expect(stdOut).to.match(
new RegExp(`Running update on batch with ids .+${projectId5}`)
)
} }
function checkAreFilesBackFilled() { function checkAreFilesBackFilled() {

View file

@ -5,9 +5,9 @@ const logger = require('@overleaf/logger/logging-manager')
const { expect } = require('chai') const { expect } = require('chai')
describe('BackFillDocRevTests', function () { describe('BackFillDocRevTests', function () {
const docId1 = ObjectId.createFromTime(1) const docId1 = ObjectId()
const docId2 = ObjectId.createFromTime(2) const docId2 = ObjectId()
const docId3 = ObjectId.createFromTime(3) const docId3 = ObjectId()
beforeEach('insert docs', async function () { beforeEach('insert docs', async function () {
await db.docs.insertMany([ await db.docs.insertMany([
@ -21,17 +21,30 @@ describe('BackFillDocRevTests', function () {
let result let result
try { try {
result = await promisify(exec)( result = await promisify(exec)(
['node', 'scripts/back_fill_doc_rev', dryRun].join(' ') [
'VERBOSE_LOGGING=true',
'node',
'scripts/back_fill_doc_rev',
dryRun,
].join(' ')
) )
} catch (error) { } catch (error) {
// dump details like exit code, stdErr and stdOut // dump details like exit code, stdErr and stdOut
logger.error({ error }, 'script failed') logger.error({ error }, 'script failed')
throw error throw error
} }
const { stdout: stdOut, stderr: stdErr } = result const { stdout: stdOut } = result
expect(stdOut).to.include('rev missing 2 | deleted=true 1') expect(stdOut).to.include('rev missing 2 | deleted=true 1')
expect(stdErr).to.include(`Completed batch ending ${docId2}`) expect(stdOut).to.match(
new RegExp(`Running update on batch with ids .+${docId1}`)
)
expect(stdOut).to.match(
new RegExp(`Running update on batch with ids .+${docId2}`)
)
expect(stdOut).to.not.match(
new RegExp(`Running update on batch with ids .+${docId3}`)
)
} }
describe('dry-run=true', function () { describe('dry-run=true', function () {

View file

@ -10,10 +10,6 @@ const DUMMY_TIME = new Date('2021-04-12T00:00:00.000Z')
const ONE_DAY_IN_S = 60 * 60 * 24 const ONE_DAY_IN_S = 60 * 60 * 24
const BATCH_SIZE = 3 const BATCH_SIZE = 3
function getSecondsFromObjectId(id) {
return id.getTimestamp().getTime() / 1000
}
function getObjectIdFromDate(date) { function getObjectIdFromDate(date) {
const seconds = new Date(date).getTime() / 1000 const seconds = new Date(date).getTime() / 1000
return ObjectId.createFromTime(seconds) return ObjectId.createFromTime(seconds)
@ -137,87 +133,21 @@ describe('BackFillDummyDocMeta', function () {
stdErr = stdErr.split('\n') stdErr = stdErr.split('\n')
stdOut = stdOut.split('\n').filter(filterOutput) stdOut = stdOut.split('\n').filter(filterOutput)
const oneDayFromProjectId9InSeconds = expect(stdOut.filter(filterOutput)).to.include.members([
getSecondsFromObjectId(projectIds[9]) + ONE_DAY_IN_S
const oneDayFromProjectId9AsObjectId = getObjectIdFromDate(
1000 * oneDayFromProjectId9InSeconds
)
let overlappingPartStdOut
let overlappingPartStdErr
if (dryRun) {
// In dry-run, the previous id will get processed again as the name has not been updated.
overlappingPartStdOut = [
`Back filling dummy meta data for ["${docIds[9]}","${docIds[10]}"]`,
`Orphaned deleted doc ${docIds[9]} (no deletedProjects entry)`,
`Orphaned deleted doc ${docIds[10]} (no deletedProjects entry)`,
]
overlappingPartStdErr = [
`Processed 11 until ${oneDayFromProjectId9AsObjectId}`,
]
} else {
// Outside dry-run, the previous id will not match again as the `name` has been back-filled.
overlappingPartStdOut = [
`Back filling dummy meta data for ["${docIds[10]}"]`,
`Orphaned deleted doc ${docIds[10]} (no deletedProjects entry)`,
]
overlappingPartStdErr = [
`Processed 10 until ${oneDayFromProjectId9AsObjectId}`,
]
}
expect(stdOut.filter(filterOutput)).to.deep.equal([
`Back filling dummy meta data for ["${docIds[0]}"]`,
`Orphaned deleted doc ${docIds[0]} (no deletedProjects entry)`, `Orphaned deleted doc ${docIds[0]} (no deletedProjects entry)`,
`Back filling dummy meta data for ["${docIds[1]}"]`,
`Orphaned deleted doc ${docIds[1]} (no deletedProjects entry)`, `Orphaned deleted doc ${docIds[1]} (no deletedProjects entry)`,
`Back filling dummy meta data for ["${docIds[2]}"]`,
`Orphaned deleted doc ${docIds[2]} (failed hard deletion)`, `Orphaned deleted doc ${docIds[2]} (failed hard deletion)`,
`Back filling dummy meta data for ["${docIds[3]}"]`,
`Missing deletedDoc for ${docIds[3]}`, `Missing deletedDoc for ${docIds[3]}`,
// two docs in the same project
`Back filling dummy meta data for ["${docIds[4]}","${docIds[11]}"]`,
`Found deletedDoc for ${docIds[4]}`, `Found deletedDoc for ${docIds[4]}`,
`Found deletedDoc for ${docIds[11]}`, `Found deletedDoc for ${docIds[11]}`,
// 7,8,9 are on the same day, but exceed the batch size of 2
`Back filling dummy meta data for ["${docIds[7]}","${docIds[8]}","${docIds[9]}"]`,
`Orphaned deleted doc ${docIds[7]} (no deletedProjects entry)`, `Orphaned deleted doc ${docIds[7]} (no deletedProjects entry)`,
`Orphaned deleted doc ${docIds[8]} (no deletedProjects entry)`, `Orphaned deleted doc ${docIds[8]} (no deletedProjects entry)`,
`Orphaned deleted doc ${docIds[9]} (no deletedProjects entry)`, `Orphaned deleted doc ${docIds[9]} (no deletedProjects entry)`,
// Potential double processing `Orphaned deleted doc ${docIds[10]} (no deletedProjects entry)`,
...overlappingPartStdOut,
'',
]) ])
expect(stdErr.filter(filterOutput)).to.deep.equal([ expect(stdErr.filter(filterOutput)).to.include.members([
`Options: {`,
` "dryRun": ${options.DRY_RUN},`,
` "cacheSize": ${options.CACHE_SIZE},`,
` "firstProjectId": "${options.FIRST_PROJECT_ID}",`,
` "incrementByS": ${options.INCREMENT_BY_S},`,
` "batchSize": ${options.BATCH_SIZE},`,
` "stopAtS": ${options.STOP_AT_S},`,
` "letUserDoubleCheckInputsFor": ${options.LET_USER_DOUBLE_CHECK_INPUTS_FOR}`,
'}',
'Waiting for you to double check inputs for 1 ms',
`Processed 1 until ${getObjectIdFromDate('2021-04-02T00:00:00.000Z')}`,
`Processed 2 until ${getObjectIdFromDate('2021-04-03T00:00:00.000Z')}`,
`Processed 2 until ${getObjectIdFromDate('2021-04-04T00:00:00.000Z')}`,
`Processed 2 until ${getObjectIdFromDate('2021-04-05T00:00:00.000Z')}`,
`Processed 2 until ${getObjectIdFromDate('2021-04-06T00:00:00.000Z')}`,
`Processed 2 until ${getObjectIdFromDate('2021-04-07T00:00:00.000Z')}`,
`Processed 2 until ${getObjectIdFromDate('2021-04-08T00:00:00.000Z')}`,
`Processed 2 until ${getObjectIdFromDate('2021-04-09T00:00:00.000Z')}`,
`Processed 2 until ${getObjectIdFromDate('2021-04-10T00:00:00.000Z')}`,
`Processed 2 until ${getObjectIdFromDate('2021-04-11T00:00:00.000Z')}`,
`Processed 3 until ${getObjectIdFromDate('2021-04-12T00:00:00.000Z')}`,
`Processed 4 until ${getObjectIdFromDate('2021-04-13T00:00:00.000Z')}`,
`Processed 6 until ${getObjectIdFromDate('2021-04-14T00:00:00.000Z')}`,
`Processed 6 until ${getObjectIdFromDate('2021-04-15T00:00:00.000Z')}`,
`Processed 6 until ${getObjectIdFromDate('2021-04-16T00:00:00.000Z')}`,
// 7,8,9,10 are on the same day, but exceed the batch size of 3
`Processed 9 until ${projectIds[9]}`, `Processed 9 until ${projectIds[9]}`,
...overlappingPartStdErr,
'Done.', 'Done.',
'',
]) ])
} }

View file

@ -110,7 +110,7 @@ describe('RegenerateDuplicateReferralIds', function () {
let { stderr: stdErr, stdout: stdOut } = result let { stderr: stdErr, stdout: stdOut } = result
stdErr = stdErr.split('\n').filter(filterOutput) stdErr = stdErr.split('\n').filter(filterOutput)
stdOut = stdOut.split('\n').filter(filterOutput) stdOut = stdOut.split('\n').filter(filterOutput)
expect(stdErr).to.deep.equal([ expect(stdErr).to.include.members([
`Completed batch ending ${firstBatch[BATCH_SIZE - 1]}`, `Completed batch ending ${firstBatch[BATCH_SIZE - 1]}`,
`Completed batch ending ${secondBatch[BATCH_SIZE - 1]}`, `Completed batch ending ${secondBatch[BATCH_SIZE - 1]}`,
`Completed batch ending ${thirdBatch[BATCH_SIZE - 1]}`, `Completed batch ending ${thirdBatch[BATCH_SIZE - 1]}`,
@ -118,7 +118,7 @@ describe('RegenerateDuplicateReferralIds', function () {
'Done.', 'Done.',
'', '',
]) ])
expect(stdOut.filter(filterOutput)).to.deep.equal([ expect(stdOut.filter(filterOutput)).to.include.members([
// only duplicates // only duplicates
`Running update on batch with ids ${JSON.stringify(firstBatch)}`, `Running update on batch with ids ${JSON.stringify(firstBatch)}`,
'Got duplicates from looking at batch.', 'Got duplicates from looking at batch.',