Merge pull request #9099 from overleaf/jpa-web-graceful-shutdown

[web] introduce graceful shutdown

GitOrigin-RevId: f42793a96f1e0304c57a855241bffa32bb291864
This commit is contained in:
Jakob Ackermann 2022-08-04 09:16:38 +01:00 committed by Copybot
parent ab4a01f15b
commit e5e6be99f8
16 changed files with 273 additions and 35 deletions

View file

@ -38,6 +38,9 @@ const Server = require('./app/src/infrastructure/Server')
const QueueWorkers = require('./app/src/infrastructure/QueueWorkers') const QueueWorkers = require('./app/src/infrastructure/QueueWorkers')
const mongodb = require('./app/src/infrastructure/mongodb') const mongodb = require('./app/src/infrastructure/mongodb')
const mongoose = require('./app/src/infrastructure/Mongoose') const mongoose = require('./app/src/infrastructure/Mongoose')
const {
triggerGracefulShutdown,
} = require('./app/src/infrastructure/GracefulShutdown')
if (Settings.catchErrors) { if (Settings.catchErrors) {
process.removeAllListeners('uncaughtException') process.removeAllListeners('uncaughtException')
@ -77,8 +80,7 @@ if (!module.parent) {
// handle SIGTERM for graceful shutdown in kubernetes // handle SIGTERM for graceful shutdown in kubernetes
process.on('SIGTERM', function (signal) { process.on('SIGTERM', function (signal) {
logger.warn({ signal }, 'received signal, shutting down') triggerGracefulShutdown(Server.server, signal)
Settings.shuttingDown = true
}) })
module.exports = Server.server module.exports = Server.server

View file

@ -24,6 +24,9 @@ const TpdsUpdateSender = require('../ThirdPartyDataStore/TpdsUpdateSender')
const TpdsProjectFlusher = require('../ThirdPartyDataStore/TpdsProjectFlusher') const TpdsProjectFlusher = require('../ThirdPartyDataStore/TpdsProjectFlusher')
const EditorRealTimeController = require('../Editor/EditorRealTimeController') const EditorRealTimeController = require('../Editor/EditorRealTimeController')
const SystemMessageManager = require('../SystemMessages/SystemMessageManager') const SystemMessageManager = require('../SystemMessages/SystemMessageManager')
const {
addOptionalCleanupHandlerAfterDrainingConnections,
} = require('../../infrastructure/GracefulShutdown')
const oneMinInMs = 60 * 1000 const oneMinInMs = 60 * 1000
@ -46,10 +49,15 @@ function updateOpenConnetionsMetrics() {
'open_connections.https', 'open_connections.https',
_.size(__guard__(require('https').globalAgent, x4 => x4.sockets)) _.size(__guard__(require('https').globalAgent, x4 => x4.sockets))
) )
return setTimeout(updateOpenConnetionsMetrics, oneMinInMs)
} }
setTimeout(updateOpenConnetionsMetrics, oneMinInMs) const intervalHandle = setInterval(updateOpenConnetionsMetrics, oneMinInMs)
addOptionalCleanupHandlerAfterDrainingConnections(
'collect connection metrics',
() => {
clearInterval(intervalHandle)
}
)
const AdminController = { const AdminController = {
_sendDisconnectAllUsersMessage: delay => { _sendDisconnectAllUsersMessage: delay => {

View file

@ -11,6 +11,9 @@
*/ */
let SystemMessageManager let SystemMessageManager
const { SystemMessage } = require('../../models/SystemMessage') const { SystemMessage } = require('../../models/SystemMessage')
const {
addRequiredCleanupHandlerBeforeDrainingConnections,
} = require('../../infrastructure/GracefulShutdown')
module.exports = SystemMessageManager = { module.exports = SystemMessageManager = {
getMessages(callback) { getMessages(callback) {
@ -53,4 +56,14 @@ module.exports = SystemMessageManager = {
const CACHE_TIMEOUT = 10 * 1000 * (Math.random() + 2) // 20-30 seconds const CACHE_TIMEOUT = 10 * 1000 * (Math.random() + 2) // 20-30 seconds
SystemMessageManager.refreshCache() SystemMessageManager.refreshCache()
setInterval(() => SystemMessageManager.refreshCache(), CACHE_TIMEOUT) const intervalHandle = setInterval(
() => SystemMessageManager.refreshCache(),
CACHE_TIMEOUT
)
addRequiredCleanupHandlerBeforeDrainingConnections(
'update system messages',
() => {
clearInterval(intervalHandle)
}
)

View file

@ -10,9 +10,14 @@ const FileTypeManager = require('../Uploads/FileTypeManager')
const CooldownManager = require('../Cooldown/CooldownManager') const CooldownManager = require('../Cooldown/CooldownManager')
const Errors = require('../Errors/Errors') const Errors = require('../Errors/Errors')
const Modules = require('../../infrastructure/Modules') const Modules = require('../../infrastructure/Modules')
const {
BackgroundTaskTracker,
} = require('../../infrastructure/GracefulShutdown')
const ROOT_DOC_TIMEOUT_LENGTH = 30 * 1000 const ROOT_DOC_TIMEOUT_LENGTH = 30 * 1000
const rootDocResets = new BackgroundTaskTracker('root doc resets')
function newUpdate(userId, projectName, path, updateRequest, source, callback) { function newUpdate(userId, projectName, path, updateRequest, source, callback) {
getOrCreateProject(userId, projectName, (err, project) => { getOrCreateProject(userId, projectName, (err, project) => {
if (err) { if (err) {
@ -113,8 +118,11 @@ function getOrCreateProject(userId, projectName, callback) {
// have a crack at setting the root doc after a while, on creation // have a crack at setting the root doc after a while, on creation
// we won't have it yet, but should have been sent it it within 30 // we won't have it yet, but should have been sent it it within 30
// seconds // seconds
rootDocResets.add()
setTimeout(() => { setTimeout(() => {
ProjectRootDocManager.setRootDocAutomatically(project._id) ProjectRootDocManager.setRootDocAutomatically(project._id, () => {
rootDocResets.done()
})
}, ROOT_DOC_TIMEOUT_LENGTH) }, ROOT_DOC_TIMEOUT_LENGTH)
callback(err, project) callback(err, project)
} }

View file

@ -16,6 +16,9 @@ const {
canRedirectToAdminDomain, canRedirectToAdminDomain,
hasAdminAccess, hasAdminAccess,
} = require('../Features/Helpers/AdminAuthorizationHelper') } = require('../Features/Helpers/AdminAuthorizationHelper')
const {
addOptionalCleanupHandlerAfterDrainingConnections,
} = require('./GracefulShutdown')
let webpackManifest let webpackManifest
switch (process.env.NODE_ENV) { switch (process.env.NODE_ENV) {
@ -23,11 +26,21 @@ switch (process.env.NODE_ENV) {
// Only load webpack manifest file in production. // Only load webpack manifest file in production.
webpackManifest = require(`../../../public/manifest.json`) webpackManifest = require(`../../../public/manifest.json`)
break break
case 'development': case 'development': {
// In dev, fetch the manifest from the webpack container. // In dev, fetch the manifest from the webpack container.
loadManifestFromWebpackDevServer() loadManifestFromWebpackDevServer()
setInterval(loadManifestFromWebpackDevServer, 10 * 1000) const intervalHandle = setInterval(
loadManifestFromWebpackDevServer,
10 * 1000
)
addOptionalCleanupHandlerAfterDrainingConnections(
'refresh webpack manifest',
() => {
clearInterval(intervalHandle)
}
)
break break
}
default: default:
// In ci, all entries are undefined. // In ci, all entries are undefined.
webpackManifest = {} webpackManifest = {}

View file

@ -0,0 +1,154 @@
/*
Graceful shutdown sequence:
- Stop background tasks that depend on the DB, like redis queues
- Stop processing new HTTP requests
- Wait for background tasks that depend on the DB, like polling that was
triggered by HTTP requests
- Drain/Close db connections
- Cleanup other background tasks, like metrics collectors
- By now the node app should exit on its own.
*/
const logger = require('@overleaf/logger')
const OError = require('@overleaf/o-error')
const Settings = require('@overleaf/settings')
const Metrics = require('@overleaf/metrics')
const sleep = require('util').promisify(setTimeout)
const optionalCleanupHandlersBeforeStoppingTraffic = []
const requiredCleanupHandlersBeforeDrainingConnections = []
const optionalCleanupHandlersAfterDrainingConnections = []
const connectionDrainer = []
function addConnectionDrainer(label, handler) {
connectionDrainer.push({ label, handler })
}
function addOptionalCleanupHandlerBeforeStoppingTraffic(label, handler) {
optionalCleanupHandlersBeforeStoppingTraffic.push({ label, handler })
}
function addRequiredCleanupHandlerBeforeDrainingConnections(label, handler) {
requiredCleanupHandlersBeforeDrainingConnections.push({ label, handler })
}
function addOptionalCleanupHandlerAfterDrainingConnections(label, handler) {
optionalCleanupHandlersAfterDrainingConnections.push({ label, handler })
}
async function runHandlers(stage, handlers, logOnly) {
logger.info({ stage }, 'graceful shutdown: run handlers')
for (const { label, handler } of handlers) {
try {
await handler()
} catch (e) {
const err = OError.tag(e, 'handler failed', { stage, label })
if (logOnly) {
logger.err({ err }, 'graceful shutdown: incomplete cleanup')
} else {
throw err
}
}
}
}
/**
* @param {import('net').Server} server
* @param {string} signal
*/
async function gracefulShutdown(server, signal) {
logger.warn({ signal }, 'graceful shutdown: started shutdown sequence')
Settings.shuttingDown = true
await runHandlers(
'optionalBeforeStoppingTraffic',
optionalCleanupHandlersBeforeStoppingTraffic,
true
)
await sleep(Settings.gracefulShutdownDelayInMs)
try {
await new Promise((resolve, reject) => {
logger.warn({}, 'graceful shutdown: closing http server')
server.close(err => {
if (err) {
reject(OError.tag(err, 'http.Server.close failed'))
} else {
resolve()
}
})
})
} catch (err) {
throw OError.tag(err, 'stop traffic')
}
await runHandlers(
'requiredBeforeDrainingConnections',
requiredCleanupHandlersBeforeDrainingConnections
)
try {
await runHandlers('connectionDrainer', connectionDrainer)
await runHandlers(
'optionalAfterDrainingConnections',
optionalCleanupHandlersAfterDrainingConnections.concat([
{ label: 'metrics module', handler: () => Metrics.close() },
{
label: 'logger module',
handler: () => logger.logLevelChecker?.stop(),
},
])
)
} catch (err) {
logger.err(
{ err },
'graceful shutdown: failed after stopping traffic, exiting'
)
// wait for logs to flush
await sleep(1000)
process.exit(1)
return
}
logger.info({}, 'graceful shutdown: ready to exit')
}
function triggerGracefulShutdown(server, signal) {
gracefulShutdown(server, signal).catch(err => {
logger.err(
{ err },
'graceful shutdown: incomplete cleanup, waiting for kill'
)
})
}
class BackgroundTaskTracker {
constructor(label) {
// Do not leak any handles, just record the number of pending jobs.
// In case we miss the cleanup of one job, the worst thing that can happen
// is that we do not stop web "gracefully" before k8s kills it forcefully.
this.pendingBackgroundTasks = 0
addRequiredCleanupHandlerBeforeDrainingConnections(label, async () => {
while (this.pendingBackgroundTasks > 0) {
await sleep(100) // try again in 100ms.
}
})
}
add() {
this.pendingBackgroundTasks++
}
done() {
this.pendingBackgroundTasks--
}
}
module.exports = {
BackgroundTaskTracker,
addConnectionDrainer,
addOptionalCleanupHandlerBeforeStoppingTraffic,
addOptionalCleanupHandlerAfterDrainingConnections,
addRequiredCleanupHandlerBeforeDrainingConnections,
triggerGracefulShutdown,
gracefulShutdown,
}

View file

@ -1,6 +1,7 @@
const mongoose = require('mongoose') const mongoose = require('mongoose')
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const logger = require('@overleaf/logger') const logger = require('@overleaf/logger')
const { addConnectionDrainer } = require('./GracefulShutdown')
if ( if (
typeof global.beforeEach === 'function' && typeof global.beforeEach === 'function' &&
@ -25,6 +26,10 @@ const connectionPromise = mongoose.connect(
Settings.mongo.options Settings.mongo.options
) )
) )
addConnectionDrainer('mongoose', async () => {
await connectionPromise
await mongoose.disconnect()
})
mongoose.connection.on('connected', () => mongoose.connection.on('connected', () =>
logger.debug('mongoose default connection open') logger.debug('mongoose default connection open')

View file

@ -3,6 +3,10 @@ const Queues = require('./Queues')
const UserOnboardingEmailManager = require('../Features/User/UserOnboardingEmailManager') const UserOnboardingEmailManager = require('../Features/User/UserOnboardingEmailManager')
const UserPostRegistrationAnalyticsManager = require('../Features/User/UserPostRegistrationAnalyticsManager') const UserPostRegistrationAnalyticsManager = require('../Features/User/UserPostRegistrationAnalyticsManager')
const FeaturesUpdater = require('../Features/Subscription/FeaturesUpdater') const FeaturesUpdater = require('../Features/Subscription/FeaturesUpdater')
const {
addOptionalCleanupHandlerBeforeStoppingTraffic,
addRequiredCleanupHandlerBeforeDrainingConnections,
} = require('./GracefulShutdown')
function start() { function start() {
if (!Features.hasFeature('saas')) { if (!Features.hasFeature('saas')) {
@ -19,12 +23,14 @@ function start() {
await queue.add(data || {}, options || {}) await queue.add(data || {}, options || {})
} }
}) })
registerCleanup(scheduledJobsQueue)
const onboardingEmailsQueue = Queues.getQueue('emails-onboarding') const onboardingEmailsQueue = Queues.getQueue('emails-onboarding')
onboardingEmailsQueue.process(async job => { onboardingEmailsQueue.process(async job => {
const { userId } = job.data const { userId } = job.data
await UserOnboardingEmailManager.sendOnboardingEmail(userId) await UserOnboardingEmailManager.sendOnboardingEmail(userId)
}) })
registerCleanup(onboardingEmailsQueue)
const postRegistrationAnalyticsQueue = Queues.getQueue( const postRegistrationAnalyticsQueue = Queues.getQueue(
'post-registration-analytics' 'post-registration-analytics'
@ -33,12 +39,31 @@ function start() {
const { userId } = job.data const { userId } = job.data
await UserPostRegistrationAnalyticsManager.postRegistrationAnalytics(userId) await UserPostRegistrationAnalyticsManager.postRegistrationAnalytics(userId)
}) })
registerCleanup(postRegistrationAnalyticsQueue)
const refreshFeaturesQueue = Queues.getQueue('refresh-features') const refreshFeaturesQueue = Queues.getQueue('refresh-features')
refreshFeaturesQueue.process(async job => { refreshFeaturesQueue.process(async job => {
const { userId, reason } = job.data const { userId, reason } = job.data
await FeaturesUpdater.promises.refreshFeatures(userId, reason) await FeaturesUpdater.promises.refreshFeatures(userId, reason)
}) })
registerCleanup(refreshFeaturesQueue)
}
function registerCleanup(queue) {
const label = `bull queue ${queue.name}`
// Stop accepting new jobs.
addOptionalCleanupHandlerBeforeStoppingTraffic(label, async () => {
const justThisWorker = true
await queue.pause(justThisWorker)
})
// Wait for all jobs to process before shutting down connections.
addRequiredCleanupHandlerBeforeDrainingConnections(label, async () => {
await queue.close()
})
// Disconnect from redis is scheduled in queue setup.
} }
module.exports = { start } module.exports = { start }

View file

@ -1,5 +1,6 @@
const Queue = require('bull') const Queue = require('bull')
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const { addConnectionDrainer } = require('./GracefulShutdown')
// Bull will keep a fixed number of the most recently completed jobs. This is // Bull will keep a fixed number of the most recently completed jobs. This is
// useful to inspect recently completed jobs. The bull prometheus exporter also // useful to inspect recently completed jobs. The bull prometheus exporter also
@ -63,6 +64,11 @@ function getQueue(queueName) {
...jobOptions, ...jobOptions,
}, },
}) })
// Disconnect from redis eventually.
addConnectionDrainer(`bull queue ${queueName}`, async () => {
await queues[queueName].disconnect()
})
} }
return queues[queueName] return queues[queueName]
} }

View file

@ -1,21 +0,0 @@
/* eslint-disable
max-len,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let trackOpenSockets
const _ = require('underscore')
const metrics = require('@overleaf/metrics')
;(trackOpenSockets = function () {
metrics.gauge(
'http.open-sockets',
_.size(require('http').globalAgent.sockets.length),
0.5
)
return setTimeout(trackOpenSockets, 1000)
})()

View file

@ -1,5 +1,6 @@
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const redis = require('@overleaf/redis-wrapper') const redis = require('@overleaf/redis-wrapper')
const { addConnectionDrainer } = require('./GracefulShutdown')
if ( if (
typeof global.beforeEach === 'function' && typeof global.beforeEach === 'function' &&
@ -19,6 +20,10 @@ module.exports = {
// feature = 'websessions' | 'ratelimiter' | ... // feature = 'websessions' | 'ratelimiter' | ...
client(feature) { client(feature) {
const redisFeatureSettings = Settings.redis[feature] || Settings.redis.web const redisFeatureSettings = Settings.redis[feature] || Settings.redis.web
return redis.createClient(redisFeatureSettings) const client = redis.createClient(redisFeatureSettings)
addConnectionDrainer(`redis ${feature}`, async () => {
await client.disconnect()
})
return client
}, },
} }

View file

@ -1,5 +1,6 @@
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const { MongoClient, ObjectId } = require('mongodb') const { MongoClient, ObjectId } = require('mongodb')
const { addConnectionDrainer } = require('./GracefulShutdown')
if ( if (
typeof global.beforeEach === 'function' && typeof global.beforeEach === 'function' &&
@ -14,6 +15,10 @@ const clientPromise = MongoClient.connect(
Settings.mongo.url, Settings.mongo.url,
Settings.mongo.options Settings.mongo.options
) )
addConnectionDrainer('mongodb', async () => {
const client = await clientPromise
client.close()
})
let setupDbPromise let setupDbPromise
async function waitForDb() { async function waitForDb() {

View file

@ -506,6 +506,12 @@ module.exports = {
// address and http/https protocol information. // address and http/https protocol information.
behindProxy: false, behindProxy: false,
// Delay before closing the http server upon receiving a SIGTERM process signal.
gracefulShutdownDelayInMs: parseInt(
process.env.GRACEFUL_SHUTDOWN_DELAY || 30 * seconds,
10
),
// Expose the hostname in the `X-Served-By` response header // Expose the hostname in the `X-Served-By` response header
exposeHostname: process.env.EXPOSE_HOSTNAME === 'true', exposeHostname: process.env.EXPOSE_HOSTNAME === 'true',

View file

@ -7,7 +7,7 @@
"public": "./public" "public": "./public"
}, },
"scripts": { "scripts": {
"test:acceptance:run_dir": "mocha --recursive --timeout 25000 --exit --grep=$MOCHA_GREP --require test/acceptance/bootstrap.js", "test:acceptance:run_dir": "mocha --recursive --timeout 25000 --grep=$MOCHA_GREP --require test/acceptance/bootstrap.js",
"test:acceptance:app": "npm run test:acceptance:run_dir -- test/acceptance/src", "test:acceptance:app": "npm run test:acceptance:run_dir -- test/acceptance/src",
"test:unit:run_dir": "mocha --recursive --timeout 25000 --exit --grep=$MOCHA_GREP --require test/unit/bootstrap.js", "test:unit:run_dir": "mocha --recursive --timeout 25000 --exit --grep=$MOCHA_GREP --require test/unit/bootstrap.js",
"test:unit:all": "npm run test:unit:run_dir -- test/unit/src modules/*/test/unit/src", "test:unit:all": "npm run test:unit:run_dir -- test/unit/src modules/*/test/unit/src",

View file

@ -28,8 +28,12 @@ describe('LinkedFiles', function () {
let sourceDocId let sourceDocId
let owner let owner
let server
before(function (done) { before(function (done) {
LinkedUrlProxy.listen(6543, done) server = LinkedUrlProxy.listen(6543, done)
})
after(function (done) {
server.close(done)
}) })
beforeEach(async function () { beforeEach(async function () {

View file

@ -3,7 +3,11 @@ const QueueWorkers = require('../../../../app/src/infrastructure/QueueWorkers')
const MongoHelper = require('./MongoHelper') const MongoHelper = require('./MongoHelper')
const RedisHelper = require('./RedisHelper') const RedisHelper = require('./RedisHelper')
const { logger } = require('@overleaf/logger') const { logger } = require('@overleaf/logger')
const Settings = require('@overleaf/settings')
const MockReCAPTCHAApi = require('../mocks/MockReCaptchaApi') const MockReCAPTCHAApi = require('../mocks/MockReCaptchaApi')
const {
gracefulShutdown,
} = require('../../../../app/src/infrastructure/GracefulShutdown')
logger.level('error') logger.level('error')
@ -21,9 +25,10 @@ before('start queue workers', function () {
QueueWorkers.start() QueueWorkers.start()
}) })
after('stop main app', function (done) { after('stop main app', async function () {
if (!server) { if (!server) {
return done() return
} }
server.close(done) Settings.gracefulShutdownDelayInMs = 1
await gracefulShutdown(server, 'tests')
}) })