From 2be894c18a1fc70b843fbec5654d378dbc36e231 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Fri, 15 Nov 2024 09:45:16 +0100 Subject: [PATCH] Merge pull request #21892 from overleaf/jpa-auto-gunzip [object-persistor] add autoGunzip option to getObjectStream GitOrigin-RevId: 21cd6c9ab349017ddb28d165238371d967ab9a59 --- libraries/object-persistor/src/FSPersistor.js | 5 + .../object-persistor/src/GcsPersistor.js | 9 +- .../src/PerProjectEncryptedS3Persistor.js | 2 +- libraries/object-persistor/src/S3Persistor.js | 13 ++- .../test/unit/GcsPersistorTests.js | 2 +- .../test/unit/S3PersistorTests.js | 6 +- .../test/acceptance/js/FilestoreTests.js | 93 +++++++++++++++++++ 7 files changed, 121 insertions(+), 9 deletions(-) diff --git a/libraries/object-persistor/src/FSPersistor.js b/libraries/object-persistor/src/FSPersistor.js index f9c20b12be..05b251b114 100644 --- a/libraries/object-persistor/src/FSPersistor.js +++ b/libraries/object-persistor/src/FSPersistor.js @@ -71,6 +71,11 @@ module.exports = class FSPersistor extends AbstractPersistor { // opts may be {start: Number, end: Number} async getObjectStream(location, name, opts = {}) { + if (opts.autoGunzip) { + throw new NotImplementedError( + 'opts.autoGunzip is not supported by FS backend. Configure GCS or S3 backend instead, get in touch with support for further information.' + ) + } const observer = new PersistorHelper.ObserverStream({ metric: 'fs.ingress', // ingress to us from disk bucket: location, diff --git a/libraries/object-persistor/src/GcsPersistor.js b/libraries/object-persistor/src/GcsPersistor.js index 94b77ba8b8..542755ee6d 100644 --- a/libraries/object-persistor/src/GcsPersistor.js +++ b/libraries/object-persistor/src/GcsPersistor.js @@ -7,6 +7,7 @@ const asyncPool = require('tiny-async-pool') const AbstractPersistor = require('./AbstractPersistor') const PersistorHelper = require('./PersistorHelper') const Logger = require('@overleaf/logger') +const zlib = require('node:zlib') module.exports = class GcsPersistor extends AbstractPersistor { constructor(settings) { @@ -117,12 +118,14 @@ module.exports = class GcsPersistor extends AbstractPersistor { .file(key) .createReadStream({ decompress: false, ...opts }) + let contentEncoding try { await new Promise((resolve, reject) => { stream.on('response', res => { switch (res.statusCode) { case 200: // full response case 206: // partial response + contentEncoding = res.headers['content-encoding'] return resolve() case 404: return reject(new NotFoundError()) @@ -143,7 +146,11 @@ module.exports = class GcsPersistor extends AbstractPersistor { } // Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along). const pass = new PassThrough() - pipeline(stream, observer, pass).catch(() => {}) + const transformer = [] + if (contentEncoding === 'gzip' && opts.autoGunzip) { + transformer.push(zlib.createGunzip()) + } + pipeline(stream, observer, ...transformer, pass).catch(() => {}) return pass } diff --git a/libraries/object-persistor/src/PerProjectEncryptedS3Persistor.js b/libraries/object-persistor/src/PerProjectEncryptedS3Persistor.js index 46e9273171..6c13a611d1 100644 --- a/libraries/object-persistor/src/PerProjectEncryptedS3Persistor.js +++ b/libraries/object-persistor/src/PerProjectEncryptedS3Persistor.js @@ -412,7 +412,7 @@ class CachedPerProjectEncryptedS3Persistor { * @param {Object} opts * @param {number} [opts.start] * @param {number} [opts.end] - * @param {string} [opts.contentEncoding] + * @param {boolean} [opts.autoGunzip] * @param {SSECOptions} [opts.ssecOptions] * @return {Promise} */ diff --git a/libraries/object-persistor/src/S3Persistor.js b/libraries/object-persistor/src/S3Persistor.js index d31f5831d1..65d4358dac 100644 --- a/libraries/object-persistor/src/S3Persistor.js +++ b/libraries/object-persistor/src/S3Persistor.js @@ -18,6 +18,7 @@ const fs = require('node:fs') const S3 = require('aws-sdk/clients/s3') const { URL } = require('node:url') const { WriteError, ReadError, NotFoundError } = require('./Errors') +const zlib = require('node:zlib') /** * Wrapper with private fields to avoid revealing them on console, JSON.stringify or similar. @@ -147,7 +148,7 @@ class S3Persistor extends AbstractPersistor { * @param {Object} opts * @param {number} [opts.start] * @param {number} [opts.end] - * @param {string} [opts.contentEncoding] + * @param {boolean} [opts.autoGunzip] * @param {SSECOptions} [opts.ssecOptions] * @return {Promise} */ @@ -172,12 +173,14 @@ class S3Persistor extends AbstractPersistor { const req = this._getClientForBucket(bucketName).getObject(params) const stream = req.createReadStream() + let contentEncoding try { await new Promise((resolve, reject) => { - req.on('httpHeaders', statusCode => { + req.on('httpHeaders', (statusCode, headers) => { switch (statusCode) { case 200: // full response case 206: // partial response + contentEncoding = headers['content-encoding'] return resolve(undefined) case 403: // AccessDenied return // handled by stream.on('error') handler below @@ -202,7 +205,11 @@ class S3Persistor extends AbstractPersistor { } // Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along). const pass = new PassThrough() - pipeline(stream, observer, pass, err => { + const transformer = [] + if (contentEncoding === 'gzip' && opts.autoGunzip) { + transformer.push(zlib.createGunzip()) + } + pipeline(stream, observer, ...transformer, pass, err => { if (err) req.abort() }) return pass diff --git a/libraries/object-persistor/test/unit/GcsPersistorTests.js b/libraries/object-persistor/test/unit/GcsPersistorTests.js index 4d16bd4ca5..16a42c772c 100644 --- a/libraries/object-persistor/test/unit/GcsPersistorTests.js +++ b/libraries/object-persistor/test/unit/GcsPersistorTests.js @@ -63,7 +63,7 @@ describe('GcsPersistorTests', function () { read() { if (this.err) return this.emit('error', this.err) - this.emit('response', { statusCode: this.statusCode }) + this.emit('response', { statusCode: this.statusCode, headers: {} }) } } diff --git a/libraries/object-persistor/test/unit/S3PersistorTests.js b/libraries/object-persistor/test/unit/S3PersistorTests.js index 0012839169..2f2ba3ea5c 100644 --- a/libraries/object-persistor/test/unit/S3PersistorTests.js +++ b/libraries/object-persistor/test/unit/S3PersistorTests.js @@ -93,14 +93,14 @@ describe('S3PersistorTests', function () { setTimeout(() => { if (this.notFoundSSEC) { // special case for AWS S3: 404 NoSuchKey wrapped in a 400. A single request received a single response, and multiple httpHeaders events are triggered. Don't ask. - this.emit('httpHeaders', 400) - this.emit('httpHeaders', 404) + this.emit('httpHeaders', 400, {}) + this.emit('httpHeaders', 404, {}) ReadStream.emit('error', S3NotFoundError) return } if (this.err) return ReadStream.emit('error', this.err) - this.emit('httpHeaders', this.statusCode) + this.emit('httpHeaders', this.statusCode, {}) if (this.statusCode === 403) { ReadStream.emit('error', S3AccessDeniedError) } diff --git a/services/filestore/test/acceptance/js/FilestoreTests.js b/services/filestore/test/acceptance/js/FilestoreTests.js index 6984be4caf..615d423780 100644 --- a/services/filestore/test/acceptance/js/FilestoreTests.js +++ b/services/filestore/test/acceptance/js/FilestoreTests.js @@ -44,6 +44,8 @@ const { } = require('@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor') const { S3Persistor } = require('@overleaf/object-persistor/src/S3Persistor') const crypto = require('node:crypto') +const { WritableBuffer } = require('@overleaf/stream-utils') +const { gzipSync } = require('node:zlib') describe('Filestore', function () { this.timeout(1000 * 10) @@ -1398,6 +1400,97 @@ describe('Filestore', function () { ).to.equal(true) }) }) + + describe('autoGunzip', function () { + let key + beforeEach('new key', function () { + key = `${projectId}/${new ObjectId().toString()}` + }) + this.timeout(60 * 1000) + const body = Buffer.alloc(10 * 1024 * 1024, 'hello') + const gzippedBody = gzipSync(body) + + /** + * @param {string} key + * @param {Buffer} wantBody + * @param {boolean} autoGunzip + * @return {Promise} + */ + async function checkBodyIsTheSame(key, wantBody, autoGunzip) { + const s = await app.persistor.getObjectStream( + Settings.filestore.stores.user_files, + key, + { autoGunzip } + ) + const buf = new WritableBuffer() + await Stream.promises.pipeline(s, buf) + expect(buf.getContents()).to.deep.equal(wantBody) + } + + if (backendSettings.backend === 'fs') { + it('should refuse to handle autoGunzip', async function () { + await expect( + app.persistor.getObjectStream( + Settings.filestore.stores.user_files, + key, + { autoGunzip: true } + ) + ).to.be.rejectedWith(NotImplementedError) + }) + } else { + it('should return the raw body with gzip', async function () { + await app.persistor.sendStream( + Settings.filestore.stores.user_files, + key, + Stream.Readable.from([gzippedBody]), + { contentEncoding: 'gzip' } + ) + expect( + await app.persistor.getObjectSize( + Settings.filestore.stores.user_files, + key + ) + ).to.equal(gzippedBody.byteLength) + // raw body with autoGunzip=true + await checkBodyIsTheSame(key, body, true) + // gzip body without autoGunzip=false + await checkBodyIsTheSame(key, gzippedBody, false) + }) + it('should return the raw body without gzip compression', async function () { + await app.persistor.sendStream( + Settings.filestore.stores.user_files, + key, + Stream.Readable.from([body]) + ) + expect( + await app.persistor.getObjectSize( + Settings.filestore.stores.user_files, + key + ) + ).to.equal(body.byteLength) + // raw body with both autoGunzip options + await checkBodyIsTheSame(key, body, true) + await checkBodyIsTheSame(key, body, false) + }) + + it('should return the gzip body without gzip header', async function () { + await app.persistor.sendStream( + Settings.filestore.stores.user_files, + key, + Stream.Readable.from([gzippedBody]) + ) + expect( + await app.persistor.getObjectSize( + Settings.filestore.stores.user_files, + key + ) + ).to.equal(gzippedBody.byteLength) + // gzip body with both autoGunzip options + await checkBodyIsTheSame(key, gzippedBody, true) + await checkBodyIsTheSame(key, gzippedBody, false) + }) + } + }) }) } })