From 114883a9e948d7ccd995fe68b6b93fc04f913911 Mon Sep 17 00:00:00 2001 From: Simon Detheridge Date: Thu, 2 Apr 2020 15:55:30 +0100 Subject: [PATCH] Ensure streams are always drained on close or error --- services/filestore/app/js/FileController.js | 3 - services/filestore/app/js/PersistorHelper.js | 69 ++++++++++++++------ 2 files changed, 48 insertions(+), 24 deletions(-) diff --git a/services/filestore/app/js/FileController.js b/services/filestore/app/js/FileController.js index 72f68047ab..e39afd67bb 100644 --- a/services/filestore/app/js/FileController.js +++ b/services/filestore/app/js/FileController.js @@ -61,9 +61,6 @@ function getFile(req, res, next) { } pipeline(fileStream, res, err => { - if (!fileStream.destroyed) { - fileStream.destroy() - } if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE') { res.end() } else if (err) { diff --git a/services/filestore/app/js/PersistorHelper.js b/services/filestore/app/js/PersistorHelper.js index b829e5ec45..1c2512b690 100644 --- a/services/filestore/app/js/PersistorHelper.js +++ b/services/filestore/app/js/PersistorHelper.js @@ -23,6 +23,7 @@ class ObserverStream extends Stream.Transform { if (options.hash) { this.hash = crypto.createHash(options.hash) } + if (options.metric) { const onEnd = () => { metrics.count(options.metric, this.bytes) @@ -98,35 +99,61 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) { function getReadyPipeline(...streams) { return new Promise((resolve, reject) => { const lastStream = streams.slice(-1)[0] - let resolvedOrErrored = false + // in case of error or stream close, we must ensure that we drain the + // previous stream so that it can clean up its socket (if it has one) + const drainPreviousStream = function(previousStream) { + // this stream is no longer reliable, so don't pipe anything more into it + previousStream.unpipe(this) + previousStream.resume() + } + + // handler to resolve when either: + // - an error happens, or + // - the last stream in the chain is readable + // for example, in the case of a 4xx error an error will occur and the + // streams will not become readable const handler = function(err) { - if (!resolvedOrErrored) { - resolvedOrErrored = true - - lastStream.removeListener('readable', handler) - if (err) { - reject( - wrapError(err, 'error before stream became ready', {}, ReadError) - ) - } else { - resolve(lastStream) - } + // remove handler from all streams because we don't want to do this on + // later errors + lastStream.removeListener('readable', handler) + for (const stream of streams) { + stream.removeListener('error', handler) } + + // return control to the caller if (err) { - for (const stream of streams) { - if (!stream.destroyed) { - stream.destroy() - } - } + reject( + wrapError(err, 'error before stream became ready', {}, ReadError) + ) + } else { + resolve(lastStream) } } - for (let index = 0; index < streams.length - 1; index++) { - streams[index + 1].on('close', () => streams[index].destroy()) - } - pipeline(...streams).catch(handler) + // ensure the handler fires when the last strem becomes readable lastStream.on('readable', handler) + + for (const stream of streams) { + // when a stream receives a pipe, set up the drain handler to drain the + // connection if an error occurs or the stream is closed + stream.on('pipe', previousStream => { + stream.on('error', x => { + drainPreviousStream(previousStream) + }) + stream.on('close', () => { + drainPreviousStream(previousStream) + }) + }) + // add the handler function to resolve this method on error if we can't + // set up the pipeline + stream.on('error', handler) + } + + // begin the pipeline + for (let index = 0; index < streams.length - 1; index++) { + streams[index].pipe(streams[index + 1]) + } }) }