Merge pull request #21972 from overleaf/jpa-get-project-blobs-batch

[history-v1] implement getProjectBlobsBatch

GitOrigin-RevId: f03dcc690ef63f72400ccf001c6e497bd4fbe790
This commit is contained in:
Jakob Ackermann 2024-11-20 09:57:30 +01:00 committed by Copybot
parent 4964d6414b
commit 0253130c36
6 changed files with 293 additions and 73 deletions

View file

@ -125,6 +125,34 @@ async function loadGlobalBlobs() {
}
}
/**
* Return metadata for all blobs in the given project
* @param {Array<string|number>} projectIds
* @return {Promise<{nBlobs:number, blobs:Map<string,Array<core.Blob>>}>}
*/
async function getProjectBlobsBatch(projectIds) {
const mongoProjects = []
const postgresProjects = []
for (const projectId of projectIds) {
if (typeof projectId === 'number') {
postgresProjects.push(projectId)
} else {
mongoProjects.push(projectId)
}
}
const [
{ nBlobs: nBlobsPostgres, blobs: blobsPostgres },
{ nBlobs: nBlobsMongo, blobs: blobsMongo },
] = await Promise.all([
postgresBackend.getProjectBlobsBatch(postgresProjects),
mongoBackend.getProjectBlobsBatch(mongoProjects),
])
for (const [id, blobs] of blobsPostgres.entries()) {
blobsMongo.set(id.toString(), blobs)
}
return { nBlobs: nBlobsPostgres + nBlobsMongo, blobs: blobsMongo }
}
/**
* @classdesc
* Fetch and store the content of files using content-addressable hashing. The
@ -366,6 +394,7 @@ class BlobStore {
module.exports = {
BlobStore,
getProjectBlobsBatch,
loadGlobalBlobs,
makeProjectKey,
makeBlobForFile,

View file

@ -16,13 +16,17 @@
*/
const { Blob } = require('overleaf-editor-core')
const { ObjectId, Binary, MongoError } = require('mongodb')
const { ObjectId, Binary, MongoError, ReadPreference } = require('mongodb')
const assert = require('../assert')
const mongodb = require('../mongodb')
const MAX_BLOBS_IN_BUCKET = 8
const DUPLICATE_KEY_ERROR_CODE = 11000
/**
* @typedef {import('mongodb').ReadPreferenceLike} ReadPreferenceLike
*/
/**
* Set up the data structures for a given project.
* @param {string} projectId
@ -246,6 +250,60 @@ async function getProjectBlobs(projectId) {
return blobs
}
/**
* Return metadata for all blobs in the given project
* @param {Array<string>} projectIds
* @return {Promise<{ nBlobs: number, blobs: Map<string, Array<Blob>> }>}
*/
async function getProjectBlobsBatch(projectIds) {
for (const project of projectIds) {
assert.mongoId(project, 'bad projectId')
}
let nBlobs = 0
const blobs = new Map()
if (projectIds.length === 0) return { nBlobs, blobs }
// blobs
{
const cursor = await mongodb.blobs.find(
{ _id: { $in: projectIds.map(projectId => new ObjectId(projectId)) } },
{ readPreference: ReadPreference.secondaryPreferred }
)
for await (const record of cursor) {
const projectBlobs = Object.values(record.blobs).flat().map(recordToBlob)
blobs.set(record._id.toString(), projectBlobs)
nBlobs += projectBlobs.length
}
}
// sharded blobs
{
// @ts-ignore We are using a custom _id here.
const cursor = await mongodb.shardedBlobs.find(
{
_id: {
$gte: makeShardedId(projectIds[0], '0'),
$lte: makeShardedId(projectIds[projectIds.length - 1], 'f'),
},
},
{ readPreference: ReadPreference.secondaryPreferred }
)
for await (const record of cursor) {
const recordIdHex = record._id.toString('hex')
const recordProjectId = recordIdHex.slice(0, 24)
const projectBlobs = Object.values(record.blobs).flat().map(recordToBlob)
const found = blobs.get(recordProjectId)
if (found) {
found.push(...projectBlobs)
} else {
blobs.set(recordProjectId, projectBlobs)
}
nBlobs += projectBlobs.length
}
}
return { nBlobs, blobs }
}
/**
* Add a blob's metadata to the blobs collection after it has been uploaded.
* @param {string} projectId
@ -373,6 +431,7 @@ module.exports = {
findBlob,
findBlobs,
getProjectBlobs,
getProjectBlobsBatch,
insertBlob,
deleteBlobs,
}

View file

@ -70,6 +70,35 @@ async function getProjectBlobs(projectId) {
return blobs
}
/**
* Return metadata for all blobs in the given project
* @param {Array<number>} projectIds
* @return {Promise<{ nBlobs: number, blobs: Map<number, Array<Blob>> }>}
*/
async function getProjectBlobsBatch(projectIds) {
for (const projectId of projectIds) {
assert.integer(projectId, 'bad projectId')
}
let nBlobs = 0
const blobs = new Map()
if (projectIds.length === 0) return { nBlobs, blobs }
const records = await knex('project_blobs')
.select('project_id', 'hash_bytes', 'byte_length', 'string_length')
.whereIn('project_id', projectIds)
for (const record of records) {
const found = blobs.get(record.project_id)
if (found) {
found.push(recordToBlob(record))
} else {
blobs.set(record.project_id, [recordToBlob(record)])
}
nBlobs++
}
return { nBlobs, blobs }
}
/**
* Add a blob's metadata to the blobs table after it has been uploaded.
*/
@ -126,6 +155,7 @@ module.exports = {
findBlob,
findBlobs,
getProjectBlobs,
getProjectBlobsBatch,
insertBlob,
deleteBlobs,
}

