Merge pull request #21982 from overleaf/jpa-cleanup

[history-v1] back_fill_file_hash: cleanup fs and graceful shutdown

GitOrigin-RevId: 362669ff988ad71fda713bf4896b1abcb36caf35
This commit is contained in:
Jakob Ackermann 2024-11-19 17:45:05 +01:00 committed by Copybot
parent 0b9b7da0e9
commit 4964d6414b

View file

@ -223,14 +223,45 @@ function printStats() {
setInterval(printStats, LOGGING_INTERVAL)
let gracefulShutdownInitiated = false
process.on('SIGINT', handleSignal)
process.on('SIGTERM', handleSignal)
function handleSignal() {
gracefulShutdownInitiated = true
console.warn('graceful shutdown initiated, draining queue')
}
/**
* @param {QueueEntry} entry
* @return {Promise<string>}
*/
async function processFile(entry) {
async function processFileWithCleanup(entry) {
const {
ctx: { projectId },
cacheKey,
} = entry
const filePath = Path.join(BUFFER_DIR, projectId.toString() + cacheKey)
try {
return await processFile(entry, filePath)
} finally {
await Promise.all([
fs.promises.rm(filePath, { force: true }),
fs.promises.rm(filePath + GZ_SUFFIX, { force: true }),
])
}
}
/**
* @param {QueueEntry} entry
* @param {string} filePath
* @return {Promise<string>}
*/
async function processFile(entry, filePath) {
for (let attempt = 0; attempt < RETRIES; attempt++) {
try {
return await processFileOnce(entry)
return await processFileOnce(entry, filePath)
} catch (err) {
if (err instanceof NotFoundError) {
const { bucketName } = OError.getFullInfo(err)
@ -245,26 +276,29 @@ async function processFile(entry) {
const {
ctx: { projectId },
fileId,
hash,
path,
} = entry
logger.warn(
{ err, projectId, fileId, path, attempt },
{ err, projectId, fileId, hash, path, attempt },
'failed to process file, trying again'
)
await setTimeout(RETRY_DELAY_MS)
}
}
return await processFileOnce(entry)
return await processFileOnce(entry, filePath)
}
/**
* @param {QueueEntry} entry
* @param {string} filePath
* @return {Promise<string>}
*/
async function processFileOnce(entry) {
const { projectId, historyId } = entry.ctx
const { fileId, cacheKey } = entry
const filePath = Path.join(BUFFER_DIR, projectId.toString() + cacheKey)
async function processFileOnce(entry, filePath) {
const {
ctx: { projectId, historyId },
fileId,
} = entry
const blobStore = new BlobStore(historyId)
if (entry.blob) {
const { blob } = entry
@ -353,6 +387,8 @@ async function uploadBlobToGCS(blobStore, entry, blob, hash, filePath) {
entry.ctx.recordHistoryBlob(hash)
}
const GZ_SUFFIX = '.gz'
/**
* @param {QueueEntry} entry
* @param {Blob} blob
@ -366,7 +402,7 @@ async function uploadBlobToAWS(entry, blob, filePath) {
const md5 = Crypto.createHash('md5')
let size
if (blob.getStringLength()) {
const filePathCompressed = filePath + '.gz'
const filePathCompressed = filePath + GZ_SUFFIX
backupSource = filePathCompressed
contentEncoding = 'gzip'
size = 0
@ -429,35 +465,32 @@ async function uploadBlobToAWS(entry, blob, filePath) {
*/
async function processFiles(files) {
if (files.length === 0) return // all processed
await fs.promises.mkdir(BUFFER_DIR, { recursive: true })
try {
await promiseMapWithLimit(
CONCURRENCY,
files,
/**
* @param {QueueEntry} entry
* @return {Promise<void>}
*/
async function (entry) {
try {
await entry.ctx.processFile(entry)
} catch (err) {
STATS.filesFailed++
const {
ctx: { projectId },
fileId,
path,
} = entry
logger.error(
{ err, projectId, fileId, path },
'failed to process file'
)
}
await promiseMapWithLimit(
CONCURRENCY,
files,
/**
* @param {QueueEntry} entry
* @return {Promise<void>}
*/
async function (entry) {
if (gracefulShutdownInitiated) return
try {
await entry.ctx.processFile(entry)
} catch (err) {
STATS.filesFailed++
const {
ctx: { projectId },
fileId,
hash,
path,
} = entry
logger.error(
{ err, projectId, fileId, hash, path },
'failed to process file'
)
}
)
} finally {
await fs.promises.rm(BUFFER_DIR, { recursive: true, force: true })
}
}
)
}
/**
@ -513,6 +546,9 @@ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') {
await entry.ctx.flushMongoQueues()
}
)
if (gracefulShutdownInitiated) {
throw new Error('graceful shutdown: aborting batch processing')
}
}
/**
@ -983,7 +1019,7 @@ class ProjectContext {
if (this.#pendingFiles.has(entry.cacheKey)) {
STATS.filesDuplicated++
} else {
this.#pendingFiles.set(entry.cacheKey, processFile(entry))
this.#pendingFiles.set(entry.cacheKey, processFileWithCleanup(entry))
}
entry.hash = await this.#pendingFiles.get(entry.cacheKey)
this.queueFileForWritingHash(entry)