Merge pull request #18850 from overleaf/ac-promisify-resync-subscriptions

Promisify resync_subscriptions

GitOrigin-RevId: c23a99683a22104815525b179d4e46d3ad568f94
This commit is contained in:
Jimmy Domagala-Tang 2024-06-25 11:30:18 -04:00 committed by Copybot
parent 86e2abc90b
commit 50b19c7793

View file

@ -1,11 +1,12 @@
const { Subscription } = require('../../app/src/models/Subscription') const { Subscription } = require('../../app/src/models/Subscription')
const RecurlyWrapper = require('../../app/src/Features/Subscription/RecurlyWrapper') const RecurlyWrapper = require('../../app/src/Features/Subscription/RecurlyWrapper')
const SubscriptionUpdater = require('../../app/src/Features/Subscription/SubscriptionUpdater') const SubscriptionUpdater = require('../../app/src/Features/Subscription/SubscriptionUpdater')
const async = require('async')
const minimist = require('minimist') const minimist = require('minimist')
const { setTimeout } = require('node:timers/promises')
// make sure all `allMismatchReasons` are displayed in the output // make sure all `allMismatchReasons` are displayed in the output
const util = require('util') const util = require('util')
const pLimit = require('p-limit')
util.inspect.defaultOptions.maxArrayLength = null util.inspect.defaultOptions.maxArrayLength = null
const ScriptLogger = { const ScriptLogger = {
@ -59,104 +60,109 @@ const ScriptLogger = {
}, },
} }
const slowCallback = callback => setTimeout(callback, 80) const handleSyncSubscriptionError = async (subscription, error) => {
const handleSyncSubscriptionError = (subscription, error, callback) => {
console.warn(`Errors with subscription id=${subscription._id}:`, error) console.warn(`Errors with subscription id=${subscription._id}:`, error)
if (typeof error === 'string' && error.match(/429$/)) { if (typeof error === 'string' && error.match(/429$/)) {
return setTimeout(callback, 1000 * 60 * 5) await setTimeout(1000 * 60 * 5)
return
} }
if (typeof error === 'string' && error.match(/5\d\d$/)) { if (typeof error === 'string' && error.match(/5\d\d$/)) {
return setTimeout(() => { await setTimeout(1000 * 60)
syncSubscription(subscription, callback) await syncSubscription(subscription)
}, 1000 * 60) return
} }
slowCallback(callback) await setTimeout(80)
} }
const syncSubscription = (subscription, callback) => { const syncSubscription = async subscription => {
RecurlyWrapper.getSubscription( let recurlySubscription
subscription.recurlySubscription_id, try {
(error, recurlySubscription) => { recurlySubscription = await RecurlyWrapper.promises.getSubscription(
if (error) { subscription.recurlySubscription_id
return handleSyncSubscriptionError(subscription, error, callback) )
} } catch (error) {
await handleSyncSubscriptionError(subscription, error)
return
}
ScriptLogger.recordMismatch(subscription, recurlySubscription) ScriptLogger.recordMismatch(subscription, recurlySubscription)
if (!COMMIT) { if (!COMMIT) {
return callback() return
} }
SubscriptionUpdater.updateSubscriptionFromRecurly( try {
recurlySubscription, await SubscriptionUpdater.promises.updateSubscriptionFromRecurly(
subscription, recurlySubscription,
{}, subscription,
error => { {}
if (error) { )
return handleSyncSubscriptionError(subscription, error, callback) } catch (error) {
} await handleSyncSubscriptionError(subscription, error)
slowCallback(callback) return
} }
) await setTimeout(80)
} }
const syncSubscriptions = async subscriptions => {
const limit = pLimit(ASYNC_LIMIT)
return await Promise.all(
subscriptions.map(subscription =>
limit(() => syncSubscription(subscription))
)
) )
} }
const syncSubscriptions = (subscriptions, callback) => { const loopForSubscriptions = async skipInitial => {
async.eachLimit(subscriptions, ASYNC_LIMIT, syncSubscription, callback) let skip = skipInitial
}
const loopForSubscriptions = (skip, callback) => { // iterate while there are more subscriptions to fetch
Subscription.find({ while (true) {
recurlySubscription_id: { $exists: true, $ne: '' }, const subscriptions = await Subscription.find({
}) recurlySubscription_id: { $exists: true, $ne: '' },
.sort('_id')
.skip(skip)
.limit(FETCH_LIMIT)
.exec((error, subscriptions) => {
if (error) {
return callback(error)
}
if (subscriptions.length === 0) {
console.warn('DONE')
return callback()
}
syncSubscriptions(subscriptions, error => {
if (error) {
return callback(error)
}
ScriptLogger.checkedSubscriptionsCount += subscriptions.length
retryCounter = 0
ScriptLogger.printProgress()
ScriptLogger.printSummary()
loopForSubscriptions(
MONGO_SKIP + ScriptLogger.checkedSubscriptionsCount,
callback
)
})
}) })
.sort('_id')
.skip(skip)
.limit(FETCH_LIMIT)
.exec()
if (subscriptions.length === 0) {
console.warn('DONE')
return
}
await syncSubscriptions(subscriptions)
ScriptLogger.checkedSubscriptionsCount += subscriptions.length
retryCounter = 0
ScriptLogger.printProgress()
ScriptLogger.printSummary()
skip += FETCH_LIMIT
}
} }
let retryCounter = 0 let retryCounter = 0
const run = () => const run = async () => {
loopForSubscriptions( while (true) {
MONGO_SKIP + ScriptLogger.checkedSubscriptionsCount, try {
error => { await loopForSubscriptions(
if (error) { MONGO_SKIP + ScriptLogger.checkedSubscriptionsCount
if (retryCounter < 3) { )
console.error(error) break
retryCounter += 1 } catch (error) {
console.warn(`RETRYING IN 60 SECONDS. (${retryCounter}/3)`) if (retryCounter < 3) {
return setTimeout(run, 60000) console.error(error)
} retryCounter += 1
console.warn(`RETRYING IN 60 SECONDS. (${retryCounter}/3)`)
await setTimeout(60000)
} else {
console.error('Failed after 3 retries')
throw error throw error
} }
process.exit()
} }
) }
}
let FETCH_LIMIT, ASYNC_LIMIT, COMMIT, MONGO_SKIP let FETCH_LIMIT, ASYNC_LIMIT, COMMIT, MONGO_SKIP
const setup = () => { const setup = () => {
@ -174,4 +180,6 @@ const setup = () => {
} }
setup() setup()
run() run().then(() => {
process.exit()
})