View file

@ -25,11 +25,12 @@ import {
BlobStore,
GLOBAL_BLOBS,
loadGlobalBlobs,
getProjectBlobsBatch,
getStringLengthOfFile,
makeBlobForFile,
makeProjectKey,
} from '../lib/blob_store/index.js'
import { backedUpBlobs, db } from '../lib/mongodb.js'
import { backedUpBlobs as backedUpBlobsCollection, db } from '../lib/mongodb.js'
import filestorePersistor from '../lib/persistor.js'
// Silence warning.
@ -68,10 +69,7 @@ ObjectId.cacheHexString = true
* @typedef {Object} Project
* @property {ObjectId} _id
* @property {Array<Folder>} rootFolder
* @property {Array<string>} deletedFileIds
* @property {Array<Blob>} blobs
* @property {{history: {id: string}}} overleaf
* @property {Array<string>} [backedUpBlobs]
* @property {{history: {id: (number|string)}}} overleaf
*/
/**
@ -499,22 +497,16 @@ async function processFiles(files) {
* @return {Promise<void>}
*/
async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') {
let nBackedUpBlobs = 0
if (process.argv.includes('collectBackedUpBlobs')) {
nBackedUpBlobs = await collectBackedUpBlobs(batch)
}
if (process.argv.includes('deletedFiles')) {
await collectDeletedFiles(batch)
}
let blobs = 0
if (COLLECT_BLOBS) {
blobs = await collectBlobs(batch)
}
const files = Array.from(findFileInBatch(batch, prefix))
const deletedFiles = await collectDeletedFiles(batch)
const { nBlobs, blobs } = await collectProjectBlobs(batch)
const { nBackedUpBlobs, backedUpBlobs } = await collectBackedUpBlobs(batch)
const files = Array.from(
findFileInBatch(batch, prefix, deletedFiles, blobs, backedUpBlobs)
)
STATS.projects += batch.length
STATS.blobs += blobs
STATS.blobs += nBlobs
STATS.backedUpBlobs += nBackedUpBlobs
STATS.filesWithoutHash += files.length - (blobs - nBackedUpBlobs)
STATS.filesWithoutHash += files.length - (nBlobs - nBackedUpBlobs)
batch.length = 0 // GC
// The files are currently ordered by project-id.
// Order them by file-id ASC then blobs ASC to
@ -709,17 +701,36 @@ function* findFiles(ctx, folder, path) {
/**
* @param {Array<Project>} projects
* @param {string} prefix
* @param {Map<string,Array<string>>} deletedFiles
* @param {Map<string,Array<Blob>>} blobs
* @param {Map<string,Array<string>>} backedUpBlobs
* @return Generator<QueueEntry>
*/
function* findFileInBatch(projects, prefix) {
function* findFileInBatch(
projects,
prefix,
deletedFiles,
blobs,
backedUpBlobs
) {
for (const project of projects) {
const ctx = new ProjectContext(project)
const projectIdS = project._id.toString()
const historyIdS = project.overleaf.history.id.toString()
const projectBlobs = blobs.get(historyIdS) || []
const projectBackedUpBlobs = new Set(backedUpBlobs.get(projectIdS) || [])
const projectDeletedFiles = deletedFiles.get(projectIdS) || []
const ctx = new ProjectContext(
project._id,
historyIdS,
projectBlobs,
projectBackedUpBlobs
)
yield* findFiles(ctx, project.rootFolder[0], prefix)
for (const fileId of project.deletedFileIds || []) {
for (const fileId of projectDeletedFiles) {
yield { ctx, cacheKey: fileId, fileId, path: '' }
}
for (const blob of project.blobs || []) {
if (ctx.hasBackedUpBlob(blob.getHash())) continue
for (const blob of projectBlobs) {
if (projectBackedUpBlobs.has(blob.getHash())) continue
yield {
ctx,
cacheKey: blob.getHash(),
@ -732,25 +743,22 @@ function* findFileInBatch(projects, prefix) {
}
/**
* @param {Array<Project>} projects
* @return {Promise<number>}
* @param {Array<Project>} batch
* @return {Promise<{nBlobs: number, blobs: Map<string, Array<Blob>>}>}
*/
async function collectBlobs(projects) {
let blobs = 0
for (const project of projects) {
const historyId = project.overleaf.history.id.toString()
const blobStore = new BlobStore(historyId)
project.blobs = await blobStore.getProjectBlobs()
blobs += project.blobs.length
}
return blobs
async function collectProjectBlobs(batch) {
if (!COLLECT_BLOBS) return { nBlobs: 0, blobs: new Map() }
return await getProjectBlobsBatch(batch.map(p => p.overleaf.history.id))
}
/**
* @param {Array<Project>} projects
* @return {Promise<void>}
* @return {Promise<Map<string, Array<string>>>}
*/
async function collectDeletedFiles(projects) {
const deletedFiles = new Map()
if (!process.argv.includes('deletedFiles')) return deletedFiles
const cursor = deletedFilesCollection.find(
{
projectId: { $in: projects.map(p => p._id) },
@ -762,52 +770,42 @@ async function collectDeletedFiles(projects) {
sort: { projectId: 1 },
}
)
const processed = projects.slice()
for await (const deletedFileRef of cursor) {
const idx = processed.findIndex(
p => p._id.toString() === deletedFileRef.projectId.toString()
)
if (idx === -1) {
throw new Error(
`bug: order of deletedFiles mongo records does not match batch of projects (${deletedFileRef.projectId} out of order)`
)
const projectId = deletedFileRef.projectId.toString()
const fileId = deletedFileRef._id.toString()
const found = deletedFiles.get(projectId)
if (found) {
found.push(fileId)
} else {
deletedFiles.set(projectId, [fileId])
}
processed.splice(0, idx)
const project = processed[0]
project.deletedFileIds = project.deletedFileIds || []
project.deletedFileIds.push(deletedFileRef._id.toString())
}
return deletedFiles
}
/**
* @param {Array<Project>} projects
* @return {Promise<number>}
* @return {Promise<{nBackedUpBlobs:number,backedUpBlobs:Map<string,Array<string>>}>}
*/
async function collectBackedUpBlobs(projects) {
const cursor = backedUpBlobs.find(
let nBackedUpBlobs = 0
const backedUpBlobs = new Map()
if (!process.argv.includes('collectBackedUpBlobs')) {
return { nBackedUpBlobs, backedUpBlobs }
}
const cursor = backedUpBlobsCollection.find(
{ _id: { $in: projects.map(p => p._id) } },
{
readPreference: READ_PREFERENCE_SECONDARY,
sort: { _id: 1 },
}
)
let nBackedUpBlobs = 0
const processed = projects.slice()
for await (const record of cursor) {
const idx = processed.findIndex(
p => p._id.toString() === record._id.toString()
)
if (idx === -1) {
throw new Error(
`bug: order of backedUpBlobs mongo records does not match batch of projects (${record._id} out of order)`
)
}
processed.splice(0, idx)
const project = processed[0]
project.backedUpBlobs = record.blobs.map(b => b.toString('hex'))
nBackedUpBlobs += record.blobs.length
const blobs = record.blobs.map(b => b.toString('hex'))
backedUpBlobs.set(record._id.toString(), blobs)
nBackedUpBlobs += blobs.length
}
return nBackedUpBlobs
return { nBackedUpBlobs, backedUpBlobs }
}
const BATCH_HASH_WRITES = 1_000
@ -824,13 +822,16 @@ class ProjectContext {
#historyBlobs
/**
* @param {Project} project
* @param {ObjectId} projectId
* @param {string} historyId
* @param {Array<Blob>} blobs
* @param {Set<string>} backedUpBlobs
*/
constructor(project) {
this.projectId = project._id
this.historyId = project.overleaf.history.id.toString()
this.#backedUpBlobs = new Set(project.backedUpBlobs || [])
this.#historyBlobs = new Set((project.blobs || []).map(b => b.getHash()))
constructor(projectId, historyId, blobs, backedUpBlobs) {
this.projectId = projectId
this.historyId = historyId
this.#backedUpBlobs = backedUpBlobs
this.#historyBlobs = new Set(blobs.map(b => b.getHash()))
}
hasHistoryBlob(hash) {
@ -901,7 +902,7 @@ class ProjectContext {
)
this.#completedBlobs.clear()
STATS.mongoUpdates++
await backedUpBlobs.updateOne(
await backedUpBlobsCollection.updateOne(
{ _id: this.projectId },
{ $addToSet: { blobs: { $each: blobs } } },
{ upsert: true }

View file

@ -22,6 +22,7 @@ const {
} = require('../../../../storage')
const mongoBackend = require('../../../../storage/lib/blob_store/mongo')
const postgresBackend = require('../../../../storage/lib/blob_store/postgres')
const { getProjectBlobsBatch } = require('../../../../storage/lib/blob_store')
const mkTmpDir = promisify(temp.mkdir)
@ -327,6 +328,31 @@ describe('BlobStore', function () {
'expected Blob.NotFoundError when calling blobStore.getStream()'
)
})
if (scenario.backend !== mongoBackend) {
// mongo backend has its own test for this, covering sharding
it('getProjectBlobsBatch() returns blobs per project', async function () {
const projects = [
parseInt(scenario.projectId, 10),
parseInt(scenario.projectId2, 10),
]
const { nBlobs, blobs } =
await postgresBackend.getProjectBlobsBatch(projects)
expect(nBlobs).to.equal(2)
expect(Object.fromEntries(blobs.entries())).to.deep.equal({
[parseInt(scenario.projectId, 10)]: [
new Blob(helloWorldHash, 11, 11),
],
[parseInt(scenario.projectId2, 10)]: [
new Blob(
testFiles.GRAPH_PNG_HASH,
testFiles.GRAPH_PNG_BYTE_LENGTH,
null
),
],
})
})
}
})
describe('a global blob', function () {
@ -454,4 +480,42 @@ describe('BlobStore', function () {
})
})
}
it('getProjectBlobsBatch() with mixed projects', async function () {
for (const scenario of scenarios) {
const blobStore = new BlobStore(scenario.projectId)
const blobStore2 = new BlobStore(scenario.projectId2)
await blobStore.initialize()
await blobStore.putString(helloWorldString)
await blobStore2.initialize()
await blobStore2.putFile(testFiles.path('graph.png'))
}
const projects = [
parseInt(scenarios[0].projectId, 10),
scenarios[1].projectId,
parseInt(scenarios[0].projectId2, 10),
scenarios[1].projectId2,
]
const { nBlobs, blobs } = await getProjectBlobsBatch(projects)
expect(nBlobs).to.equal(4)
expect(Object.fromEntries(blobs.entries())).to.deep.equal({
[scenarios[0].projectId]: [new Blob(helloWorldHash, 11, 11)],
[scenarios[1].projectId]: [new Blob(helloWorldHash, 11, 11)],
[scenarios[0].projectId2]: [
new Blob(
testFiles.GRAPH_PNG_HASH,
testFiles.GRAPH_PNG_BYTE_LENGTH,
null
),
],
[scenarios[1].projectId2]: [
new Blob(
testFiles.GRAPH_PNG_HASH,
testFiles.GRAPH_PNG_BYTE_LENGTH,
null
),
],
})
})
})

View file

@ -22,6 +22,7 @@ describe('BlobStore Mongo backend', function () {
'abcdaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
],
1234: ['1234000000000000000000000000000000000000'],
1337: ['1337000000000000000000000000000000000000'],
}
beforeEach('clean up', cleanup.everything)
@ -77,6 +78,42 @@ describe('BlobStore Mongo backend', function () {
})
})
describe('getProjectBlobsBatch', function () {
it('finds all the blobs', async function () {
const projectId0 = new ObjectId().toString()
const hashesProject0 = hashes[1234].concat(hashes.abcd)
const projectId1 = new ObjectId().toString()
const hashesProject1 = hashes[1337].concat(hashes.abcd)
const projectId2 = new ObjectId().toString()
const hashesProject2 = [] // no hashes
const projectId3 = new ObjectId().toString()
const hashesProject3 = hashes[1337]
const projectBlobs = {
[projectId0]: hashesProject0,
[projectId1]: hashesProject1,
[projectId2]: hashesProject2,
[projectId3]: hashesProject3,
}
for (const [projectId, hashes] of Object.entries(projectBlobs)) {
for (const hash of hashes) {
const blob = new Blob(hash, 123, 99)
await mongoBackend.insertBlob(projectId, blob)
}
}
const projects = [projectId0, projectId1, projectId2, projectId3]
const { nBlobs, blobs } =
await mongoBackend.getProjectBlobsBatch(projects)
expect(nBlobs).to.equal(
hashesProject0.length + hashesProject1.length + hashesProject3.length
)
expect(Object.fromEntries(blobs.entries())).to.deep.equal({
[projectId0]: hashesProject0.map(hash => new Blob(hash, 123, 99)),
[projectId1]: hashesProject1.map(hash => new Blob(hash, 123, 99)),
[projectId3]: hashesProject3.map(hash => new Blob(hash, 123, 99)),
})
})
})
describe('with existing blobs', function () {
beforeEach(async function () {
for (const hash of hashes.abcd.concat(hashes[1234])) {