From 24f2388aa228c636bb201d542bed651a060fffca Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 19 Nov 2024 11:49:00 +0100 Subject: [PATCH] Merge pull request #21948 from overleaf/bg-jpa-back-fill-project-blobs [history-v1] back_fill_file_hash: process blobs GitOrigin-RevId: e54d0f8ab537ce43a12f9c972ba2ee82836073c8 --- .../storage/lib/blob_store/index.js | 32 ++- .../storage/scripts/back_fill_file_hash.mjs | 222 ++++++++++++++++-- .../js/storage/back_fill_file_hash.test.mjs | 94 ++++++-- 3 files changed, 303 insertions(+), 45 deletions(-) diff --git a/services/history-v1/storage/lib/blob_store/index.js b/services/history-v1/storage/lib/blob_store/index.js index ef52541e8c..9c762202a3 100644 --- a/services/history-v1/storage/lib/blob_store/index.js +++ b/services/history-v1/storage/lib/blob_store/index.js @@ -177,7 +177,7 @@ class BlobStore { * temporary file). * * @param {string} pathname - * @return {Promise.} + * @return {Promise} */ async putFile(pathname) { assert.string(pathname, 'bad pathname') @@ -191,11 +191,28 @@ class BlobStore { pathname ) newBlob.setStringLength(stringLength) - await uploadBlob(this.projectId, newBlob, fs.createReadStream(pathname)) - await this.backend.insertBlob(this.projectId, newBlob) + await this.putBlob(pathname, newBlob) return newBlob } + /** + * Write a new blob, the stringLength must have been added already. It should + * have been checked that the blob does not exist yet. Consider using + * {@link putFile} instead of this lower-level method. + * + * @param {string} pathname + * @param {core.Blob} finializedBlob + * @return {Promise} + */ + async putBlob(pathname, finializedBlob) { + await uploadBlob( + this.projectId, + finializedBlob, + fs.createReadStream(pathname) + ) + await this.backend.insertBlob(this.projectId, finializedBlob) + } + /** * Stores an object as a JSON string in a blob. * @@ -347,4 +364,11 @@ class BlobStore { } } -module.exports = { BlobStore, loadGlobalBlobs, makeProjectKey, GLOBAL_BLOBS } +module.exports = { + BlobStore, + loadGlobalBlobs, + makeProjectKey, + makeBlobForFile, + getStringLengthOfFile, + GLOBAL_BLOBS, +} diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs index dcd50f8dee..e927fea86b 100644 --- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs +++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs @@ -25,6 +25,8 @@ import { BlobStore, GLOBAL_BLOBS, loadGlobalBlobs, + getStringLengthOfFile, + makeBlobForFile, makeProjectKey, } from '../lib/blob_store/index.js' import { backedUpBlobs, db } from '../lib/mongodb.js' @@ -67,17 +69,23 @@ ObjectId.cacheHexString = true * @property {ObjectId} _id * @property {Array} rootFolder * @property {Array} deletedFileIds + * @property {Array} blobs * @property {{history: {id: string}}} overleaf + * @property {Array} [backedUpBlobs] */ /** * @typedef {Object} QueueEntry * @property {ProjectContext} ctx - * @property {string} fileId + * @property {string} cacheKey + * @property {string} [fileId] * @property {string} path * @property {string} [hash] + * @property {Blob} [blob] */ +const COLLECT_BLOBS = process.argv.includes('blobs') + // Time of closing the ticket for adding hashes: https://github.com/overleaf/internal/issues/464#issuecomment-492668129 const ALL_PROJECTS_HAVE_FILE_HASHES_AFTER = new Date('2019-05-15T14:02:00Z') const PUBLIC_LAUNCH_DATE = new Date('2012-01-01T00:00:00Z') @@ -120,6 +128,8 @@ const deletedFilesCollection = db.collection('deletedFiles') const STATS = { projects: 0, + blobs: 0, + backedUpBlobs: 0, filesWithoutHash: 0, filesDuplicated: 0, filesRetries: 0, @@ -139,6 +149,8 @@ const STATS = { readFromGCSIngress: 0, writeToAWSCount: 0, writeToAWSEgress: 0, + writeToGCSCount: 0, + writeToGCSEgress: 0, } const processStart = performance.now() @@ -250,28 +262,50 @@ async function processFile(entry) { * @return {Promise} */ async function processFileOnce(entry) { - const { fileId } = entry const { projectId, historyId } = entry.ctx - const filePath = Path.join( - BUFFER_DIR, - projectId.toString() + fileId.toString() - ) - const dst = fs.createWriteStream(filePath, { - highWaterMark: STREAM_HIGH_WATER_MARK, - }) + const { fileId, cacheKey } = entry + const filePath = Path.join(BUFFER_DIR, projectId.toString() + cacheKey) + const blobStore = new BlobStore(historyId) + if (entry.blob) { + const { blob } = entry + const hash = blob.getHash() + if (entry.ctx.hasBackedUpBlob(hash)) { + STATS.deduplicatedWriteToAWSLocalCount++ + STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob) + return hash + } + entry.ctx.recordPendingBlob(hash) + STATS.readFromGCSCount++ + const src = await blobStore.getStream(hash) + const dst = fs.createWriteStream(filePath, { + highWaterMark: STREAM_HIGH_WATER_MARK, + }) + try { + await Stream.promises.pipeline(src, dst) + } finally { + STATS.readFromGCSIngress += dst.bytesWritten + } + await uploadBlobToAWS(entry, blob, filePath) + return hash + } + STATS.readFromGCSCount++ const src = await filestorePersistor.getObjectStream( USER_FILES_BUCKET_NAME, `${projectId}/${fileId}` ) + const dst = fs.createWriteStream(filePath, { + highWaterMark: STREAM_HIGH_WATER_MARK, + }) try { await Stream.promises.pipeline(src, dst) } finally { STATS.readFromGCSIngress += dst.bytesWritten } - - const blobStore = new BlobStore(historyId) - const blob = await blobStore.putFile(filePath) + const blob = await makeBlobForFile(filePath) + blob.setStringLength( + await getStringLengthOfFile(blob.getByteLength(), filePath) + ) const hash = blob.getHash() if (GLOBAL_BLOBS.has(hash)) { @@ -279,7 +313,6 @@ async function processFileOnce(entry) { STATS.globalBlobsEgress += estimateBlobSize(blob) return hash } - if (entry.ctx.hasBackedUpBlob(hash)) { STATS.deduplicatedWriteToAWSLocalCount++ STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob) @@ -287,6 +320,47 @@ async function processFileOnce(entry) { } entry.ctx.recordPendingBlob(hash) + try { + await uploadBlobToGCS(blobStore, entry, blob, hash, filePath) + await uploadBlobToAWS(entry, blob, filePath) + } catch (err) { + entry.ctx.recordFailedBlob(hash) + throw err + } + return hash +} + +/** + * @param {BlobStore} blobStore + * @param {QueueEntry} entry + * @param {Blob} blob + * @param {string} hash + * @param {string} filePath + * @return {Promise} + */ +async function uploadBlobToGCS(blobStore, entry, blob, hash, filePath) { + if (entry.ctx.hasHistoryBlob(hash)) { + return // fast-path using hint from pre-fetched blobs + } + if (!COLLECT_BLOBS && (await blobStore.getBlob(hash))) { + entry.ctx.recordHistoryBlob(hash) + return // round trip to postgres/mongo when not pre-fetched + } + // blob missing in history-v1, create in GCS and persist in postgres/mongo + STATS.writeToGCSCount++ + STATS.writeToGCSEgress += blob.getByteLength() + await blobStore.putBlob(filePath, blob) + entry.ctx.recordHistoryBlob(hash) +} + +/** + * @param {QueueEntry} entry + * @param {Blob} blob + * @param {string} filePath + * @return {Promise} + */ +async function uploadBlobToAWS(entry, blob, filePath) { + const { historyId } = entry.ctx let backupSource let contentEncoding const md5 = Crypto.createHash('md5') @@ -343,12 +417,10 @@ async function processFileOnce(entry) { STATS.deduplicatedWriteToAWSRemoteEgress += size } else { STATS.writeToAWSEgress += size - entry.ctx.recordFailedBlob(hash) throw err } } - entry.ctx.recordBackedUpBlob(hash) - return hash + entry.ctx.recordBackedUpBlob(blob.getHash()) } /** @@ -394,18 +466,41 @@ async function processFiles(files) { * @return {Promise} */ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') { + let nBackedUpBlobs = 0 + if (process.argv.includes('collectBackedUpBlobs')) { + nBackedUpBlobs = await collectBackedUpBlobs(batch) + } if (process.argv.includes('deletedFiles')) { await collectDeletedFiles(batch) } + let blobs = 0 + if (COLLECT_BLOBS) { + blobs = await collectBlobs(batch) + } const files = Array.from(findFileInBatch(batch, prefix)) STATS.projects += batch.length - STATS.filesWithoutHash += files.length + STATS.blobs += blobs + STATS.backedUpBlobs += nBackedUpBlobs + STATS.filesWithoutHash += files.length - (blobs - nBackedUpBlobs) batch.length = 0 // GC // The files are currently ordered by project-id. - // Order them by file-id to + // Order them by file-id ASC then blobs ASC to + // - process files before blobs // - avoid head-of-line blocking from many project-files waiting on the generation of the projects DEK (round trip to AWS) // - bonus: increase chance of de-duplicating write to AWS - files.sort((a, b) => (a.fileId > b.fileId ? 1 : -1)) + files.sort( + /** + * @param {QueueEntry} a + * @param {QueueEntry} b + * @return {number} + */ + function (a, b) { + if (a.fileId && b.fileId) return a.fileId > b.fileId ? 1 : -1 + if (a.hash && b.hash) return a.hash > b.hash ? 1 : -1 + if (a.fileId) return -1 + return 1 + } + ) await processFiles(files) await promiseMapWithLimit( CONCURRENCY, @@ -566,6 +661,7 @@ function* findFiles(ctx, folder, path) { if (!fileRef.hash) { yield { ctx, + cacheKey: fileRef._id.toString(), fileId: fileRef._id.toString(), path: `${path}.fileRefs.${i}`, } @@ -577,17 +673,43 @@ function* findFiles(ctx, folder, path) { /** * @param {Array} projects * @param {string} prefix + * @return Generator */ function* findFileInBatch(projects, prefix) { for (const project of projects) { const ctx = new ProjectContext(project) yield* findFiles(ctx, project.rootFolder[0], prefix) for (const fileId of project.deletedFileIds || []) { - yield { ctx, fileId, path: '' } + yield { ctx, cacheKey: fileId, fileId, path: '' } + } + for (const blob of project.blobs || []) { + if (ctx.hasBackedUpBlob(blob.getHash())) continue + yield { + ctx, + cacheKey: blob.getHash(), + path: 'blob', + blob, + hash: blob.getHash(), + } } } } +/** + * @param {Array} projects + * @return {Promise} + */ +async function collectBlobs(projects) { + let blobs = 0 + for (const project of projects) { + const historyId = project.overleaf.history.id.toString() + const blobStore = new BlobStore(historyId) + project.blobs = await blobStore.getProjectBlobs() + blobs += project.blobs.length + } + return blobs +} + /** * @param {Array} projects * @return {Promise} @@ -621,6 +743,37 @@ async function collectDeletedFiles(projects) { } } +/** + * @param {Array} projects + * @return {Promise} + */ +async function collectBackedUpBlobs(projects) { + const cursor = backedUpBlobs.find( + { _id: { $in: projects.map(p => p._id) } }, + { + readPreference: READ_PREFERENCE_SECONDARY, + sort: { _id: 1 }, + } + ) + let nBackedUpBlobs = 0 + const processed = projects.slice() + for await (const record of cursor) { + const idx = processed.findIndex( + p => p._id.toString() === record._id.toString() + ) + if (idx === -1) { + throw new Error( + `bug: order of backedUpBlobs mongo records does not match batch of projects (${record._id} out of order)` + ) + } + processed.splice(0, idx) + const project = processed[0] + project.backedUpBlobs = record.blobs.map(b => b.toString('hex')) + nBackedUpBlobs += record.blobs.length + } + return nBackedUpBlobs +} + const BATCH_HASH_WRITES = 1_000 const BATCH_FILE_UPDATES = 100 @@ -628,12 +781,27 @@ class ProjectContext { /** @type {Promise | null} */ #cachedPersistorPromise = null + /** @type {Set} */ + #backedUpBlobs + + /** @type {Set} */ + #historyBlobs + /** * @param {Project} project */ constructor(project) { this.projectId = project._id this.historyId = project.overleaf.history.id.toString() + this.#backedUpBlobs = new Set(project.backedUpBlobs || []) + this.#historyBlobs = new Set((project.blobs || []).map(b => b.getHash())) + } + + hasHistoryBlob(hash) { + return this.#historyBlobs.has(hash) + } + recordHistoryBlob(hash) { + this.#historyBlobs.add(hash) } /** @@ -722,6 +890,7 @@ class ProjectContext { * @param {string} hash */ recordBackedUpBlob(hash) { + this.#backedUpBlobs.add(hash) this.#completedBlobs.add(hash) this.#pendingBlobs.delete(hash) } @@ -731,7 +900,11 @@ class ProjectContext { * @return {boolean} */ hasBackedUpBlob(hash) { - return this.#pendingBlobs.has(hash) || this.#completedBlobs.has(hash) + return ( + this.#pendingBlobs.has(hash) || + this.#completedBlobs.has(hash) || + this.#backedUpBlobs.has(hash) + ) } /** @type {Array} */ @@ -741,6 +914,7 @@ class ProjectContext { * @param {QueueEntry} entry */ queueFileForWritingHash(entry) { + if (entry.path === 'blob') return this.#pendingFileWrites.push(entry) } @@ -806,12 +980,12 @@ class ProjectContext { * @param {QueueEntry} entry */ async processFile(entry) { - if (this.#pendingFiles.has(entry.fileId)) { + if (this.#pendingFiles.has(entry.cacheKey)) { STATS.filesDuplicated++ } else { - this.#pendingFiles.set(entry.fileId, processFile(entry)) + this.#pendingFiles.set(entry.cacheKey, processFile(entry)) } - entry.hash = await this.#pendingFiles.get(entry.fileId) + entry.hash = await this.#pendingFiles.get(entry.cacheKey) this.queueFileForWritingHash(entry) await this.flushMongoQueuesIfNeeded() } diff --git a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs index 899da8893e..dcd4434b85 100644 --- a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs +++ b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs @@ -122,6 +122,12 @@ describe('back_fill_file_hash script', function () { const fileIdDeleted3 = objectIdFromTime('2017-02-01T00:09:00Z') const fileIdDeleted4 = objectIdFromTime('2024-02-01T00:10:00Z') const fileIdDeleted5 = objectIdFromTime('2024-02-01T00:11:00Z') + const contentTextBlob0 = Buffer.from('Hello 0') + const hashTextBlob0 = gitBlobHashBuffer(contentTextBlob0) + const contentTextBlob1 = Buffer.from('Hello 1') + const hashTextBlob1 = gitBlobHashBuffer(contentTextBlob1) + const contentTextBlob2 = Buffer.from('Hello 2') + const hashTextBlob2 = gitBlobHashBuffer(contentTextBlob2) const deleteProjectsRecordId0 = new ObjectId() const deleteProjectsRecordId1 = new ObjectId() const deleteProjectsRecordId2 = new ObjectId() @@ -138,8 +144,27 @@ describe('back_fill_file_hash script', function () { historyId: historyId0, fileId: fileId7, hash: hashFile7, + content: contentFile7, }, { projectId: projectId0, historyId: historyId0, fileId: fileIdDeleted5 }, + { + projectId: projectId0, + historyId: historyId0, + hash: hashTextBlob0, + content: contentTextBlob0, + }, + { + projectId: projectId1, + historyId: historyId1, + hash: hashTextBlob1, + content: contentTextBlob1, + }, + { + projectId: projectId2, + historyId: historyId2, + hash: hashTextBlob2, + content: contentTextBlob2, + }, { projectId: projectId1, historyId: historyId1, fileId: fileId1 }, { projectId: projectId1, historyId: historyId1, fileId: fileIdDeleted1 }, // { historyId: historyId2, fileId: fileId2 }, // already has hash @@ -371,6 +396,13 @@ describe('back_fill_file_hash script', function () { await testProjects.createEmptyProject(historyId3) await testProjects.createEmptyProject(historyIdDeleted0) await testProjects.createEmptyProject(historyIdDeleted1) + + const blobStore0 = new BlobStore(historyId0.toString()) + await blobStore0.putString(contentTextBlob0.toString()) + const blobStore1 = new BlobStore(historyId1.toString()) + await blobStore1.putString(contentTextBlob1.toString()) + const blobStore2 = new BlobStore(historyId2.toString()) + await blobStore2.putString(contentTextBlob2.toString()) }) beforeEach('populate filestore', async function () { @@ -454,7 +486,9 @@ describe('back_fill_file_hash script', function () { process.argv0, [ 'storage/scripts/back_fill_file_hash.mjs', + 'collectBackedUpBlobs', 'live', + 'blobs', 'deleted', 'deletedFiles', ], @@ -726,6 +760,7 @@ describe('back_fill_file_hash script', function () { binaryForGitBlobHash(gitBlobHash(fileId0)), binaryForGitBlobHash(hashFile7), binaryForGitBlobHash(gitBlobHash(fileIdDeleted5)), + binaryForGitBlobHash(hashTextBlob0), ].sort(), }, { @@ -733,8 +768,13 @@ describe('back_fill_file_hash script', function () { blobs: [ binaryForGitBlobHash(gitBlobHash(fileId1)), binaryForGitBlobHash(gitBlobHash(fileIdDeleted1)), + binaryForGitBlobHash(hashTextBlob1), ].sort(), }, + { + _id: projectId2, + blobs: [binaryForGitBlobHash(hashTextBlob2)].sort(), + }, { _id: projectIdDeleted0, blobs: [ @@ -748,8 +788,10 @@ describe('back_fill_file_hash script', function () { const rerun = await runScript({}, false) expect(rerun.stats).deep.equal({ ...STATS_ALL_ZERO, - // We still need to iterate over all the projects. + // We still need to iterate over all the projects and blobs. projects: 4, + blobs: 10, + backedUpBlobs: 10, }) }) it('should have backed up all the files', async function () { @@ -762,7 +804,7 @@ describe('back_fill_file_hash script', function () { ) .sort() ) - for (let { historyId, fileId, hash } of writtenBlobs) { + for (let { historyId, fileId, hash, content } of writtenBlobs) { hash = hash || gitBlobHash(fileId.toString()) const s = await backupPersistor.getObjectStream( projectBlobsBucket, @@ -772,7 +814,9 @@ describe('back_fill_file_hash script', function () { const buf = new WritableBuffer() await Stream.promises.pipeline(s, buf) expect(gitBlobHashBuffer(buf.getContents())).to.equal(hash) - if (fileId !== fileId7) { + if (content) { + expect(buf.getContents()).to.deep.equal(content) + } else { const id = buf.getContents().toString('utf-8') expect(id).to.equal(fileId.toString()) // double check we are not comparing 'undefined' or '[object Object]' above @@ -791,16 +835,18 @@ describe('back_fill_file_hash script', function () { ) }) it('should have written the back filled files to history v1', async function () { - for (const { historyId, fileId } of writtenBlobs) { + for (const { historyId, hash, fileId, content } of writtenBlobs) { const blobStore = new BlobStore(historyId.toString()) - if (fileId === fileId7) { - const s = await blobStore.getStream(hashFile7) + if (content) { + const s = await blobStore.getStream(hash) const buf = new WritableBuffer() await Stream.promises.pipeline(s, buf) - expect(buf.getContents()).to.deep.equal(contentFile7) + expect(buf.getContents()).to.deep.equal(content) continue } - const id = await blobStore.getString(gitBlobHash(fileId.toString())) + const id = await blobStore.getString( + hash || gitBlobHash(fileId.toString()) + ) expect(id).to.equal(fileId.toString()) // double check we are not comparing 'undefined' or '[object Object]' above expect(id).to.match(/^[a-f0-9]{24}$/) @@ -826,6 +872,8 @@ describe('back_fill_file_hash script', function () { const STATS_ALL_ZERO = { projects: 0, + blobs: 0, + backedUpBlobs: 0, filesWithoutHash: 0, filesDuplicated: 0, filesRetries: 0, @@ -845,9 +893,13 @@ describe('back_fill_file_hash script', function () { readFromGCSIngress: 0, writeToAWSCount: 0, writeToAWSEgress: 0, + writeToGCSCount: 0, + writeToGCSEgress: 0, } const STATS_UP_TO_PROJECT1 = { projects: 2, + blobs: 2, + backedUpBlobs: 0, filesWithoutHash: 7, filesDuplicated: 1, filesRetries: 0, @@ -863,13 +915,17 @@ describe('back_fill_file_hash script', function () { deduplicatedWriteToAWSLocalEgress: 0, deduplicatedWriteToAWSRemoteCount: 0, deduplicatedWriteToAWSRemoteEgress: 0, - readFromGCSCount: 6, - readFromGCSIngress: 4000120, - writeToAWSCount: 5, - writeToAWSEgress: 4032, + readFromGCSCount: 8, + readFromGCSIngress: 4000134, + writeToAWSCount: 7, + writeToAWSEgress: 4086, + writeToGCSCount: 5, + writeToGCSEgress: 4000096, } const STATS_UP_FROM_PROJECT1_ONWARD = { projects: 2, + blobs: 1, + backedUpBlobs: 0, filesWithoutHash: 3, filesDuplicated: 0, filesRetries: 0, @@ -880,15 +936,17 @@ describe('back_fill_file_hash script', function () { projectDeleted: 0, projectHardDeleted: 0, fileHardDeleted: 0, - mongoUpdates: 4, + mongoUpdates: 5, deduplicatedWriteToAWSLocalCount: 1, deduplicatedWriteToAWSLocalEgress: 30, deduplicatedWriteToAWSRemoteCount: 0, deduplicatedWriteToAWSRemoteEgress: 0, - readFromGCSCount: 3, - readFromGCSIngress: 72, - writeToAWSCount: 2, - writeToAWSEgress: 58, + readFromGCSCount: 4, + readFromGCSIngress: 79, + writeToAWSCount: 3, + writeToAWSEgress: 85, + writeToGCSCount: 2, + writeToGCSEgress: 48, } function sumStats(a, b) { @@ -920,6 +978,8 @@ describe('back_fill_file_hash script', function () { readFromGCSIngress: -24, writeToAWSCount: -1, writeToAWSEgress: -28, + writeToGCSCount: -1, + writeToGCSEgress: -24, }) ) // should not retry 404