From 7e45a82c356bba713205bede583af4e843819269 Mon Sep 17 00:00:00 2001 From: Simon Detheridge Date: Thu, 26 Mar 2020 15:09:56 +0000 Subject: [PATCH 1/5] Use autodestroy on Transform stream --- services/filestore/app/js/PersistorHelper.js | 1 + 1 file changed, 1 insertion(+) diff --git a/services/filestore/app/js/PersistorHelper.js b/services/filestore/app/js/PersistorHelper.js index ad5152374f..9df7d42369 100644 --- a/services/filestore/app/js/PersistorHelper.js +++ b/services/filestore/app/js/PersistorHelper.js @@ -15,6 +15,7 @@ const pipeline = promisify(Stream.pipeline) // the number of bytes transferred class ObserverStream extends Stream.Transform { constructor(options) { + options.autoDestroy = true super(options) this.bytes = 0 From 238d96ed448787ff3e91a9f926f0bdb08a9b4c99 Mon Sep 17 00:00:00 2001 From: Simon Detheridge Date: Thu, 26 Mar 2020 15:11:22 +0000 Subject: [PATCH 2/5] Fail acceptance tests on unhandled promise rejection --- services/filestore/test/acceptance/js/FilestoreTests.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/services/filestore/test/acceptance/js/FilestoreTests.js b/services/filestore/test/acceptance/js/FilestoreTests.js index 668570e7cb..a2c710a382 100644 --- a/services/filestore/test/acceptance/js/FilestoreTests.js +++ b/services/filestore/test/acceptance/js/FilestoreTests.js @@ -26,6 +26,12 @@ if (!process.env.AWS_ACCESS_KEY_ID) { throw new Error('please provide credentials for the AWS S3 test server') } +process.on('unhandledRejection', e => { + // eslint-disable no-console + console.log('** Unhandled Promise Rejection **\n', e) + throw e +}) + // store settings for multiple backends, so that we can test each one. // fs will always be available - add others if they are configured const BackendSettings = require('./TestConfig') From 1f037ef653b6bdf9f97f01e3c1711bdbb58c7d6f Mon Sep 17 00:00:00 2001 From: Simon Detheridge Date: Thu, 26 Mar 2020 16:24:08 +0000 Subject: [PATCH 3/5] Catch errors from pipeline and stream when waiting for streams to be readable --- services/filestore/app/js/GcsPersistor.js | 4 +-- services/filestore/app/js/PersistorHelper.js | 32 ++++++++++++----- services/filestore/app/js/S3Persistor.js | 4 +-- .../test/unit/js/GcsPersistorTests.js | 35 +++++++++++------- .../test/unit/js/S3PersistorTests.js | 36 ++++++++++++------- 5 files changed, 73 insertions(+), 38 deletions(-) diff --git a/services/filestore/app/js/GcsPersistor.js b/services/filestore/app/js/GcsPersistor.js index ae1c2dd53a..99a8c1a513 100644 --- a/services/filestore/app/js/GcsPersistor.js +++ b/services/filestore/app/js/GcsPersistor.js @@ -126,10 +126,10 @@ async function getFileStream(bucketName, key, _opts = {}) { const observer = new PersistorHelper.ObserverStream({ metric: 'gcs.ingress' }) - pipeline(stream, observer) try { - await PersistorHelper.waitForStreamReady(stream) + // wait for the pipeline to be ready, to catch non-200s + await PersistorHelper.getReadyPipeline(stream, observer) return observer } catch (err) { throw PersistorHelper.wrapError( diff --git a/services/filestore/app/js/PersistorHelper.js b/services/filestore/app/js/PersistorHelper.js index 9df7d42369..f99dd78e45 100644 --- a/services/filestore/app/js/PersistorHelper.js +++ b/services/filestore/app/js/PersistorHelper.js @@ -50,7 +50,7 @@ module.exports = { ObserverStream, calculateStreamMd5, verifyMd5, - waitForStreamReady, + getReadyPipeline, wrapError, hexToBase64, base64ToHex @@ -94,19 +94,33 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) { // resolves when a stream is 'readable', or rejects if the stream throws an error // before that happens - this lets us handle protocol-level errors before trying -// to read them -function waitForStreamReady(stream) { +// to read them - these can come from the call to pipeline or the stream itself +function getReadyPipeline(...streams) { return new Promise((resolve, reject) => { + const lastStream = streams.slice(-1)[0] + let resolvedOrErrored = false + const onError = function(err) { - reject(wrapError(err, 'error before stream became ready', {}, ReadError)) + if (!resolvedOrErrored) { + resolvedOrErrored = true + reject( + wrapError(err, 'error before stream became ready', {}, ReadError) + ) + } } const onStreamReady = function() { - stream.removeListener('readable', onStreamReady) - stream.removeListener('error', onError) - resolve(stream) + if (!resolvedOrErrored) { + resolvedOrErrored = true + lastStream.removeListener('readable', onStreamReady) + lastStream.removeListener('error', onError) + resolve(lastStream) + } } - stream.on('readable', onStreamReady) - stream.on('error', onError) + + pipeline(...streams).catch(onError) + + lastStream.on('readable', onStreamReady) + lastStream.on('error', onError) }) } diff --git a/services/filestore/app/js/S3Persistor.js b/services/filestore/app/js/S3Persistor.js index ba82db31e2..8216c5f7cb 100644 --- a/services/filestore/app/js/S3Persistor.js +++ b/services/filestore/app/js/S3Persistor.js @@ -131,10 +131,10 @@ async function getFileStream(bucketName, key, opts) { // ingress from S3 to us const observer = new PersistorHelper.ObserverStream({ metric: 's3.ingress' }) - pipeline(stream, observer) try { - await PersistorHelper.waitForStreamReady(stream) + // wait for the pipeline to be ready, to catch non-200s + await PersistorHelper.getReadyPipeline(stream, observer) return observer } catch (err) { throw PersistorHelper.wrapError( diff --git a/services/filestore/test/unit/js/GcsPersistorTests.js b/services/filestore/test/unit/js/GcsPersistorTests.js index 0ca0f39d0f..2df42729ec 100644 --- a/services/filestore/test/unit/js/GcsPersistorTests.js +++ b/services/filestore/test/unit/js/GcsPersistorTests.js @@ -5,7 +5,6 @@ const modulePath = '../../../app/js/GcsPersistor.js' const SandboxedModule = require('sandboxed-module') const { ObjectId } = require('mongodb') const asyncPool = require('tiny-async-pool') -const StreamModule = require('stream') const Errors = require('../../../app/js/Errors') @@ -21,6 +20,7 @@ describe('GcsPersistorTests', function() { let Metrics, Logger, + Transform, Storage, Fs, GcsNotFoundError, @@ -68,9 +68,20 @@ describe('GcsPersistorTests', function() { removeListener: sinon.stub() } + Transform = class { + on(event, callback) { + if (event === 'readable') { + callback() + } + } + + once() {} + removeListener() {} + } + Stream = { pipeline: sinon.stub().yields(), - Transform: StreamModule.Transform + Transform: Transform } Metrics = { @@ -147,7 +158,7 @@ describe('GcsPersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.be.instanceOf(StreamModule.Transform) + expect(stream).to.be.instanceOf(Transform) }) it('fetches the right key from the right bucket', function() { @@ -159,7 +170,7 @@ describe('GcsPersistorTests', function() { it('pipes the stream through the meter', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - sinon.match.instanceOf(StreamModule.Transform) + sinon.match.instanceOf(Transform) ) }) }) @@ -175,7 +186,7 @@ describe('GcsPersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.be.instanceOf(StreamModule.Transform) + expect(stream).to.be.instanceOf(Transform) }) it('passes the byte range on to GCS', function() { @@ -190,8 +201,8 @@ describe('GcsPersistorTests', function() { let error, stream beforeEach(async function() { - ReadStream.on = sinon.stub() - ReadStream.on.withArgs('error').yields(GcsNotFoundError) + Transform.prototype.on = sinon.stub() + Transform.prototype.on.withArgs('error').yields(GcsNotFoundError) try { stream = await GcsPersistor.promises.getFileStream(bucket, key) } catch (err) { @@ -220,8 +231,8 @@ describe('GcsPersistorTests', function() { let error, stream beforeEach(async function() { - ReadStream.on = sinon.stub() - ReadStream.on.withArgs('error').yields(genericError) + Transform.prototype.on = sinon.stub() + Transform.prototype.on.withArgs('error').yields(genericError) try { stream = await GcsPersistor.promises.getFileStream(bucket, key) } catch (err) { @@ -330,7 +341,7 @@ describe('GcsPersistorTests', function() { it('should meter the stream and pass it to GCS', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - sinon.match.instanceOf(StreamModule.Transform), + sinon.match.instanceOf(Transform), WriteStream ) }) @@ -375,7 +386,7 @@ describe('GcsPersistorTests', function() { Stream.pipeline .withArgs( ReadStream, - sinon.match.instanceOf(StreamModule.Transform), + sinon.match.instanceOf(Transform), WriteStream, sinon.match.any ) @@ -416,7 +427,7 @@ describe('GcsPersistorTests', function() { it('should upload the stream via the meter', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - sinon.match.instanceOf(StreamModule.Transform), + sinon.match.instanceOf(Transform), WriteStream ) }) diff --git a/services/filestore/test/unit/js/S3PersistorTests.js b/services/filestore/test/unit/js/S3PersistorTests.js index 2117164d74..414179afd1 100644 --- a/services/filestore/test/unit/js/S3PersistorTests.js +++ b/services/filestore/test/unit/js/S3PersistorTests.js @@ -3,7 +3,6 @@ const chai = require('chai') const { expect } = chai const modulePath = '../../../app/js/S3Persistor.js' const SandboxedModule = require('sandboxed-module') -const StreamModule = require('stream') const Errors = require('../../../app/js/Errors') @@ -31,6 +30,7 @@ describe('S3PersistorTests', function() { let Metrics, Logger, + Transform, S3, Fs, ReadStream, @@ -61,9 +61,20 @@ describe('S3PersistorTests', function() { } } + Transform = class { + on(event, callback) { + if (event === 'readable') { + callback() + } + } + + once() {} + removeListener() {} + } + Stream = { pipeline: sinon.stub().yields(), - Transform: StreamModule.Transform + Transform: Transform } EmptyPromise = { @@ -100,7 +111,6 @@ describe('S3PersistorTests', function() { pipe: sinon.stub(), removeListener: sinon.stub() } - S3ReadStream.on.withArgs('readable').yields() S3Client = { getObject: sinon.stub().returns({ createReadStream: sinon.stub().returns(S3ReadStream) @@ -163,7 +173,7 @@ describe('S3PersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.be.instanceOf(StreamModule.Transform) + expect(stream).to.be.instanceOf(Transform) }) it('sets the AWS client up with credentials from settings', function() { @@ -180,7 +190,7 @@ describe('S3PersistorTests', function() { it('pipes the stream through the meter', function() { expect(Stream.pipeline).to.have.been.calledWith( S3ReadStream, - sinon.match.instanceOf(StreamModule.Transform) + sinon.match.instanceOf(Transform) ) }) }) @@ -281,8 +291,8 @@ describe('S3PersistorTests', function() { let error, stream beforeEach(async function() { - S3ReadStream.on = sinon.stub() - S3ReadStream.on.withArgs('error').yields(S3NotFoundError) + Transform.prototype.on = sinon.stub() + Transform.prototype.on.withArgs('error').yields(S3NotFoundError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -311,8 +321,8 @@ describe('S3PersistorTests', function() { let error, stream beforeEach(async function() { - S3ReadStream.on = sinon.stub() - S3ReadStream.on.withArgs('error').yields(S3AccessDeniedError) + Transform.prototype.on = sinon.stub() + Transform.prototype.on.withArgs('error').yields(S3AccessDeniedError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -341,8 +351,8 @@ describe('S3PersistorTests', function() { let error, stream beforeEach(async function() { - S3ReadStream.on = sinon.stub() - S3ReadStream.on.withArgs('error').yields(genericError) + Transform.prototype.on = sinon.stub() + Transform.prototype.on.withArgs('error').yields(genericError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -485,7 +495,7 @@ describe('S3PersistorTests', function() { expect(S3Client.upload).to.have.been.calledWith({ Bucket: bucket, Key: key, - Body: sinon.match.instanceOf(StreamModule.Transform), + Body: sinon.match.instanceOf(Transform), ContentMD5: 'qqqqqru7u7uqqqqqu7u7uw==' }) }) @@ -560,7 +570,7 @@ describe('S3PersistorTests', function() { expect(S3Client.upload).to.have.been.calledWith({ Bucket: bucket, Key: key, - Body: sinon.match.instanceOf(StreamModule.Transform) + Body: sinon.match.instanceOf(Transform) }) }) }) From a1ae68f6b5c702581e9fbbb45f7013df166c6600 Mon Sep 17 00:00:00 2001 From: Simon Detheridge Date: Thu, 26 Mar 2020 16:44:46 +0000 Subject: [PATCH 4/5] Tidy up 'getReadyPipeline' --- services/filestore/app/js/PersistorHelper.js | 27 ++++++++----------- .../test/unit/js/GcsPersistorTests.js | 4 +-- .../test/unit/js/S3PersistorTests.js | 6 ++--- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/services/filestore/app/js/PersistorHelper.js b/services/filestore/app/js/PersistorHelper.js index f99dd78e45..f2d0013915 100644 --- a/services/filestore/app/js/PersistorHelper.js +++ b/services/filestore/app/js/PersistorHelper.js @@ -94,33 +94,28 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) { // resolves when a stream is 'readable', or rejects if the stream throws an error // before that happens - this lets us handle protocol-level errors before trying -// to read them - these can come from the call to pipeline or the stream itself +// to read them function getReadyPipeline(...streams) { return new Promise((resolve, reject) => { const lastStream = streams.slice(-1)[0] let resolvedOrErrored = false - const onError = function(err) { + const handler = function(err) { if (!resolvedOrErrored) { resolvedOrErrored = true - reject( - wrapError(err, 'error before stream became ready', {}, ReadError) - ) - } - } - const onStreamReady = function() { - if (!resolvedOrErrored) { - resolvedOrErrored = true - lastStream.removeListener('readable', onStreamReady) - lastStream.removeListener('error', onError) + + lastStream.removeListener('readable', handler) + if (err) { + return reject( + wrapError(err, 'error before stream became ready', {}, ReadError) + ) + } resolve(lastStream) } } - pipeline(...streams).catch(onError) - - lastStream.on('readable', onStreamReady) - lastStream.on('error', onError) + pipeline(...streams).catch(handler) + lastStream.on('readable', handler) }) } diff --git a/services/filestore/test/unit/js/GcsPersistorTests.js b/services/filestore/test/unit/js/GcsPersistorTests.js index 2df42729ec..cc13c45ce7 100644 --- a/services/filestore/test/unit/js/GcsPersistorTests.js +++ b/services/filestore/test/unit/js/GcsPersistorTests.js @@ -202,7 +202,7 @@ describe('GcsPersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Transform.prototype.on.withArgs('error').yields(GcsNotFoundError) + Stream.pipeline.yields(GcsNotFoundError) try { stream = await GcsPersistor.promises.getFileStream(bucket, key) } catch (err) { @@ -232,7 +232,7 @@ describe('GcsPersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Transform.prototype.on.withArgs('error').yields(genericError) + Stream.pipeline.yields(genericError) try { stream = await GcsPersistor.promises.getFileStream(bucket, key) } catch (err) { diff --git a/services/filestore/test/unit/js/S3PersistorTests.js b/services/filestore/test/unit/js/S3PersistorTests.js index 414179afd1..565e3e0bc9 100644 --- a/services/filestore/test/unit/js/S3PersistorTests.js +++ b/services/filestore/test/unit/js/S3PersistorTests.js @@ -292,7 +292,7 @@ describe('S3PersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Transform.prototype.on.withArgs('error').yields(S3NotFoundError) + Stream.pipeline.yields(S3NotFoundError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -322,7 +322,7 @@ describe('S3PersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Transform.prototype.on.withArgs('error').yields(S3AccessDeniedError) + Stream.pipeline.yields(S3AccessDeniedError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -352,7 +352,7 @@ describe('S3PersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Transform.prototype.on.withArgs('error').yields(genericError) + Stream.pipeline.yields(genericError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { From 64562dffb0a201a238beda51301e1a982130db67 Mon Sep 17 00:00:00 2001 From: Simon Detheridge Date: Thu, 26 Mar 2020 22:07:37 +0000 Subject: [PATCH 5/5] eslint-disable => eslint-disable-next-line --- services/filestore/test/acceptance/js/FilestoreTests.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/filestore/test/acceptance/js/FilestoreTests.js b/services/filestore/test/acceptance/js/FilestoreTests.js index a2c710a382..7e3b197a9c 100644 --- a/services/filestore/test/acceptance/js/FilestoreTests.js +++ b/services/filestore/test/acceptance/js/FilestoreTests.js @@ -27,7 +27,7 @@ if (!process.env.AWS_ACCESS_KEY_ID) { } process.on('unhandledRejection', e => { - // eslint-disable no-console + // eslint-disable-next-line no-console console.log('** Unhandled Promise Rejection **\n', e) throw e })