2022-12-20 11:09:28 -05:00
|
|
|
const GoogleBigQueryHelper = require('./helpers/GoogleBigQueryHelper')
|
|
|
|
const { Subscription } = require('../../app/src/models/Subscription')
|
|
|
|
const { waitForDb } = require('../../app/src/infrastructure/mongodb')
|
|
|
|
const AnalyticsManager = require('../../app/src/Features/Analytics/AnalyticsManager')
|
|
|
|
const {
|
|
|
|
DeletedSubscription,
|
|
|
|
} = require('../../app/src/models/DeletedSubscription')
|
|
|
|
const minimist = require('minimist')
|
|
|
|
const _ = require('lodash')
|
2023-06-26 11:27:27 -04:00
|
|
|
const { ObjectId } = require('mongodb')
|
2022-12-20 11:09:28 -05:00
|
|
|
|
|
|
|
let FETCH_LIMIT, COMMIT, VERBOSE
|
|
|
|
|
|
|
|
async function main() {
|
|
|
|
await waitForDb()
|
|
|
|
|
|
|
|
console.log('## Syncing group subscription memberships...')
|
|
|
|
|
|
|
|
const subscriptionsCount = await Subscription.count({ groupPlan: true })
|
|
|
|
const deletedSubscriptionsCount = await DeletedSubscription.count({
|
|
|
|
'subscription.groupPlan': true,
|
|
|
|
})
|
|
|
|
|
|
|
|
console.log(
|
|
|
|
`## Going to synchronize ${subscriptionsCount} subscriptions and ${deletedSubscriptionsCount} deleted subscriptions`
|
|
|
|
)
|
|
|
|
|
|
|
|
await checkActiveSubscriptions()
|
|
|
|
await checkDeletedSubscriptions()
|
|
|
|
}
|
|
|
|
|
|
|
|
async function checkActiveSubscriptions() {
|
|
|
|
let totalSubscriptionsChecked = 0
|
|
|
|
let subscriptions
|
2023-06-26 09:49:34 -04:00
|
|
|
const processedSubscriptionIds = new Set()
|
2022-12-20 11:09:28 -05:00
|
|
|
do {
|
|
|
|
subscriptions = await Subscription.find(
|
|
|
|
{ groupPlan: true },
|
|
|
|
{ recurlySubscription_id: 1, member_ids: 1 }
|
|
|
|
)
|
|
|
|
.sort('_id')
|
|
|
|
.skip(totalSubscriptionsChecked)
|
|
|
|
.limit(FETCH_LIMIT)
|
|
|
|
.lean()
|
|
|
|
|
|
|
|
if (subscriptions.length) {
|
|
|
|
const groupIds = subscriptions.map(sub => sub._id)
|
2024-03-25 06:51:40 -04:00
|
|
|
const bigQueryGroupMemberships =
|
|
|
|
await fetchBigQueryMembershipStatuses(groupIds)
|
2022-12-20 11:09:28 -05:00
|
|
|
const membershipsByGroupId = _.groupBy(
|
|
|
|
bigQueryGroupMemberships,
|
|
|
|
'group_id'
|
|
|
|
)
|
|
|
|
|
|
|
|
for (const subscription of subscriptions) {
|
2023-06-26 09:49:34 -04:00
|
|
|
const subscriptionId = subscription._id.toString()
|
|
|
|
if (!processedSubscriptionIds.has(subscriptionId)) {
|
|
|
|
await checkSubscriptionMemberships(
|
|
|
|
subscription,
|
|
|
|
membershipsByGroupId[subscriptionId] || []
|
|
|
|
)
|
|
|
|
processedSubscriptionIds.add(subscriptionId)
|
|
|
|
}
|
2022-12-20 11:09:28 -05:00
|
|
|
}
|
|
|
|
totalSubscriptionsChecked += subscriptions.length
|
|
|
|
}
|
|
|
|
} while (subscriptions.length > 0)
|
|
|
|
}
|
|
|
|
|
|
|
|
async function checkDeletedSubscriptions() {
|
|
|
|
let totalDeletedSubscriptionsChecked = 0
|
|
|
|
let deletedSubscriptions
|
2023-06-26 09:49:34 -04:00
|
|
|
const processedSubscriptionIds = new Set()
|
2022-12-20 11:09:28 -05:00
|
|
|
do {
|
|
|
|
deletedSubscriptions = (
|
|
|
|
await DeletedSubscription.find(
|
|
|
|
{ 'subscription.groupPlan': true },
|
|
|
|
{ subscription: 1 }
|
|
|
|
)
|
|
|
|
.sort('deletedAt')
|
|
|
|
.skip(totalDeletedSubscriptionsChecked)
|
|
|
|
.limit(FETCH_LIMIT)
|
|
|
|
).map(sub => sub.toObject().subscription)
|
|
|
|
|
|
|
|
if (deletedSubscriptions.length) {
|
|
|
|
const groupIds = deletedSubscriptions.map(sub => sub._id.toString())
|
2024-03-25 06:51:40 -04:00
|
|
|
const bigQueryGroupMemberships =
|
|
|
|
await fetchBigQueryMembershipStatuses(groupIds)
|
2022-12-20 11:09:28 -05:00
|
|
|
|
2023-06-26 09:49:34 -04:00
|
|
|
const membershipsByGroupId = _.groupBy(
|
|
|
|
bigQueryGroupMemberships,
|
|
|
|
'group_id'
|
|
|
|
)
|
|
|
|
|
2022-12-20 11:09:28 -05:00
|
|
|
for (const deletedSubscription of deletedSubscriptions) {
|
2023-06-26 09:49:34 -04:00
|
|
|
const subscriptionId = deletedSubscription._id.toString()
|
|
|
|
if (!processedSubscriptionIds.has(subscriptionId)) {
|
|
|
|
await checkDeletedSubscriptionMemberships(
|
|
|
|
deletedSubscription,
|
|
|
|
membershipsByGroupId[subscriptionId] || []
|
|
|
|
)
|
|
|
|
processedSubscriptionIds.add(subscriptionId)
|
|
|
|
}
|
2022-12-20 11:09:28 -05:00
|
|
|
}
|
|
|
|
totalDeletedSubscriptionsChecked += deletedSubscriptions.length
|
|
|
|
}
|
|
|
|
} while (deletedSubscriptions.length > 0)
|
|
|
|
}
|
|
|
|
|
2023-06-26 09:49:34 -04:00
|
|
|
async function checkSubscriptionMemberships(subscription, membershipStatuses) {
|
2022-12-20 11:09:28 -05:00
|
|
|
if (VERBOSE) {
|
|
|
|
console.log(
|
|
|
|
'\n###########################################################################################',
|
|
|
|
'\n# Subscription (mongo): ',
|
|
|
|
'\n# _id: \t\t\t\t',
|
|
|
|
subscription._id.toString(),
|
|
|
|
'\n# member_ids: \t\t\t',
|
2023-06-26 09:49:34 -04:00
|
|
|
subscription.member_ids.map(_id => _id.toString()),
|
2022-12-20 11:09:28 -05:00
|
|
|
'\n# recurlySubscription_id: \t',
|
|
|
|
subscription.recurlySubscription_id
|
|
|
|
)
|
|
|
|
console.log('#\n# Membership statuses found in BigQuery: ')
|
|
|
|
console.table(membershipStatuses)
|
|
|
|
}
|
|
|
|
// create missing `joined` events when membership status is missing
|
|
|
|
for (const memberId of subscription.member_ids) {
|
|
|
|
if (
|
|
|
|
!_.find(membershipStatuses, {
|
|
|
|
user_id: memberId.toString(),
|
|
|
|
is_member: true,
|
|
|
|
})
|
|
|
|
) {
|
2023-06-26 09:49:34 -04:00
|
|
|
await sendCorrectiveEvent(
|
|
|
|
memberId,
|
|
|
|
'group-subscription-joined',
|
|
|
|
subscription
|
|
|
|
)
|
2022-12-20 11:09:28 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
// create missing `left` events if user is not a member of the group anymore
|
|
|
|
for (const { user_id: userId, is_member: isMember } of membershipStatuses) {
|
|
|
|
if (
|
|
|
|
isMember &&
|
2023-06-26 09:49:34 -04:00
|
|
|
!subscription.member_ids.some(id => id.toString() === userId)
|
2022-12-20 11:09:28 -05:00
|
|
|
) {
|
2023-06-26 09:49:34 -04:00
|
|
|
await sendCorrectiveEvent(userId, 'group-subscription-left', subscription)
|
2022-12-20 11:09:28 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-26 09:49:34 -04:00
|
|
|
async function checkDeletedSubscriptionMemberships(
|
|
|
|
subscription,
|
|
|
|
membershipStatuses
|
|
|
|
) {
|
2022-12-20 11:09:28 -05:00
|
|
|
if (VERBOSE) {
|
|
|
|
console.log(
|
|
|
|
'\n###########################################################################################',
|
|
|
|
'\n# Deleted subscription (mongo): ',
|
|
|
|
'\n# _id: \t\t\t\t',
|
|
|
|
subscription._id.toString(),
|
|
|
|
'\n# member_ids: \t\t\t',
|
2023-06-26 09:49:34 -04:00
|
|
|
subscription.member_ids.map(_id => _id.toString()),
|
2022-12-20 11:09:28 -05:00
|
|
|
'\n# recurlySubscription_id: \t',
|
|
|
|
subscription.recurlySubscription_id
|
|
|
|
)
|
|
|
|
console.log('#\n# Membership statuses found in BigQuery: ')
|
|
|
|
console.table(membershipStatuses)
|
|
|
|
}
|
|
|
|
|
|
|
|
const updatedUserIds = new Set()
|
|
|
|
// create missing `left` events if user was a member of the group in BQ and status is not up-to-date
|
|
|
|
for (const memberId of subscription.member_ids.map(id => id.toString())) {
|
|
|
|
if (
|
|
|
|
_.find(membershipStatuses, {
|
|
|
|
user_id: memberId,
|
|
|
|
is_member: true,
|
|
|
|
})
|
|
|
|
) {
|
2023-06-26 09:49:34 -04:00
|
|
|
await sendCorrectiveEvent(
|
|
|
|
memberId,
|
|
|
|
'group-subscription-left',
|
|
|
|
subscription
|
|
|
|
)
|
2022-12-20 11:09:28 -05:00
|
|
|
updatedUserIds.add(memberId)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// for cases where the user has been removed from the subscription before it was deleted and status is not up-to-date
|
|
|
|
for (const { user_id: userId, is_member: isMember } of membershipStatuses) {
|
|
|
|
if (isMember && !updatedUserIds.has(userId)) {
|
2023-06-26 09:49:34 -04:00
|
|
|
await sendCorrectiveEvent(userId, 'group-subscription-left', subscription)
|
2022-12-20 11:09:28 -05:00
|
|
|
updatedUserIds.add(userId)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-26 09:49:34 -04:00
|
|
|
async function sendCorrectiveEvent(userId, event, subscription) {
|
2023-06-26 11:27:27 -04:00
|
|
|
if (!ObjectId.isValid(userId)) {
|
|
|
|
console.warn(`Skipping '${event}' for user ${userId}: invalid user ID`)
|
|
|
|
return
|
|
|
|
}
|
2022-12-20 11:09:28 -05:00
|
|
|
const segmentation = {
|
|
|
|
groupId: subscription._id.toString(),
|
|
|
|
subscriptionId: subscription.recurlySubscription_id,
|
|
|
|
source: 'sync',
|
|
|
|
}
|
|
|
|
if (COMMIT) {
|
|
|
|
console.log(
|
|
|
|
`Sending event '${event}' for user ${userId} with segmentation: ${JSON.stringify(
|
|
|
|
segmentation
|
|
|
|
)}`
|
|
|
|
)
|
2023-06-26 09:49:34 -04:00
|
|
|
await AnalyticsManager.recordEventForUser(userId, event, segmentation)
|
2022-12-20 11:09:28 -05:00
|
|
|
} else {
|
|
|
|
console.log(
|
|
|
|
`Dry run - would send event '${event}' for user ${userId} with segmentation: ${JSON.stringify(
|
|
|
|
segmentation
|
|
|
|
)}`
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async function fetchBigQueryMembershipStatuses(groupIds) {
|
|
|
|
const joinedGroupIds = groupIds.map(id => `"${id}"`).join(',')
|
|
|
|
const query = `\
|
2023-06-26 09:49:34 -04:00
|
|
|
WITH user_memberships AS (
|
2022-12-20 11:09:28 -05:00
|
|
|
SELECT
|
2023-06-26 09:49:34 -04:00
|
|
|
group_id,
|
|
|
|
COALESCE(user_aliases.user_id, ugm.user_id) AS user_id,
|
|
|
|
is_member,
|
|
|
|
ugm.created_at
|
|
|
|
FROM analytics.user_group_memberships ugm
|
|
|
|
LEFT JOIN analytics.user_aliases ON ugm.user_id = user_aliases.analytics_id
|
|
|
|
WHERE ugm.group_id IN (${joinedGroupIds})
|
|
|
|
),
|
|
|
|
ordered_status AS (
|
|
|
|
SELECT *,
|
2022-12-20 11:09:28 -05:00
|
|
|
ROW_NUMBER() OVER(PARTITION BY group_id, user_id ORDER BY created_at DESC) AS row_number
|
2023-06-26 09:49:34 -04:00
|
|
|
FROM user_memberships
|
2022-12-20 11:09:28 -05:00
|
|
|
)
|
2023-06-26 09:49:34 -04:00
|
|
|
SELECT group_id, user_id, is_member, created_at FROM ordered_status
|
2022-12-20 11:09:28 -05:00
|
|
|
WHERE row_number = 1;
|
|
|
|
`
|
|
|
|
|
|
|
|
return GoogleBigQueryHelper.query(query)
|
|
|
|
}
|
|
|
|
|
|
|
|
const setup = () => {
|
|
|
|
const argv = minimist(process.argv.slice(2))
|
|
|
|
FETCH_LIMIT = argv.fetch ? argv.fetch : 100
|
|
|
|
COMMIT = argv.commit !== undefined
|
|
|
|
VERBOSE = argv.debug !== undefined
|
|
|
|
if (!COMMIT) {
|
|
|
|
console.warn('Doing dry run without --commit')
|
|
|
|
}
|
|
|
|
if (VERBOSE) {
|
|
|
|
console.log('Running in verbose mode')
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
setup()
|
|
|
|
main()
|
|
|
|
.then(() => {
|
|
|
|
console.error('Done.')
|
|
|
|
process.exit(0)
|
|
|
|
})
|
|
|
|
.catch(error => {
|
|
|
|
console.error({ error })
|
|
|
|
process.exit(1)
|
|
|
|
})
|