Ensure streams are always drained on close or error

This commit is contained in:
Simon Detheridge 2020-04-02 15:55:30 +01:00
parent ccf5f8b9e8
commit 114883a9e9
2 changed files with 48 additions and 24 deletions

View file

@ -61,9 +61,6 @@ function getFile(req, res, next) {
} }
pipeline(fileStream, res, err => { pipeline(fileStream, res, err => {
if (!fileStream.destroyed) {
fileStream.destroy()
}
if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE') { if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE') {
res.end() res.end()
} else if (err) { } else if (err) {

View file

@ -23,6 +23,7 @@ class ObserverStream extends Stream.Transform {
if (options.hash) { if (options.hash) {
this.hash = crypto.createHash(options.hash) this.hash = crypto.createHash(options.hash)
} }
if (options.metric) { if (options.metric) {
const onEnd = () => { const onEnd = () => {
metrics.count(options.metric, this.bytes) metrics.count(options.metric, this.bytes)
@ -98,35 +99,61 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) {
function getReadyPipeline(...streams) { function getReadyPipeline(...streams) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const lastStream = streams.slice(-1)[0] const lastStream = streams.slice(-1)[0]
let resolvedOrErrored = false
// in case of error or stream close, we must ensure that we drain the
// previous stream so that it can clean up its socket (if it has one)
const drainPreviousStream = function(previousStream) {
// this stream is no longer reliable, so don't pipe anything more into it
previousStream.unpipe(this)
previousStream.resume()
}
// handler to resolve when either:
// - an error happens, or
// - the last stream in the chain is readable
// for example, in the case of a 4xx error an error will occur and the
// streams will not become readable
const handler = function(err) { const handler = function(err) {
if (!resolvedOrErrored) { // remove handler from all streams because we don't want to do this on
resolvedOrErrored = true // later errors
lastStream.removeListener('readable', handler)
lastStream.removeListener('readable', handler) for (const stream of streams) {
if (err) { stream.removeListener('error', handler)
reject(
wrapError(err, 'error before stream became ready', {}, ReadError)
)
} else {
resolve(lastStream)
}
} }
// return control to the caller
if (err) { if (err) {
for (const stream of streams) { reject(
if (!stream.destroyed) { wrapError(err, 'error before stream became ready', {}, ReadError)
stream.destroy() )
} } else {
} resolve(lastStream)
} }
} }
for (let index = 0; index < streams.length - 1; index++) { // ensure the handler fires when the last strem becomes readable
streams[index + 1].on('close', () => streams[index].destroy())
}
pipeline(...streams).catch(handler)
lastStream.on('readable', handler) lastStream.on('readable', handler)
for (const stream of streams) {
// when a stream receives a pipe, set up the drain handler to drain the
// connection if an error occurs or the stream is closed
stream.on('pipe', previousStream => {
stream.on('error', x => {
drainPreviousStream(previousStream)
})
stream.on('close', () => {
drainPreviousStream(previousStream)
})
})
// add the handler function to resolve this method on error if we can't
// set up the pipeline
stream.on('error', handler)
}
// begin the pipeline
for (let index = 0; index < streams.length - 1; index++) {
streams[index].pipe(streams[index + 1])
}
}) })
} }