Merge pull request #21947 from overleaf/bg-jpa-back-fill-script-tweaks

[history-v1] back_fill_file_hash: performance tweaks

GitOrigin-RevId: c3d0c7906707fc902addcde64eaf41c24ceeece7
This commit is contained in:
Jakob Ackermann 2024-11-19 10:46:48 +01:00 committed by Copybot
parent 87e7e3017a
commit 8e74d3c58c
4 changed files with 195 additions and 28 deletions

View file

@ -61,6 +61,9 @@ class SSECOptions {
}
class S3Persistor extends AbstractPersistor {
/** @type {Map<string, S3>} */
#clients = new Map()
constructor(settings = {}) {
super()
@ -131,19 +134,19 @@ class S3Persistor extends AbstractPersistor {
// if we have an md5 hash, pass this to S3 to verify the upload - otherwise
// we rely on the S3 client's checksum calculation to validate the upload
const clientOptions = {}
let computeChecksums = false
if (opts.sourceMd5) {
uploadOptions.ContentMD5 = PersistorHelper.hexToBase64(opts.sourceMd5)
} else {
clientOptions.computeChecksums = true
computeChecksums = true
}
if (this.settings.disableMultiPartUpload) {
await this._getClientForBucket(bucketName, clientOptions)
await this._getClientForBucket(bucketName, computeChecksums)
.putObject(uploadOptions)
.promise()
} else {
await this._getClientForBucket(bucketName, clientOptions)
await this._getClientForBucket(bucketName, computeChecksums)
.upload(uploadOptions, { partSize: this.settings.partSize })
.promise()
}
@ -517,23 +520,34 @@ class S3Persistor extends AbstractPersistor {
/**
* @param {string} bucket
* @param {Object} [clientOptions]
* @param {boolean} computeChecksums
* @return {S3}
* @private
*/
_getClientForBucket(bucket, clientOptions) {
return new S3(
this._buildClientOptions(
this.settings.bucketCreds?.[bucket],
clientOptions
_getClientForBucket(bucket, computeChecksums = false) {
/** @type {S3.Types.ClientConfiguration} */
const clientOptions = {}
const cacheKey = `${bucket}:${computeChecksums}`
if (computeChecksums) {
clientOptions.computeChecksums = true
}
let client = this.#clients.get(cacheKey)
if (!client) {
client = new S3(
this._buildClientOptions(
this.settings.bucketCreds?.[bucket],
clientOptions
)
)
)
this.#clients.set(cacheKey, client)
}
return client
}
/**
* @param {Object} bucketCredentials
* @param {Object} clientOptions
* @return {Object}
* @param {S3.Types.ClientConfiguration} clientOptions
* @return {S3.Types.ClientConfiguration}
* @private
*/
_buildClientOptions(bucketCredentials, clientOptions) {

View file

@ -147,7 +147,7 @@ describe('S3PersistorTests', function () {
deleteObjects: sinon.stub().returns(EmptyPromise),
getSignedUrlPromise: sinon.stub().resolves(redirectUrl),
}
S3 = sinon.stub().returns(S3Client)
S3 = sinon.stub().callsFake(() => Object.assign({}, S3Client))
Hash = {
end: sinon.stub(),
@ -1027,4 +1027,22 @@ describe('S3PersistorTests', function () {
})
})
})
describe('_getClientForBucket', function () {
it('should return same instance for same bucket', function () {
const a = S3Persistor._getClientForBucket('foo')
const b = S3Persistor._getClientForBucket('foo')
expect(a).to.equal(b)
})
it('should return different instance for different bucket', function () {
const a = S3Persistor._getClientForBucket('foo')
const b = S3Persistor._getClientForBucket('bar')
expect(a).to.not.equal(b)
})
it('should return different instance for same bucket different computeChecksums', function () {
const a = S3Persistor._getClientForBucket('foo', false)
const b = S3Persistor._getClientForBucket('foo', true)
expect(a).to.not.equal(b)
})
})
})

View file

@ -3,6 +3,7 @@ import Crypto from 'node:crypto'
import Events from 'node:events'
import fs from 'node:fs'
import Path from 'node:path'
import { performance } from 'node:perf_hooks'
import Stream from 'node:stream'
import zLib from 'node:zlib'
import { setTimeout } from 'node:timers/promises'
@ -37,6 +38,7 @@ ObjectId.cacheHexString = true
/**
* @typedef {import("overleaf-editor-core").Blob} Blob
* @typedef {import("perf_hooks").EventLoopUtilization} EventLoopUtilization
* @typedef {import("mongodb").Collection} Collection
* @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor
*/
@ -105,6 +107,12 @@ const RETRY_FILESTORE_404 = process.env.RETRY_FILESTORE_404 === 'true'
const BUFFER_DIR = fs.mkdtempSync(
process.env.BUFFER_DIR_PREFIX || '/tmp/back_fill_file_hash-'
)
// https://nodejs.org/api/stream.html#streamgetdefaulthighwatermarkobjectmode
const STREAM_HIGH_WATER_MARK = parseInt(
process.env.STREAM_HIGH_WATER_MARK || (64 * 1024).toString(),
10
)
const LOGGING_INTERVAL = parseInt(process.env.LOGGING_INTERVAL || '60000', 10)
const projectsCollection = db.collection('projects')
const deletedProjectsCollection = db.collection('deletedProjects')
@ -127,20 +135,81 @@ const STATS = {
deduplicatedWriteToAWSLocalEgress: 0,
deduplicatedWriteToAWSRemoteCount: 0,
deduplicatedWriteToAWSRemoteEgress: 0,
readFromGCSCount: 0,
readFromGCSIngress: 0,
writeToAWSCount: 0,
writeToAWSEgress: 0,
}
const processStart = performance.now()
let lastLogTS = processStart
let lastLog = Object.assign({}, STATS)
let lastEventLoopStats = performance.eventLoopUtilization()
/**
* @param {number} v
* @param {number} ms
*/
function toMiBPerSecond(v, ms) {
const ONE_MiB = 1024 * 1024
return v / ONE_MiB / (ms / 1000)
}
/**
* @param {any} stats
* @param {number} ms
* @return {{writeToAWSThroughputMiBPerSecond: number, readFromGCSThroughputMiBPerSecond: number}}
*/
function bandwidthStats(stats, ms) {
return {
readFromGCSThroughputMiBPerSecond: toMiBPerSecond(
stats.readFromGCSIngress,
ms
),
writeToAWSThroughputMiBPerSecond: toMiBPerSecond(
stats.writeToAWSEgress,
ms
),
}
}
/**
* @param {EventLoopUtilization} nextEventLoopStats
* @param {number} now
* @return {Object}
*/
function computeDiff(nextEventLoopStats, now) {
const ms = now - lastLogTS
lastLogTS = now
const diff = {
eventLoop: performance.eventLoopUtilization(
nextEventLoopStats,
lastEventLoopStats
),
}
for (const [name, v] of Object.entries(STATS)) {
diff[name] = v - lastLog[name]
}
return Object.assign(diff, bandwidthStats(diff, ms))
}
function printStats() {
const now = performance.now()
const nextEventLoopStats = performance.eventLoopUtilization()
console.log(
JSON.stringify({
time: new Date(),
...STATS,
...bandwidthStats(STATS, now - processStart),
eventLoop: nextEventLoopStats,
diff: computeDiff(nextEventLoopStats, now),
})
)
lastEventLoopStats = nextEventLoopStats
lastLog = Object.assign({}, STATS)
}
setInterval(printStats, 60_000)
setInterval(printStats, LOGGING_INTERVAL)
/**
* @param {QueueEntry} entry
@ -187,12 +256,19 @@ async function processFileOnce(entry) {
BUFFER_DIR,
projectId.toString() + fileId.toString()
)
const dst = fs.createWriteStream(filePath)
const dst = fs.createWriteStream(filePath, {
highWaterMark: STREAM_HIGH_WATER_MARK,
})
STATS.readFromGCSCount++
const src = await filestorePersistor.getObjectStream(
USER_FILES_BUCKET_NAME,
`${projectId}/${fileId}`
)
await Stream.promises.pipeline(src, dst)
try {
await Stream.promises.pipeline(src, dst)
} finally {
STATS.readFromGCSIngress += dst.bytesWritten
}
const blobStore = new BlobStore(historyId)
const blob = await blobStore.putFile(filePath)
@ -221,7 +297,7 @@ async function processFileOnce(entry) {
contentEncoding = 'gzip'
size = 0
await Stream.promises.pipeline(
fs.createReadStream(filePath),
fs.createReadStream(filePath, { highWaterMark: STREAM_HIGH_WATER_MARK }),
zLib.createGzip(),
async function* (source) {
for await (const chunk of source) {
@ -230,12 +306,17 @@ async function processFileOnce(entry) {
yield chunk
}
},
fs.createWriteStream(filePathCompressed)
fs.createWriteStream(filePathCompressed, {
highWaterMark: STREAM_HIGH_WATER_MARK,
})
)
} else {
backupSource = filePath
size = blob.getByteLength()
await Stream.promises.pipeline(fs.createReadStream(filePath), md5)
await Stream.promises.pipeline(
fs.createReadStream(filePath, { highWaterMark: STREAM_HIGH_WATER_MARK }),
md5
)
}
const backendKeyPath = makeProjectKey(historyId, blob.getHash())
const persistor = await entry.ctx.getCachedPersistor(backendKeyPath)
@ -244,7 +325,9 @@ async function processFileOnce(entry) {
await persistor.sendStream(
projectBlobsBucket,
backendKeyPath,
fs.createReadStream(backupSource),
fs.createReadStream(backupSource, {
highWaterMark: STREAM_HIGH_WATER_MARK,
}),
{
contentEncoding,
contentType: 'application/octet-stream',

View file

@ -127,7 +127,8 @@ describe('back_fill_file_hash script', function () {
const deleteProjectsRecordId2 = new ObjectId()
const deleteProjectsRecordId3 = new ObjectId()
const deleteProjectsRecordId4 = new ObjectId()
const contentFile7 = Buffer.alloc(11_000_000)
const twoByteUTF8Symbol = 'ö'
const contentFile7 = Buffer.alloc(4_000_000, twoByteUTF8Symbol)
const hashFile7 = gitBlobHashBuffer(contentFile7)
const writtenBlobs = [
{ projectId: projectId0, historyId: historyId0, fileId: fileId0 },
@ -441,7 +442,12 @@ describe('back_fill_file_hash script', function () {
)
})
async function tryRunScript(env = {}) {
/**
* @param {Record<string, string>} env
* @param {boolean} shouldHaveWritten
* @return {Promise<{result, stats: any}>}
*/
async function tryRunScript(env = {}, shouldHaveWritten) {
let result
try {
result = await promisify(execFile)(
@ -471,14 +477,35 @@ describe('back_fill_file_hash script', function () {
}
result = { stdout, stderr, status: code }
}
const extraStatsKeys = [
'eventLoop',
'readFromGCSThroughputMiBPerSecond',
'writeToAWSThroughputMiBPerSecond',
]
const stats = JSON.parse(result.stdout.trimEnd().split('\n').pop())
expect(Object.keys(stats.diff).sort()).to.deep.equal(
[...extraStatsKeys, ...Object.keys(STATS_ALL)].sort()
)
delete stats.diff
expect(new Date(stats.time).toISOString()).to.equal(stats.time)
delete stats.time
if (shouldHaveWritten) {
expect(stats.readFromGCSThroughputMiBPerSecond).to.be.greaterThan(0)
expect(stats.writeToAWSThroughputMiBPerSecond).to.be.greaterThan(0)
}
for (const key of extraStatsKeys) {
delete stats[key]
}
return { stats, result }
}
async function runScript(env = {}) {
const { stats, result } = await tryRunScript(env)
/**
* @param {Record<string, string>} env
* @param {boolean} shouldHaveWritten
* @return {Promise<{result, stats: any}>}
*/
async function runScript(env = {}, shouldHaveWritten = true) {
const { stats, result } = await tryRunScript(env, shouldHaveWritten)
if (result.status !== 0) {
console.log(result)
expect(result).to.have.property('status', 0)
@ -718,7 +745,7 @@ describe('back_fill_file_hash script', function () {
])
})
it('should process nothing on re-run', async function () {
const rerun = await runScript()
const rerun = await runScript({}, false)
expect(rerun.stats).deep.equal({
...STATS_ALL_ZERO,
// We still need to iterate over all the projects.
@ -814,6 +841,8 @@ describe('back_fill_file_hash script', function () {
deduplicatedWriteToAWSLocalEgress: 0,
deduplicatedWriteToAWSRemoteCount: 0,
deduplicatedWriteToAWSRemoteEgress: 0,
readFromGCSCount: 0,
readFromGCSIngress: 0,
writeToAWSCount: 0,
writeToAWSEgress: 0,
}
@ -834,8 +863,10 @@ describe('back_fill_file_hash script', function () {
deduplicatedWriteToAWSLocalEgress: 0,
deduplicatedWriteToAWSRemoteCount: 0,
deduplicatedWriteToAWSRemoteEgress: 0,
readFromGCSCount: 6,
readFromGCSIngress: 4000120,
writeToAWSCount: 5,
writeToAWSEgress: 11000118,
writeToAWSEgress: 4032,
}
const STATS_UP_FROM_PROJECT1_ONWARD = {
projects: 2,
@ -854,6 +885,8 @@ describe('back_fill_file_hash script', function () {
deduplicatedWriteToAWSLocalEgress: 30,
deduplicatedWriteToAWSRemoteCount: 0,
deduplicatedWriteToAWSRemoteEgress: 0,
readFromGCSCount: 3,
readFromGCSIngress: 72,
writeToAWSCount: 2,
writeToAWSEgress: 58,
}
@ -884,6 +917,7 @@ describe('back_fill_file_hash script', function () {
sumStats(STATS_ALL, {
...STATS_ALL_ZERO,
filesFailed: 1,
readFromGCSIngress: -24,
writeToAWSCount: -1,
writeToAWSEgress: -28,
})
@ -921,8 +955,13 @@ describe('back_fill_file_hash script', function () {
])
expectNotFoundError(result, 'failed to process file, trying again')
expect(result.status).to.equal(0)
expect({ ...stats, filesRetries: 0 }).to.deep.equal(STATS_ALL)
expect({ ...stats, filesRetries: 0, readFromGCSCount: 0 }).to.deep.equal({
...STATS_ALL,
filesRetries: 0,
readFromGCSCount: 0,
})
expect(stats.filesRetries).to.be.greaterThan(0)
expect(stats.filesRetries).to.be.greaterThan(STATS_ALL.readFromGCSCount)
})
describe('full run CONCURRENCY=1', function () {
@ -952,6 +991,19 @@ describe('back_fill_file_hash script', function () {
commonAssertions()
})
describe('full run STREAM_HIGH_WATER_MARK=1MB', function () {
let output
beforeEach('run script', async function () {
output = await runScript({
STREAM_HIGH_WATER_MARK: (1024 * 1024).toString(),
})
})
it('should print stats', function () {
expect(output.stats).deep.equal(STATS_ALL)
})
commonAssertions()
})
describe('with something in the bucket already', function () {
beforeEach('create a file in s3', async function () {
const buf = Buffer.from(fileId0.toString())