From 4964d6414b33588f8540dc392758d5ccd87f05af Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 19 Nov 2024 17:45:05 +0100 Subject: [PATCH] Merge pull request #21982 from overleaf/jpa-cleanup [history-v1] back_fill_file_hash: cleanup fs and graceful shutdown GitOrigin-RevId: 362669ff988ad71fda713bf4896b1abcb36caf35 --- .../storage/scripts/back_fill_file_hash.mjs | 112 ++++++++++++------ 1 file changed, 74 insertions(+), 38 deletions(-) diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs index e927fea86b..3e919018f4 100644 --- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs +++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs @@ -223,14 +223,45 @@ function printStats() { setInterval(printStats, LOGGING_INTERVAL) +let gracefulShutdownInitiated = false + +process.on('SIGINT', handleSignal) +process.on('SIGTERM', handleSignal) + +function handleSignal() { + gracefulShutdownInitiated = true + console.warn('graceful shutdown initiated, draining queue') +} + /** * @param {QueueEntry} entry * @return {Promise} */ -async function processFile(entry) { +async function processFileWithCleanup(entry) { + const { + ctx: { projectId }, + cacheKey, + } = entry + const filePath = Path.join(BUFFER_DIR, projectId.toString() + cacheKey) + try { + return await processFile(entry, filePath) + } finally { + await Promise.all([ + fs.promises.rm(filePath, { force: true }), + fs.promises.rm(filePath + GZ_SUFFIX, { force: true }), + ]) + } +} + +/** + * @param {QueueEntry} entry + * @param {string} filePath + * @return {Promise} + */ +async function processFile(entry, filePath) { for (let attempt = 0; attempt < RETRIES; attempt++) { try { - return await processFileOnce(entry) + return await processFileOnce(entry, filePath) } catch (err) { if (err instanceof NotFoundError) { const { bucketName } = OError.getFullInfo(err) @@ -245,26 +276,29 @@ async function processFile(entry) { const { ctx: { projectId }, fileId, + hash, path, } = entry logger.warn( - { err, projectId, fileId, path, attempt }, + { err, projectId, fileId, hash, path, attempt }, 'failed to process file, trying again' ) await setTimeout(RETRY_DELAY_MS) } } - return await processFileOnce(entry) + return await processFileOnce(entry, filePath) } /** * @param {QueueEntry} entry + * @param {string} filePath * @return {Promise} */ -async function processFileOnce(entry) { - const { projectId, historyId } = entry.ctx - const { fileId, cacheKey } = entry - const filePath = Path.join(BUFFER_DIR, projectId.toString() + cacheKey) +async function processFileOnce(entry, filePath) { + const { + ctx: { projectId, historyId }, + fileId, + } = entry const blobStore = new BlobStore(historyId) if (entry.blob) { const { blob } = entry @@ -353,6 +387,8 @@ async function uploadBlobToGCS(blobStore, entry, blob, hash, filePath) { entry.ctx.recordHistoryBlob(hash) } +const GZ_SUFFIX = '.gz' + /** * @param {QueueEntry} entry * @param {Blob} blob @@ -366,7 +402,7 @@ async function uploadBlobToAWS(entry, blob, filePath) { const md5 = Crypto.createHash('md5') let size if (blob.getStringLength()) { - const filePathCompressed = filePath + '.gz' + const filePathCompressed = filePath + GZ_SUFFIX backupSource = filePathCompressed contentEncoding = 'gzip' size = 0 @@ -429,35 +465,32 @@ async function uploadBlobToAWS(entry, blob, filePath) { */ async function processFiles(files) { if (files.length === 0) return // all processed - await fs.promises.mkdir(BUFFER_DIR, { recursive: true }) - try { - await promiseMapWithLimit( - CONCURRENCY, - files, - /** - * @param {QueueEntry} entry - * @return {Promise} - */ - async function (entry) { - try { - await entry.ctx.processFile(entry) - } catch (err) { - STATS.filesFailed++ - const { - ctx: { projectId }, - fileId, - path, - } = entry - logger.error( - { err, projectId, fileId, path }, - 'failed to process file' - ) - } + await promiseMapWithLimit( + CONCURRENCY, + files, + /** + * @param {QueueEntry} entry + * @return {Promise} + */ + async function (entry) { + if (gracefulShutdownInitiated) return + try { + await entry.ctx.processFile(entry) + } catch (err) { + STATS.filesFailed++ + const { + ctx: { projectId }, + fileId, + hash, + path, + } = entry + logger.error( + { err, projectId, fileId, hash, path }, + 'failed to process file' + ) } - ) - } finally { - await fs.promises.rm(BUFFER_DIR, { recursive: true, force: true }) - } + } + ) } /** @@ -513,6 +546,9 @@ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') { await entry.ctx.flushMongoQueues() } ) + if (gracefulShutdownInitiated) { + throw new Error('graceful shutdown: aborting batch processing') + } } /** @@ -983,7 +1019,7 @@ class ProjectContext { if (this.#pendingFiles.has(entry.cacheKey)) { STATS.filesDuplicated++ } else { - this.#pendingFiles.set(entry.cacheKey, processFile(entry)) + this.#pendingFiles.set(entry.cacheKey, processFileWithCleanup(entry)) } entry.hash = await this.#pendingFiles.get(entry.cacheKey) this.queueFileForWritingHash(entry)