diff --git a/services/filestore/app/js/GcsPersistor.js b/services/filestore/app/js/GcsPersistor.js index ae1c2dd53a..99a8c1a513 100644 --- a/services/filestore/app/js/GcsPersistor.js +++ b/services/filestore/app/js/GcsPersistor.js @@ -126,10 +126,10 @@ async function getFileStream(bucketName, key, _opts = {}) { const observer = new PersistorHelper.ObserverStream({ metric: 'gcs.ingress' }) - pipeline(stream, observer) try { - await PersistorHelper.waitForStreamReady(stream) + // wait for the pipeline to be ready, to catch non-200s + await PersistorHelper.getReadyPipeline(stream, observer) return observer } catch (err) { throw PersistorHelper.wrapError( diff --git a/services/filestore/app/js/PersistorHelper.js b/services/filestore/app/js/PersistorHelper.js index ad5152374f..f2d0013915 100644 --- a/services/filestore/app/js/PersistorHelper.js +++ b/services/filestore/app/js/PersistorHelper.js @@ -15,6 +15,7 @@ const pipeline = promisify(Stream.pipeline) // the number of bytes transferred class ObserverStream extends Stream.Transform { constructor(options) { + options.autoDestroy = true super(options) this.bytes = 0 @@ -49,7 +50,7 @@ module.exports = { ObserverStream, calculateStreamMd5, verifyMd5, - waitForStreamReady, + getReadyPipeline, wrapError, hexToBase64, base64ToHex @@ -94,18 +95,27 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) { // resolves when a stream is 'readable', or rejects if the stream throws an error // before that happens - this lets us handle protocol-level errors before trying // to read them -function waitForStreamReady(stream) { +function getReadyPipeline(...streams) { return new Promise((resolve, reject) => { - const onError = function(err) { - reject(wrapError(err, 'error before stream became ready', {}, ReadError)) + const lastStream = streams.slice(-1)[0] + let resolvedOrErrored = false + + const handler = function(err) { + if (!resolvedOrErrored) { + resolvedOrErrored = true + + lastStream.removeListener('readable', handler) + if (err) { + return reject( + wrapError(err, 'error before stream became ready', {}, ReadError) + ) + } + resolve(lastStream) + } } - const onStreamReady = function() { - stream.removeListener('readable', onStreamReady) - stream.removeListener('error', onError) - resolve(stream) - } - stream.on('readable', onStreamReady) - stream.on('error', onError) + + pipeline(...streams).catch(handler) + lastStream.on('readable', handler) }) } diff --git a/services/filestore/app/js/S3Persistor.js b/services/filestore/app/js/S3Persistor.js index ba82db31e2..8216c5f7cb 100644 --- a/services/filestore/app/js/S3Persistor.js +++ b/services/filestore/app/js/S3Persistor.js @@ -131,10 +131,10 @@ async function getFileStream(bucketName, key, opts) { // ingress from S3 to us const observer = new PersistorHelper.ObserverStream({ metric: 's3.ingress' }) - pipeline(stream, observer) try { - await PersistorHelper.waitForStreamReady(stream) + // wait for the pipeline to be ready, to catch non-200s + await PersistorHelper.getReadyPipeline(stream, observer) return observer } catch (err) { throw PersistorHelper.wrapError( diff --git a/services/filestore/test/acceptance/js/FilestoreTests.js b/services/filestore/test/acceptance/js/FilestoreTests.js index 668570e7cb..7e3b197a9c 100644 --- a/services/filestore/test/acceptance/js/FilestoreTests.js +++ b/services/filestore/test/acceptance/js/FilestoreTests.js @@ -26,6 +26,12 @@ if (!process.env.AWS_ACCESS_KEY_ID) { throw new Error('please provide credentials for the AWS S3 test server') } +process.on('unhandledRejection', e => { + // eslint-disable-next-line no-console + console.log('** Unhandled Promise Rejection **\n', e) + throw e +}) + // store settings for multiple backends, so that we can test each one. // fs will always be available - add others if they are configured const BackendSettings = require('./TestConfig') diff --git a/services/filestore/test/unit/js/GcsPersistorTests.js b/services/filestore/test/unit/js/GcsPersistorTests.js index 0ca0f39d0f..cc13c45ce7 100644 --- a/services/filestore/test/unit/js/GcsPersistorTests.js +++ b/services/filestore/test/unit/js/GcsPersistorTests.js @@ -5,7 +5,6 @@ const modulePath = '../../../app/js/GcsPersistor.js' const SandboxedModule = require('sandboxed-module') const { ObjectId } = require('mongodb') const asyncPool = require('tiny-async-pool') -const StreamModule = require('stream') const Errors = require('../../../app/js/Errors') @@ -21,6 +20,7 @@ describe('GcsPersistorTests', function() { let Metrics, Logger, + Transform, Storage, Fs, GcsNotFoundError, @@ -68,9 +68,20 @@ describe('GcsPersistorTests', function() { removeListener: sinon.stub() } + Transform = class { + on(event, callback) { + if (event === 'readable') { + callback() + } + } + + once() {} + removeListener() {} + } + Stream = { pipeline: sinon.stub().yields(), - Transform: StreamModule.Transform + Transform: Transform } Metrics = { @@ -147,7 +158,7 @@ describe('GcsPersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.be.instanceOf(StreamModule.Transform) + expect(stream).to.be.instanceOf(Transform) }) it('fetches the right key from the right bucket', function() { @@ -159,7 +170,7 @@ describe('GcsPersistorTests', function() { it('pipes the stream through the meter', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - sinon.match.instanceOf(StreamModule.Transform) + sinon.match.instanceOf(Transform) ) }) }) @@ -175,7 +186,7 @@ describe('GcsPersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.be.instanceOf(StreamModule.Transform) + expect(stream).to.be.instanceOf(Transform) }) it('passes the byte range on to GCS', function() { @@ -190,8 +201,8 @@ describe('GcsPersistorTests', function() { let error, stream beforeEach(async function() { - ReadStream.on = sinon.stub() - ReadStream.on.withArgs('error').yields(GcsNotFoundError) + Transform.prototype.on = sinon.stub() + Stream.pipeline.yields(GcsNotFoundError) try { stream = await GcsPersistor.promises.getFileStream(bucket, key) } catch (err) { @@ -220,8 +231,8 @@ describe('GcsPersistorTests', function() { let error, stream beforeEach(async function() { - ReadStream.on = sinon.stub() - ReadStream.on.withArgs('error').yields(genericError) + Transform.prototype.on = sinon.stub() + Stream.pipeline.yields(genericError) try { stream = await GcsPersistor.promises.getFileStream(bucket, key) } catch (err) { @@ -330,7 +341,7 @@ describe('GcsPersistorTests', function() { it('should meter the stream and pass it to GCS', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - sinon.match.instanceOf(StreamModule.Transform), + sinon.match.instanceOf(Transform), WriteStream ) }) @@ -375,7 +386,7 @@ describe('GcsPersistorTests', function() { Stream.pipeline .withArgs( ReadStream, - sinon.match.instanceOf(StreamModule.Transform), + sinon.match.instanceOf(Transform), WriteStream, sinon.match.any ) @@ -416,7 +427,7 @@ describe('GcsPersistorTests', function() { it('should upload the stream via the meter', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - sinon.match.instanceOf(StreamModule.Transform), + sinon.match.instanceOf(Transform), WriteStream ) }) diff --git a/services/filestore/test/unit/js/S3PersistorTests.js b/services/filestore/test/unit/js/S3PersistorTests.js index 2117164d74..565e3e0bc9 100644 --- a/services/filestore/test/unit/js/S3PersistorTests.js +++ b/services/filestore/test/unit/js/S3PersistorTests.js @@ -3,7 +3,6 @@ const chai = require('chai') const { expect } = chai const modulePath = '../../../app/js/S3Persistor.js' const SandboxedModule = require('sandboxed-module') -const StreamModule = require('stream') const Errors = require('../../../app/js/Errors') @@ -31,6 +30,7 @@ describe('S3PersistorTests', function() { let Metrics, Logger, + Transform, S3, Fs, ReadStream, @@ -61,9 +61,20 @@ describe('S3PersistorTests', function() { } } + Transform = class { + on(event, callback) { + if (event === 'readable') { + callback() + } + } + + once() {} + removeListener() {} + } + Stream = { pipeline: sinon.stub().yields(), - Transform: StreamModule.Transform + Transform: Transform } EmptyPromise = { @@ -100,7 +111,6 @@ describe('S3PersistorTests', function() { pipe: sinon.stub(), removeListener: sinon.stub() } - S3ReadStream.on.withArgs('readable').yields() S3Client = { getObject: sinon.stub().returns({ createReadStream: sinon.stub().returns(S3ReadStream) @@ -163,7 +173,7 @@ describe('S3PersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.be.instanceOf(StreamModule.Transform) + expect(stream).to.be.instanceOf(Transform) }) it('sets the AWS client up with credentials from settings', function() { @@ -180,7 +190,7 @@ describe('S3PersistorTests', function() { it('pipes the stream through the meter', function() { expect(Stream.pipeline).to.have.been.calledWith( S3ReadStream, - sinon.match.instanceOf(StreamModule.Transform) + sinon.match.instanceOf(Transform) ) }) }) @@ -281,8 +291,8 @@ describe('S3PersistorTests', function() { let error, stream beforeEach(async function() { - S3ReadStream.on = sinon.stub() - S3ReadStream.on.withArgs('error').yields(S3NotFoundError) + Transform.prototype.on = sinon.stub() + Stream.pipeline.yields(S3NotFoundError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -311,8 +321,8 @@ describe('S3PersistorTests', function() { let error, stream beforeEach(async function() { - S3ReadStream.on = sinon.stub() - S3ReadStream.on.withArgs('error').yields(S3AccessDeniedError) + Transform.prototype.on = sinon.stub() + Stream.pipeline.yields(S3AccessDeniedError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -341,8 +351,8 @@ describe('S3PersistorTests', function() { let error, stream beforeEach(async function() { - S3ReadStream.on = sinon.stub() - S3ReadStream.on.withArgs('error').yields(genericError) + Transform.prototype.on = sinon.stub() + Stream.pipeline.yields(genericError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -485,7 +495,7 @@ describe('S3PersistorTests', function() { expect(S3Client.upload).to.have.been.calledWith({ Bucket: bucket, Key: key, - Body: sinon.match.instanceOf(StreamModule.Transform), + Body: sinon.match.instanceOf(Transform), ContentMD5: 'qqqqqru7u7uqqqqqu7u7uw==' }) }) @@ -560,7 +570,7 @@ describe('S3PersistorTests', function() { expect(S3Client.upload).to.have.been.calledWith({ Bucket: bucket, Key: key, - Body: sinon.match.instanceOf(StreamModule.Transform) + Body: sinon.match.instanceOf(Transform) }) }) })