From baaac44172eb2422dc4075c0c40a247f72094ac4 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Mon, 23 Sep 2024 13:09:58 +0200 Subject: [PATCH] Merge pull request #20302 from overleaf/jpa-object-persistor-metrics [object-persistor] add more metrics to getObjectStream and sendStream GitOrigin-RevId: 9fe6b9d205de6ad27838f91d92d2b3a3d6c2f129 --- libraries/object-persistor/src/FSPersistor.js | 23 +++++-- .../object-persistor/src/GcsPersistor.js | 15 +++-- .../object-persistor/src/PersistorHelper.js | 61 +++++++++++++++++-- libraries/object-persistor/src/S3Persistor.js | 15 +++-- 4 files changed, 87 insertions(+), 27 deletions(-) diff --git a/libraries/object-persistor/src/FSPersistor.js b/libraries/object-persistor/src/FSPersistor.js index d806448f9c..3752ad3756 100644 --- a/libraries/object-persistor/src/FSPersistor.js +++ b/libraries/object-persistor/src/FSPersistor.js @@ -3,9 +3,9 @@ const fs = require('fs') const fsPromises = require('fs/promises') const globCallbacks = require('glob') const Path = require('path') +const { PassThrough } = require('stream') const { pipeline } = require('stream/promises') const { promisify } = require('util') -const Metrics = require('@overleaf/metrics') const AbstractPersistor = require('./AbstractPersistor') const { ReadError, WriteError } = require('./Errors') @@ -63,6 +63,10 @@ module.exports = class FSPersistor extends AbstractPersistor { // opts may be {start: Number, end: Number} async getObjectStream(location, name, opts = {}) { + const observer = new PersistorHelper.ObserverStream({ + metric: 'fs.ingress', // ingress to us from disk + bucket: location, + }) const fsPath = this._getFsPath(location, name) try { @@ -76,7 +80,11 @@ module.exports = class FSPersistor extends AbstractPersistor { ) } - return fs.createReadStream(null, opts) + const stream = fs.createReadStream(null, opts) + // Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along). + const pass = new PassThrough() + pipeline(stream, observer, pass).catch(() => {}) + return pass } async getRedirectUrl() { @@ -221,22 +229,25 @@ module.exports = class FSPersistor extends AbstractPersistor { } async _writeStreamToTempFile(location, stream, opts = {}) { + const observerOptions = { + metric: 'fs.egress', // egress from us to disk + bucket: location, + } + const observer = new PersistorHelper.ObserverStream(observerOptions) + const tempDirPath = await fsPromises.mkdtemp(Path.join(location, 'tmp-')) const tempFilePath = Path.join(tempDirPath, 'uploaded-file') - const transforms = [] + const transforms = [observer] let md5Observer if (opts.sourceMd5) { md5Observer = createMd5Observer() transforms.push(md5Observer.transform) } - const timer = new Metrics.Timer('writingFile') - try { const writeStream = fs.createWriteStream(tempFilePath) await pipeline(stream, ...transforms, writeStream) - timer.done() } catch (err) { await this._cleanupTempFile(tempFilePath) throw new WriteError( diff --git a/libraries/object-persistor/src/GcsPersistor.js b/libraries/object-persistor/src/GcsPersistor.js index dabdab10ab..b8c542bb62 100644 --- a/libraries/object-persistor/src/GcsPersistor.js +++ b/libraries/object-persistor/src/GcsPersistor.js @@ -47,9 +47,9 @@ module.exports = class GcsPersistor extends AbstractPersistor { async sendStream(bucketName, key, readStream, opts = {}) { try { - // egress from us to gcs const observeOptions = { - metric: 'gcs.egress', + metric: 'gcs.egress', // egress from us to GCS + bucket: bucketName, } let sourceMd5 = opts.sourceMd5 @@ -104,6 +104,10 @@ module.exports = class GcsPersistor extends AbstractPersistor { } async getObjectStream(bucketName, key, opts = {}) { + const observer = new PersistorHelper.ObserverStream({ + metric: 'gcs.ingress', // ingress to us from GCS + bucket: bucketName, + }) const stream = this.storage .bucket(bucketName) .file(key) @@ -133,12 +137,7 @@ module.exports = class GcsPersistor extends AbstractPersistor { ReadError ) } - - // ingress to us from gcs - const observer = new PersistorHelper.ObserverStream({ - metric: 'gcs.ingress', - }) - + // Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along). const pass = new PassThrough() pipeline(stream, observer, pass).catch(() => {}) return pass diff --git a/libraries/object-persistor/src/PersistorHelper.js b/libraries/object-persistor/src/PersistorHelper.js index 88c773cba9..ac34b768cb 100644 --- a/libraries/object-persistor/src/PersistorHelper.js +++ b/libraries/object-persistor/src/PersistorHelper.js @@ -5,31 +5,82 @@ const Logger = require('@overleaf/logger') const Metrics = require('@overleaf/metrics') const { WriteError, NotFoundError } = require('./Errors') +const _128KiB = 128 * 1024 +const TIMING_BUCKETS = [ + 0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000, +] +const SIZE_BUCKETS = [ + 0, + 1_000, + 10_000, + 100_000, + _128KiB, + 1_000_000, + 10_000_000, + 50_000_000, + 100_000_000, +] + /** * Observes data that passes through and optionally computes hash for content. */ class ObserverStream extends Stream.Transform { /** * @param {string} metric prefix for metrics + * @param {string} bucket name of source/target bucket * @param {string} hash optional hash algorithm, e.g. 'md5' */ - constructor({ metric, hash = '' }) { + constructor({ metric, bucket, hash = '' }) { super({ autoDestroy: true }) this.bytes = 0 + this.start = performance.now() if (hash) { this.hash = Crypto.createHash(hash) } - const onEnd = () => { - Metrics.count(metric, this.bytes) + const onEnd = status => { + const size = this.bytes < _128KiB ? 'lt-128KiB' : 'gte-128KiB' + const labels = { size, bucket, status } + // Keep this counter metric to allow rendering long-term charts. + Metrics.count(metric, this.bytes, 1, labels) + Metrics.inc(`${metric}.hit`, 1, labels) + + if (status === 'error') return + // The below metrics are only relevant for successfully fetched objects. + + Metrics.histogram(`${metric}.size`, this.bytes, SIZE_BUCKETS, { + status, + bucket, + }) + if (this.firstByteAfterMs) { + Metrics.histogram( + `${metric}.latency.first-byte`, + this.firstByteAfterMs, + TIMING_BUCKETS, + labels + ) + } + Metrics.histogram( + `${metric}.latency`, + this.#getMsSinceStart(), + TIMING_BUCKETS, + labels + ) } - this.once('error', onEnd) - this.once('end', onEnd) + this.once('error', () => onEnd('error')) + this.once('end', () => onEnd('success')) + } + + #getMsSinceStart() { + return performance.now() - this.start } _transform(chunk, encoding, done) { + if (this.bytes === 0) { + this.firstByteAfterMs = this.#getMsSinceStart() + } if (this.hash) { this.hash.update(chunk) } diff --git a/libraries/object-persistor/src/S3Persistor.js b/libraries/object-persistor/src/S3Persistor.js index de3adcb67c..8df65d80f0 100644 --- a/libraries/object-persistor/src/S3Persistor.js +++ b/libraries/object-persistor/src/S3Persistor.js @@ -30,9 +30,9 @@ module.exports = class S3Persistor extends AbstractPersistor { async sendStream(bucketName, key, readStream, opts = {}) { try { - // egress from us to S3 const observeOptions = { - metric: 's3.egress', + metric: 's3.egress', // egress from us to S3 + bucket: bucketName, } const observer = new PersistorHelper.ObserverStream(observeOptions) @@ -85,6 +85,10 @@ module.exports = class S3Persistor extends AbstractPersistor { if (opts.start != null && opts.end != null) { params.Range = `bytes=${opts.start}-${opts.end}` } + const observer = new PersistorHelper.ObserverStream({ + metric: 's3.ingress', // ingress from S3 to us + bucket: bucketName, + }) const req = this._getClientForBucket(bucketName).getObject(params) const stream = req.createReadStream() @@ -116,12 +120,7 @@ module.exports = class S3Persistor extends AbstractPersistor { ReadError ) } - - // ingress from S3 to us - const observer = new PersistorHelper.ObserverStream({ - metric: 's3.ingress', - }) - + // Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along). const pass = new PassThrough() pipeline(stream, observer, pass, err => { if (err) req.abort()