Merge pull request #12899 from overleaf/jpa-object-persistor-pipe

[object-persistor] move away from manual .pipe()

GitOrigin-RevId: 5407d48fbbb026ba81f43c891499bd3a5ff59518
This commit is contained in:
Jakob Ackermann 2023-05-23 08:07:06 +01:00 committed by Copybot
parent 5638f06b3a
commit 2760ad35fc
10 changed files with 323 additions and 170 deletions

View file

@ -19,7 +19,14 @@ module.exports = class AbstractPersistor {
})
}
// opts may be {start: Number, end: Number}
/**
* @param location
* @param name
* @param {Object} opts
* @param {Number} opts.start
* @param {Number} opts.end
* @return {Promise<Readable>}
*/
async getObjectStream(location, name, opts) {
throw new NotImplementedError('method not implemented in persistor', {
method: 'getObjectStream',

View file

@ -1,5 +1,6 @@
const fs = require('fs')
const { pipeline } = require('stream/promises')
const { PassThrough } = require('stream')
const { Storage } = require('@google-cloud/storage')
const { WriteError, ReadError, NotFoundError } = require('./Errors')
const asyncPool = require('tiny-async-pool')
@ -101,16 +102,22 @@ module.exports = class GcsPersistor extends AbstractPersistor {
.file(key)
.createReadStream({ decompress: false, ...opts })
// ingress to us from gcs
const observer = new PersistorHelper.ObserverStream({
metric: 'gcs.ingress',
Metrics: this.settings.Metrics,
})
try {
// wait for the pipeline to be ready, to catch non-200s
await PersistorHelper.getReadyPipeline(stream, observer)
return observer
await new Promise((resolve, reject) => {
stream.on('response', res => {
switch (res.statusCode) {
case 200: // full response
case 206: // partial response
return resolve()
case 404:
return reject(new NotFoundError())
default:
return reject(new Error('non success status: ' + res.statusCode))
}
})
stream.on('error', reject)
stream.read(0) // kick off request
})
} catch (err) {
throw PersistorHelper.wrapError(
err,
@ -119,6 +126,16 @@ module.exports = class GcsPersistor extends AbstractPersistor {
ReadError
)
}
// ingress to us from gcs
const observer = new PersistorHelper.ObserverStream({
metric: 'gcs.ingress',
Metrics: this.settings.Metrics,
})
const pass = new PassThrough()
pipeline(stream, observer, pass).catch(() => {})
return pass
}
async getRedirectUrl(bucketName, key) {

View file

@ -2,7 +2,7 @@ const Crypto = require('crypto')
const Stream = require('stream')
const { pipeline } = require('stream/promises')
const Logger = require('@overleaf/logger')
const { WriteError, ReadError, NotFoundError } = require('./Errors')
const { WriteError, NotFoundError } = require('./Errors')
// Observes data that passes through and computes some metadata for it
// - specifically, it computes the number of bytes transferred, and optionally
@ -47,7 +47,6 @@ module.exports = {
ObserverStream,
calculateStreamMd5,
verifyMd5,
getReadyPipeline,
wrapError,
hexToBase64,
base64ToHex,
@ -87,70 +86,6 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) {
}
}
// resolves when a stream is 'readable', or rejects if the stream throws an error
// before that happens - this lets us handle protocol-level errors before trying
// to read them
function getReadyPipeline(...streams) {
return new Promise((resolve, reject) => {
const lastStream = streams.slice(-1)[0]
// in case of error or stream close, we must ensure that we drain the
// previous stream so that it can clean up its socket (if it has one)
const drainPreviousStream = function (previousStream) {
// this stream is no longer reliable, so don't pipe anything more into it
previousStream.unpipe(this)
previousStream.resume()
}
// handler to resolve when either:
// - an error happens, or
// - the last stream in the chain is readable
// for example, in the case of a 4xx error an error will occur and the
// streams will not become readable
const handler = function (err) {
// remove handler from all streams because we don't want to do this on
// later errors
lastStream.removeListener('readable', handler)
for (const stream of streams) {
stream.removeListener('error', handler)
}
// return control to the caller
if (err) {
reject(
wrapError(err, 'error before stream became ready', {}, ReadError)
)
} else {
resolve(lastStream)
}
}
// ensure the handler fires when the last strem becomes readable
lastStream.on('readable', handler)
for (const stream of streams) {
// when a stream receives a pipe, set up the drain handler to drain the
// connection if an error occurs or the stream is closed
stream.on('pipe', previousStream => {
stream.on('error', x => {
drainPreviousStream(previousStream)
})
stream.on('close', () => {
drainPreviousStream(previousStream)
})
})
// add the handler function to resolve this method on error if we can't
// set up the pipeline
stream.on('error', handler)
}
// begin the pipeline
for (let index = 0; index < streams.length - 1; index++) {
streams[index].pipe(streams[index + 1])
}
})
}
function wrapError(error, message, params, ErrorType) {
if (
error instanceof NotFoundError ||

View file

@ -10,6 +10,7 @@ if (https.globalAgent.maxSockets < 300) {
const AbstractPersistor = require('./AbstractPersistor')
const PersistorHelper = require('./PersistorHelper')
const { pipeline, PassThrough } = require('stream')
const fs = require('fs')
const S3 = require('aws-sdk/clients/s3')
const { URL } = require('url')
@ -41,7 +42,7 @@ module.exports = class S3Persistor extends AbstractPersistor {
const observer = new PersistorHelper.ObserverStream(observeOptions)
// observer will catch errors, clean up and log a warning
readStream.pipe(observer)
pipeline(readStream, observer, () => {})
// if we have an md5 hash, pass this to S3 to verify the upload
const uploadOptions = {
@ -90,20 +91,28 @@ module.exports = class S3Persistor extends AbstractPersistor {
params.Range = `bytes=${opts.start}-${opts.end}`
}
const stream = this._getClientForBucket(bucketName)
.getObject(params)
.createReadStream()
// ingress from S3 to us
const observer = new PersistorHelper.ObserverStream({
metric: 's3.ingress',
Metrics: this.settings.Metrics,
})
const req = this._getClientForBucket(bucketName).getObject(params)
const stream = req.createReadStream()
try {
// wait for the pipeline to be ready, to catch non-200s
await PersistorHelper.getReadyPipeline(stream, observer)
return observer
await new Promise((resolve, reject) => {
req.on('httpHeaders', statusCode => {
switch (statusCode) {
case 200: // full response
case 206: // partial response
return resolve()
case 403: // AccessDenied is handled the same as NoSuchKey
case 404: // NoSuchKey
return reject(new NotFoundError())
default:
return reject(new Error('non success status: ' + statusCode))
}
})
// The AWS SDK is forwarding any errors from the request to the stream.
// The AWS SDK is emitting additional errors on the stream ahead of starting to stream.
stream.on('error', reject)
// The AWS SDK is kicking off the request in the next event loop cycle.
})
} catch (err) {
throw PersistorHelper.wrapError(
err,
@ -112,6 +121,18 @@ module.exports = class S3Persistor extends AbstractPersistor {
ReadError
)
}
// ingress from S3 to us
const observer = new PersistorHelper.ObserverStream({
metric: 's3.ingress',
Metrics: this.settings.Metrics,
})
const pass = new PassThrough()
pipeline(stream, observer, pass, err => {
if (err) req.abort()
})
return pass
}
async getRedirectUrl(bucketName, key) {

View file

@ -1,3 +1,4 @@
const { EventEmitter } = require('events')
const sinon = require('sinon')
const chai = require('chai')
const { expect } = chai
@ -21,6 +22,7 @@ describe('GcsPersistorTests', function () {
let Logger,
Transform,
PassThrough,
Storage,
Fs,
GcsNotFoundError,
@ -55,30 +57,28 @@ describe('GcsPersistorTests', function () {
},
]
ReadStream = {
pipe: sinon.stub().returns('readStream'),
on: sinon.stub(),
removeListener: sinon.stub(),
}
ReadStream.on.withArgs('end').yields()
ReadStream.on.withArgs('pipe').yields({
unpipe: sinon.stub(),
resume: sinon.stub(),
on: sinon.stub(),
})
Transform = class {
on(event, callback) {
if (event === 'readable') {
callback()
}
class FakeGCSResponse extends EventEmitter {
constructor() {
super()
this.statusCode = 200
this.err = null
}
read() {
if (this.err) return this.emit('error', this.err)
this.emit('response', { statusCode: this.statusCode })
}
}
ReadStream = new FakeGCSResponse()
PassThrough = class {}
Transform = class {
once() {}
removeListener() {}
}
Stream = {
PassThrough,
Transform,
}
@ -155,8 +155,8 @@ describe('GcsPersistorTests', function () {
stream = await GcsPersistor.getObjectStream(bucket, key)
})
it('returns a metered stream', function () {
expect(stream).to.be.instanceOf(Transform)
it('returns a PassThrough stream', function () {
expect(stream).to.be.instanceOf(PassThrough)
})
it('fetches the right key from the right bucket', function () {
@ -172,8 +172,10 @@ describe('GcsPersistorTests', function () {
})
it('pipes the stream through the meter', function () {
expect(ReadStream.pipe).to.have.been.calledWith(
sinon.match.instanceOf(Transform)
expect(StreamPromises.pipeline).to.have.been.calledWith(
ReadStream,
sinon.match.instanceOf(Transform),
sinon.match.instanceOf(PassThrough)
)
})
})
@ -188,8 +190,8 @@ describe('GcsPersistorTests', function () {
})
})
it('returns a metered stream', function () {
expect(stream).to.be.instanceOf(Transform)
it('returns a PassThrough stream', function () {
expect(stream).to.be.instanceOf(PassThrough)
})
it('passes the byte range on to GCS', function () {
@ -205,8 +207,7 @@ describe('GcsPersistorTests', function () {
let error, stream
beforeEach(async function () {
Transform.prototype.on = sinon.stub()
ReadStream.on.withArgs('error').yields(GcsNotFoundError)
ReadStream.statusCode = 404
try {
stream = await GcsPersistor.getObjectStream(bucket, key)
} catch (e) {
@ -231,12 +232,11 @@ describe('GcsPersistorTests', function () {
})
})
describe('when Gcs encounters an unkown error', function () {
describe('when Gcs encounters an unknown error', function () {
let error, stream
beforeEach(async function () {
Transform.prototype.on = sinon.stub()
ReadStream.on.withArgs('error').yields(genericError)
ReadStream.err = genericError
try {
stream = await GcsPersistor.getObjectStream(bucket, key)
} catch (err) {

View file

@ -3,6 +3,7 @@ const chai = require('chai')
const { expect } = chai
const SandboxedModule = require('sandboxed-module')
const Errors = require('../../src/Errors')
const { EventEmitter } = require('events')
const MODULE_PATH = '../../src/S3Persistor.js'
@ -31,14 +32,15 @@ describe('S3PersistorTests', function () {
let Logger,
Transform,
PassThrough,
S3,
Fs,
ReadStream,
Stream,
StreamPromises,
S3GetObjectRequest,
S3Persistor,
S3Client,
S3ReadStream,
S3NotFoundError,
S3AccessDeniedError,
FileNotFoundError,
@ -55,18 +57,15 @@ describe('S3PersistorTests', function () {
}
Transform = class {
on(event, callback) {
if (event === 'readable') {
callback()
}
}
once() {}
removeListener() {}
}
PassThrough = class {}
Stream = {
Transform,
PassThrough,
pipeline: sinon.stub().yields(),
}
StreamPromises = {
@ -77,16 +76,28 @@ describe('S3PersistorTests', function () {
promise: sinon.stub().resolves(),
}
ReadStream = {
pipe: sinon.stub().returns('readStream'),
on: sinon.stub(),
removeListener: sinon.stub(),
ReadStream = new EventEmitter()
class FakeS3GetObjectRequest extends EventEmitter {
constructor() {
super()
this.statusCode = 200
this.err = null
this.aborted = false
}
abort() {
this.aborted = true
}
createReadStream() {
setTimeout(() => {
if (this.err) return ReadStream.emit('error', this.err)
this.emit('httpHeaders', this.statusCode)
})
return ReadStream
}
}
ReadStream.on.withArgs('end').yields()
ReadStream.on.withArgs('pipe').yields({
unpipe: sinon.stub(),
resume: sinon.stub(),
})
S3GetObjectRequest = new FakeS3GetObjectRequest()
FileNotFoundError = new Error('File not found')
FileNotFoundError.code = 'ENOENT'
@ -101,20 +112,8 @@ describe('S3PersistorTests', function () {
S3AccessDeniedError = new Error('access denied')
S3AccessDeniedError.code = 'AccessDenied'
S3ReadStream = {
on: sinon.stub(),
pipe: sinon.stub(),
removeListener: sinon.stub(),
}
S3ReadStream.on.withArgs('end').yields()
S3ReadStream.on.withArgs('pipe').yields({
unpipe: sinon.stub(),
resume: sinon.stub(),
})
S3Client = {
getObject: sinon.stub().returns({
createReadStream: sinon.stub().returns(S3ReadStream),
}),
getObject: sinon.stub().returns(S3GetObjectRequest),
headObject: sinon.stub().returns({
promise: sinon.stub().resolves({
ContentLength: objectSize,
@ -171,8 +170,8 @@ describe('S3PersistorTests', function () {
stream = await S3Persistor.getObjectStream(bucket, key)
})
it('returns a metered stream', function () {
expect(stream).to.be.instanceOf(Transform)
it('returns a PassThrough stream', function () {
expect(stream).to.be.instanceOf(PassThrough)
})
it('sets the AWS client up with credentials from settings', function () {
@ -187,10 +186,16 @@ describe('S3PersistorTests', function () {
})
it('pipes the stream through the meter', async function () {
expect(S3ReadStream.pipe).to.have.been.calledWith(
sinon.match.instanceOf(Transform)
expect(Stream.pipeline).to.have.been.calledWith(
ReadStream,
sinon.match.instanceOf(Transform),
sinon.match.instanceOf(PassThrough)
)
})
it('does not abort the request', function () {
expect(S3GetObjectRequest.aborted).to.equal(false)
})
})
describe('when called with a byte range', function () {
@ -203,8 +208,8 @@ describe('S3PersistorTests', function () {
})
})
it('returns a metered stream', function () {
expect(stream).to.be.instanceOf(Stream.Transform)
it('returns a PassThrough stream', function () {
expect(stream).to.be.instanceOf(Stream.PassThrough)
})
it('passes the byte range on to S3', function () {
@ -216,6 +221,23 @@ describe('S3PersistorTests', function () {
})
})
describe('when streaming fails', function () {
let stream
beforeEach(async function () {
Stream.pipeline.yields(new Error())
stream = await S3Persistor.getObjectStream(bucket, key)
})
it('returns a PassThrough stream', function () {
expect(stream).to.be.instanceOf(Stream.PassThrough)
})
it('aborts the request', function () {
expect(S3GetObjectRequest.aborted).to.equal(true)
})
})
describe('when there are alternative credentials', function () {
let stream
const alternativeSecret = 'giraffe'
@ -237,8 +259,8 @@ describe('S3PersistorTests', function () {
stream = await S3Persistor.getObjectStream(bucket, key)
})
it('returns a metered stream', function () {
expect(stream).to.be.instanceOf(Stream.Transform)
it('returns a PassThrough stream', function () {
expect(stream).to.be.instanceOf(Stream.PassThrough)
})
it('sets the AWS client up with the alternative credentials', function () {
@ -289,8 +311,7 @@ describe('S3PersistorTests', function () {
let error, stream
beforeEach(async function () {
Transform.prototype.on = sinon.stub()
S3ReadStream.on.withArgs('error').yields(S3NotFoundError)
S3GetObjectRequest.statusCode = 404
try {
stream = await S3Persistor.getObjectStream(bucket, key)
} catch (err) {
@ -319,8 +340,7 @@ describe('S3PersistorTests', function () {
let error, stream
beforeEach(async function () {
Transform.prototype.on = sinon.stub()
S3ReadStream.on.withArgs('error').yields(S3AccessDeniedError)
S3GetObjectRequest.statusCode = 403
try {
stream = await S3Persistor.getObjectStream(bucket, key)
} catch (err) {
@ -345,12 +365,11 @@ describe('S3PersistorTests', function () {
})
})
describe('when S3 encounters an unkown error', function () {
describe('when S3 encounters an unknown error', function () {
let error, stream
beforeEach(async function () {
Transform.prototype.on = sinon.stub()
S3ReadStream.on.withArgs('error').yields(genericError)
S3GetObjectRequest.err = genericError
try {
stream = await S3Persistor.getObjectStream(bucket, key)
} catch (err) {
@ -480,7 +499,8 @@ describe('S3PersistorTests', function () {
})
it('should meter the stream', function () {
expect(ReadStream.pipe).to.have.been.calledWith(
expect(Stream.pipeline).to.have.been.calledWith(
ReadStream,
sinon.match.instanceOf(Stream.Transform)
)
})
@ -608,12 +628,6 @@ describe('S3PersistorTests', function () {
})
})
it('should meter the download', function () {
expect(S3ReadStream.pipe).to.have.been.calledWith(
sinon.match.instanceOf(Stream.Transform)
)
})
it('should calculate the md5 hash from the file', function () {
expect(Hash.read).to.have.been.called
})

View file

@ -0,0 +1,30 @@
diff --git a/node_modules/@google-cloud/storage/node_modules/retry-request/index.js b/node_modules/@google-cloud/storage/node_modules/retry-request/index.js
index a293298..df21af6 100644
--- a/node_modules/@google-cloud/storage/node_modules/retry-request/index.js
+++ b/node_modules/@google-cloud/storage/node_modules/retry-request/index.js
@@ -1,6 +1,6 @@
'use strict';
-const {PassThrough} = require('stream');
+const { PassThrough, pipeline } = require('stream');
const debug = require('debug')('retry-request');
const extend = require('extend');
@@ -166,7 +166,7 @@ function retryRequest(requestOpts, opts, callback) {
})
.on('complete', retryStream.emit.bind(retryStream, 'complete'));
- requestStream.pipe(delayStream);
+ pipeline(requestStream, delayStream, () => {});
} else {
activeRequest = opts.request(requestOpts, onResponse);
}
@@ -232,7 +232,7 @@ function retryRequest(requestOpts, opts, callback) {
// No more attempts need to be made, just continue on.
if (streamMode) {
retryStream.emit('response', response);
- delayStream.pipe(retryStream);
+ pipeline(delayStream, retryStream, () => {});
requestStream.on('error', err => {
retryStream.destroy(err);
});

View file

@ -0,0 +1,50 @@
diff --git a/node_modules/@google-cloud/storage/node_modules/teeny-request/build/src/index.js b/node_modules/@google-cloud/storage/node_modules/teeny-request/build/src/index.js
index a2251ca..e29e796 100644
--- a/node_modules/@google-cloud/storage/node_modules/teeny-request/build/src/index.js
+++ b/node_modules/@google-cloud/storage/node_modules/teeny-request/build/src/index.js
@@ -166,27 +166,27 @@ function teenyRequest(reqOpts, callback) {
}
if (callback === undefined) {
// Stream mode
- const requestStream = streamEvents(new stream_1.PassThrough());
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- let responseStream;
- requestStream.once('reading', () => {
- if (responseStream) {
- responseStream.pipe(requestStream);
- }
- else {
- requestStream.once('response', () => {
- responseStream.pipe(requestStream);
- });
- }
- });
+ const requestStream = new stream_1.PassThrough();
+ // // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ // let responseStream;
+ // requestStream.once('reading', () => {
+ // if (responseStream) {
+ // responseStream.pipe(requestStream);
+ // }
+ // else {
+ // requestStream.once('response', () => {
+ // responseStream.pipe(requestStream);
+ // });
+ // }
+ // });
options.compress = false;
teenyRequest.stats.requestStarting();
(0, node_fetch_1.default)(uri, options).then(res => {
- teenyRequest.stats.requestFinished();
- responseStream = res.body;
- responseStream.on('error', (err) => {
- requestStream.emit('error', err);
- });
+ teenyRequest.stats.requestFinished(); stream_1.pipeline(res.body, requestStream, () => {});
+ // responseStream = res.body;
+ // responseStream.on('error', (err) => {
+ // requestStream.emit('error', err);
+ // });
const response = fetchToRequestResponse(options, res);
requestStream.emit('response', response);
}, err => {

View file

@ -0,0 +1,30 @@
diff --git a/node_modules/retry-request/index.js b/node_modules/retry-request/index.js
index 6cd6f65..39efb89 100644
--- a/node_modules/retry-request/index.js
+++ b/node_modules/retry-request/index.js
@@ -1,6 +1,6 @@
'use strict';
-var { PassThrough } = require('stream');
+var { PassThrough, pipeline } = require('stream');
var debug = require('debug')('retry-request');
var extend = require('extend');
@@ -164,7 +164,7 @@ function retryRequest(requestOpts, opts, callback) {
})
.on('complete', retryStream.emit.bind(retryStream, 'complete'));
- requestStream.pipe(delayStream);
+ pipeline(requestStream, delayStream, () => {});
} else {
activeRequest = opts.request(requestOpts, onResponse);
}
@@ -220,7 +220,7 @@ function retryRequest(requestOpts, opts, callback) {
// No more attempts need to be made, just continue on.
if (streamMode) {
retryStream.emit('response', response);
- delayStream.pipe(retryStream);
+ pipeline(delayStream, retryStream, () => {});
requestStream.on('error', function (err) {
retryStream.destroy(err);
});

View file

@ -0,0 +1,49 @@
diff --git a/node_modules/teeny-request/build/src/index.js b/node_modules/teeny-request/build/src/index.js
index f209888..e9fe982 100644
--- a/node_modules/teeny-request/build/src/index.js
+++ b/node_modules/teeny-request/build/src/index.js
@@ -166,27 +166,27 @@ function teenyRequest(reqOpts, callback) {
}
if (callback === undefined) {
// Stream mode
- const requestStream = streamEvents(new stream_1.PassThrough());
+ const requestStream = new stream_1.PassThrough();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
- let responseStream;
- requestStream.once('reading', () => {
- if (responseStream) {
- responseStream.pipe(requestStream);
- }
- else {
- requestStream.once('response', () => {
- responseStream.pipe(requestStream);
- });
- }
- });
+ // let responseStream;
+ // requestStream.once('reading', () => {
+ // if (responseStream) {
+ // responseStream.pipe(requestStream);
+ // }
+ // else {
+ // requestStream.once('response', () => {
+ // responseStream.pipe(requestStream);
+ // });
+ // }
+ // });
options.compress = false;
teenyRequest.stats.requestStarting();
node_fetch_1.default(uri, options).then(res => {
- teenyRequest.stats.requestFinished();
- responseStream = res.body;
- responseStream.on('error', (err) => {
- requestStream.emit('error', err);
- });
+ teenyRequest.stats.requestFinished(); stream_1.pipeline(res.body, requestStream, () => {});
+ // responseStream = res.body;
+ // responseStream.on('error', (err) => {
+ // requestStream.emit('error', err);
+ // });
const response = fetchToRequestResponse(options, res);
requestStream.emit('response', response);
}, err => {