Merge pull request #21948 from overleaf/bg-jpa-back-fill-project-blobs

[history-v1] back_fill_file_hash: process blobs

GitOrigin-RevId: e54d0f8ab537ce43a12f9c972ba2ee82836073c8
This commit is contained in:
Jakob Ackermann 2024-11-19 11:49:00 +01:00 committed by Copybot
parent 99f77b2205
commit 24f2388aa2
3 changed files with 303 additions and 45 deletions

View file

@ -177,7 +177,7 @@ class BlobStore {
* temporary file).
*
* @param {string} pathname
* @return {Promise.<core.Blob>}
* @return {Promise<core.Blob>}
*/
async putFile(pathname) {
assert.string(pathname, 'bad pathname')
@ -191,11 +191,28 @@ class BlobStore {
pathname
)
newBlob.setStringLength(stringLength)
await uploadBlob(this.projectId, newBlob, fs.createReadStream(pathname))
await this.backend.insertBlob(this.projectId, newBlob)
await this.putBlob(pathname, newBlob)
return newBlob
}
/**
* Write a new blob, the stringLength must have been added already. It should
* have been checked that the blob does not exist yet. Consider using
* {@link putFile} instead of this lower-level method.
*
* @param {string} pathname
* @param {core.Blob} finializedBlob
* @return {Promise<void>}
*/
async putBlob(pathname, finializedBlob) {
await uploadBlob(
this.projectId,
finializedBlob,
fs.createReadStream(pathname)
)
await this.backend.insertBlob(this.projectId, finializedBlob)
}
/**
* Stores an object as a JSON string in a blob.
*
@ -347,4 +364,11 @@ class BlobStore {
}
}
module.exports = { BlobStore, loadGlobalBlobs, makeProjectKey, GLOBAL_BLOBS }
module.exports = {
BlobStore,
loadGlobalBlobs,
makeProjectKey,
makeBlobForFile,
getStringLengthOfFile,
GLOBAL_BLOBS,
}

View file

@ -25,6 +25,8 @@ import {
BlobStore,
GLOBAL_BLOBS,
loadGlobalBlobs,
getStringLengthOfFile,
makeBlobForFile,
makeProjectKey,
} from '../lib/blob_store/index.js'
import { backedUpBlobs, db } from '../lib/mongodb.js'
@ -67,17 +69,23 @@ ObjectId.cacheHexString = true
* @property {ObjectId} _id
* @property {Array<Folder>} rootFolder
* @property {Array<string>} deletedFileIds
* @property {Array<Blob>} blobs
* @property {{history: {id: string}}} overleaf
* @property {Array<string>} [backedUpBlobs]
*/
/**
* @typedef {Object} QueueEntry
* @property {ProjectContext} ctx
* @property {string} fileId
* @property {string} cacheKey
* @property {string} [fileId]
* @property {string} path
* @property {string} [hash]
* @property {Blob} [blob]
*/
const COLLECT_BLOBS = process.argv.includes('blobs')
// Time of closing the ticket for adding hashes: https://github.com/overleaf/internal/issues/464#issuecomment-492668129
const ALL_PROJECTS_HAVE_FILE_HASHES_AFTER = new Date('2019-05-15T14:02:00Z')
const PUBLIC_LAUNCH_DATE = new Date('2012-01-01T00:00:00Z')
@ -120,6 +128,8 @@ const deletedFilesCollection = db.collection('deletedFiles')
const STATS = {
projects: 0,
blobs: 0,
backedUpBlobs: 0,
filesWithoutHash: 0,
filesDuplicated: 0,
filesRetries: 0,
@ -139,6 +149,8 @@ const STATS = {
readFromGCSIngress: 0,
writeToAWSCount: 0,
writeToAWSEgress: 0,
writeToGCSCount: 0,
writeToGCSEgress: 0,
}
const processStart = performance.now()
@ -250,28 +262,50 @@ async function processFile(entry) {
* @return {Promise<string>}
*/
async function processFileOnce(entry) {
const { fileId } = entry
const { projectId, historyId } = entry.ctx
const filePath = Path.join(
BUFFER_DIR,
projectId.toString() + fileId.toString()
)
const dst = fs.createWriteStream(filePath, {
highWaterMark: STREAM_HIGH_WATER_MARK,
})
const { fileId, cacheKey } = entry
const filePath = Path.join(BUFFER_DIR, projectId.toString() + cacheKey)
const blobStore = new BlobStore(historyId)
if (entry.blob) {
const { blob } = entry
const hash = blob.getHash()
if (entry.ctx.hasBackedUpBlob(hash)) {
STATS.deduplicatedWriteToAWSLocalCount++
STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob)
return hash
}
entry.ctx.recordPendingBlob(hash)
STATS.readFromGCSCount++
const src = await blobStore.getStream(hash)
const dst = fs.createWriteStream(filePath, {
highWaterMark: STREAM_HIGH_WATER_MARK,
})
try {
await Stream.promises.pipeline(src, dst)
} finally {
STATS.readFromGCSIngress += dst.bytesWritten
}
await uploadBlobToAWS(entry, blob, filePath)
return hash
}
STATS.readFromGCSCount++
const src = await filestorePersistor.getObjectStream(
USER_FILES_BUCKET_NAME,
`${projectId}/${fileId}`
)
const dst = fs.createWriteStream(filePath, {
highWaterMark: STREAM_HIGH_WATER_MARK,
})
try {
await Stream.promises.pipeline(src, dst)
} finally {
STATS.readFromGCSIngress += dst.bytesWritten
}
const blobStore = new BlobStore(historyId)
const blob = await blobStore.putFile(filePath)
const blob = await makeBlobForFile(filePath)
blob.setStringLength(
await getStringLengthOfFile(blob.getByteLength(), filePath)
)
const hash = blob.getHash()
if (GLOBAL_BLOBS.has(hash)) {
@ -279,7 +313,6 @@ async function processFileOnce(entry) {
STATS.globalBlobsEgress += estimateBlobSize(blob)
return hash
}
if (entry.ctx.hasBackedUpBlob(hash)) {
STATS.deduplicatedWriteToAWSLocalCount++
STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob)
@ -287,6 +320,47 @@ async function processFileOnce(entry) {
}
entry.ctx.recordPendingBlob(hash)
try {
await uploadBlobToGCS(blobStore, entry, blob, hash, filePath)
await uploadBlobToAWS(entry, blob, filePath)
} catch (err) {
entry.ctx.recordFailedBlob(hash)
throw err
}
return hash
}
/**
* @param {BlobStore} blobStore
* @param {QueueEntry} entry
* @param {Blob} blob
* @param {string} hash
* @param {string} filePath
* @return {Promise<void>}
*/
async function uploadBlobToGCS(blobStore, entry, blob, hash, filePath) {
if (entry.ctx.hasHistoryBlob(hash)) {
return // fast-path using hint from pre-fetched blobs
}
if (!COLLECT_BLOBS && (await blobStore.getBlob(hash))) {
entry.ctx.recordHistoryBlob(hash)
return // round trip to postgres/mongo when not pre-fetched
}
// blob missing in history-v1, create in GCS and persist in postgres/mongo
STATS.writeToGCSCount++
STATS.writeToGCSEgress += blob.getByteLength()
await blobStore.putBlob(filePath, blob)
entry.ctx.recordHistoryBlob(hash)
}
/**
* @param {QueueEntry} entry
* @param {Blob} blob
* @param {string} filePath
* @return {Promise<void>}
*/
async function uploadBlobToAWS(entry, blob, filePath) {
const { historyId } = entry.ctx
let backupSource
let contentEncoding
const md5 = Crypto.createHash('md5')
@ -343,12 +417,10 @@ async function processFileOnce(entry) {
STATS.deduplicatedWriteToAWSRemoteEgress += size
} else {
STATS.writeToAWSEgress += size
entry.ctx.recordFailedBlob(hash)
throw err
}
}
entry.ctx.recordBackedUpBlob(hash)
return hash
entry.ctx.recordBackedUpBlob(blob.getHash())
}
/**
@ -394,18 +466,41 @@ async function processFiles(files) {
* @return {Promise<void>}
*/
async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') {
let nBackedUpBlobs = 0
if (process.argv.includes('collectBackedUpBlobs')) {
nBackedUpBlobs = await collectBackedUpBlobs(batch)
}
if (process.argv.includes('deletedFiles')) {
await collectDeletedFiles(batch)
}
let blobs = 0
if (COLLECT_BLOBS) {
blobs = await collectBlobs(batch)
}
const files = Array.from(findFileInBatch(batch, prefix))
STATS.projects += batch.length
STATS.filesWithoutHash += files.length
STATS.blobs += blobs
STATS.backedUpBlobs += nBackedUpBlobs
STATS.filesWithoutHash += files.length - (blobs - nBackedUpBlobs)
batch.length = 0 // GC
// The files are currently ordered by project-id.
// Order them by file-id to
// Order them by file-id ASC then blobs ASC to
// - process files before blobs
// - avoid head-of-line blocking from many project-files waiting on the generation of the projects DEK (round trip to AWS)
// - bonus: increase chance of de-duplicating write to AWS
files.sort((a, b) => (a.fileId > b.fileId ? 1 : -1))
files.sort(
/**
* @param {QueueEntry} a
* @param {QueueEntry} b
* @return {number}
*/
function (a, b) {
if (a.fileId && b.fileId) return a.fileId > b.fileId ? 1 : -1
if (a.hash && b.hash) return a.hash > b.hash ? 1 : -1
if (a.fileId) return -1
return 1
}
)
await processFiles(files)
await promiseMapWithLimit(
CONCURRENCY,
@ -566,6 +661,7 @@ function* findFiles(ctx, folder, path) {
if (!fileRef.hash) {
yield {
ctx,
cacheKey: fileRef._id.toString(),
fileId: fileRef._id.toString(),
path: `${path}.fileRefs.${i}`,
}
@ -577,17 +673,43 @@ function* findFiles(ctx, folder, path) {
/**
* @param {Array<Project>} projects
* @param {string} prefix
* @return Generator<QueueEntry>
*/
function* findFileInBatch(projects, prefix) {
for (const project of projects) {
const ctx = new ProjectContext(project)
yield* findFiles(ctx, project.rootFolder[0], prefix)
for (const fileId of project.deletedFileIds || []) {
yield { ctx, fileId, path: '' }
yield { ctx, cacheKey: fileId, fileId, path: '' }
}
for (const blob of project.blobs || []) {
if (ctx.hasBackedUpBlob(blob.getHash())) continue
yield {
ctx,
cacheKey: blob.getHash(),
path: 'blob',
blob,
hash: blob.getHash(),
}
}
}
}
/**
* @param {Array<Project>} projects
* @return {Promise<number>}
*/
async function collectBlobs(projects) {
let blobs = 0
for (const project of projects) {
const historyId = project.overleaf.history.id.toString()
const blobStore = new BlobStore(historyId)
project.blobs = await blobStore.getProjectBlobs()
blobs += project.blobs.length
}
return blobs
}
/**
* @param {Array<Project>} projects
* @return {Promise<void>}
@ -621,6 +743,37 @@ async function collectDeletedFiles(projects) {
}
}
/**
* @param {Array<Project>} projects
* @return {Promise<number>}
*/
async function collectBackedUpBlobs(projects) {
const cursor = backedUpBlobs.find(
{ _id: { $in: projects.map(p => p._id) } },
{
readPreference: READ_PREFERENCE_SECONDARY,
sort: { _id: 1 },
}
)
let nBackedUpBlobs = 0
const processed = projects.slice()
for await (const record of cursor) {
const idx = processed.findIndex(
p => p._id.toString() === record._id.toString()
)
if (idx === -1) {
throw new Error(
`bug: order of backedUpBlobs mongo records does not match batch of projects (${record._id} out of order)`
)
}
processed.splice(0, idx)
const project = processed[0]
project.backedUpBlobs = record.blobs.map(b => b.toString('hex'))
nBackedUpBlobs += record.blobs.length
}
return nBackedUpBlobs
}
const BATCH_HASH_WRITES = 1_000
const BATCH_FILE_UPDATES = 100
@ -628,12 +781,27 @@ class ProjectContext {
/** @type {Promise<CachedPerProjectEncryptedS3Persistor> | null} */
#cachedPersistorPromise = null
/** @type {Set<string>} */
#backedUpBlobs
/** @type {Set<string>} */
#historyBlobs
/**
* @param {Project} project
*/
constructor(project) {
this.projectId = project._id
this.historyId = project.overleaf.history.id.toString()
this.#backedUpBlobs = new Set(project.backedUpBlobs || [])
this.#historyBlobs = new Set((project.blobs || []).map(b => b.getHash()))
}
hasHistoryBlob(hash) {
return this.#historyBlobs.has(hash)
}
recordHistoryBlob(hash) {
this.#historyBlobs.add(hash)
}
/**
@ -722,6 +890,7 @@ class ProjectContext {
* @param {string} hash
*/
recordBackedUpBlob(hash) {
this.#backedUpBlobs.add(hash)
this.#completedBlobs.add(hash)
this.#pendingBlobs.delete(hash)
}
@ -731,7 +900,11 @@ class ProjectContext {
* @return {boolean}
*/
hasBackedUpBlob(hash) {
return this.#pendingBlobs.has(hash) || this.#completedBlobs.has(hash)
return (
this.#pendingBlobs.has(hash) ||
this.#completedBlobs.has(hash) ||
this.#backedUpBlobs.has(hash)
)
}
/** @type {Array<QueueEntry>} */
@ -741,6 +914,7 @@ class ProjectContext {
* @param {QueueEntry} entry
*/
queueFileForWritingHash(entry) {
if (entry.path === 'blob') return
this.#pendingFileWrites.push(entry)
}
@ -806,12 +980,12 @@ class ProjectContext {
* @param {QueueEntry} entry
*/
async processFile(entry) {
if (this.#pendingFiles.has(entry.fileId)) {
if (this.#pendingFiles.has(entry.cacheKey)) {
STATS.filesDuplicated++
} else {
this.#pendingFiles.set(entry.fileId, processFile(entry))
this.#pendingFiles.set(entry.cacheKey, processFile(entry))
}
entry.hash = await this.#pendingFiles.get(entry.fileId)
entry.hash = await this.#pendingFiles.get(entry.cacheKey)
this.queueFileForWritingHash(entry)
await this.flushMongoQueuesIfNeeded()
}

View file

@ -122,6 +122,12 @@ describe('back_fill_file_hash script', function () {
const fileIdDeleted3 = objectIdFromTime('2017-02-01T00:09:00Z')
const fileIdDeleted4 = objectIdFromTime('2024-02-01T00:10:00Z')
const fileIdDeleted5 = objectIdFromTime('2024-02-01T00:11:00Z')
const contentTextBlob0 = Buffer.from('Hello 0')
const hashTextBlob0 = gitBlobHashBuffer(contentTextBlob0)
const contentTextBlob1 = Buffer.from('Hello 1')
const hashTextBlob1 = gitBlobHashBuffer(contentTextBlob1)
const contentTextBlob2 = Buffer.from('Hello 2')
const hashTextBlob2 = gitBlobHashBuffer(contentTextBlob2)
const deleteProjectsRecordId0 = new ObjectId()
const deleteProjectsRecordId1 = new ObjectId()
const deleteProjectsRecordId2 = new ObjectId()
@ -138,8 +144,27 @@ describe('back_fill_file_hash script', function () {
historyId: historyId0,
fileId: fileId7,
hash: hashFile7,
content: contentFile7,
},
{ projectId: projectId0, historyId: historyId0, fileId: fileIdDeleted5 },
{
projectId: projectId0,
historyId: historyId0,
hash: hashTextBlob0,
content: contentTextBlob0,
},
{
projectId: projectId1,
historyId: historyId1,
hash: hashTextBlob1,
content: contentTextBlob1,
},
{
projectId: projectId2,
historyId: historyId2,
hash: hashTextBlob2,
content: contentTextBlob2,
},
{ projectId: projectId1, historyId: historyId1, fileId: fileId1 },
{ projectId: projectId1, historyId: historyId1, fileId: fileIdDeleted1 },
// { historyId: historyId2, fileId: fileId2 }, // already has hash
@ -371,6 +396,13 @@ describe('back_fill_file_hash script', function () {
await testProjects.createEmptyProject(historyId3)
await testProjects.createEmptyProject(historyIdDeleted0)
await testProjects.createEmptyProject(historyIdDeleted1)
const blobStore0 = new BlobStore(historyId0.toString())
await blobStore0.putString(contentTextBlob0.toString())
const blobStore1 = new BlobStore(historyId1.toString())
await blobStore1.putString(contentTextBlob1.toString())
const blobStore2 = new BlobStore(historyId2.toString())
await blobStore2.putString(contentTextBlob2.toString())
})
beforeEach('populate filestore', async function () {
@ -454,7 +486,9 @@ describe('back_fill_file_hash script', function () {
process.argv0,
[
'storage/scripts/back_fill_file_hash.mjs',
'collectBackedUpBlobs',
'live',
'blobs',
'deleted',
'deletedFiles',
],
@ -726,6 +760,7 @@ describe('back_fill_file_hash script', function () {
binaryForGitBlobHash(gitBlobHash(fileId0)),
binaryForGitBlobHash(hashFile7),
binaryForGitBlobHash(gitBlobHash(fileIdDeleted5)),
binaryForGitBlobHash(hashTextBlob0),
].sort(),
},
{
@ -733,8 +768,13 @@ describe('back_fill_file_hash script', function () {
blobs: [
binaryForGitBlobHash(gitBlobHash(fileId1)),
binaryForGitBlobHash(gitBlobHash(fileIdDeleted1)),
binaryForGitBlobHash(hashTextBlob1),
].sort(),
},
{
_id: projectId2,
blobs: [binaryForGitBlobHash(hashTextBlob2)].sort(),
},
{
_id: projectIdDeleted0,
blobs: [
@ -748,8 +788,10 @@ describe('back_fill_file_hash script', function () {
const rerun = await runScript({}, false)
expect(rerun.stats).deep.equal({
...STATS_ALL_ZERO,
// We still need to iterate over all the projects.
// We still need to iterate over all the projects and blobs.
projects: 4,
blobs: 10,
backedUpBlobs: 10,
})
})
it('should have backed up all the files', async function () {
@ -762,7 +804,7 @@ describe('back_fill_file_hash script', function () {
)
.sort()
)
for (let { historyId, fileId, hash } of writtenBlobs) {
for (let { historyId, fileId, hash, content } of writtenBlobs) {
hash = hash || gitBlobHash(fileId.toString())
const s = await backupPersistor.getObjectStream(
projectBlobsBucket,
@ -772,7 +814,9 @@ describe('back_fill_file_hash script', function () {
const buf = new WritableBuffer()
await Stream.promises.pipeline(s, buf)
expect(gitBlobHashBuffer(buf.getContents())).to.equal(hash)
if (fileId !== fileId7) {
if (content) {
expect(buf.getContents()).to.deep.equal(content)
} else {
const id = buf.getContents().toString('utf-8')
expect(id).to.equal(fileId.toString())
// double check we are not comparing 'undefined' or '[object Object]' above
@ -791,16 +835,18 @@ describe('back_fill_file_hash script', function () {
)
})
it('should have written the back filled files to history v1', async function () {
for (const { historyId, fileId } of writtenBlobs) {
for (const { historyId, hash, fileId, content } of writtenBlobs) {
const blobStore = new BlobStore(historyId.toString())
if (fileId === fileId7) {
const s = await blobStore.getStream(hashFile7)
if (content) {
const s = await blobStore.getStream(hash)
const buf = new WritableBuffer()
await Stream.promises.pipeline(s, buf)
expect(buf.getContents()).to.deep.equal(contentFile7)
expect(buf.getContents()).to.deep.equal(content)
continue
}
const id = await blobStore.getString(gitBlobHash(fileId.toString()))
const id = await blobStore.getString(
hash || gitBlobHash(fileId.toString())
)
expect(id).to.equal(fileId.toString())
// double check we are not comparing 'undefined' or '[object Object]' above
expect(id).to.match(/^[a-f0-9]{24}$/)
@ -826,6 +872,8 @@ describe('back_fill_file_hash script', function () {
const STATS_ALL_ZERO = {
projects: 0,
blobs: 0,
backedUpBlobs: 0,
filesWithoutHash: 0,
filesDuplicated: 0,
filesRetries: 0,
@ -845,9 +893,13 @@ describe('back_fill_file_hash script', function () {
readFromGCSIngress: 0,
writeToAWSCount: 0,
writeToAWSEgress: 0,
writeToGCSCount: 0,
writeToGCSEgress: 0,
}
const STATS_UP_TO_PROJECT1 = {
projects: 2,
blobs: 2,
backedUpBlobs: 0,
filesWithoutHash: 7,
filesDuplicated: 1,
filesRetries: 0,
@ -863,13 +915,17 @@ describe('back_fill_file_hash script', function () {
deduplicatedWriteToAWSLocalEgress: 0,
deduplicatedWriteToAWSRemoteCount: 0,
deduplicatedWriteToAWSRemoteEgress: 0,
readFromGCSCount: 6,
readFromGCSIngress: 4000120,
writeToAWSCount: 5,
writeToAWSEgress: 4032,
readFromGCSCount: 8,
readFromGCSIngress: 4000134,
writeToAWSCount: 7,
writeToAWSEgress: 4086,
writeToGCSCount: 5,
writeToGCSEgress: 4000096,
}
const STATS_UP_FROM_PROJECT1_ONWARD = {
projects: 2,
blobs: 1,
backedUpBlobs: 0,
filesWithoutHash: 3,
filesDuplicated: 0,
filesRetries: 0,
@ -880,15 +936,17 @@ describe('back_fill_file_hash script', function () {
projectDeleted: 0,
projectHardDeleted: 0,
fileHardDeleted: 0,
mongoUpdates: 4,
mongoUpdates: 5,
deduplicatedWriteToAWSLocalCount: 1,
deduplicatedWriteToAWSLocalEgress: 30,
deduplicatedWriteToAWSRemoteCount: 0,
deduplicatedWriteToAWSRemoteEgress: 0,
readFromGCSCount: 3,
readFromGCSIngress: 72,
writeToAWSCount: 2,
writeToAWSEgress: 58,
readFromGCSCount: 4,
readFromGCSIngress: 79,
writeToAWSCount: 3,
writeToAWSEgress: 85,
writeToGCSCount: 2,
writeToGCSEgress: 48,
}
function sumStats(a, b) {
@ -920,6 +978,8 @@ describe('back_fill_file_hash script', function () {
readFromGCSIngress: -24,
writeToAWSCount: -1,
writeToAWSEgress: -28,
writeToGCSCount: -1,
writeToGCSEgress: -24,
})
)
// should not retry 404