diff --git a/services/filestore/app/js/FileController.js b/services/filestore/app/js/FileController.js index 72f68047ab..e39afd67bb 100644 --- a/services/filestore/app/js/FileController.js +++ b/services/filestore/app/js/FileController.js @@ -61,9 +61,6 @@ function getFile(req, res, next) { } pipeline(fileStream, res, err => { - if (!fileStream.destroyed) { - fileStream.destroy() - } if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE') { res.end() } else if (err) { diff --git a/services/filestore/app/js/PersistorHelper.js b/services/filestore/app/js/PersistorHelper.js index b829e5ec45..1c2512b690 100644 --- a/services/filestore/app/js/PersistorHelper.js +++ b/services/filestore/app/js/PersistorHelper.js @@ -23,6 +23,7 @@ class ObserverStream extends Stream.Transform { if (options.hash) { this.hash = crypto.createHash(options.hash) } + if (options.metric) { const onEnd = () => { metrics.count(options.metric, this.bytes) @@ -98,35 +99,61 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) { function getReadyPipeline(...streams) { return new Promise((resolve, reject) => { const lastStream = streams.slice(-1)[0] - let resolvedOrErrored = false + // in case of error or stream close, we must ensure that we drain the + // previous stream so that it can clean up its socket (if it has one) + const drainPreviousStream = function(previousStream) { + // this stream is no longer reliable, so don't pipe anything more into it + previousStream.unpipe(this) + previousStream.resume() + } + + // handler to resolve when either: + // - an error happens, or + // - the last stream in the chain is readable + // for example, in the case of a 4xx error an error will occur and the + // streams will not become readable const handler = function(err) { - if (!resolvedOrErrored) { - resolvedOrErrored = true - - lastStream.removeListener('readable', handler) - if (err) { - reject( - wrapError(err, 'error before stream became ready', {}, ReadError) - ) - } else { - resolve(lastStream) - } + // remove handler from all streams because we don't want to do this on + // later errors + lastStream.removeListener('readable', handler) + for (const stream of streams) { + stream.removeListener('error', handler) } + + // return control to the caller if (err) { - for (const stream of streams) { - if (!stream.destroyed) { - stream.destroy() - } - } + reject( + wrapError(err, 'error before stream became ready', {}, ReadError) + ) + } else { + resolve(lastStream) } } - for (let index = 0; index < streams.length - 1; index++) { - streams[index + 1].on('close', () => streams[index].destroy()) - } - pipeline(...streams).catch(handler) + // ensure the handler fires when the last strem becomes readable lastStream.on('readable', handler) + + for (const stream of streams) { + // when a stream receives a pipe, set up the drain handler to drain the + // connection if an error occurs or the stream is closed + stream.on('pipe', previousStream => { + stream.on('error', x => { + drainPreviousStream(previousStream) + }) + stream.on('close', () => { + drainPreviousStream(previousStream) + }) + }) + // add the handler function to resolve this method on error if we can't + // set up the pipeline + stream.on('error', handler) + } + + // begin the pipeline + for (let index = 0; index < streams.length - 1; index++) { + streams[index].pipe(streams[index + 1]) + } }) } diff --git a/services/filestore/test/acceptance/js/FilestoreTests.js b/services/filestore/test/acceptance/js/FilestoreTests.js index 7e3b197a9c..8382a48de5 100644 --- a/services/filestore/test/acceptance/js/FilestoreTests.js +++ b/services/filestore/test/acceptance/js/FilestoreTests.js @@ -17,10 +17,13 @@ const streamifier = require('streamifier') chai.use(require('chai-as-promised')) const { ObjectId } = require('mongodb') const tk = require('timekeeper') +const ChildProcess = require('child_process') const fsWriteFile = promisify(fs.writeFile) const fsStat = promisify(fs.stat) const pipeline = promisify(Stream.pipeline) +const exec = promisify(ChildProcess.exec) +const msleep = promisify(setTimeout) if (!process.env.AWS_ACCESS_KEY_ID) { throw new Error('please provide credentials for the AWS S3 test server') @@ -40,6 +43,41 @@ describe('Filestore', function() { this.timeout(1000 * 10) const filestoreUrl = `http://localhost:${Settings.internal.filestore.port}` + const seenSockets = [] + async function expectNoSockets() { + try { + await msleep(1000) + const { stdout } = await exec('ss -tnH') + + const badSockets = [] + for (const socket of stdout.split('\n')) { + const fields = socket.split(' ').filter(part => part !== '') + if ( + fields.length > 2 && + parseInt(fields[1]) && + !seenSockets.includes(socket) + ) { + badSockets.push(socket) + seenSockets.push(socket) + } + } + + if (badSockets.length) { + // eslint-disable-next-line no-console + console.error( + 'ERR: Sockets still have receive buffer after connection closed' + ) + for (const socket of badSockets) { + // eslint-disable-next-line no-console + console.error(socket) + } + throw new Error('Sockets still open after connection closed') + } + } catch (err) { + expect(err).not.to.exist + } + } + // redefine the test suite for every available backend Object.keys(BackendSettings).forEach(backend => { describe(backend, function() { @@ -71,7 +109,8 @@ describe('Filestore', function() { } after(async function() { - return app.stop() + await msleep(3000) + await app.stop() }) beforeEach(async function() { @@ -156,6 +195,11 @@ describe('Filestore', function() { expect(res.body).to.equal(constantFileContent) }) + it('should not leak a socket', async function() { + await rp.get(fileUrl) + await expectNoSockets() + }) + it('should be able to get back the first 9 bytes of the file', async function() { const options = { uri: fileUrl, @@ -378,6 +422,30 @@ describe('Filestore', function() { it('should not throw an error', function() { expect(error).not.to.exist }) + + it('should not leak a socket', async function() { + await rp.get(fileUrl) + await expectNoSockets() + }) + + it('should not leak a socket if the connection is aborted', async function() { + this.timeout(20000) + for (let i = 0; i < 5; i++) { + // test is not 100% reliable, so repeat + // create a new connection and have it time out before reading any data + await new Promise(resolve => { + const streamThatHangs = new Stream.PassThrough() + const stream = request({ url: fileUrl, timeout: 1000 }) + stream.pipe(streamThatHangs) + stream.on('error', () => { + stream.destroy() + streamThatHangs.destroy() + resolve() + }) + }) + await expectNoSockets() + } + }) }) if (backend === 'S3Persistor' || backend === 'FallbackGcsToS3Persistor') { @@ -554,7 +622,7 @@ describe('Filestore', function() { it('copies the file to the primary', async function() { await rp.get(fileUrl) // wait for the file to copy in the background - await promisify(setTimeout)(1000) + await msleep(1000) await TestHelper.expectPersistorToHaveFile( app.persistor.primaryPersistor, @@ -622,7 +690,7 @@ describe('Filestore', function() { it('should not copy the old file to the primary with the old key', async function() { // wait for the file to copy in the background - await promisify(setTimeout)(1000) + await msleep(1000) await TestHelper.expectPersistorNotToHaveFile( app.persistor.primaryPersistor, @@ -668,7 +736,7 @@ describe('Filestore', function() { it('should copy the old file to the primary with the old key', async function() { // wait for the file to copy in the background - await promisify(setTimeout)(1000) + await msleep(1000) await TestHelper.expectPersistorToHaveFile( app.persistor.primaryPersistor, diff --git a/services/filestore/test/unit/js/GcsPersistorTests.js b/services/filestore/test/unit/js/GcsPersistorTests.js index cc13c45ce7..3c386e5002 100644 --- a/services/filestore/test/unit/js/GcsPersistorTests.js +++ b/services/filestore/test/unit/js/GcsPersistorTests.js @@ -61,12 +61,15 @@ describe('GcsPersistorTests', function() { ReadStream = { pipe: sinon.stub().returns('readStream'), - on: sinon - .stub() - .withArgs('end') - .yields(), + on: sinon.stub(), removeListener: sinon.stub() } + ReadStream.on.withArgs('end').yields() + ReadStream.on.withArgs('pipe').yields({ + unpipe: sinon.stub(), + resume: sinon.stub(), + on: sinon.stub() + }) Transform = class { on(event, callback) { @@ -168,8 +171,7 @@ describe('GcsPersistorTests', function() { }) it('pipes the stream through the meter', function() { - expect(Stream.pipeline).to.have.been.calledWith( - ReadStream, + expect(ReadStream.pipe).to.have.been.calledWith( sinon.match.instanceOf(Transform) ) }) @@ -202,11 +204,11 @@ describe('GcsPersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Stream.pipeline.yields(GcsNotFoundError) + ReadStream.on.withArgs('error').yields(GcsNotFoundError) try { stream = await GcsPersistor.promises.getFileStream(bucket, key) - } catch (err) { - error = err + } catch (e) { + error = e } }) @@ -232,7 +234,7 @@ describe('GcsPersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Stream.pipeline.yields(genericError) + ReadStream.on.withArgs('error').yields(genericError) try { stream = await GcsPersistor.promises.getFileStream(bucket, key) } catch (err) { diff --git a/services/filestore/test/unit/js/S3PersistorTests.js b/services/filestore/test/unit/js/S3PersistorTests.js index 565e3e0bc9..c236de25ef 100644 --- a/services/filestore/test/unit/js/S3PersistorTests.js +++ b/services/filestore/test/unit/js/S3PersistorTests.js @@ -87,11 +87,14 @@ describe('S3PersistorTests', function() { ReadStream = { pipe: sinon.stub().returns('readStream'), - on: sinon - .stub() - .withArgs('end') - .yields() + on: sinon.stub(), + removeListener: sinon.stub() } + ReadStream.on.withArgs('end').yields() + ReadStream.on.withArgs('pipe').yields({ + unpipe: sinon.stub(), + resume: sinon.stub() + }) FileNotFoundError = new Error('File not found') FileNotFoundError.code = 'ENOENT' @@ -111,6 +114,11 @@ describe('S3PersistorTests', function() { pipe: sinon.stub(), removeListener: sinon.stub() } + S3ReadStream.on.withArgs('end').yields() + S3ReadStream.on.withArgs('pipe').yields({ + unpipe: sinon.stub(), + resume: sinon.stub() + }) S3Client = { getObject: sinon.stub().returns({ createReadStream: sinon.stub().returns(S3ReadStream) @@ -187,9 +195,8 @@ describe('S3PersistorTests', function() { }) }) - it('pipes the stream through the meter', function() { - expect(Stream.pipeline).to.have.been.calledWith( - S3ReadStream, + it('pipes the stream through the meter', async function() { + expect(S3ReadStream.pipe).to.have.been.calledWith( sinon.match.instanceOf(Transform) ) }) @@ -292,7 +299,7 @@ describe('S3PersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Stream.pipeline.yields(S3NotFoundError) + S3ReadStream.on.withArgs('error').yields(S3NotFoundError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -322,7 +329,7 @@ describe('S3PersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Stream.pipeline.yields(S3AccessDeniedError) + S3ReadStream.on.withArgs('error').yields(S3AccessDeniedError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -352,7 +359,7 @@ describe('S3PersistorTests', function() { beforeEach(async function() { Transform.prototype.on = sinon.stub() - Stream.pipeline.yields(genericError) + S3ReadStream.on.withArgs('error').yields(genericError) try { stream = await S3Persistor.promises.getFileStream(bucket, key) } catch (err) { @@ -544,8 +551,7 @@ describe('S3PersistorTests', function() { }) it('should meter the download', function() { - expect(Stream.pipeline).to.have.been.calledWith( - S3ReadStream, + expect(S3ReadStream.pipe).to.have.been.calledWith( sinon.match.instanceOf(Stream.Transform) ) })