Merge pull request #21892 from overleaf/jpa-auto-gunzip

[object-persistor] add autoGunzip option to getObjectStream

GitOrigin-RevId: 21cd6c9ab349017ddb28d165238371d967ab9a59
This commit is contained in:
Jakob Ackermann 2024-11-15 09:45:16 +01:00 committed by Copybot
parent bd855044af
commit 2be894c18a
7 changed files with 121 additions and 9 deletions

View file

@ -71,6 +71,11 @@ module.exports = class FSPersistor extends AbstractPersistor {
// opts may be {start: Number, end: Number}
async getObjectStream(location, name, opts = {}) {
if (opts.autoGunzip) {
throw new NotImplementedError(
'opts.autoGunzip is not supported by FS backend. Configure GCS or S3 backend instead, get in touch with support for further information.'
)
}
const observer = new PersistorHelper.ObserverStream({
metric: 'fs.ingress', // ingress to us from disk
bucket: location,

View file

@ -7,6 +7,7 @@ const asyncPool = require('tiny-async-pool')
const AbstractPersistor = require('./AbstractPersistor')
const PersistorHelper = require('./PersistorHelper')
const Logger = require('@overleaf/logger')
const zlib = require('node:zlib')
module.exports = class GcsPersistor extends AbstractPersistor {
constructor(settings) {
@ -117,12 +118,14 @@ module.exports = class GcsPersistor extends AbstractPersistor {
.file(key)
.createReadStream({ decompress: false, ...opts })
let contentEncoding
try {
await new Promise((resolve, reject) => {
stream.on('response', res => {
switch (res.statusCode) {
case 200: // full response
case 206: // partial response
contentEncoding = res.headers['content-encoding']
return resolve()
case 404:
return reject(new NotFoundError())
@ -143,7 +146,11 @@ module.exports = class GcsPersistor extends AbstractPersistor {
}
// Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along).
const pass = new PassThrough()
pipeline(stream, observer, pass).catch(() => {})
const transformer = []
if (contentEncoding === 'gzip' && opts.autoGunzip) {
transformer.push(zlib.createGunzip())
}
pipeline(stream, observer, ...transformer, pass).catch(() => {})
return pass
}

View file

@ -412,7 +412,7 @@ class CachedPerProjectEncryptedS3Persistor {
* @param {Object} opts
* @param {number} [opts.start]
* @param {number} [opts.end]
* @param {string} [opts.contentEncoding]
* @param {boolean} [opts.autoGunzip]
* @param {SSECOptions} [opts.ssecOptions]
* @return {Promise<NodeJS.ReadableStream>}
*/

View file

@ -18,6 +18,7 @@ const fs = require('node:fs')
const S3 = require('aws-sdk/clients/s3')
const { URL } = require('node:url')
const { WriteError, ReadError, NotFoundError } = require('./Errors')
const zlib = require('node:zlib')
/**
* Wrapper with private fields to avoid revealing them on console, JSON.stringify or similar.
@ -147,7 +148,7 @@ class S3Persistor extends AbstractPersistor {
* @param {Object} opts
* @param {number} [opts.start]
* @param {number} [opts.end]
* @param {string} [opts.contentEncoding]
* @param {boolean} [opts.autoGunzip]
* @param {SSECOptions} [opts.ssecOptions]
* @return {Promise<NodeJS.ReadableStream>}
*/
@ -172,12 +173,14 @@ class S3Persistor extends AbstractPersistor {
const req = this._getClientForBucket(bucketName).getObject(params)
const stream = req.createReadStream()
let contentEncoding
try {
await new Promise((resolve, reject) => {
req.on('httpHeaders', statusCode => {
req.on('httpHeaders', (statusCode, headers) => {
switch (statusCode) {
case 200: // full response
case 206: // partial response
contentEncoding = headers['content-encoding']
return resolve(undefined)
case 403: // AccessDenied
return // handled by stream.on('error') handler below
@ -202,7 +205,11 @@ class S3Persistor extends AbstractPersistor {
}
// Return a PassThrough stream with a minimal interface. It will buffer until the caller starts reading. It will emit errors from the source stream (Stream.pipeline passes errors along).
const pass = new PassThrough()
pipeline(stream, observer, pass, err => {
const transformer = []
if (contentEncoding === 'gzip' && opts.autoGunzip) {
transformer.push(zlib.createGunzip())
}
pipeline(stream, observer, ...transformer, pass, err => {
if (err) req.abort()
})
return pass

View file

@ -63,7 +63,7 @@ describe('GcsPersistorTests', function () {
read() {
if (this.err) return this.emit('error', this.err)
this.emit('response', { statusCode: this.statusCode })
this.emit('response', { statusCode: this.statusCode, headers: {} })
}
}

View file

@ -93,14 +93,14 @@ describe('S3PersistorTests', function () {
setTimeout(() => {
if (this.notFoundSSEC) {
// special case for AWS S3: 404 NoSuchKey wrapped in a 400. A single request received a single response, and multiple httpHeaders events are triggered. Don't ask.
this.emit('httpHeaders', 400)
this.emit('httpHeaders', 404)
this.emit('httpHeaders', 400, {})
this.emit('httpHeaders', 404, {})
ReadStream.emit('error', S3NotFoundError)
return
}
if (this.err) return ReadStream.emit('error', this.err)
this.emit('httpHeaders', this.statusCode)
this.emit('httpHeaders', this.statusCode, {})
if (this.statusCode === 403) {
ReadStream.emit('error', S3AccessDeniedError)
}

View file

@ -44,6 +44,8 @@ const {
} = require('@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor')
const { S3Persistor } = require('@overleaf/object-persistor/src/S3Persistor')
const crypto = require('node:crypto')
const { WritableBuffer } = require('@overleaf/stream-utils')
const { gzipSync } = require('node:zlib')
describe('Filestore', function () {
this.timeout(1000 * 10)
@ -1398,6 +1400,97 @@ describe('Filestore', function () {
).to.equal(true)
})
})
describe('autoGunzip', function () {
let key
beforeEach('new key', function () {
key = `${projectId}/${new ObjectId().toString()}`
})
this.timeout(60 * 1000)
const body = Buffer.alloc(10 * 1024 * 1024, 'hello')
const gzippedBody = gzipSync(body)
/**
* @param {string} key
* @param {Buffer} wantBody
* @param {boolean} autoGunzip
* @return {Promise<void>}
*/
async function checkBodyIsTheSame(key, wantBody, autoGunzip) {
const s = await app.persistor.getObjectStream(
Settings.filestore.stores.user_files,
key,
{ autoGunzip }
)
const buf = new WritableBuffer()
await Stream.promises.pipeline(s, buf)
expect(buf.getContents()).to.deep.equal(wantBody)
}
if (backendSettings.backend === 'fs') {
it('should refuse to handle autoGunzip', async function () {
await expect(
app.persistor.getObjectStream(
Settings.filestore.stores.user_files,
key,
{ autoGunzip: true }
)
).to.be.rejectedWith(NotImplementedError)
})
} else {
it('should return the raw body with gzip', async function () {
await app.persistor.sendStream(
Settings.filestore.stores.user_files,
key,
Stream.Readable.from([gzippedBody]),
{ contentEncoding: 'gzip' }
)
expect(
await app.persistor.getObjectSize(
Settings.filestore.stores.user_files,
key
)
).to.equal(gzippedBody.byteLength)
// raw body with autoGunzip=true
await checkBodyIsTheSame(key, body, true)
// gzip body without autoGunzip=false
await checkBodyIsTheSame(key, gzippedBody, false)
})
it('should return the raw body without gzip compression', async function () {
await app.persistor.sendStream(
Settings.filestore.stores.user_files,
key,
Stream.Readable.from([body])
)
expect(
await app.persistor.getObjectSize(
Settings.filestore.stores.user_files,
key
)
).to.equal(body.byteLength)
// raw body with both autoGunzip options
await checkBodyIsTheSame(key, body, true)
await checkBodyIsTheSame(key, body, false)
})
it('should return the gzip body without gzip header', async function () {
await app.persistor.sendStream(
Settings.filestore.stores.user_files,
key,
Stream.Readable.from([gzippedBody])
)
expect(
await app.persistor.getObjectSize(
Settings.filestore.stores.user_files,
key
)
).to.equal(gzippedBody.byteLength)
// gzip body with both autoGunzip options
await checkBodyIsTheSame(key, gzippedBody, true)
await checkBodyIsTheSame(key, gzippedBody, false)
})
}
})
})
}
})