Add new scheduled-jobs queue dedicated to delayed jobs (#5496)

* Add new scheduled-jobs queue dedicated to delayed jobs

* Extract createScheduledJob and enqueue to a dedicated QueueHandler

* Queues cleanup & refactoring (review suggestions)

GitOrigin-RevId: f7b9380388430e878def83cd44e7d086d0fb92ad
This commit is contained in:
Alexandre Bourdin 2021-10-25 14:07:45 +02:00 committed by Copybot
parent 43c381e4d5
commit 2037adf808
9 changed files with 130 additions and 101 deletions

View file

@ -8,9 +8,13 @@ const _ = require('lodash')
const { expressify } = require('../../util/promises')
const { logger } = require('logger-sharelatex')
const analyticsEventsQueue = Queues.getAnalyticsEventsQueue()
const analyticsEditingSessionsQueue = Queues.getAnalyticsEditingSessionsQueue()
const analyticsUserPropertiesQueue = Queues.getAnalyticsUserPropertiesQueue()
const analyticsEventsQueue = Queues.getQueue('analytics-events')
const analyticsEditingSessionsQueue = Queues.getQueue(
'analytics-editing-sessions'
)
const analyticsUserPropertiesQueue = Queues.getQueue(
'analytics-user-properties'
)
const ONE_MINUTE_MS = 60 * 1000
@ -24,12 +28,14 @@ function identifyUser(userId, analyticsId, isNewUser) {
return
}
Metrics.analyticsQueue.inc({ status: 'adding', event_type: 'identify' })
analyticsEventsQueue
.add(
'identify',
{ userId, analyticsId, isNewUser, createdAt: new Date() },
{ delay: ONE_MINUTE_MS }
)
Queues.createScheduledJob(
'analytics-events',
{
name: 'identify',
data: { userId, analyticsId, isNewUser, createdAt: new Date() },
},
ONE_MINUTE_MS
)
.then(() => {
Metrics.analyticsQueue.inc({ status: 'added', event_type: 'identify' })
})

View file

@ -18,7 +18,7 @@ const Queues = require('../../infrastructure/Queues')
* Enqueue a job for refreshing features for the given user
*/
async function scheduleRefreshFeatures(userId, reason) {
const queue = Queues.getRefreshFeaturesQueue()
const queue = Queues.getQueue('refresh-features')
await queue.add({ userId, reason })
}

View file

@ -6,9 +6,10 @@ const UserGetter = require('./UserGetter')
const ONE_DAY_MS = 24 * 60 * 60 * 1000
async function scheduleOnboardingEmail(user) {
await Queues.getOnboardingEmailsQueue().add(
{ userId: user._id },
{ delay: ONE_DAY_MS }
await Queues.createScheduledJob(
'emails-onboarding',
{ data: { userId: user._id } },
ONE_DAY_MS
)
}

View file

@ -8,9 +8,10 @@ const AnalyticsManager = require('../Analytics/AnalyticsManager')
const ONE_DAY_MS = 24 * 60 * 60 * 1000
async function schedulePostRegistrationAnalytics(user) {
await Queues.getPostRegistrationAnalyticsQueue().add(
{ userId: user._id },
{ delay: ONE_DAY_MS }
await Queues.createScheduledJob(
'post-registration-analytics',
{ data: { userId: user._id } },
ONE_DAY_MS
)
}

View file

@ -9,19 +9,32 @@ function start() {
return
}
const onboardingEmailsQueue = Queues.getOnboardingEmailsQueue()
const scheduledJobsQueue = Queues.getQueue('scheduled-jobs')
scheduledJobsQueue.process(async job => {
const { queueName, name, data, options } = job.data
const queue = Queues.getQueue(queueName)
if (name) {
await queue.add(name, data || {}, options || {})
} else {
await queue.add(data || {}, options || {})
}
})
const onboardingEmailsQueue = Queues.getQueue('emails-onboarding')
onboardingEmailsQueue.process(async job => {
const { userId } = job.data
await UserOnboardingEmailManager.sendOnboardingEmail(userId)
})
const postRegistrationAnalyticsQueue = Queues.getPostRegistrationAnalyticsQueue()
const postRegistrationAnalyticsQueue = Queues.getQueue(
'post-registration-analytics'
)
postRegistrationAnalyticsQueue.process(async job => {
const { userId } = job.data
await UserPostRegistrationAnalyticsManager.postRegistrationAnalytics(userId)
})
const refreshFeaturesQueue = Queues.getRefreshFeaturesQueue()
const refreshFeaturesQueue = Queues.getQueue('refresh-features')
refreshFeaturesQueue.process(async job => {
const { userId, reason } = job.data
await FeaturesUpdater.promises.refreshFeatures(userId, reason)

View file

@ -7,40 +7,25 @@ const Settings = require('@overleaf/settings')
const MAX_COMPLETED_JOBS_RETAINED = 10000
const MAX_FAILED_JOBS_RETAINED = 50000
const QUEUES_JOB_OPTIONS = {
'analytics-events': {},
'analytics-editing-sessions': {},
'analytics-user-properties': {},
'refresh-features': {
attempts: 3,
},
'emails-onboarding': {},
'post-registration-analytics': {},
'scheduled-jobs': {
attempts: 1,
},
}
const queues = {}
function getAnalyticsEventsQueue() {
if (Settings.analytics.enabled) {
return getOrCreateQueue('analytics-events')
}
}
function getAnalyticsEditingSessionsQueue() {
if (Settings.analytics.enabled) {
return getOrCreateQueue('analytics-editing-sessions')
}
}
function getAnalyticsUserPropertiesQueue() {
if (Settings.analytics.enabled) {
return getOrCreateQueue('analytics-user-properties')
}
}
function getRefreshFeaturesQueue() {
return getOrCreateQueue('refresh-features', { attempts: 3 })
}
function getOnboardingEmailsQueue() {
return getOrCreateQueue('emails-onboarding')
}
function getPostRegistrationAnalyticsQueue() {
return getOrCreateQueue('post-registration-analytics')
}
function getOrCreateQueue(queueName, jobOptions = {}) {
function getQueue(queueName) {
if (!queues[queueName]) {
const jobOptions = QUEUES_JOB_OPTIONS[queueName] || {}
queues[queueName] = new Queue(queueName, {
// this configuration is duplicated in /services/analytics/app/js/Queues.js
// and needs to be manually kept in sync whenever modified
@ -60,11 +45,16 @@ function getOrCreateQueue(queueName, jobOptions = {}) {
return queues[queueName]
}
module.exports = {
getAnalyticsEventsQueue,
getAnalyticsEditingSessionsQueue,
getAnalyticsUserPropertiesQueue,
getRefreshFeaturesQueue,
getOnboardingEmailsQueue,
getPostRegistrationAnalyticsQueue,
async function createScheduledJob(queueName, { name, data, options }, delay) {
await getQueue('scheduled-jobs').add(
{ queueName, name, data, options },
{
delay,
}
)
}
module.exports = {
getQueue,
createScheduledJob,
}

View file

@ -36,18 +36,21 @@ describe('AnalyticsManager', function () {
}
const self = this
this.Queues = {
getAnalyticsEventsQueue: () => {
return self.analyticsEventsQueue
},
getAnalyticsEditingSessionsQueue: () => {
return self.analyticsEditingSessionQueue
},
getOnboardingEmailsQueue: () => {
return self.onboardingEmailsQueue
},
getAnalyticsUserPropertiesQueue: () => {
return self.analyticsUserPropertiesQueue
getQueue: queueName => {
switch (queueName) {
case 'analytics-events':
return self.analyticsEventsQueue
case 'analytics-editing-sessions':
return self.analyticsEditingSessionQueue
case 'emails-onboarding':
return self.onboardingEmailsQueue
case 'analytics-user-properties':
return self.analyticsUserPropertiesQueue
default:
throw new Error('Unexpected queue name')
}
},
createScheduledJob: sinon.stub().resolves(),
}
this.backgroundRequest = sinon.stub().yields()
this.request = sinon.stub().yields()
@ -66,18 +69,18 @@ describe('AnalyticsManager', function () {
it('user is smoke test user', function () {
this.Settings.smokeTest = { userId: this.fakeUserId }
this.AnalyticsManager.identifyUser(this.fakeUserId, '')
sinon.assert.notCalled(this.analyticsEventsQueue.add)
sinon.assert.notCalled(this.Queues.createScheduledJob)
})
it('analytics service is disabled', function () {
this.Settings.analytics.enabled = false
this.AnalyticsManager.identifyUser(this.fakeUserId, '')
sinon.assert.notCalled(this.analyticsEventsQueue.add)
sinon.assert.notCalled(this.Queues.createScheduledJob)
})
it('userId is missing', function () {
this.AnalyticsManager.identifyUser(undefined, this.analyticsId)
sinon.assert.notCalled(this.analyticsEventsQueue.add)
sinon.assert.notCalled(this.Queues.createScheduledJob)
})
it('analyticsId is missing', function () {
@ -85,7 +88,7 @@ describe('AnalyticsManager', function () {
new ObjectID(this.fakeUserId),
undefined
)
sinon.assert.notCalled(this.analyticsEventsQueue.add)
sinon.assert.notCalled(this.Queues.createScheduledJob)
})
it('analyticsId is not a valid UUID', function () {
@ -93,7 +96,7 @@ describe('AnalyticsManager', function () {
new ObjectID(this.fakeUserId),
this.fakeUserId
)
sinon.assert.notCalled(this.analyticsEventsQueue.add)
sinon.assert.notCalled(this.Queues.createScheduledJob)
})
it('userId and analyticsId are the same Mongo ID', function () {
@ -101,18 +104,28 @@ describe('AnalyticsManager', function () {
new ObjectID(this.fakeUserId),
new ObjectID(this.fakeUserId)
)
sinon.assert.notCalled(this.analyticsEventsQueue.add)
sinon.assert.notCalled(this.Queues.createScheduledJob)
})
})
describe('queues the appropriate message for', function () {
it('identifyUser', function () {
const analyticsId = 'bd101c4c-722f-4204-9e2d-8303e5d9c120'
this.AnalyticsManager.identifyUser(this.fakeUserId, analyticsId)
sinon.assert.calledWithMatch(this.analyticsEventsQueue.add, 'identify', {
userId: this.fakeUserId,
analyticsId,
})
this.AnalyticsManager.identifyUser(this.fakeUserId, analyticsId, true)
sinon.assert.calledWithMatch(
this.Queues.createScheduledJob,
'analytics-events',
{
name: 'identify',
data: {
userId: this.fakeUserId,
analyticsId,
isNewUser: true,
createdAt: sinon.match.date,
},
},
60000
)
})
it('recordEventForUser', async function () {
@ -154,14 +167,25 @@ describe('AnalyticsManager', function () {
beforeEach(function () {
this.userId = '123abc'
this.analyticsId = 'bccd308c-5d72-426e-a106-662e88557795'
const self = this
this.AnalyticsManager = SandboxedModule.require(MODULE_PATH, {
requires: {
'@overleaf/settings': {},
'../../infrastructure/Queues': {
getAnalyticsEventsQueue: () => {},
getAnalyticsEditingSessionsQueue: () => {},
getOnboardingEmailsQueue: () => {},
getAnalyticsUserPropertiesQueue: () => {},
getQueue: queueName => {
switch (queueName) {
case 'analytics-events':
return self.analyticsEventsQueue
case 'analytics-editing-sessions':
return self.analyticsEditingSessionQueue
case 'emails-onboarding':
return self.onboardingEmailsQueue
case 'analytics-user-properties':
return self.analyticsUserPropertiesQueue
default:
throw new Error('Unexpected queue name')
}
},
},
'./UserAnalyticsIdCache': {},
uuid: {

View file

@ -19,9 +19,7 @@ describe('UserOnboardingEmailManager', function () {
},
}
this.Queues = {
getOnboardingEmailsQueue: sinon
.stub()
.returns(this.onboardingEmailsQueue),
createScheduledJob: sinon.stub().resolves(),
}
this.UserGetter = {
promises: {
@ -60,9 +58,11 @@ describe('UserOnboardingEmailManager', function () {
await this.UserOnboardingEmailManager.scheduleOnboardingEmail({
_id: this.fakeUserId,
})
expect(this.onboardingEmailsQueue.add).to.have.been.calledWith(
{ userId: this.fakeUserId },
{ delay: 24 * 60 * 60 * 1000 }
sinon.assert.calledWith(
this.Queues.createScheduledJob,
'emails-onboarding',
{ data: { userId: this.fakeUserId } },
24 * 60 * 60 * 1000
)
})
})

View file

@ -11,16 +11,8 @@ const MODULE_PATH = path.join(
describe('UserPostRegistrationAnalyticsManager', function () {
beforeEach(function () {
this.fakeUserId = '123abc'
this.postRegistrationAnalyticsQueue = {
add: sinon.stub().resolves(),
process: callback => {
this.queueProcessFunction = callback
},
}
this.Queues = {
getPostRegistrationAnalyticsQueue: sinon
.stub()
.returns(this.postRegistrationAnalyticsQueue),
createScheduledJob: sinon.stub().resolves(),
}
this.UserGetter = {
promises: {
@ -58,9 +50,11 @@ describe('UserPostRegistrationAnalyticsManager', function () {
_id: this.fakeUserId,
}
)
expect(this.postRegistrationAnalyticsQueue.add).to.have.been.calledWith(
{ userId: this.fakeUserId },
{ delay: 24 * 60 * 60 * 1000 }
sinon.assert.calledWith(
this.Queues.createScheduledJob,
'post-registration-analytics',
{ data: { userId: this.fakeUserId } },
24 * 60 * 60 * 1000
)
})
})