mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
Merge pull request #4760 from overleaf/em-explicitly-start-queue-workers
Start queue workers explicitly GitOrigin-RevId: 0f8b710e0f1c0d64efa04f46fec269fae53609b2
This commit is contained in:
parent
cb8a24162b
commit
0654805cbb
7 changed files with 149 additions and 223 deletions
|
@ -31,6 +31,7 @@ https.globalAgent.maxSockets = Settings.limits.httpsGlobalAgentMaxSockets
|
|||
metrics.memory.monitor(logger)
|
||||
|
||||
const Server = require('./app/src/infrastructure/Server')
|
||||
const QueueWorkers = require('./app/src/infrastructure/QueueWorkers')
|
||||
const mongodb = require('./app/src/infrastructure/mongodb')
|
||||
const mongoose = require('./app/src/infrastructure/Mongoose')
|
||||
|
||||
|
@ -60,6 +61,7 @@ if (!module.parent) {
|
|||
// wait until the process is ready before monitoring the event loop
|
||||
metrics.event_loop.monitor(logger)
|
||||
})
|
||||
QueueWorkers.start()
|
||||
})
|
||||
.catch(err => {
|
||||
logger.fatal({ err }, 'Cannot connect to mongo. Exiting.')
|
||||
|
|
|
@ -7,7 +7,7 @@ const UserDeleter = require('./UserDeleter')
|
|||
const UserGetter = require('./UserGetter')
|
||||
const UserUpdater = require('./UserUpdater')
|
||||
const Analytics = require('../Analytics/AnalyticsManager')
|
||||
const UserOnboardingEmailQueueManager = require('./UserOnboardingEmailManager')
|
||||
const UserOnboardingEmailManager = require('./UserOnboardingEmailManager')
|
||||
const UserPostRegistrationAnalyticsManager = require('./UserPostRegistrationAnalyticsManager')
|
||||
const OError = require('@overleaf/o-error')
|
||||
|
||||
|
@ -89,7 +89,7 @@ async function createNewUser(attributes, options = {}) {
|
|||
|
||||
if (Features.hasFeature('saas')) {
|
||||
try {
|
||||
await UserOnboardingEmailQueueManager.scheduleOnboardingEmail(user)
|
||||
await UserOnboardingEmailManager.scheduleOnboardingEmail(user)
|
||||
await UserPostRegistrationAnalyticsManager.schedulePostRegistrationAnalytics(
|
||||
user
|
||||
)
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
const Features = require('../../infrastructure/Features')
|
||||
const Queues = require('../../infrastructure/Queues')
|
||||
const EmailHandler = require('../Email/EmailHandler')
|
||||
const UserUpdater = require('./UserUpdater')
|
||||
|
@ -6,24 +5,15 @@ const UserGetter = require('./UserGetter')
|
|||
|
||||
const ONE_DAY_MS = 24 * 60 * 60 * 1000
|
||||
|
||||
class UserOnboardingEmailManager {
|
||||
constructor() {
|
||||
this.queue = Queues.getOnboardingEmailsQueue()
|
||||
this.queue.process(async job => {
|
||||
const { userId } = job.data
|
||||
await this._sendOnboardingEmail(userId)
|
||||
})
|
||||
}
|
||||
|
||||
async scheduleOnboardingEmail(user) {
|
||||
await this.queue.add({ userId: user._id }, { delay: ONE_DAY_MS })
|
||||
}
|
||||
|
||||
async _sendOnboardingEmail(userId) {
|
||||
const user = await UserGetter.promises.getUser(
|
||||
{ _id: userId },
|
||||
{ email: 1 }
|
||||
async function scheduleOnboardingEmail(user) {
|
||||
await Queues.getOnboardingEmailsQueue().add(
|
||||
{ userId: user._id },
|
||||
{ delay: ONE_DAY_MS }
|
||||
)
|
||||
}
|
||||
|
||||
async function sendOnboardingEmail(userId) {
|
||||
const user = await UserGetter.promises.getUser({ _id: userId }, { email: 1 })
|
||||
if (user) {
|
||||
await EmailHandler.promises.sendEmail('userOnboardingEmail', {
|
||||
to: user.email,
|
||||
|
@ -32,13 +22,6 @@ class UserOnboardingEmailManager {
|
|||
$set: { onboardingEmailSentAt: new Date() },
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class NoopManager {
|
||||
async scheduleOnboardingEmail() {}
|
||||
}
|
||||
|
||||
module.exports = Features.hasFeature('saas')
|
||||
? new UserOnboardingEmailManager()
|
||||
: new NoopManager()
|
||||
module.exports = { scheduleOnboardingEmail, sendOnboardingEmail }
|
||||
|
|
|
@ -4,22 +4,14 @@ const {
|
|||
promises: InstitutionsAPIPromises,
|
||||
} = require('../Institutions/InstitutionsAPI')
|
||||
const AnalyticsManager = require('../Analytics/AnalyticsManager')
|
||||
const Features = require('../../infrastructure/Features')
|
||||
|
||||
const ONE_DAY_MS = 24 * 60 * 60 * 1000
|
||||
|
||||
class UserPostRegistrationAnalyticsManager {
|
||||
constructor() {
|
||||
this.queue = Queues.getPostRegistrationAnalyticsQueue()
|
||||
this.queue.process(async job => {
|
||||
const { userId } = job.data
|
||||
await postRegistrationAnalytics(userId)
|
||||
})
|
||||
}
|
||||
|
||||
async schedulePostRegistrationAnalytics(user) {
|
||||
await this.queue.add({ userId: user._id }, { delay: ONE_DAY_MS })
|
||||
}
|
||||
async function schedulePostRegistrationAnalytics(user) {
|
||||
await Queues.getPostRegistrationAnalyticsQueue().add(
|
||||
{ userId: user._id },
|
||||
{ delay: ONE_DAY_MS }
|
||||
)
|
||||
}
|
||||
|
||||
async function postRegistrationAnalytics(userId) {
|
||||
|
@ -48,10 +40,7 @@ async function checkAffiliations(userId) {
|
|||
}
|
||||
}
|
||||
|
||||
class NoopManager {
|
||||
async schedulePostRegistrationAnalytics() {}
|
||||
module.exports = {
|
||||
schedulePostRegistrationAnalytics,
|
||||
postRegistrationAnalytics,
|
||||
}
|
||||
|
||||
module.exports = Features.hasFeature('saas')
|
||||
? new UserPostRegistrationAnalyticsManager()
|
||||
: new NoopManager()
|
||||
|
|
24
services/web/app/src/infrastructure/QueueWorkers.js
Normal file
24
services/web/app/src/infrastructure/QueueWorkers.js
Normal file
|
@ -0,0 +1,24 @@
|
|||
const Features = require('./Features')
|
||||
const Queues = require('./Queues')
|
||||
const UserOnboardingEmailManager = require('../Features/User/UserOnboardingEmailManager')
|
||||
const UserPostRegistrationAnalyticsManager = require('../Features/User/UserPostRegistrationAnalyticsManager')
|
||||
|
||||
function start() {
|
||||
if (!Features.hasFeature('saas')) {
|
||||
return
|
||||
}
|
||||
|
||||
const onboardingEmailsQueue = Queues.getOnboardingEmailsQueue()
|
||||
onboardingEmailsQueue.process(async job => {
|
||||
const { userId } = job.data
|
||||
await UserOnboardingEmailManager.sendOnboardingEmail(userId)
|
||||
})
|
||||
|
||||
const postRegistrationAnalyticsQueue = Queues.getPostRegistrationAnalyticsQueue()
|
||||
postRegistrationAnalyticsQueue.process(async job => {
|
||||
const { userId } = job.data
|
||||
await UserPostRegistrationAnalyticsManager.postRegistrationAnalytics(userId)
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = { start }
|
|
@ -25,12 +25,15 @@ describe('UserOnboardingEmailManager', function () {
|
|||
}
|
||||
this.UserGetter = {
|
||||
promises: {
|
||||
getUser: sinon.stub().resolves({
|
||||
_id: this.fakeUserId,
|
||||
email: this.fakeUserEmail,
|
||||
}),
|
||||
getUser: sinon.stub().resolves(null),
|
||||
},
|
||||
}
|
||||
this.UserGetter.promises.getUser
|
||||
.withArgs({ _id: this.fakeUserId })
|
||||
.resolves({
|
||||
_id: this.fakeUserId,
|
||||
email: this.fakeUserEmail,
|
||||
})
|
||||
this.EmailHandler = {
|
||||
promises: {
|
||||
sendEmail: sinon.stub().resolves(),
|
||||
|
@ -41,93 +44,50 @@ describe('UserOnboardingEmailManager', function () {
|
|||
updateUser: sinon.stub().resolves(),
|
||||
},
|
||||
}
|
||||
this.Features = {
|
||||
hasFeature: sinon.stub(),
|
||||
}
|
||||
this.request = sinon.stub().yields()
|
||||
|
||||
this.init = isSAAS => {
|
||||
this.Features.hasFeature.withArgs('saas').returns(isSAAS)
|
||||
this.UserOnboardingEmailManager = SandboxedModule.require(MODULE_PATH, {
|
||||
globals: {
|
||||
console: console,
|
||||
},
|
||||
requires: {
|
||||
'../../infrastructure/Features': this.Features,
|
||||
'../../infrastructure/Queues': this.Queues,
|
||||
'../Email/EmailHandler': this.EmailHandler,
|
||||
'./UserGetter': this.UserGetter,
|
||||
'./UserUpdater': this.UserUpdater,
|
||||
},
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
describe('in Server CE/Pro', function () {
|
||||
beforeEach(function () {
|
||||
this.init(false)
|
||||
})
|
||||
|
||||
it('should not create any queue', function () {
|
||||
expect(this.Queues.getOnboardingEmailsQueue).to.not.have.been.called
|
||||
})
|
||||
it('should not schedule any email', function () {
|
||||
this.UserOnboardingEmailManager.scheduleOnboardingEmail({
|
||||
describe('scheduleOnboardingEmail', function () {
|
||||
it('should schedule delayed job on queue', async function () {
|
||||
await this.UserOnboardingEmailManager.scheduleOnboardingEmail({
|
||||
_id: this.fakeUserId,
|
||||
})
|
||||
expect(this.onboardingEmailsQueue.add).to.not.have.been.called
|
||||
})
|
||||
})
|
||||
|
||||
describe('schedule email in SAAS', function () {
|
||||
beforeEach(function () {
|
||||
this.init(true)
|
||||
})
|
||||
|
||||
it('should schedule delayed job on queue', function () {
|
||||
this.UserOnboardingEmailManager.scheduleOnboardingEmail({
|
||||
_id: this.fakeUserId,
|
||||
})
|
||||
sinon.assert.calledWith(
|
||||
this.onboardingEmailsQueue.add,
|
||||
expect(this.onboardingEmailsQueue.add).to.have.been.calledWith(
|
||||
{ userId: this.fakeUserId },
|
||||
{ delay: 24 * 60 * 60 * 1000 }
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
it('queue process callback should send onboarding email and update user', async function () {
|
||||
await this.queueProcessFunction({ data: { userId: this.fakeUserId } })
|
||||
sinon.assert.calledWith(
|
||||
this.UserGetter.promises.getUser,
|
||||
{ _id: this.fakeUserId },
|
||||
{ email: 1 }
|
||||
)
|
||||
sinon.assert.calledWith(
|
||||
this.EmailHandler.promises.sendEmail,
|
||||
describe('sendOnboardingEmail', function () {
|
||||
it('should send onboarding email and update user', async function () {
|
||||
await this.UserOnboardingEmailManager.sendOnboardingEmail(this.fakeUserId)
|
||||
expect(this.EmailHandler.promises.sendEmail).to.have.been.calledWith(
|
||||
'userOnboardingEmail',
|
||||
{
|
||||
to: this.fakeUserEmail,
|
||||
}
|
||||
)
|
||||
sinon.assert.calledWith(
|
||||
this.UserUpdater.promises.updateUser,
|
||||
expect(this.UserUpdater.promises.updateUser).to.have.been.calledWith(
|
||||
this.fakeUserId,
|
||||
{
|
||||
$set: { onboardingEmailSentAt: sinon.match.date },
|
||||
}
|
||||
{ $set: { onboardingEmailSentAt: sinon.match.date } }
|
||||
)
|
||||
})
|
||||
|
||||
it('queue process callback should stop if user is not found', async function () {
|
||||
this.UserGetter.promises.getUser = sinon.stub().resolves()
|
||||
await this.queueProcessFunction({ data: { userId: 'deleted-user' } })
|
||||
sinon.assert.calledWith(
|
||||
this.UserGetter.promises.getUser,
|
||||
{ _id: 'deleted-user' },
|
||||
{ email: 1 }
|
||||
)
|
||||
sinon.assert.notCalled(this.EmailHandler.promises.sendEmail)
|
||||
sinon.assert.notCalled(this.UserUpdater.promises.updateUser)
|
||||
it('should stop if user is not found', async function () {
|
||||
await this.UserOnboardingEmailManager.sendOnboardingEmail({
|
||||
data: { userId: 'deleted-user' },
|
||||
})
|
||||
expect(this.EmailHandler.promises.sendEmail).not.to.have.been.called
|
||||
expect(this.UserUpdater.promises.updateUser).not.to.have.been.called
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
@ -24,9 +24,12 @@ describe('UserPostRegistrationAnalyticsManager', function () {
|
|||
}
|
||||
this.UserGetter = {
|
||||
promises: {
|
||||
getUser: sinon.stub().resolves({ _id: this.fakeUserId }),
|
||||
getUser: sinon.stub().resolves(),
|
||||
},
|
||||
}
|
||||
this.UserGetter.promises.getUser
|
||||
.withArgs({ _id: this.fakeUserId })
|
||||
.resolves({ _id: this.fakeUserId })
|
||||
this.InstitutionsAPI = {
|
||||
promises: {
|
||||
getUserAffiliations: sinon.stub().resolves([]),
|
||||
|
@ -35,16 +38,10 @@ describe('UserPostRegistrationAnalyticsManager', function () {
|
|||
this.AnalyticsManager = {
|
||||
setUserProperty: sinon.stub().resolves(),
|
||||
}
|
||||
this.Features = {
|
||||
hasFeature: sinon.stub().returns(true),
|
||||
}
|
||||
this.init = isSAAS => {
|
||||
this.Features.hasFeature.withArgs('saas').returns(isSAAS)
|
||||
this.UserPostRegistrationAnalyticsManager = SandboxedModule.require(
|
||||
MODULE_PATH,
|
||||
{
|
||||
requires: {
|
||||
'../../infrastructure/Features': this.Features,
|
||||
'../../infrastructure/Queues': this.Queues,
|
||||
'./UserGetter': this.UserGetter,
|
||||
'../Institutions/InstitutionsAPI': this.InstitutionsAPI,
|
||||
|
@ -52,54 +49,30 @@ describe('UserPostRegistrationAnalyticsManager', function () {
|
|||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
describe('in Server CE/Pro', function () {
|
||||
beforeEach(function () {
|
||||
this.init(false)
|
||||
})
|
||||
|
||||
it('should schedule delayed job on queue', function () {
|
||||
this.UserPostRegistrationAnalyticsManager.schedulePostRegistrationAnalytics(
|
||||
{ _id: this.fakeUserId }
|
||||
)
|
||||
expect(this.Queues.getPostRegistrationAnalyticsQueue).to.not.have.been
|
||||
.called
|
||||
expect(this.postRegistrationAnalyticsQueue.add).to.not.have.been.called
|
||||
})
|
||||
})
|
||||
|
||||
describe('in SAAS', function () {
|
||||
beforeEach(function () {
|
||||
this.init(true)
|
||||
})
|
||||
describe('schedule jobs in SAAS', function () {
|
||||
it('should schedule delayed job on queue', function () {
|
||||
this.UserPostRegistrationAnalyticsManager.schedulePostRegistrationAnalytics(
|
||||
describe('schedulePostRegistrationAnalytics', function () {
|
||||
it('should schedule delayed job on queue', async function () {
|
||||
await this.UserPostRegistrationAnalyticsManager.schedulePostRegistrationAnalytics(
|
||||
{
|
||||
_id: this.fakeUserId,
|
||||
}
|
||||
)
|
||||
sinon.assert.calledWithMatch(
|
||||
this.postRegistrationAnalyticsQueue.add,
|
||||
expect(this.postRegistrationAnalyticsQueue.add).to.have.been.calledWith(
|
||||
{ userId: this.fakeUserId },
|
||||
{ delay: 24 * 60 * 60 * 1000 }
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe('process jobs', function () {
|
||||
describe('postRegistrationAnalytics', function () {
|
||||
it('stops without errors if user is not found', async function () {
|
||||
this.UserGetter.promises.getUser.resolves(null)
|
||||
await this.queueProcessFunction({ data: { userId: this.fakeUserId } })
|
||||
sinon.assert.calledWith(this.UserGetter.promises.getUser, {
|
||||
_id: this.fakeUserId,
|
||||
})
|
||||
sinon.assert.notCalled(
|
||||
this.InstitutionsAPI.promises.getUserAffiliations
|
||||
await this.UserPostRegistrationAnalyticsManager.postRegistrationAnalytics(
|
||||
'not-a-user'
|
||||
)
|
||||
sinon.assert.notCalled(this.AnalyticsManager.setUserProperty)
|
||||
expect(this.InstitutionsAPI.promises.getUserAffiliations).not.to.have.been
|
||||
.called
|
||||
expect(this.AnalyticsManager.setUserProperty).not.to.have.been.called
|
||||
})
|
||||
|
||||
it('sets user property if user has commons account affiliationd', async function () {
|
||||
|
@ -116,16 +89,10 @@ describe('UserPostRegistrationAnalyticsManager', function () {
|
|||
},
|
||||
},
|
||||
])
|
||||
await this.queueProcessFunction({ data: { userId: this.fakeUserId } })
|
||||
sinon.assert.calledWith(this.UserGetter.promises.getUser, {
|
||||
_id: this.fakeUserId,
|
||||
})
|
||||
sinon.assert.calledWith(
|
||||
this.InstitutionsAPI.promises.getUserAffiliations,
|
||||
await this.UserPostRegistrationAnalyticsManager.postRegistrationAnalytics(
|
||||
this.fakeUserId
|
||||
)
|
||||
sinon.assert.calledWith(
|
||||
this.AnalyticsManager.setUserProperty,
|
||||
expect(this.AnalyticsManager.setUserProperty).to.have.been.calledWith(
|
||||
this.fakeUserId,
|
||||
'registered-from-commons-account',
|
||||
true
|
||||
|
@ -140,9 +107,10 @@ describe('UserPostRegistrationAnalyticsManager', function () {
|
|||
},
|
||||
},
|
||||
])
|
||||
await this.queueProcessFunction({ data: { userId: this.fakeUserId } })
|
||||
sinon.assert.notCalled(this.AnalyticsManager.setUserProperty)
|
||||
})
|
||||
await this.UserPostRegistrationAnalyticsManager.postRegistrationAnalytics(
|
||||
this.fakeUserId
|
||||
)
|
||||
expect(this.AnalyticsManager.setUserProperty).not.to.have.been.called
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
Loading…
Reference in a new issue