mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
Merge pull request #21668 from overleaf/jpa-mongo-utils
[mongo-utils] create new home for batchedUpdate helper GitOrigin-RevId: 9f61c5e367a9f4cef63b5cc4c0bbbd3ef57c8ca8
This commit is contained in:
parent
a3d8caf87b
commit
252533b2fd
23 changed files with 96 additions and 355 deletions
38
package-lock.json
generated
38
package-lock.json
generated
|
@ -237,6 +237,23 @@
|
|||
"node": ">=6"
|
||||
}
|
||||
},
|
||||
"libraries/mongo-utils": {
|
||||
"name": "@overleaf/mongo-utils",
|
||||
"version": "0.0.1",
|
||||
"license": "AGPL-3.0-only",
|
||||
"dependencies": {
|
||||
"mongodb": "6.7.0",
|
||||
"mongodb-legacy": "6.1.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"chai": "^4.3.6",
|
||||
"mocha": "^10.2.0",
|
||||
"sandboxed-module": "^2.0.4",
|
||||
"sinon": "^9.2.4",
|
||||
"sinon-chai": "^3.7.0",
|
||||
"typescript": "^5.0.4"
|
||||
}
|
||||
},
|
||||
"libraries/o-error": {
|
||||
"name": "@overleaf/o-error",
|
||||
"version": "3.4.0",
|
||||
|
@ -8348,6 +8365,10 @@
|
|||
"resolved": "libraries/metrics",
|
||||
"link": true
|
||||
},
|
||||
"node_modules/@overleaf/mongo-utils": {
|
||||
"resolved": "libraries/mongo-utils",
|
||||
"link": true
|
||||
},
|
||||
"node_modules/@overleaf/notifications": {
|
||||
"resolved": "services/notifications",
|
||||
"link": true
|
||||
|
@ -39624,6 +39645,7 @@
|
|||
"@overleaf/access-token-encryptor": "*",
|
||||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/mongo-utils": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
"@overleaf/promise-utils": "*",
|
||||
"@overleaf/settings": "*",
|
||||
|
@ -41331,6 +41353,7 @@
|
|||
"@overleaf/fetch-utils": "*",
|
||||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/mongo-utils": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
"@overleaf/object-persistor": "*",
|
||||
"@overleaf/promise-utils": "*",
|
||||
|
@ -50168,6 +50191,7 @@
|
|||
"@overleaf/fetch-utils": "*",
|
||||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/mongo-utils": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
"@overleaf/promise-utils": "*",
|
||||
"@overleaf/settings": "*",
|
||||
|
@ -50259,6 +50283,19 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"@overleaf/mongo-utils": {
|
||||
"version": "file:libraries/mongo-utils",
|
||||
"requires": {
|
||||
"chai": "^4.3.6",
|
||||
"mocha": "^10.2.0",
|
||||
"mongodb": "6.7.0",
|
||||
"mongodb-legacy": "6.1.0",
|
||||
"sandboxed-module": "^2.0.4",
|
||||
"sinon": "^9.2.4",
|
||||
"sinon-chai": "^3.7.0",
|
||||
"typescript": "^5.0.4"
|
||||
}
|
||||
},
|
||||
"@overleaf/notifications": {
|
||||
"version": "file:services/notifications",
|
||||
"requires": {
|
||||
|
@ -50960,6 +50997,7 @@
|
|||
"@overleaf/fetch-utils": "*",
|
||||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/mongo-utils": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
"@overleaf/object-persistor": "*",
|
||||
"@overleaf/promise-utils": "*",
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import BatchedUpdateScript from '../scripts/helpers/batchedUpdate.mjs'
|
||||
const { batchedUpdate } = BatchedUpdateScript
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
|
||||
const tags = ['saas']
|
||||
|
||||
|
@ -10,7 +10,7 @@ const batchedUpdateOptions = {
|
|||
|
||||
const migrate = async () => {
|
||||
await batchedUpdate(
|
||||
'users',
|
||||
db.users,
|
||||
{ 'twoFactorAuthentication.secret': { $exists: true } },
|
||||
{ $unset: { twoFactorAuthentication: true } },
|
||||
null,
|
||||
|
@ -21,7 +21,7 @@ const migrate = async () => {
|
|||
|
||||
const rollback = async () => {
|
||||
await batchedUpdate(
|
||||
'users',
|
||||
db.users,
|
||||
{ 'twoFactorAuthentication.secretEncrypted': { $exists: true } },
|
||||
{ $unset: { twoFactorAuthentication: true } },
|
||||
null,
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
import BatchedUpdateScript from '../scripts/helpers/batchedUpdate.mjs'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateScript
|
||||
const tags = ['server-ce', 'server-pro', 'saas']
|
||||
|
||||
const migrate = async () => {
|
||||
await batchedUpdate(
|
||||
'users',
|
||||
db.users,
|
||||
{ 'features.templates': { $exists: true } },
|
||||
{ $unset: { 'features.templates': true } }
|
||||
)
|
||||
|
|
|
@ -79,6 +79,7 @@
|
|||
"@overleaf/fetch-utils": "*",
|
||||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/mongo-utils": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
"@overleaf/object-persistor": "*",
|
||||
"@overleaf/promise-utils": "*",
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { promiseMapWithLimit, promisify } from '@overleaf/promise-utils'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import _ from 'lodash'
|
||||
import { fileURLToPath } from 'node:url'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
const sleep = promisify(setTimeout)
|
||||
|
||||
async function main(options) {
|
||||
|
@ -24,7 +23,7 @@ async function main(options) {
|
|||
await letUserDoubleCheckInputs(options)
|
||||
|
||||
await batchedUpdate(
|
||||
'projects',
|
||||
db.projects,
|
||||
// array is not empty ~ array has one item
|
||||
{ 'deletedFiles.0': { $exists: true } },
|
||||
async projects => {
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { promiseMapWithLimit, promisify } from '@overleaf/promise-utils'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import { fileURLToPath } from 'node:url'
|
||||
import _ from 'lodash'
|
||||
|
||||
const sleep = promisify(setTimeout)
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
|
||||
async function main(options) {
|
||||
if (!options) {
|
||||
|
@ -23,7 +22,7 @@ async function main(options) {
|
|||
await letUserDoubleCheckInputs(options)
|
||||
|
||||
await batchedUpdate(
|
||||
'projects',
|
||||
db.projects,
|
||||
// array is not empty ~ array has one item
|
||||
{ 'deletedDocs.0': { $exists: true } },
|
||||
async projects => {
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { fileURLToPath } from 'node:url'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
|
||||
const DRY_RUN = !process.argv.includes('--dry-run=false')
|
||||
const LOG_EVERY_IN_S = parseInt(process.env.LOG_EVERY_IN_S, 10) || 5
|
||||
|
||||
|
@ -17,7 +15,7 @@ async function main(DRY_RUN) {
|
|||
}
|
||||
|
||||
await batchedUpdate(
|
||||
'docs',
|
||||
db.docs,
|
||||
{ rev: { $exists: false } },
|
||||
async docs => {
|
||||
if (!DRY_RUN) {
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
import NotificationsBuilder from '../app/src/Features/Notifications/NotificationsBuilder.js'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
const DRY_RUN = !process.argv.includes('--dry-run=false')
|
||||
|
||||
if (DRY_RUN) {
|
||||
|
@ -57,7 +56,7 @@ async function processBatch(groupSubscriptionsBatch) {
|
|||
}
|
||||
|
||||
async function main() {
|
||||
await batchedUpdate('subscriptions', { groupPlan: true }, processBatch, {
|
||||
await batchedUpdate(db.subscriptions, { groupPlan: true }, processBatch, {
|
||||
member_ids: 1,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
// @ts-check
|
||||
import '../app/src/models/User.js'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdateWithResultHandling } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { promiseMapWithLimit } from '@overleaf/promise-utils'
|
||||
import { getQueue } from '../app/src/infrastructure/Queues.js'
|
||||
import SubscriptionLocator from '../app/src/Features/Subscription/SubscriptionLocator.js'
|
||||
import PlansLocator from '../app/src/Features/Subscription/PlansLocator.js'
|
||||
import FeaturesHelper from '../app/src/Features/Subscription/FeaturesHelper.js'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
|
||||
const { batchedUpdateWithResultHandling } = BatchedUpdateModule
|
||||
|
||||
const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY, 10) || 10
|
||||
const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY || '10', 10)
|
||||
|
||||
const mixpanelSinkQueue = getQueue('analytics-mixpanel-sink')
|
||||
|
||||
|
@ -99,7 +99,7 @@ async function processBatch(_, users) {
|
|||
}
|
||||
|
||||
batchedUpdateWithResultHandling(
|
||||
'users',
|
||||
db.users,
|
||||
{
|
||||
$nor: [
|
||||
{ thirdPartyIdentifiers: { $exists: false } },
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// @ts-check
|
||||
import minimist from 'minimist'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
|
||||
const { batchedUpdateWithResultHandling } = BatchedUpdateModule
|
||||
import { batchedUpdateWithResultHandling } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
|
||||
const argv = minimist(process.argv.slice(2))
|
||||
const commit = argv.commit !== undefined
|
||||
|
@ -50,7 +50,7 @@ if (!commit) {
|
|||
}
|
||||
|
||||
batchedUpdateWithResultHandling(
|
||||
'projects',
|
||||
db.projects,
|
||||
{ imageName: null },
|
||||
{ $set: { imageName } }
|
||||
)
|
||||
|
|
|
@ -1,11 +1,9 @@
|
|||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import minimist from 'minimist'
|
||||
import CollaboratorsInviteHelper from '../app/src/Features/Collaborators/CollaboratorsInviteHelper.js'
|
||||
import { fileURLToPath } from 'node:url'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
|
||||
const argv = minimist(process.argv.slice(2), {
|
||||
boolean: ['dry-run', 'help'],
|
||||
default: {
|
||||
|
@ -19,7 +17,7 @@ async function addTokenHmacField(DRY_RUN) {
|
|||
const query = { tokenHmac: { $exists: false } }
|
||||
|
||||
await batchedUpdate(
|
||||
'projectInvites',
|
||||
db.projectInvites,
|
||||
query,
|
||||
async invites => {
|
||||
for (const invite of invites) {
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdateWithResultHandling } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { promiseMapWithLimit } from '@overleaf/promise-utils'
|
||||
import SubscriptionLocator from '../app/src/Features/Subscription/SubscriptionLocator.js'
|
||||
import PlansLocator from '../app/src/Features/Subscription/PlansLocator.js'
|
||||
import FeaturesHelper from '../app/src/Features/Subscription/FeaturesHelper.js'
|
||||
import AnalyticsManager from '../app/src/Features/Analytics/AnalyticsManager.js'
|
||||
|
||||
const { batchedUpdateWithResultHandling } = BatchedUpdateModule
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
|
||||
const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY, 10) || 10
|
||||
|
||||
|
@ -55,7 +54,7 @@ async function processBatch(users) {
|
|||
})
|
||||
}
|
||||
|
||||
batchedUpdateWithResultHandling('users', {}, processBatch, {
|
||||
batchedUpdateWithResultHandling(db.users, {}, processBatch, {
|
||||
_id: true,
|
||||
analyticsId: true,
|
||||
features: true,
|
||||
|
|
|
@ -1,11 +1,9 @@
|
|||
import _ from 'lodash'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { promiseMapWithLimit } from '@overleaf/promise-utils'
|
||||
import { fileURLToPath } from 'node:url'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
|
||||
const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY, 10) || 10
|
||||
|
||||
// $ node scripts/convert_archived_state.mjs FIRST,SECOND
|
||||
|
@ -14,7 +12,7 @@ async function main(STAGE) {
|
|||
for (const FIELD of ['archived', 'trashed']) {
|
||||
if (STAGE.includes('FIRST')) {
|
||||
await batchedUpdate(
|
||||
'projects',
|
||||
db.projects,
|
||||
{ [FIELD]: false },
|
||||
{
|
||||
$set: { [FIELD]: [] },
|
||||
|
@ -26,7 +24,7 @@ async function main(STAGE) {
|
|||
|
||||
if (STAGE.includes('SECOND')) {
|
||||
await batchedUpdate(
|
||||
'projects',
|
||||
db.projects,
|
||||
{ [FIELD]: true },
|
||||
async function performUpdate(nextBatch) {
|
||||
await promiseMapWithLimit(
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
import mongodb from 'mongodb-legacy'
|
||||
import { promiseMapWithLimit } from '@overleaf/promise-utils'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import ChatApiHandler from '../app/src/Features/Chat/ChatApiHandler.js'
|
||||
import DeleteOrphanedDataHelper from './delete_orphaned_data_helper.mjs'
|
||||
import { ensureMongoTimeout } from './helpers/env_variable_helper.mjs'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
const { ObjectId } = mongodb
|
||||
const { getHardDeletedProjectIds } = DeleteOrphanedDataHelper
|
||||
|
||||
|
@ -86,7 +86,7 @@ async function main() {
|
|||
_id: 1,
|
||||
project_id: 1,
|
||||
}
|
||||
await batchedUpdate('rooms', {}, processBatch, projection)
|
||||
await batchedUpdate(db.rooms, {}, processBatch, projection)
|
||||
console.log('Final')
|
||||
console.log(RESULT)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
const DRY_RUN = process.env.DRY_RUN !== 'false'
|
||||
|
||||
console.log({
|
||||
|
@ -55,7 +54,7 @@ async function main() {
|
|||
$exists: true,
|
||||
},
|
||||
}
|
||||
await batchedUpdate('subscriptions', query, processBatch, projection)
|
||||
await batchedUpdate(db.subscriptions, query, processBatch, projection)
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
|
@ -1,281 +0,0 @@
|
|||
// @ts-check
|
||||
import mongodb from 'mongodb-legacy'
|
||||
import {
|
||||
db,
|
||||
READ_PREFERENCE_SECONDARY,
|
||||
} from '../../app/src/infrastructure/mongodb.js'
|
||||
|
||||
const { ObjectId } = mongodb
|
||||
|
||||
const ONE_MONTH_IN_MS = 1000 * 60 * 60 * 24 * 31
|
||||
let ID_EDGE_PAST
|
||||
const ID_EDGE_FUTURE = objectIdFromMs(Date.now() + 1000)
|
||||
let BATCH_DESCENDING
|
||||
let BATCH_SIZE
|
||||
let VERBOSE_LOGGING
|
||||
let BATCH_RANGE_START
|
||||
let BATCH_RANGE_END
|
||||
let BATCH_MAX_TIME_SPAN_IN_MS
|
||||
|
||||
/**
|
||||
* @typedef {import("mongodb").Collection} Collection
|
||||
* @typedef {import("mongodb").Document} Document
|
||||
* @typedef {import("mongodb").FindOptions} FindOptions
|
||||
* @typedef {import("mongodb").UpdateFilter<Document>} UpdateDocument
|
||||
* @typedef {import("mongodb").ObjectId} ObjectId
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {Object} BatchedUpdateOptions
|
||||
* @property {string} [BATCH_DESCENDING]
|
||||
* @property {string} [BATCH_LAST_ID]
|
||||
* @property {string} [BATCH_MAX_TIME_SPAN_IN_MS]
|
||||
* @property {string} [BATCH_RANGE_END]
|
||||
* @property {string} [BATCH_RANGE_START]
|
||||
* @property {string} [BATCH_SIZE]
|
||||
* @property {string} [VERBOSE_LOGGING]
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {BatchedUpdateOptions} options
|
||||
*/
|
||||
function refreshGlobalOptionsForBatchedUpdate(options = {}) {
|
||||
options = Object.assign({}, options, process.env)
|
||||
|
||||
BATCH_DESCENDING = options.BATCH_DESCENDING === 'true'
|
||||
BATCH_SIZE = parseInt(options.BATCH_SIZE || '1000', 10) || 1000
|
||||
VERBOSE_LOGGING = options.VERBOSE_LOGGING === 'true'
|
||||
if (options.BATCH_LAST_ID) {
|
||||
BATCH_RANGE_START = new ObjectId(options.BATCH_LAST_ID)
|
||||
} else if (options.BATCH_RANGE_START) {
|
||||
BATCH_RANGE_START = new ObjectId(options.BATCH_RANGE_START)
|
||||
} else {
|
||||
if (BATCH_DESCENDING) {
|
||||
BATCH_RANGE_START = ID_EDGE_FUTURE
|
||||
} else {
|
||||
BATCH_RANGE_START = ID_EDGE_PAST
|
||||
}
|
||||
}
|
||||
BATCH_MAX_TIME_SPAN_IN_MS = parseInt(
|
||||
options.BATCH_MAX_TIME_SPAN_IN_MS || ONE_MONTH_IN_MS.toString(),
|
||||
10
|
||||
)
|
||||
if (options.BATCH_RANGE_END) {
|
||||
BATCH_RANGE_END = new ObjectId(options.BATCH_RANGE_END)
|
||||
} else {
|
||||
if (BATCH_DESCENDING) {
|
||||
BATCH_RANGE_END = ID_EDGE_PAST
|
||||
} else {
|
||||
BATCH_RANGE_END = ID_EDGE_FUTURE
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Collection} collection
|
||||
* @param {Document} query
|
||||
* @param {ObjectId} start
|
||||
* @param {ObjectId} end
|
||||
* @param {Document} projection
|
||||
* @param {FindOptions} findOptions
|
||||
* @return {Promise<Array<Document>>}
|
||||
*/
|
||||
async function getNextBatch(
|
||||
collection,
|
||||
query,
|
||||
start,
|
||||
end,
|
||||
projection,
|
||||
findOptions
|
||||
) {
|
||||
if (BATCH_DESCENDING) {
|
||||
query._id = {
|
||||
$gt: end,
|
||||
$lte: start,
|
||||
}
|
||||
} else {
|
||||
query._id = {
|
||||
$gt: start,
|
||||
$lte: end,
|
||||
}
|
||||
}
|
||||
return await collection
|
||||
.find(query, findOptions)
|
||||
.project(projection)
|
||||
.sort({ _id: BATCH_DESCENDING ? -1 : 1 })
|
||||
.limit(BATCH_SIZE)
|
||||
.toArray()
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Collection} collection
|
||||
* @param {Array<Document>} nextBatch
|
||||
* @param {UpdateDocument} update
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async function performUpdate(collection, nextBatch, update) {
|
||||
await collection.updateMany(
|
||||
{ _id: { $in: nextBatch.map(entry => entry._id) } },
|
||||
update
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {number} ms
|
||||
* @return {ObjectId}
|
||||
*/
|
||||
function objectIdFromMs(ms) {
|
||||
return ObjectId.createFromTime(ms / 1000)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {ObjectId} id
|
||||
* @return {number}
|
||||
*/
|
||||
function getMsFromObjectId(id) {
|
||||
return id.getTimestamp().getTime()
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {ObjectId} start
|
||||
* @return {ObjectId}
|
||||
*/
|
||||
function getNextEnd(start) {
|
||||
let end
|
||||
if (BATCH_DESCENDING) {
|
||||
end = objectIdFromMs(getMsFromObjectId(start) - BATCH_MAX_TIME_SPAN_IN_MS)
|
||||
if (getMsFromObjectId(end) <= getMsFromObjectId(BATCH_RANGE_END)) {
|
||||
end = BATCH_RANGE_END
|
||||
}
|
||||
} else {
|
||||
end = objectIdFromMs(getMsFromObjectId(start) + BATCH_MAX_TIME_SPAN_IN_MS)
|
||||
if (getMsFromObjectId(end) >= getMsFromObjectId(BATCH_RANGE_END)) {
|
||||
end = BATCH_RANGE_END
|
||||
}
|
||||
}
|
||||
return end
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Collection} collection
|
||||
* @return {Promise<ObjectId|null>}
|
||||
*/
|
||||
async function getIdEdgePast(collection) {
|
||||
const [first] = await collection
|
||||
.find({})
|
||||
.project({ _id: 1 })
|
||||
.sort({ _id: 1 })
|
||||
.limit(1)
|
||||
.toArray()
|
||||
if (!first) return null
|
||||
// Go one second further into the past in order to include the first entry via
|
||||
// first._id > ID_EDGE_PAST
|
||||
return objectIdFromMs(Math.max(0, getMsFromObjectId(first._id) - 1000))
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} collectionName
|
||||
* @param {Document} query
|
||||
* @param {UpdateDocument | ((batch: Array<Document>) => Promise<void>)} update
|
||||
* @param {Document} [projection]
|
||||
* @param {FindOptions} [findOptions]
|
||||
* @param {BatchedUpdateOptions} [batchedUpdateOptions]
|
||||
*/
|
||||
async function batchedUpdate(
|
||||
collectionName,
|
||||
query,
|
||||
update,
|
||||
projection,
|
||||
findOptions,
|
||||
batchedUpdateOptions
|
||||
) {
|
||||
const collection = db[collectionName]
|
||||
ID_EDGE_PAST = await getIdEdgePast(collection)
|
||||
if (!ID_EDGE_PAST) {
|
||||
console.warn(`The collection ${collectionName} appears to be empty.`)
|
||||
return 0
|
||||
}
|
||||
refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions)
|
||||
|
||||
findOptions = findOptions || {}
|
||||
findOptions.readPreference = READ_PREFERENCE_SECONDARY
|
||||
|
||||
projection = projection || { _id: 1 }
|
||||
let nextBatch
|
||||
let updated = 0
|
||||
let start = BATCH_RANGE_START
|
||||
|
||||
while (start !== BATCH_RANGE_END) {
|
||||
let end = getNextEnd(start)
|
||||
nextBatch = await getNextBatch(
|
||||
collection,
|
||||
query,
|
||||
start,
|
||||
end,
|
||||
projection,
|
||||
findOptions
|
||||
)
|
||||
if (nextBatch.length > 0) {
|
||||
end = nextBatch[nextBatch.length - 1]._id
|
||||
updated += nextBatch.length
|
||||
|
||||
if (VERBOSE_LOGGING) {
|
||||
console.log(
|
||||
`Running update on batch with ids ${JSON.stringify(
|
||||
nextBatch.map(entry => entry._id)
|
||||
)}`
|
||||
)
|
||||
} else {
|
||||
console.error(`Running update on batch ending ${end}`)
|
||||
}
|
||||
|
||||
if (typeof update === 'function') {
|
||||
await update(nextBatch)
|
||||
} else {
|
||||
await performUpdate(collection, nextBatch, update)
|
||||
}
|
||||
}
|
||||
console.error(`Completed batch ending ${end}`)
|
||||
start = end
|
||||
}
|
||||
return updated
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} collectionName
|
||||
* @param {Document} query
|
||||
* @param {UpdateDocument | ((batch: Array<Object>) => Promise<void>)} update
|
||||
* @param {Document} [projection]
|
||||
* @param {FindOptions} [findOptions]
|
||||
* @param {BatchedUpdateOptions} [batchedUpdateOptions]
|
||||
*/
|
||||
function batchedUpdateWithResultHandling(
|
||||
collectionName,
|
||||
query,
|
||||
update,
|
||||
projection,
|
||||
findOptions,
|
||||
batchedUpdateOptions
|
||||
) {
|
||||
batchedUpdate(
|
||||
collectionName,
|
||||
query,
|
||||
update,
|
||||
projection,
|
||||
findOptions,
|
||||
batchedUpdateOptions
|
||||
)
|
||||
.then(processed => {
|
||||
console.error({ processed })
|
||||
process.exit(0)
|
||||
})
|
||||
.catch(error => {
|
||||
console.error({ error })
|
||||
process.exit(1)
|
||||
})
|
||||
}
|
||||
|
||||
export default {
|
||||
batchedUpdate,
|
||||
batchedUpdateWithResultHandling,
|
||||
}
|
|
@ -1,11 +1,10 @@
|
|||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { promiseMapWithLimit, promisify } from '@overleaf/promise-utils'
|
||||
import { db, ObjectId } from '../app/src/infrastructure/mongodb.js'
|
||||
import _ from 'lodash'
|
||||
import { fileURLToPath } from 'node:url'
|
||||
|
||||
const sleep = promisify(setTimeout)
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
|
||||
async function main(options) {
|
||||
if (!options) {
|
||||
|
@ -50,7 +49,7 @@ async function main(options) {
|
|||
} else {
|
||||
if (!options.skipUsersMigration) {
|
||||
await batchedUpdate(
|
||||
'users',
|
||||
db.users,
|
||||
{ auditLog: { $exists: true } },
|
||||
async users => {
|
||||
await processUsersBatch(users, options)
|
||||
|
@ -63,7 +62,7 @@ async function main(options) {
|
|||
// users with an existing `auditLog` have been taken into consideration, leaving
|
||||
// some projects orphan. This batched update processes all remaining projects.
|
||||
await batchedUpdate(
|
||||
'projects',
|
||||
db.projects,
|
||||
{ auditLog: { $exists: true } },
|
||||
async projects => {
|
||||
await processProjectsBatch(projects, options)
|
||||
|
|
|
@ -4,9 +4,8 @@ import {
|
|||
} from '../app/src/infrastructure/mongodb.js'
|
||||
import { promiseMapWithLimit } from '@overleaf/promise-utils'
|
||||
import TokenGenerator from '../app/src/Features/TokenGenerator/TokenGenerator.js'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
const VERBOSE_LOGGING = process.env.VERBOSE_LOGGING === 'true'
|
||||
const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY, 10) || 10
|
||||
const BATCH_SIZE = parseInt(process.env.BATCH_SIZE, 10) || 100
|
||||
|
@ -92,7 +91,7 @@ async function processBatch(users) {
|
|||
|
||||
async function main() {
|
||||
await batchedUpdate(
|
||||
'users',
|
||||
db.users,
|
||||
{ referal_id: { $exists: true } },
|
||||
processBatch,
|
||||
{ _id: 1, referal_id: 1 }
|
||||
|
|
|
@ -2,7 +2,7 @@ import {
|
|||
db,
|
||||
READ_PREFERENCE_SECONDARY,
|
||||
} from '../app/src/infrastructure/mongodb.js'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import mongodb from 'mongodb-legacy'
|
||||
import minimist from 'minimist'
|
||||
import CollaboratorsHandler from '../app/src/Features/Collaborators/CollaboratorsHandler.js'
|
||||
|
@ -10,8 +10,6 @@ import { fileURLToPath } from 'node:url'
|
|||
|
||||
const { ObjectId } = mongodb
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
|
||||
const argv = minimist(process.argv.slice(2), {
|
||||
string: ['projects'],
|
||||
boolean: ['dry-run', 'help'],
|
||||
|
@ -79,7 +77,7 @@ async function fixProjectsWithInvalidTokenAccessRefsIds(
|
|||
}
|
||||
|
||||
await batchedUpdate(
|
||||
'projects',
|
||||
db.projects,
|
||||
query,
|
||||
async projects => {
|
||||
for (const project of projects) {
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import mongodb from 'mongodb-legacy'
|
||||
import fs from 'node:fs'
|
||||
import { fileURLToPath } from 'node:url'
|
||||
|
||||
const { ObjectId } = mongodb
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
const CHUNK_SIZE = 1000
|
||||
|
||||
// Function to chunk the array
|
||||
|
@ -28,7 +27,7 @@ async function main() {
|
|||
console.log(`preserving opt-outs of ${optedOutList.length} users`)
|
||||
// update all applicable user models
|
||||
await batchedUpdate(
|
||||
'users',
|
||||
db.users,
|
||||
{ 'writefull.enabled': false }, // and is false
|
||||
{ $set: { 'writefull.enabled': null } }
|
||||
)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
|
||||
const { batchedUpdate } = BatchedUpdateModule
|
||||
const oldImage = process.argv[2]
|
||||
const newImage = process.argv[3]
|
||||
|
||||
|
@ -34,7 +34,7 @@ if (!process.env.ALL_TEX_LIVE_DOCKER_IMAGES.split(',').includes(newImage)) {
|
|||
|
||||
try {
|
||||
await batchedUpdate(
|
||||
'projects',
|
||||
db.projects,
|
||||
{ imageName: oldImage },
|
||||
{ $set: { imageName: newImage } }
|
||||
)
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import BatchedUpdateModule from './helpers/batchedUpdate.mjs'
|
||||
|
||||
const { batchedUpdateWithResultHandling } = BatchedUpdateModule
|
||||
import { batchedUpdateWithResultHandling } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { db } from '../app/src/infrastructure/mongodb.js'
|
||||
|
||||
const MODEL_NAME = process.argv.pop()
|
||||
|
||||
|
@ -20,10 +19,10 @@ function processBatch(batch) {
|
|||
}
|
||||
|
||||
batchedUpdateWithResultHandling(
|
||||
Model.collection.name,
|
||||
db[Model.collection.name],
|
||||
{},
|
||||
async nextBatch => {
|
||||
await processBatch(nextBatch)
|
||||
processBatch(nextBatch)
|
||||
},
|
||||
{}
|
||||
{} // fetch the entire record
|
||||
)
|
||||
|
|
|
@ -24,7 +24,7 @@ describe('BatchedUpdateTests', function () {
|
|||
spawnSync(process.argv0, [
|
||||
'--input-type=module',
|
||||
'-e',
|
||||
'import BatchedUpdateModule from "./scripts/helpers/batchedUpdate.mjs"; BatchedUpdateModule.batchedUpdateWithResultHandling("systemmessages", { content: { $ne: "42" }}, { $set: { content: "42" } })',
|
||||
'import { batchedUpdateWithResultHandling } from "@overleaf/mongo-utils/batchedUpdate.js"; import { db } from "./app/src/infrastructure/mongodb.js"; batchedUpdateWithResultHandling(db.systemmessages, { content: { $ne: "42" }}, { $set: { content: "42" } })',
|
||||
])
|
||||
|
||||
await expect(
|
||||
|
@ -56,7 +56,7 @@ describe('BatchedUpdateTests', function () {
|
|||
[
|
||||
'--input-type=module',
|
||||
'-e',
|
||||
'import BatchedUpdateModule from "./scripts/helpers/batchedUpdate.mjs"; BatchedUpdateModule.batchedUpdateWithResultHandling("systemmessages", { content: { $ne: "42" }}, { $set: { content: "42" } })',
|
||||
'import { batchedUpdateWithResultHandling } from "@overleaf/mongo-utils/batchedUpdate.js"; import { db } from "./app/src/infrastructure/mongodb.js"; batchedUpdateWithResultHandling(db.systemmessages, { content: { $ne: "42" }}, { $set: { content: "42" } })',
|
||||
],
|
||||
{ encoding: 'utf-8' }
|
||||
)
|
||||
|
@ -87,7 +87,7 @@ describe('BatchedUpdateTests', function () {
|
|||
[
|
||||
'--input-type=module',
|
||||
'-e',
|
||||
'import BatchedUpdateModule from "./scripts/helpers/batchedUpdate.mjs"; BatchedUpdateModule.batchedUpdateWithResultHandling("systemmessages", { content: { $ne: "42" }}, { $set: { content: "42" } })',
|
||||
'import { batchedUpdateWithResultHandling } from "@overleaf/mongo-utils/batchedUpdate.js"; import { db } from "./app/src/infrastructure/mongodb.js"; batchedUpdateWithResultHandling(db.systemmessages, { content: { $ne: "42" }}, { $set: { content: "42" } })',
|
||||
],
|
||||
{
|
||||
encoding: 'utf-8',
|
||||
|
|
Loading…
Reference in a new issue