Merge pull request #20302 from overleaf/jpa-object-persistor-metrics

[object-persistor] add more metrics to getObjectStream and sendStream

GitOrigin-RevId: 9fe6b9d205de6ad27838f91d92d2b3a3d6c2f129
This commit is contained in:
Jakob Ackermann 2024-09-23 13:09:58 +02:00 committed by Copybot
parent 8fa676082e
commit baaac44172
4 changed files with 87 additions and 27 deletions

View file

@ -3,9 +3,9 @@ const fs = require('fs')
const fsPromises = require('fs/promises')
const globCallbacks = require('glob')
const Path = require('path')
const { PassThrough } = require('stream')
const { pipeline } = require('stream/promises')
const { promisify } = require('util')
const Metrics = require('@overleaf/metrics')
const AbstractPersistor = require('./AbstractPersistor')
const { ReadError, WriteError } = require('./Errors')
@ -63,6 +63,10 @@ module.exports = class FSPersistor extends AbstractPersistor {
// opts may be {start: Number, end: Number}
async getObjectStream(location, name, opts = {}) {
const observer = new PersistorHelper.ObserverStream({
metric: 'fs.ingress', // ingress to us from disk
bucket: location,
})
const fsPath = this._getFsPath(location, name)
try {
@ -76,7 +80,11 @@ module.exports = class FSPersistor extends AbstractPersistor {
)
}
return fs.createReadStream(null, opts)
const stream = fs.createReadStream(null, opts)
// Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along).
const pass = new PassThrough()
pipeline(stream, observer, pass).catch(() => {})
return pass
}
async getRedirectUrl() {
@ -221,22 +229,25 @@ module.exports = class FSPersistor extends AbstractPersistor {
}
async _writeStreamToTempFile(location, stream, opts = {}) {
const observerOptions = {
metric: 'fs.egress', // egress from us to disk
bucket: location,
}
const observer = new PersistorHelper.ObserverStream(observerOptions)
const tempDirPath = await fsPromises.mkdtemp(Path.join(location, 'tmp-'))
const tempFilePath = Path.join(tempDirPath, 'uploaded-file')
const transforms = []
const transforms = [observer]
let md5Observer
if (opts.sourceMd5) {
md5Observer = createMd5Observer()
transforms.push(md5Observer.transform)
}
const timer = new Metrics.Timer('writingFile')
try {
const writeStream = fs.createWriteStream(tempFilePath)
await pipeline(stream, ...transforms, writeStream)
timer.done()
} catch (err) {
await this._cleanupTempFile(tempFilePath)
throw new WriteError(

View file

@ -47,9 +47,9 @@ module.exports = class GcsPersistor extends AbstractPersistor {
async sendStream(bucketName, key, readStream, opts = {}) {
try {
// egress from us to gcs
const observeOptions = {
metric: 'gcs.egress',
metric: 'gcs.egress', // egress from us to GCS
bucket: bucketName,
}
let sourceMd5 = opts.sourceMd5
@ -104,6 +104,10 @@ module.exports = class GcsPersistor extends AbstractPersistor {
}
async getObjectStream(bucketName, key, opts = {}) {
const observer = new PersistorHelper.ObserverStream({
metric: 'gcs.ingress', // ingress to us from GCS
bucket: bucketName,
})
const stream = this.storage
.bucket(bucketName)
.file(key)
@ -133,12 +137,7 @@ module.exports = class GcsPersistor extends AbstractPersistor {
ReadError
)
}
// ingress to us from gcs
const observer = new PersistorHelper.ObserverStream({
metric: 'gcs.ingress',
})
// Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along).
const pass = new PassThrough()
pipeline(stream, observer, pass).catch(() => {})
return pass

View file

@ -5,31 +5,82 @@ const Logger = require('@overleaf/logger')
const Metrics = require('@overleaf/metrics')
const { WriteError, NotFoundError } = require('./Errors')
const _128KiB = 128 * 1024
const TIMING_BUCKETS = [
0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000,
]
const SIZE_BUCKETS = [
0,
1_000,
10_000,
100_000,
_128KiB,
1_000_000,
10_000_000,
50_000_000,
100_000_000,
]
/**
* Observes data that passes through and optionally computes hash for content.
*/
class ObserverStream extends Stream.Transform {
/**
* @param {string} metric prefix for metrics
* @param {string} bucket name of source/target bucket
* @param {string} hash optional hash algorithm, e.g. 'md5'
*/
constructor({ metric, hash = '' }) {
constructor({ metric, bucket, hash = '' }) {
super({ autoDestroy: true })
this.bytes = 0
this.start = performance.now()
if (hash) {
this.hash = Crypto.createHash(hash)
}
const onEnd = () => {
Metrics.count(metric, this.bytes)
const onEnd = status => {
const size = this.bytes < _128KiB ? 'lt-128KiB' : 'gte-128KiB'
const labels = { size, bucket, status }
// Keep this counter metric to allow rendering long-term charts.
Metrics.count(metric, this.bytes, 1, labels)
Metrics.inc(`${metric}.hit`, 1, labels)
if (status === 'error') return
// The below metrics are only relevant for successfully fetched objects.
Metrics.histogram(`${metric}.size`, this.bytes, SIZE_BUCKETS, {
status,
bucket,
})
if (this.firstByteAfterMs) {
Metrics.histogram(
`${metric}.latency.first-byte`,
this.firstByteAfterMs,
TIMING_BUCKETS,
labels
)
}
Metrics.histogram(
`${metric}.latency`,
this.#getMsSinceStart(),
TIMING_BUCKETS,
labels
)
}
this.once('error', onEnd)
this.once('end', onEnd)
this.once('error', () => onEnd('error'))
this.once('end', () => onEnd('success'))
}
#getMsSinceStart() {
return performance.now() - this.start
}
_transform(chunk, encoding, done) {
if (this.bytes === 0) {
this.firstByteAfterMs = this.#getMsSinceStart()
}
if (this.hash) {
this.hash.update(chunk)
}

View file

@ -30,9 +30,9 @@ module.exports = class S3Persistor extends AbstractPersistor {
async sendStream(bucketName, key, readStream, opts = {}) {
try {
// egress from us to S3
const observeOptions = {
metric: 's3.egress',
metric: 's3.egress', // egress from us to S3
bucket: bucketName,
}
const observer = new PersistorHelper.ObserverStream(observeOptions)
@ -85,6 +85,10 @@ module.exports = class S3Persistor extends AbstractPersistor {
if (opts.start != null && opts.end != null) {
params.Range = `bytes=${opts.start}-${opts.end}`
}
const observer = new PersistorHelper.ObserverStream({
metric: 's3.ingress', // ingress from S3 to us
bucket: bucketName,
})
const req = this._getClientForBucket(bucketName).getObject(params)
const stream = req.createReadStream()
@ -116,12 +120,7 @@ module.exports = class S3Persistor extends AbstractPersistor {
ReadError
)
}
// ingress from S3 to us
const observer = new PersistorHelper.ObserverStream({
metric: 's3.ingress',
})
// Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along).
const pass = new PassThrough()
pipeline(stream, observer, pass, err => {
if (err) req.abort()