Merge pull request #24204 from overleaf/ar-jpa-add-chunk-verification

[history-v1] add chunk verification

GitOrigin-RevId: 7208ad20872386813bb1c6946283afddb5e8b1cf
This commit is contained in:
Andrew Rumble 2025-03-10 15:37:39 +00:00 committed by Copybot
parent 7ec4cbd841
commit 441c7a89a7
6 changed files with 454 additions and 23 deletions

View file

@ -23,6 +23,7 @@
}
}
},
"backupRPOInMS": "360000",
"chunkStore": {
"historyStoreConcurrency": "4"
},

View file

@ -1,16 +1,35 @@
// @ts-check
import config from 'config'
import OError from '@overleaf/o-error'
import { backupPersistor, projectBlobsBucket } from './backupPersistor.mjs'
import { Blob } from 'overleaf-editor-core'
import { BlobStore, makeProjectKey } from './blob_store/index.js'
import chunkStore from '../lib/chunk_store/index.js'
import {
backupPersistor,
chunksBucket,
projectBlobsBucket,
} from './backupPersistor.mjs'
import { Blob, Chunk, History } from 'overleaf-editor-core'
import { BlobStore, GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js'
import blobHash from './blob_hash.js'
import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js'
import logger from '@overleaf/logger'
import { text } from 'node:stream/consumers'
import { createGunzip } from 'node:zlib'
import path from 'node:path'
import projectKey from './project_key.js'
const RPO = parseInt(config.get('backupRPOInMS'), 10)
/**
* @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor
* @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor
*/
/**
* @return {Date}
*/
export function getEndDateForRPO() {
return new Date(Date.now() - RPO)
}
/**
* @param {string} historyId
* @param {string} hash
@ -20,13 +39,13 @@ export async function verifyBlob(historyId, hash) {
}
/**
*
* @param {string} historyId
* @param {Array<string>} hashes
* @return {Promise<CachedPerProjectEncryptedS3Persistor>}
*/
export async function verifyBlobs(historyId, hashes) {
let projectCache
async function getProjectPersistor(historyId) {
try {
projectCache = await backupPersistor.forProjectRO(
return await backupPersistor.forProjectRO(
projectBlobsBucket,
makeProjectKey(historyId, '')
)
@ -36,16 +55,19 @@ export async function verifyBlobs(historyId, hashes) {
}
throw err
}
await verifyBlobsWithCache(historyId, projectCache, hashes)
}
/**
* @param {string} historyId
* @param {CachedPerProjectEncryptedS3Persistor} projectCache
* @param {Array<string>} hashes
* @param {CachedPerProjectEncryptedS3Persistor} [projectCache]
*/
export async function verifyBlobsWithCache(historyId, projectCache, hashes) {
export async function verifyBlobs(historyId, hashes, projectCache) {
if (hashes.length === 0) throw new Error('bug: empty hashes')
if (!projectCache) {
projectCache = await getProjectPersistor(historyId)
}
const blobStore = new BlobStore(historyId)
for (const hash of hashes) {
const path = makeProjectKey(historyId, hash)
@ -58,7 +80,7 @@ export async function verifyBlobsWithCache(historyId, projectCache, hashes) {
})
} catch (err) {
if (err instanceof NotFoundError) {
throw new BackupCorruptedError('missing blob')
throw new BackupCorruptedError('missing blob', { path, hash })
}
throw err
}
@ -73,7 +95,114 @@ export async function verifyBlobsWithCache(historyId, projectCache, hashes) {
}
}
/**
* @param {string} historyId
* @param {Date} [endTimestamp]
*/
export async function verifyProjectWithErrorContext(
historyId,
endTimestamp = getEndDateForRPO()
) {
try {
await verifyProject(historyId, endTimestamp)
} catch (err) {
// @ts-ignore err is Error instance
throw OError.tag(err, 'verifyProject', { historyId, endTimestamp })
}
}
/**
*
* @param {string} historyId
* @param {number} startVersion
* @param {CachedPerProjectEncryptedS3Persistor} backupPersistorForProject
* @return {Promise<any>}
*/
async function loadChunk(historyId, startVersion, backupPersistorForProject) {
const key = path.join(
projectKey.format(historyId),
projectKey.pad(startVersion)
)
const backupChunkStream = await backupPersistorForProject.getObjectStream(
chunksBucket,
key
)
const raw = await text(backupChunkStream.pipe(createGunzip()))
return JSON.parse(raw)
}
/**
* @param {string} historyId
* @param {Date} endTimestamp
*/
export async function verifyProject(historyId, endTimestamp) {
const backend = chunkStore.getBackend(historyId)
const [first, last] = await Promise.all([
backend.getFirstChunkBeforeTimestamp(historyId, endTimestamp),
backend.getLastActiveChunkBeforeTimestamp(historyId, endTimestamp),
])
const chunksRecordsToVerify = [
{
chunkId: first.id,
chunkLabel: 'first',
},
]
if (first.startVersion !== last.startVersion) {
chunksRecordsToVerify.push({
chunkId: last.id,
chunkLabel: 'last before RPO',
})
}
const projectCache = await getProjectPersistor(historyId)
const chunks = await Promise.all(
chunksRecordsToVerify.map(async chunk => {
try {
return History.fromRaw(
await loadChunk(historyId, chunk.startVersion, projectCache)
)
} catch (err) {
if (err instanceof Chunk.NotPersistedError) {
throw new BackupRPOViolationError('backup RPO violation', chunk)
}
throw err
}
})
)
const seenBlobs = new Set()
const blobsToVerify = []
for (const chunk of chunks) {
/** @type {Set<string>} */
const chunkBlobs = new Set()
chunk.findBlobHashes(chunkBlobs)
let hasAddedBlobFromThisChunk = false
for (const blobHash of chunkBlobs) {
if (seenBlobs.has(blobHash)) continue // old blob
if (GLOBAL_BLOBS.has(blobHash)) continue // global blob
seenBlobs.add(blobHash)
if (!hasAddedBlobFromThisChunk) {
blobsToVerify.push(blobHash)
hasAddedBlobFromThisChunk = true
}
}
}
if (blobsToVerify.length === 0) {
logger.debug(
{
historyId,
chunksRecordsToVerify: chunksRecordsToVerify.map(c => c.chunkId),
},
'chunks contain no blobs to verify'
)
return
}
await verifyBlobs(historyId, blobsToVerify, projectCache)
}
export class BackupCorruptedError extends OError {}
export class BackupRPOViolationError extends OError {}
export async function healthCheck() {
/** @type {Array<string>} */

View file

@ -54,6 +54,35 @@ async function getChunkForVersion(projectId, version) {
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the given version before the endTime.
*/
async function getFirstChunkBeforeTimestamp(projectId, timestamp) {
assert.mongoId(projectId, 'bad projectId')
assert.date(timestamp, 'bad timestamp')
const recordActive = await getChunkForVersion(projectId, 0)
if (recordActive && recordActive.endTimestamp <= timestamp) {
return recordActive
}
// fallback to deleted chunk
const recordDeleted = await mongodb.chunks.findOne(
{
projectId: new ObjectId(projectId),
state: 'deleted',
startVersion: 0,
updatedAt: { $lte: timestamp }, // indexed for state=deleted
endTimestamp: { $lte: timestamp },
},
{ sort: { updatedAt: -1 } }
)
if (recordDeleted) {
return chunkFromRecord(recordDeleted)
}
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
/**
* Get the metadata for the chunk that contains the version that was current at
* the given timestamp.
@ -86,6 +115,39 @@ async function getChunkForTimestamp(projectId, timestamp) {
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the version that was current before
* the given timestamp.
*/
async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) {
assert.mongoId(projectId, 'bad projectId')
assert.date(timestamp, 'bad timestamp')
const record = await mongodb.chunks.findOne(
{
projectId: new ObjectId(projectId),
state: 'active',
$or: [
{
endTimestamp: {
$lte: timestamp,
},
},
{
endTimestamp: null,
},
],
},
// We use the index on the startVersion for sorting records. This assumes
// that timestamps go up with each version.
{ sort: { startVersion: -1 } }
)
if (record == null) {
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
return chunkFromRecord(record)
}
/**
* Get all of a project's chunk ids
*/
@ -310,6 +372,8 @@ function chunkFromRecord(record) {
module.exports = {
getLatestChunk,
getFirstChunkBeforeTimestamp,
getLastActiveChunkBeforeTimestamp,
getChunkForVersion,
getChunkForTimestamp,
getProjectChunkIds,

View file

@ -46,6 +46,59 @@ async function getChunkForVersion(projectId, version) {
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the given version.
*/
async function getFirstChunkBeforeTimestamp(projectId, timestamp) {
assert.date(timestamp, 'bad timestamp')
const recordActive = await getChunkForVersion(projectId, 0)
// projectId must be valid if getChunkForVersion did not throw
projectId = parseInt(projectId, 10)
if (recordActive && recordActive.endTimestamp <= timestamp) {
return recordActive
}
// fallback to deleted chunk
const recordDeleted = await knex('old_chunks')
.where('doc_id', projectId)
.where('start_version', '=', 0)
.where('end_timestamp', '<=', timestamp)
.orderBy('end_version', 'desc')
.first()
if (recordDeleted) {
return chunkFromRecord(recordDeleted)
}
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
/**
* Get the metadata for the chunk that contains the version that was current at
* the given timestamp.
*/
async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) {
assert.date(timestamp, 'bad timestamp')
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
const query = knex('chunks')
.where('doc_id', projectId)
.where(function () {
this.where('end_timestamp', '<=', timestamp).orWhere(
'end_timestamp',
null
)
})
.orderBy('end_version', 'desc', 'last')
const record = await query.first()
if (!record) {
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the version that was current at
* the given timestamp.
@ -280,6 +333,8 @@ async function generateProjectId() {
module.exports = {
getLatestChunk,
getFirstChunkBeforeTimestamp,
getLastActiveChunkBeforeTimestamp,
getChunkForVersion,
getChunkForTimestamp,
getProjectChunkIds,

View file

@ -0,0 +1,33 @@
import commandLineArgs from 'command-line-args'
import { verifyProjectWithErrorContext } from '../lib/backupVerifier.mjs'
import knex from '../lib/knex.js'
import { client } from '../lib/mongodb.js'
import { setTimeout } from 'node:timers/promises'
import { loadGlobalBlobs } from '../lib/blob_store/index.js'
const { historyId } = commandLineArgs([{ name: 'historyId', type: String }])
async function gracefulShutdown(code = process.exitCode) {
await knex.destroy()
await client.close()
await setTimeout(1_000)
process.exit(code)
}
if (!historyId) {
console.error('missing --historyId')
process.exitCode = 1
await gracefulShutdown()
}
await loadGlobalBlobs()
try {
await verifyProjectWithErrorContext(historyId)
console.log('OK')
} catch (error) {
console.error('error verifying', error)
process.exitCode = 1
} finally {
await gracefulShutdown()
}

View file

@ -6,23 +6,63 @@ import { expect } from 'chai'
import testProjects from './support/test_projects.js'
import {
backupPersistor,
chunksBucket,
projectBlobsBucket,
} from '../../../../storage/lib/backupPersistor.mjs'
import {
BlobStore,
makeProjectKey,
} from '../../../../storage/lib/blob_store/index.js'
import Stream from 'stream'
import Stream from 'node:stream'
import * as zlib from 'node:zlib'
import { promisify } from 'node:util'
import { execFile } from 'node:child_process'
import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js'
import { chunkStore } from '../../../../storage/index.js'
import { Change, File, Operation } from 'overleaf-editor-core'
import Crypto from 'node:crypto'
import path from 'node:path'
import projectKey from '../../../../storage/lib/project_key.js'
import { historyStore } from '../../../../storage/lib/history_store.js'
/**
* @typedef {import("node-fetch").Response} Response
* @typedef {import("overleaf-editor-core").Blob} Blob
*/
async function verifyProjectScript(historyId) {
try {
const result = await promisify(execFile)(
process.argv0,
['storage/scripts/verify_project.mjs', `--historyId=${historyId}`],
{
encoding: 'utf-8',
timeout: 5_000,
env: {
...process.env,
LOG_LEVEL: 'warn',
},
}
)
return { status: 0, stdout: result.stdout, stderr: result.stderr }
} catch (err) {
if (
err &&
typeof err === 'object' &&
'stdout' in err &&
'code' in err &&
'stderr' in err
) {
return {
stdout: typeof err.stdout === 'string' ? err.stdout : '',
status: typeof err.code === 'number' ? err.code : -1,
stderr: typeof err.stdout === 'string' ? err.stderr : '',
}
}
throw err
}
}
/**
* @param {string} historyId
* @param {string} hash
@ -69,22 +109,84 @@ async function verifyBlobHTTP(historyId, hash) {
)
}
async function backupChunk(historyId) {
const newChunk = await chunkStore.loadLatestRaw(historyId)
const { buffer: chunkBuffer } = await historyStore.loadRawWithBuffer(
historyId,
newChunk.id
)
const md5 = Crypto.createHash('md5').update(chunkBuffer)
await backupPersistor.sendStream(
chunksBucket,
path.join(
projectKey.format(historyId),
projectKey.pad(newChunk.startVersion)
),
Stream.Readable.from([chunkBuffer]),
{
contentType: 'application/json',
contentEncoding: 'gzip',
contentLength: chunkBuffer.byteLength,
sourceMd5: md5.digest('hex'),
}
)
}
const FIFTEEN_MINUTES_IN_MS = 900_000
async function addFileInNewChunk(
fileContents,
filePath,
historyId,
{ creationDate = new Date() }
) {
const chunk = await chunkStore.loadLatest(historyId)
const operation = Operation.addFile(
`${historyId}.txt`,
File.fromString(fileContents)
)
const changes = [new Change([operation], creationDate, [])]
chunk.pushChanges(changes)
await chunkStore.update(historyId, 0, chunk)
}
/**
* @param {string} historyId
* @param {Object} [backup]
* @return {Promise<string>}
*/
async function prepareProjectAndBlob(historyId) {
async function prepareProjectAndBlob(
historyId,
{ shouldBackupBlob, shouldBackupChunk, shouldCreateChunk } = {
shouldBackupBlob: true,
shouldBackupChunk: true,
shouldCreateChunk: true,
}
) {
await testProjects.createEmptyProject(historyId)
const blobStore = new BlobStore(historyId)
const blob = await blobStore.putString(historyId)
const gzipped = zlib.gzipSync(Buffer.from(historyId))
await backupPersistor.sendStream(
projectBlobsBucket,
makeProjectKey(historyId, blob.getHash()),
Stream.Readable.from([gzipped]),
{ contentLength: gzipped.byteLength, contentEncoding: 'gzip' }
)
await checkDEKExists(historyId)
const fileContents = historyId
const blob = await blobStore.putString(fileContents)
if (shouldCreateChunk) {
await addFileInNewChunk(fileContents, `${historyId}.txt`, historyId, {
creationDate: new Date(new Date().getTime() - FIFTEEN_MINUTES_IN_MS),
})
}
if (shouldBackupBlob) {
const gzipped = zlib.gzipSync(Buffer.from(historyId))
await backupPersistor.sendStream(
projectBlobsBucket,
makeProjectKey(historyId, blob.getHash()),
Stream.Readable.from([gzipped]),
{ contentLength: gzipped.byteLength, contentEncoding: 'gzip' }
)
await checkDEKExists(historyId)
}
if (shouldCreateChunk && shouldBackupChunk) {
await backupChunk(historyId)
}
return blob.getHash()
}
@ -123,6 +225,53 @@ describe('backupVerifier', function () {
const response = await fetch(testServer.testUrl('/health_check'))
expect(response.status).to.equal(200)
})
describe('storage/scripts/verify_project.mjs', function () {
describe('when the project is appropriately backed up', function () {
it('should return 0', async function () {
const response = await verifyProjectScript(historyIdPostgres)
expect(response.status).to.equal(0)
})
})
describe('when the project chunk is not backed up', function () {
let response
beforeEach(async function () {
await prepareProjectAndBlob('000000000000000000000043', {
shouldBackupChunk: false,
shouldBackupBlob: true,
shouldCreateChunk: true,
})
response = await verifyProjectScript('000000000000000000000043')
})
it('should return 1', async function () {
expect(response.status).to.equal(1)
})
it('should emit an error message referring to a missing chunk', async function () {
const stderr = response.stderr
expect(stderr).to.include('NotFoundError: no such file')
expect(stderr).to.include("bucketName: 'overleaf-test-history-chunks'")
expect(stderr).to.include("key: '340/000/000000000000000000/000000000'")
})
})
describe('when a project blob is not backed up', function () {
let response
beforeEach(async function () {
await prepareProjectAndBlob('43', {
shouldBackupChunk: true,
shouldBackupBlob: false,
shouldCreateChunk: true,
})
response = await verifyProjectScript('43')
})
it('should return 1', function () {
expect(response.status).to.equal(1)
})
it('includes a BackupCorruptedError in stderr', function () {
expect(response.stderr).to.include('BackupCorruptedError: missing blob')
})
})
})
describe('storage/scripts/verify_backup_blob.mjs', function () {
it('throws and does not create DEK if missing', async function () {
const historyId = '404'