diff --git a/libraries/object-persistor/src/AbstractPersistor.js b/libraries/object-persistor/src/AbstractPersistor.js index a856d388a0..b17d3e7325 100644 --- a/libraries/object-persistor/src/AbstractPersistor.js +++ b/libraries/object-persistor/src/AbstractPersistor.js @@ -19,7 +19,14 @@ module.exports = class AbstractPersistor { }) } - // opts may be {start: Number, end: Number} + /** + * @param location + * @param name + * @param {Object} opts + * @param {Number} opts.start + * @param {Number} opts.end + * @return {Promise} + */ async getObjectStream(location, name, opts) { throw new NotImplementedError('method not implemented in persistor', { method: 'getObjectStream', diff --git a/libraries/object-persistor/src/GcsPersistor.js b/libraries/object-persistor/src/GcsPersistor.js index 54215734f9..dab155930b 100644 --- a/libraries/object-persistor/src/GcsPersistor.js +++ b/libraries/object-persistor/src/GcsPersistor.js @@ -1,5 +1,6 @@ const fs = require('fs') const { pipeline } = require('stream/promises') +const { PassThrough } = require('stream') const { Storage } = require('@google-cloud/storage') const { WriteError, ReadError, NotFoundError } = require('./Errors') const asyncPool = require('tiny-async-pool') @@ -101,16 +102,22 @@ module.exports = class GcsPersistor extends AbstractPersistor { .file(key) .createReadStream({ decompress: false, ...opts }) - // ingress to us from gcs - const observer = new PersistorHelper.ObserverStream({ - metric: 'gcs.ingress', - Metrics: this.settings.Metrics, - }) - try { - // wait for the pipeline to be ready, to catch non-200s - await PersistorHelper.getReadyPipeline(stream, observer) - return observer + await new Promise((resolve, reject) => { + stream.on('response', res => { + switch (res.statusCode) { + case 200: // full response + case 206: // partial response + return resolve() + case 404: + return reject(new NotFoundError()) + default: + return reject(new Error('non success status: ' + res.statusCode)) + } + }) + stream.on('error', reject) + stream.read(0) // kick off request + }) } catch (err) { throw PersistorHelper.wrapError( err, @@ -119,6 +126,16 @@ module.exports = class GcsPersistor extends AbstractPersistor { ReadError ) } + + // ingress to us from gcs + const observer = new PersistorHelper.ObserverStream({ + metric: 'gcs.ingress', + Metrics: this.settings.Metrics, + }) + + const pass = new PassThrough() + pipeline(stream, observer, pass).catch(() => {}) + return pass } async getRedirectUrl(bucketName, key) { diff --git a/libraries/object-persistor/src/PersistorHelper.js b/libraries/object-persistor/src/PersistorHelper.js index 5079ebcceb..015fedd0d8 100644 --- a/libraries/object-persistor/src/PersistorHelper.js +++ b/libraries/object-persistor/src/PersistorHelper.js @@ -2,7 +2,7 @@ const Crypto = require('crypto') const Stream = require('stream') const { pipeline } = require('stream/promises') const Logger = require('@overleaf/logger') -const { WriteError, ReadError, NotFoundError } = require('./Errors') +const { WriteError, NotFoundError } = require('./Errors') // Observes data that passes through and computes some metadata for it // - specifically, it computes the number of bytes transferred, and optionally @@ -47,7 +47,6 @@ module.exports = { ObserverStream, calculateStreamMd5, verifyMd5, - getReadyPipeline, wrapError, hexToBase64, base64ToHex, @@ -87,70 +86,6 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) { } } -// resolves when a stream is 'readable', or rejects if the stream throws an error -// before that happens - this lets us handle protocol-level errors before trying -// to read them -function getReadyPipeline(...streams) { - return new Promise((resolve, reject) => { - const lastStream = streams.slice(-1)[0] - - // in case of error or stream close, we must ensure that we drain the - // previous stream so that it can clean up its socket (if it has one) - const drainPreviousStream = function (previousStream) { - // this stream is no longer reliable, so don't pipe anything more into it - previousStream.unpipe(this) - previousStream.resume() - } - - // handler to resolve when either: - // - an error happens, or - // - the last stream in the chain is readable - // for example, in the case of a 4xx error an error will occur and the - // streams will not become readable - const handler = function (err) { - // remove handler from all streams because we don't want to do this on - // later errors - lastStream.removeListener('readable', handler) - for (const stream of streams) { - stream.removeListener('error', handler) - } - - // return control to the caller - if (err) { - reject( - wrapError(err, 'error before stream became ready', {}, ReadError) - ) - } else { - resolve(lastStream) - } - } - - // ensure the handler fires when the last strem becomes readable - lastStream.on('readable', handler) - - for (const stream of streams) { - // when a stream receives a pipe, set up the drain handler to drain the - // connection if an error occurs or the stream is closed - stream.on('pipe', previousStream => { - stream.on('error', x => { - drainPreviousStream(previousStream) - }) - stream.on('close', () => { - drainPreviousStream(previousStream) - }) - }) - // add the handler function to resolve this method on error if we can't - // set up the pipeline - stream.on('error', handler) - } - - // begin the pipeline - for (let index = 0; index < streams.length - 1; index++) { - streams[index].pipe(streams[index + 1]) - } - }) -} - function wrapError(error, message, params, ErrorType) { if ( error instanceof NotFoundError || diff --git a/libraries/object-persistor/src/S3Persistor.js b/libraries/object-persistor/src/S3Persistor.js index 8b1a63c44e..048cff6564 100644 --- a/libraries/object-persistor/src/S3Persistor.js +++ b/libraries/object-persistor/src/S3Persistor.js @@ -10,6 +10,7 @@ if (https.globalAgent.maxSockets < 300) { const AbstractPersistor = require('./AbstractPersistor') const PersistorHelper = require('./PersistorHelper') +const { pipeline, PassThrough } = require('stream') const fs = require('fs') const S3 = require('aws-sdk/clients/s3') const { URL } = require('url') @@ -41,7 +42,7 @@ module.exports = class S3Persistor extends AbstractPersistor { const observer = new PersistorHelper.ObserverStream(observeOptions) // observer will catch errors, clean up and log a warning - readStream.pipe(observer) + pipeline(readStream, observer, () => {}) // if we have an md5 hash, pass this to S3 to verify the upload const uploadOptions = { @@ -90,20 +91,28 @@ module.exports = class S3Persistor extends AbstractPersistor { params.Range = `bytes=${opts.start}-${opts.end}` } - const stream = this._getClientForBucket(bucketName) - .getObject(params) - .createReadStream() - - // ingress from S3 to us - const observer = new PersistorHelper.ObserverStream({ - metric: 's3.ingress', - Metrics: this.settings.Metrics, - }) + const req = this._getClientForBucket(bucketName).getObject(params) + const stream = req.createReadStream() try { - // wait for the pipeline to be ready, to catch non-200s - await PersistorHelper.getReadyPipeline(stream, observer) - return observer + await new Promise((resolve, reject) => { + req.on('httpHeaders', statusCode => { + switch (statusCode) { + case 200: // full response + case 206: // partial response + return resolve() + case 403: // AccessDenied is handled the same as NoSuchKey + case 404: // NoSuchKey + return reject(new NotFoundError()) + default: + return reject(new Error('non success status: ' + statusCode)) + } + }) + // The AWS SDK is forwarding any errors from the request to the stream. + // The AWS SDK is emitting additional errors on the stream ahead of starting to stream. + stream.on('error', reject) + // The AWS SDK is kicking off the request in the next event loop cycle. + }) } catch (err) { throw PersistorHelper.wrapError( err, @@ -112,6 +121,18 @@ module.exports = class S3Persistor extends AbstractPersistor { ReadError ) } + + // ingress from S3 to us + const observer = new PersistorHelper.ObserverStream({ + metric: 's3.ingress', + Metrics: this.settings.Metrics, + }) + + const pass = new PassThrough() + pipeline(stream, observer, pass, err => { + if (err) req.abort() + }) + return pass } async getRedirectUrl(bucketName, key) { diff --git a/libraries/object-persistor/test/unit/GcsPersistorTests.js b/libraries/object-persistor/test/unit/GcsPersistorTests.js index 027d08d638..f6cc616041 100644 --- a/libraries/object-persistor/test/unit/GcsPersistorTests.js +++ b/libraries/object-persistor/test/unit/GcsPersistorTests.js @@ -1,3 +1,4 @@ +const { EventEmitter } = require('events') const sinon = require('sinon') const chai = require('chai') const { expect } = chai @@ -21,6 +22,7 @@ describe('GcsPersistorTests', function () { let Logger, Transform, + PassThrough, Storage, Fs, GcsNotFoundError, @@ -55,30 +57,28 @@ describe('GcsPersistorTests', function () { }, ] - ReadStream = { - pipe: sinon.stub().returns('readStream'), - on: sinon.stub(), - removeListener: sinon.stub(), - } - ReadStream.on.withArgs('end').yields() - ReadStream.on.withArgs('pipe').yields({ - unpipe: sinon.stub(), - resume: sinon.stub(), - on: sinon.stub(), - }) - - Transform = class { - on(event, callback) { - if (event === 'readable') { - callback() - } + class FakeGCSResponse extends EventEmitter { + constructor() { + super() + this.statusCode = 200 + this.err = null } + read() { + if (this.err) return this.emit('error', this.err) + this.emit('response', { statusCode: this.statusCode }) + } + } + + ReadStream = new FakeGCSResponse() + PassThrough = class {} + + Transform = class { once() {} - removeListener() {} } Stream = { + PassThrough, Transform, } @@ -155,8 +155,8 @@ describe('GcsPersistorTests', function () { stream = await GcsPersistor.getObjectStream(bucket, key) }) - it('returns a metered stream', function () { - expect(stream).to.be.instanceOf(Transform) + it('returns a PassThrough stream', function () { + expect(stream).to.be.instanceOf(PassThrough) }) it('fetches the right key from the right bucket', function () { @@ -172,8 +172,10 @@ describe('GcsPersistorTests', function () { }) it('pipes the stream through the meter', function () { - expect(ReadStream.pipe).to.have.been.calledWith( - sinon.match.instanceOf(Transform) + expect(StreamPromises.pipeline).to.have.been.calledWith( + ReadStream, + sinon.match.instanceOf(Transform), + sinon.match.instanceOf(PassThrough) ) }) }) @@ -188,8 +190,8 @@ describe('GcsPersistorTests', function () { }) }) - it('returns a metered stream', function () { - expect(stream).to.be.instanceOf(Transform) + it('returns a PassThrough stream', function () { + expect(stream).to.be.instanceOf(PassThrough) }) it('passes the byte range on to GCS', function () { @@ -205,8 +207,7 @@ describe('GcsPersistorTests', function () { let error, stream beforeEach(async function () { - Transform.prototype.on = sinon.stub() - ReadStream.on.withArgs('error').yields(GcsNotFoundError) + ReadStream.statusCode = 404 try { stream = await GcsPersistor.getObjectStream(bucket, key) } catch (e) { @@ -231,12 +232,11 @@ describe('GcsPersistorTests', function () { }) }) - describe('when Gcs encounters an unkown error', function () { + describe('when Gcs encounters an unknown error', function () { let error, stream beforeEach(async function () { - Transform.prototype.on = sinon.stub() - ReadStream.on.withArgs('error').yields(genericError) + ReadStream.err = genericError try { stream = await GcsPersistor.getObjectStream(bucket, key) } catch (err) { diff --git a/libraries/object-persistor/test/unit/S3PersistorTests.js b/libraries/object-persistor/test/unit/S3PersistorTests.js index 0c109202a1..c374d873d7 100644 --- a/libraries/object-persistor/test/unit/S3PersistorTests.js +++ b/libraries/object-persistor/test/unit/S3PersistorTests.js @@ -3,6 +3,7 @@ const chai = require('chai') const { expect } = chai const SandboxedModule = require('sandboxed-module') const Errors = require('../../src/Errors') +const { EventEmitter } = require('events') const MODULE_PATH = '../../src/S3Persistor.js' @@ -31,14 +32,15 @@ describe('S3PersistorTests', function () { let Logger, Transform, + PassThrough, S3, Fs, ReadStream, Stream, StreamPromises, + S3GetObjectRequest, S3Persistor, S3Client, - S3ReadStream, S3NotFoundError, S3AccessDeniedError, FileNotFoundError, @@ -55,18 +57,15 @@ describe('S3PersistorTests', function () { } Transform = class { - on(event, callback) { - if (event === 'readable') { - callback() - } - } - once() {} - removeListener() {} } + PassThrough = class {} + Stream = { Transform, + PassThrough, + pipeline: sinon.stub().yields(), } StreamPromises = { @@ -77,16 +76,28 @@ describe('S3PersistorTests', function () { promise: sinon.stub().resolves(), } - ReadStream = { - pipe: sinon.stub().returns('readStream'), - on: sinon.stub(), - removeListener: sinon.stub(), + ReadStream = new EventEmitter() + class FakeS3GetObjectRequest extends EventEmitter { + constructor() { + super() + this.statusCode = 200 + this.err = null + this.aborted = false + } + + abort() { + this.aborted = true + } + + createReadStream() { + setTimeout(() => { + if (this.err) return ReadStream.emit('error', this.err) + this.emit('httpHeaders', this.statusCode) + }) + return ReadStream + } } - ReadStream.on.withArgs('end').yields() - ReadStream.on.withArgs('pipe').yields({ - unpipe: sinon.stub(), - resume: sinon.stub(), - }) + S3GetObjectRequest = new FakeS3GetObjectRequest() FileNotFoundError = new Error('File not found') FileNotFoundError.code = 'ENOENT' @@ -101,20 +112,8 @@ describe('S3PersistorTests', function () { S3AccessDeniedError = new Error('access denied') S3AccessDeniedError.code = 'AccessDenied' - S3ReadStream = { - on: sinon.stub(), - pipe: sinon.stub(), - removeListener: sinon.stub(), - } - S3ReadStream.on.withArgs('end').yields() - S3ReadStream.on.withArgs('pipe').yields({ - unpipe: sinon.stub(), - resume: sinon.stub(), - }) S3Client = { - getObject: sinon.stub().returns({ - createReadStream: sinon.stub().returns(S3ReadStream), - }), + getObject: sinon.stub().returns(S3GetObjectRequest), headObject: sinon.stub().returns({ promise: sinon.stub().resolves({ ContentLength: objectSize, @@ -171,8 +170,8 @@ describe('S3PersistorTests', function () { stream = await S3Persistor.getObjectStream(bucket, key) }) - it('returns a metered stream', function () { - expect(stream).to.be.instanceOf(Transform) + it('returns a PassThrough stream', function () { + expect(stream).to.be.instanceOf(PassThrough) }) it('sets the AWS client up with credentials from settings', function () { @@ -187,10 +186,16 @@ describe('S3PersistorTests', function () { }) it('pipes the stream through the meter', async function () { - expect(S3ReadStream.pipe).to.have.been.calledWith( - sinon.match.instanceOf(Transform) + expect(Stream.pipeline).to.have.been.calledWith( + ReadStream, + sinon.match.instanceOf(Transform), + sinon.match.instanceOf(PassThrough) ) }) + + it('does not abort the request', function () { + expect(S3GetObjectRequest.aborted).to.equal(false) + }) }) describe('when called with a byte range', function () { @@ -203,8 +208,8 @@ describe('S3PersistorTests', function () { }) }) - it('returns a metered stream', function () { - expect(stream).to.be.instanceOf(Stream.Transform) + it('returns a PassThrough stream', function () { + expect(stream).to.be.instanceOf(Stream.PassThrough) }) it('passes the byte range on to S3', function () { @@ -216,6 +221,23 @@ describe('S3PersistorTests', function () { }) }) + describe('when streaming fails', function () { + let stream + + beforeEach(async function () { + Stream.pipeline.yields(new Error()) + stream = await S3Persistor.getObjectStream(bucket, key) + }) + + it('returns a PassThrough stream', function () { + expect(stream).to.be.instanceOf(Stream.PassThrough) + }) + + it('aborts the request', function () { + expect(S3GetObjectRequest.aborted).to.equal(true) + }) + }) + describe('when there are alternative credentials', function () { let stream const alternativeSecret = 'giraffe' @@ -237,8 +259,8 @@ describe('S3PersistorTests', function () { stream = await S3Persistor.getObjectStream(bucket, key) }) - it('returns a metered stream', function () { - expect(stream).to.be.instanceOf(Stream.Transform) + it('returns a PassThrough stream', function () { + expect(stream).to.be.instanceOf(Stream.PassThrough) }) it('sets the AWS client up with the alternative credentials', function () { @@ -289,8 +311,7 @@ describe('S3PersistorTests', function () { let error, stream beforeEach(async function () { - Transform.prototype.on = sinon.stub() - S3ReadStream.on.withArgs('error').yields(S3NotFoundError) + S3GetObjectRequest.statusCode = 404 try { stream = await S3Persistor.getObjectStream(bucket, key) } catch (err) { @@ -319,8 +340,7 @@ describe('S3PersistorTests', function () { let error, stream beforeEach(async function () { - Transform.prototype.on = sinon.stub() - S3ReadStream.on.withArgs('error').yields(S3AccessDeniedError) + S3GetObjectRequest.statusCode = 403 try { stream = await S3Persistor.getObjectStream(bucket, key) } catch (err) { @@ -345,12 +365,11 @@ describe('S3PersistorTests', function () { }) }) - describe('when S3 encounters an unkown error', function () { + describe('when S3 encounters an unknown error', function () { let error, stream beforeEach(async function () { - Transform.prototype.on = sinon.stub() - S3ReadStream.on.withArgs('error').yields(genericError) + S3GetObjectRequest.err = genericError try { stream = await S3Persistor.getObjectStream(bucket, key) } catch (err) { @@ -480,7 +499,8 @@ describe('S3PersistorTests', function () { }) it('should meter the stream', function () { - expect(ReadStream.pipe).to.have.been.calledWith( + expect(Stream.pipeline).to.have.been.calledWith( + ReadStream, sinon.match.instanceOf(Stream.Transform) ) }) @@ -608,12 +628,6 @@ describe('S3PersistorTests', function () { }) }) - it('should meter the download', function () { - expect(S3ReadStream.pipe).to.have.been.calledWith( - sinon.match.instanceOf(Stream.Transform) - ) - }) - it('should calculate the md5 hash from the file', function () { expect(Hash.read).to.have.been.called }) diff --git a/patches/@google-cloud+storage++retry-request+5.0.2.patch b/patches/@google-cloud+storage++retry-request+5.0.2.patch new file mode 100644 index 0000000000..bbf8a8f66b --- /dev/null +++ b/patches/@google-cloud+storage++retry-request+5.0.2.patch @@ -0,0 +1,30 @@ +diff --git a/node_modules/@google-cloud/storage/node_modules/retry-request/index.js b/node_modules/@google-cloud/storage/node_modules/retry-request/index.js +index a293298..df21af6 100644 +--- a/node_modules/@google-cloud/storage/node_modules/retry-request/index.js ++++ b/node_modules/@google-cloud/storage/node_modules/retry-request/index.js +@@ -1,6 +1,6 @@ + 'use strict'; + +-const {PassThrough} = require('stream'); ++const { PassThrough, pipeline } = require('stream'); + const debug = require('debug')('retry-request'); + const extend = require('extend'); + +@@ -166,7 +166,7 @@ function retryRequest(requestOpts, opts, callback) { + }) + .on('complete', retryStream.emit.bind(retryStream, 'complete')); + +- requestStream.pipe(delayStream); ++ pipeline(requestStream, delayStream, () => {}); + } else { + activeRequest = opts.request(requestOpts, onResponse); + } +@@ -232,7 +232,7 @@ function retryRequest(requestOpts, opts, callback) { + // No more attempts need to be made, just continue on. + if (streamMode) { + retryStream.emit('response', response); +- delayStream.pipe(retryStream); ++ pipeline(delayStream, retryStream, () => {}); + requestStream.on('error', err => { + retryStream.destroy(err); + }); diff --git a/patches/@google-cloud+storage++teeny-request+8.0.2.patch b/patches/@google-cloud+storage++teeny-request+8.0.2.patch new file mode 100644 index 0000000000..738eef51dc --- /dev/null +++ b/patches/@google-cloud+storage++teeny-request+8.0.2.patch @@ -0,0 +1,50 @@ +diff --git a/node_modules/@google-cloud/storage/node_modules/teeny-request/build/src/index.js b/node_modules/@google-cloud/storage/node_modules/teeny-request/build/src/index.js +index a2251ca..e29e796 100644 +--- a/node_modules/@google-cloud/storage/node_modules/teeny-request/build/src/index.js ++++ b/node_modules/@google-cloud/storage/node_modules/teeny-request/build/src/index.js +@@ -166,27 +166,27 @@ function teenyRequest(reqOpts, callback) { + } + if (callback === undefined) { + // Stream mode +- const requestStream = streamEvents(new stream_1.PassThrough()); +- // eslint-disable-next-line @typescript-eslint/no-explicit-any +- let responseStream; +- requestStream.once('reading', () => { +- if (responseStream) { +- responseStream.pipe(requestStream); +- } +- else { +- requestStream.once('response', () => { +- responseStream.pipe(requestStream); +- }); +- } +- }); ++ const requestStream = new stream_1.PassThrough(); ++ // // eslint-disable-next-line @typescript-eslint/no-explicit-any ++ // let responseStream; ++ // requestStream.once('reading', () => { ++ // if (responseStream) { ++ // responseStream.pipe(requestStream); ++ // } ++ // else { ++ // requestStream.once('response', () => { ++ // responseStream.pipe(requestStream); ++ // }); ++ // } ++ // }); + options.compress = false; + teenyRequest.stats.requestStarting(); + (0, node_fetch_1.default)(uri, options).then(res => { +- teenyRequest.stats.requestFinished(); +- responseStream = res.body; +- responseStream.on('error', (err) => { +- requestStream.emit('error', err); +- }); ++ teenyRequest.stats.requestFinished(); stream_1.pipeline(res.body, requestStream, () => {}); ++ // responseStream = res.body; ++ // responseStream.on('error', (err) => { ++ // requestStream.emit('error', err); ++ // }); + const response = fetchToRequestResponse(options, res); + requestStream.emit('response', response); + }, err => { diff --git a/patches/retry-request+4.2.2.patch b/patches/retry-request+4.2.2.patch new file mode 100644 index 0000000000..f3096b54d4 --- /dev/null +++ b/patches/retry-request+4.2.2.patch @@ -0,0 +1,30 @@ +diff --git a/node_modules/retry-request/index.js b/node_modules/retry-request/index.js +index 6cd6f65..39efb89 100644 +--- a/node_modules/retry-request/index.js ++++ b/node_modules/retry-request/index.js +@@ -1,6 +1,6 @@ + 'use strict'; + +-var { PassThrough } = require('stream'); ++var { PassThrough, pipeline } = require('stream'); + var debug = require('debug')('retry-request'); + var extend = require('extend'); + +@@ -164,7 +164,7 @@ function retryRequest(requestOpts, opts, callback) { + }) + .on('complete', retryStream.emit.bind(retryStream, 'complete')); + +- requestStream.pipe(delayStream); ++ pipeline(requestStream, delayStream, () => {}); + } else { + activeRequest = opts.request(requestOpts, onResponse); + } +@@ -220,7 +220,7 @@ function retryRequest(requestOpts, opts, callback) { + // No more attempts need to be made, just continue on. + if (streamMode) { + retryStream.emit('response', response); +- delayStream.pipe(retryStream); ++ pipeline(delayStream, retryStream, () => {}); + requestStream.on('error', function (err) { + retryStream.destroy(err); + }); diff --git a/patches/teeny-request+7.1.3.patch b/patches/teeny-request+7.1.3.patch new file mode 100644 index 0000000000..213ed046e3 --- /dev/null +++ b/patches/teeny-request+7.1.3.patch @@ -0,0 +1,49 @@ +diff --git a/node_modules/teeny-request/build/src/index.js b/node_modules/teeny-request/build/src/index.js +index f209888..e9fe982 100644 +--- a/node_modules/teeny-request/build/src/index.js ++++ b/node_modules/teeny-request/build/src/index.js +@@ -166,27 +166,27 @@ function teenyRequest(reqOpts, callback) { + } + if (callback === undefined) { + // Stream mode +- const requestStream = streamEvents(new stream_1.PassThrough()); ++ const requestStream = new stream_1.PassThrough(); + // eslint-disable-next-line @typescript-eslint/no-explicit-any +- let responseStream; +- requestStream.once('reading', () => { +- if (responseStream) { +- responseStream.pipe(requestStream); +- } +- else { +- requestStream.once('response', () => { +- responseStream.pipe(requestStream); +- }); +- } +- }); ++ // let responseStream; ++ // requestStream.once('reading', () => { ++ // if (responseStream) { ++ // responseStream.pipe(requestStream); ++ // } ++ // else { ++ // requestStream.once('response', () => { ++ // responseStream.pipe(requestStream); ++ // }); ++ // } ++ // }); + options.compress = false; + teenyRequest.stats.requestStarting(); + node_fetch_1.default(uri, options).then(res => { +- teenyRequest.stats.requestFinished(); +- responseStream = res.body; +- responseStream.on('error', (err) => { +- requestStream.emit('error', err); +- }); ++ teenyRequest.stats.requestFinished(); stream_1.pipeline(res.body, requestStream, () => {}); ++ // responseStream = res.body; ++ // responseStream.on('error', (err) => { ++ // requestStream.emit('error', err); ++ // }); + const response = fetchToRequestResponse(options, res); + requestStream.emit('response', response); + }, err => {