Merge pull request #107 from overleaf/spd-handle-fixes-hopefully

(Hopefully) fix handle leaks
This commit is contained in:
Simon Detheridge 2020-03-27 11:37:48 +00:00 committed by GitHub
commit ae62bb75ca
6 changed files with 77 additions and 40 deletions

View file

@ -126,10 +126,10 @@ async function getFileStream(bucketName, key, _opts = {}) {
const observer = new PersistorHelper.ObserverStream({ const observer = new PersistorHelper.ObserverStream({
metric: 'gcs.ingress' metric: 'gcs.ingress'
}) })
pipeline(stream, observer)
try { try {
await PersistorHelper.waitForStreamReady(stream) // wait for the pipeline to be ready, to catch non-200s
await PersistorHelper.getReadyPipeline(stream, observer)
return observer return observer
} catch (err) { } catch (err) {
throw PersistorHelper.wrapError( throw PersistorHelper.wrapError(

View file

@ -15,6 +15,7 @@ const pipeline = promisify(Stream.pipeline)
// the number of bytes transferred // the number of bytes transferred
class ObserverStream extends Stream.Transform { class ObserverStream extends Stream.Transform {
constructor(options) { constructor(options) {
options.autoDestroy = true
super(options) super(options)
this.bytes = 0 this.bytes = 0
@ -49,7 +50,7 @@ module.exports = {
ObserverStream, ObserverStream,
calculateStreamMd5, calculateStreamMd5,
verifyMd5, verifyMd5,
waitForStreamReady, getReadyPipeline,
wrapError, wrapError,
hexToBase64, hexToBase64,
base64ToHex base64ToHex
@ -94,18 +95,27 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) {
// resolves when a stream is 'readable', or rejects if the stream throws an error // resolves when a stream is 'readable', or rejects if the stream throws an error
// before that happens - this lets us handle protocol-level errors before trying // before that happens - this lets us handle protocol-level errors before trying
// to read them // to read them
function waitForStreamReady(stream) { function getReadyPipeline(...streams) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const onError = function(err) { const lastStream = streams.slice(-1)[0]
reject(wrapError(err, 'error before stream became ready', {}, ReadError)) let resolvedOrErrored = false
const handler = function(err) {
if (!resolvedOrErrored) {
resolvedOrErrored = true
lastStream.removeListener('readable', handler)
if (err) {
return reject(
wrapError(err, 'error before stream became ready', {}, ReadError)
)
}
resolve(lastStream)
}
} }
const onStreamReady = function() {
stream.removeListener('readable', onStreamReady) pipeline(...streams).catch(handler)
stream.removeListener('error', onError) lastStream.on('readable', handler)
resolve(stream)
}
stream.on('readable', onStreamReady)
stream.on('error', onError)
}) })
} }

View file

@ -131,10 +131,10 @@ async function getFileStream(bucketName, key, opts) {
// ingress from S3 to us // ingress from S3 to us
const observer = new PersistorHelper.ObserverStream({ metric: 's3.ingress' }) const observer = new PersistorHelper.ObserverStream({ metric: 's3.ingress' })
pipeline(stream, observer)
try { try {
await PersistorHelper.waitForStreamReady(stream) // wait for the pipeline to be ready, to catch non-200s
await PersistorHelper.getReadyPipeline(stream, observer)
return observer return observer
} catch (err) { } catch (err) {
throw PersistorHelper.wrapError( throw PersistorHelper.wrapError(

View file

@ -26,6 +26,12 @@ if (!process.env.AWS_ACCESS_KEY_ID) {
throw new Error('please provide credentials for the AWS S3 test server') throw new Error('please provide credentials for the AWS S3 test server')
} }
process.on('unhandledRejection', e => {
// eslint-disable-next-line no-console
console.log('** Unhandled Promise Rejection **\n', e)
throw e
})
// store settings for multiple backends, so that we can test each one. // store settings for multiple backends, so that we can test each one.
// fs will always be available - add others if they are configured // fs will always be available - add others if they are configured
const BackendSettings = require('./TestConfig') const BackendSettings = require('./TestConfig')

View file

@ -5,7 +5,6 @@ const modulePath = '../../../app/js/GcsPersistor.js'
const SandboxedModule = require('sandboxed-module') const SandboxedModule = require('sandboxed-module')
const { ObjectId } = require('mongodb') const { ObjectId } = require('mongodb')
const asyncPool = require('tiny-async-pool') const asyncPool = require('tiny-async-pool')
const StreamModule = require('stream')
const Errors = require('../../../app/js/Errors') const Errors = require('../../../app/js/Errors')
@ -21,6 +20,7 @@ describe('GcsPersistorTests', function() {
let Metrics, let Metrics,
Logger, Logger,
Transform,
Storage, Storage,
Fs, Fs,
GcsNotFoundError, GcsNotFoundError,
@ -68,9 +68,20 @@ describe('GcsPersistorTests', function() {
removeListener: sinon.stub() removeListener: sinon.stub()
} }
Transform = class {
on(event, callback) {
if (event === 'readable') {
callback()
}
}
once() {}
removeListener() {}
}
Stream = { Stream = {
pipeline: sinon.stub().yields(), pipeline: sinon.stub().yields(),
Transform: StreamModule.Transform Transform: Transform
} }
Metrics = { Metrics = {
@ -147,7 +158,7 @@ describe('GcsPersistorTests', function() {
}) })
it('returns a metered stream', function() { it('returns a metered stream', function() {
expect(stream).to.be.instanceOf(StreamModule.Transform) expect(stream).to.be.instanceOf(Transform)
}) })
it('fetches the right key from the right bucket', function() { it('fetches the right key from the right bucket', function() {
@ -159,7 +170,7 @@ describe('GcsPersistorTests', function() {
it('pipes the stream through the meter', function() { it('pipes the stream through the meter', function() {
expect(Stream.pipeline).to.have.been.calledWith( expect(Stream.pipeline).to.have.been.calledWith(
ReadStream, ReadStream,
sinon.match.instanceOf(StreamModule.Transform) sinon.match.instanceOf(Transform)
) )
}) })
}) })
@ -175,7 +186,7 @@ describe('GcsPersistorTests', function() {
}) })
it('returns a metered stream', function() { it('returns a metered stream', function() {
expect(stream).to.be.instanceOf(StreamModule.Transform) expect(stream).to.be.instanceOf(Transform)
}) })
it('passes the byte range on to GCS', function() { it('passes the byte range on to GCS', function() {
@ -190,8 +201,8 @@ describe('GcsPersistorTests', function() {
let error, stream let error, stream
beforeEach(async function() { beforeEach(async function() {
ReadStream.on = sinon.stub() Transform.prototype.on = sinon.stub()
ReadStream.on.withArgs('error').yields(GcsNotFoundError) Stream.pipeline.yields(GcsNotFoundError)
try { try {
stream = await GcsPersistor.promises.getFileStream(bucket, key) stream = await GcsPersistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (err) {
@ -220,8 +231,8 @@ describe('GcsPersistorTests', function() {
let error, stream let error, stream
beforeEach(async function() { beforeEach(async function() {
ReadStream.on = sinon.stub() Transform.prototype.on = sinon.stub()
ReadStream.on.withArgs('error').yields(genericError) Stream.pipeline.yields(genericError)
try { try {
stream = await GcsPersistor.promises.getFileStream(bucket, key) stream = await GcsPersistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (err) {
@ -330,7 +341,7 @@ describe('GcsPersistorTests', function() {
it('should meter the stream and pass it to GCS', function() { it('should meter the stream and pass it to GCS', function() {
expect(Stream.pipeline).to.have.been.calledWith( expect(Stream.pipeline).to.have.been.calledWith(
ReadStream, ReadStream,
sinon.match.instanceOf(StreamModule.Transform), sinon.match.instanceOf(Transform),
WriteStream WriteStream
) )
}) })
@ -375,7 +386,7 @@ describe('GcsPersistorTests', function() {
Stream.pipeline Stream.pipeline
.withArgs( .withArgs(
ReadStream, ReadStream,
sinon.match.instanceOf(StreamModule.Transform), sinon.match.instanceOf(Transform),
WriteStream, WriteStream,
sinon.match.any sinon.match.any
) )
@ -416,7 +427,7 @@ describe('GcsPersistorTests', function() {
it('should upload the stream via the meter', function() { it('should upload the stream via the meter', function() {
expect(Stream.pipeline).to.have.been.calledWith( expect(Stream.pipeline).to.have.been.calledWith(
ReadStream, ReadStream,
sinon.match.instanceOf(StreamModule.Transform), sinon.match.instanceOf(Transform),
WriteStream WriteStream
) )
}) })

View file

@ -3,7 +3,6 @@ const chai = require('chai')
const { expect } = chai const { expect } = chai
const modulePath = '../../../app/js/S3Persistor.js' const modulePath = '../../../app/js/S3Persistor.js'
const SandboxedModule = require('sandboxed-module') const SandboxedModule = require('sandboxed-module')
const StreamModule = require('stream')
const Errors = require('../../../app/js/Errors') const Errors = require('../../../app/js/Errors')
@ -31,6 +30,7 @@ describe('S3PersistorTests', function() {
let Metrics, let Metrics,
Logger, Logger,
Transform,
S3, S3,
Fs, Fs,
ReadStream, ReadStream,
@ -61,9 +61,20 @@ describe('S3PersistorTests', function() {
} }
} }
Transform = class {
on(event, callback) {
if (event === 'readable') {
callback()
}
}
once() {}
removeListener() {}
}
Stream = { Stream = {
pipeline: sinon.stub().yields(), pipeline: sinon.stub().yields(),
Transform: StreamModule.Transform Transform: Transform
} }
EmptyPromise = { EmptyPromise = {
@ -100,7 +111,6 @@ describe('S3PersistorTests', function() {
pipe: sinon.stub(), pipe: sinon.stub(),
removeListener: sinon.stub() removeListener: sinon.stub()
} }
S3ReadStream.on.withArgs('readable').yields()
S3Client = { S3Client = {
getObject: sinon.stub().returns({ getObject: sinon.stub().returns({
createReadStream: sinon.stub().returns(S3ReadStream) createReadStream: sinon.stub().returns(S3ReadStream)
@ -163,7 +173,7 @@ describe('S3PersistorTests', function() {
}) })
it('returns a metered stream', function() { it('returns a metered stream', function() {
expect(stream).to.be.instanceOf(StreamModule.Transform) expect(stream).to.be.instanceOf(Transform)
}) })
it('sets the AWS client up with credentials from settings', function() { it('sets the AWS client up with credentials from settings', function() {
@ -180,7 +190,7 @@ describe('S3PersistorTests', function() {
it('pipes the stream through the meter', function() { it('pipes the stream through the meter', function() {
expect(Stream.pipeline).to.have.been.calledWith( expect(Stream.pipeline).to.have.been.calledWith(
S3ReadStream, S3ReadStream,
sinon.match.instanceOf(StreamModule.Transform) sinon.match.instanceOf(Transform)
) )
}) })
}) })
@ -281,8 +291,8 @@ describe('S3PersistorTests', function() {
let error, stream let error, stream
beforeEach(async function() { beforeEach(async function() {
S3ReadStream.on = sinon.stub() Transform.prototype.on = sinon.stub()
S3ReadStream.on.withArgs('error').yields(S3NotFoundError) Stream.pipeline.yields(S3NotFoundError)
try { try {
stream = await S3Persistor.promises.getFileStream(bucket, key) stream = await S3Persistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (err) {
@ -311,8 +321,8 @@ describe('S3PersistorTests', function() {
let error, stream let error, stream
beforeEach(async function() { beforeEach(async function() {
S3ReadStream.on = sinon.stub() Transform.prototype.on = sinon.stub()
S3ReadStream.on.withArgs('error').yields(S3AccessDeniedError) Stream.pipeline.yields(S3AccessDeniedError)
try { try {
stream = await S3Persistor.promises.getFileStream(bucket, key) stream = await S3Persistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (err) {
@ -341,8 +351,8 @@ describe('S3PersistorTests', function() {
let error, stream let error, stream
beforeEach(async function() { beforeEach(async function() {
S3ReadStream.on = sinon.stub() Transform.prototype.on = sinon.stub()
S3ReadStream.on.withArgs('error').yields(genericError) Stream.pipeline.yields(genericError)
try { try {
stream = await S3Persistor.promises.getFileStream(bucket, key) stream = await S3Persistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (err) {
@ -485,7 +495,7 @@ describe('S3PersistorTests', function() {
expect(S3Client.upload).to.have.been.calledWith({ expect(S3Client.upload).to.have.been.calledWith({
Bucket: bucket, Bucket: bucket,
Key: key, Key: key,
Body: sinon.match.instanceOf(StreamModule.Transform), Body: sinon.match.instanceOf(Transform),
ContentMD5: 'qqqqqru7u7uqqqqqu7u7uw==' ContentMD5: 'qqqqqru7u7uqqqqqu7u7uw=='
}) })
}) })
@ -560,7 +570,7 @@ describe('S3PersistorTests', function() {
expect(S3Client.upload).to.have.been.calledWith({ expect(S3Client.upload).to.have.been.calledWith({
Bucket: bucket, Bucket: bucket,
Key: key, Key: key,
Body: sinon.match.instanceOf(StreamModule.Transform) Body: sinon.match.instanceOf(Transform)
}) })
}) })
}) })