Merge pull request #113 from overleaf/spd-stream-debugging

Ensure streams are drained when a pipeline fails
This commit is contained in:
Simon Detheridge 2020-04-03 14:06:55 +01:00 committed by GitHub
commit 63907e0170
5 changed files with 150 additions and 50 deletions

View file

@ -61,9 +61,6 @@ function getFile(req, res, next) {
} }
pipeline(fileStream, res, err => { pipeline(fileStream, res, err => {
if (!fileStream.destroyed) {
fileStream.destroy()
}
if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE') { if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE') {
res.end() res.end()
} else if (err) { } else if (err) {

View file

@ -23,6 +23,7 @@ class ObserverStream extends Stream.Transform {
if (options.hash) { if (options.hash) {
this.hash = crypto.createHash(options.hash) this.hash = crypto.createHash(options.hash)
} }
if (options.metric) { if (options.metric) {
const onEnd = () => { const onEnd = () => {
metrics.count(options.metric, this.bytes) metrics.count(options.metric, this.bytes)
@ -98,35 +99,61 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) {
function getReadyPipeline(...streams) { function getReadyPipeline(...streams) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const lastStream = streams.slice(-1)[0] const lastStream = streams.slice(-1)[0]
let resolvedOrErrored = false
// in case of error or stream close, we must ensure that we drain the
// previous stream so that it can clean up its socket (if it has one)
const drainPreviousStream = function(previousStream) {
// this stream is no longer reliable, so don't pipe anything more into it
previousStream.unpipe(this)
previousStream.resume()
}
// handler to resolve when either:
// - an error happens, or
// - the last stream in the chain is readable
// for example, in the case of a 4xx error an error will occur and the
// streams will not become readable
const handler = function(err) { const handler = function(err) {
if (!resolvedOrErrored) { // remove handler from all streams because we don't want to do this on
resolvedOrErrored = true // later errors
lastStream.removeListener('readable', handler)
lastStream.removeListener('readable', handler) for (const stream of streams) {
if (err) { stream.removeListener('error', handler)
reject(
wrapError(err, 'error before stream became ready', {}, ReadError)
)
} else {
resolve(lastStream)
}
} }
// return control to the caller
if (err) { if (err) {
for (const stream of streams) { reject(
if (!stream.destroyed) { wrapError(err, 'error before stream became ready', {}, ReadError)
stream.destroy() )
} } else {
} resolve(lastStream)
} }
} }
for (let index = 0; index < streams.length - 1; index++) { // ensure the handler fires when the last strem becomes readable
streams[index + 1].on('close', () => streams[index].destroy())
}
pipeline(...streams).catch(handler)
lastStream.on('readable', handler) lastStream.on('readable', handler)
for (const stream of streams) {
// when a stream receives a pipe, set up the drain handler to drain the
// connection if an error occurs or the stream is closed
stream.on('pipe', previousStream => {
stream.on('error', x => {
drainPreviousStream(previousStream)
})
stream.on('close', () => {
drainPreviousStream(previousStream)
})
})
// add the handler function to resolve this method on error if we can't
// set up the pipeline
stream.on('error', handler)
}
// begin the pipeline
for (let index = 0; index < streams.length - 1; index++) {
streams[index].pipe(streams[index + 1])
}
}) })
} }

View file

@ -17,10 +17,13 @@ const streamifier = require('streamifier')
chai.use(require('chai-as-promised')) chai.use(require('chai-as-promised'))
const { ObjectId } = require('mongodb') const { ObjectId } = require('mongodb')
const tk = require('timekeeper') const tk = require('timekeeper')
const ChildProcess = require('child_process')
const fsWriteFile = promisify(fs.writeFile) const fsWriteFile = promisify(fs.writeFile)
const fsStat = promisify(fs.stat) const fsStat = promisify(fs.stat)
const pipeline = promisify(Stream.pipeline) const pipeline = promisify(Stream.pipeline)
const exec = promisify(ChildProcess.exec)
const msleep = promisify(setTimeout)
if (!process.env.AWS_ACCESS_KEY_ID) { if (!process.env.AWS_ACCESS_KEY_ID) {
throw new Error('please provide credentials for the AWS S3 test server') throw new Error('please provide credentials for the AWS S3 test server')
@ -40,6 +43,41 @@ describe('Filestore', function() {
this.timeout(1000 * 10) this.timeout(1000 * 10)
const filestoreUrl = `http://localhost:${Settings.internal.filestore.port}` const filestoreUrl = `http://localhost:${Settings.internal.filestore.port}`
const seenSockets = []
async function expectNoSockets() {
try {
await msleep(1000)
const { stdout } = await exec('ss -tnH')
const badSockets = []
for (const socket of stdout.split('\n')) {
const fields = socket.split(' ').filter(part => part !== '')
if (
fields.length > 2 &&
parseInt(fields[1]) &&
!seenSockets.includes(socket)
) {
badSockets.push(socket)
seenSockets.push(socket)
}
}
if (badSockets.length) {
// eslint-disable-next-line no-console
console.error(
'ERR: Sockets still have receive buffer after connection closed'
)
for (const socket of badSockets) {
// eslint-disable-next-line no-console
console.error(socket)
}
throw new Error('Sockets still open after connection closed')
}
} catch (err) {
expect(err).not.to.exist
}
}
// redefine the test suite for every available backend // redefine the test suite for every available backend
Object.keys(BackendSettings).forEach(backend => { Object.keys(BackendSettings).forEach(backend => {
describe(backend, function() { describe(backend, function() {
@ -71,7 +109,8 @@ describe('Filestore', function() {
} }
after(async function() { after(async function() {
return app.stop() await msleep(3000)
await app.stop()
}) })
beforeEach(async function() { beforeEach(async function() {
@ -156,6 +195,11 @@ describe('Filestore', function() {
expect(res.body).to.equal(constantFileContent) expect(res.body).to.equal(constantFileContent)
}) })
it('should not leak a socket', async function() {
await rp.get(fileUrl)
await expectNoSockets()
})
it('should be able to get back the first 9 bytes of the file', async function() { it('should be able to get back the first 9 bytes of the file', async function() {
const options = { const options = {
uri: fileUrl, uri: fileUrl,
@ -378,6 +422,30 @@ describe('Filestore', function() {
it('should not throw an error', function() { it('should not throw an error', function() {
expect(error).not.to.exist expect(error).not.to.exist
}) })
it('should not leak a socket', async function() {
await rp.get(fileUrl)
await expectNoSockets()
})
it('should not leak a socket if the connection is aborted', async function() {
this.timeout(20000)
for (let i = 0; i < 5; i++) {
// test is not 100% reliable, so repeat
// create a new connection and have it time out before reading any data
await new Promise(resolve => {
const streamThatHangs = new Stream.PassThrough()
const stream = request({ url: fileUrl, timeout: 1000 })
stream.pipe(streamThatHangs)
stream.on('error', () => {
stream.destroy()
streamThatHangs.destroy()
resolve()
})
})
await expectNoSockets()
}
})
}) })
if (backend === 'S3Persistor' || backend === 'FallbackGcsToS3Persistor') { if (backend === 'S3Persistor' || backend === 'FallbackGcsToS3Persistor') {
@ -554,7 +622,7 @@ describe('Filestore', function() {
it('copies the file to the primary', async function() { it('copies the file to the primary', async function() {
await rp.get(fileUrl) await rp.get(fileUrl)
// wait for the file to copy in the background // wait for the file to copy in the background
await promisify(setTimeout)(1000) await msleep(1000)
await TestHelper.expectPersistorToHaveFile( await TestHelper.expectPersistorToHaveFile(
app.persistor.primaryPersistor, app.persistor.primaryPersistor,
@ -622,7 +690,7 @@ describe('Filestore', function() {
it('should not copy the old file to the primary with the old key', async function() { it('should not copy the old file to the primary with the old key', async function() {
// wait for the file to copy in the background // wait for the file to copy in the background
await promisify(setTimeout)(1000) await msleep(1000)
await TestHelper.expectPersistorNotToHaveFile( await TestHelper.expectPersistorNotToHaveFile(
app.persistor.primaryPersistor, app.persistor.primaryPersistor,
@ -668,7 +736,7 @@ describe('Filestore', function() {
it('should copy the old file to the primary with the old key', async function() { it('should copy the old file to the primary with the old key', async function() {
// wait for the file to copy in the background // wait for the file to copy in the background
await promisify(setTimeout)(1000) await msleep(1000)
await TestHelper.expectPersistorToHaveFile( await TestHelper.expectPersistorToHaveFile(
app.persistor.primaryPersistor, app.persistor.primaryPersistor,

View file

@ -61,12 +61,15 @@ describe('GcsPersistorTests', function() {
ReadStream = { ReadStream = {
pipe: sinon.stub().returns('readStream'), pipe: sinon.stub().returns('readStream'),
on: sinon on: sinon.stub(),
.stub()
.withArgs('end')
.yields(),
removeListener: sinon.stub() removeListener: sinon.stub()
} }
ReadStream.on.withArgs('end').yields()
ReadStream.on.withArgs('pipe').yields({
unpipe: sinon.stub(),
resume: sinon.stub(),
on: sinon.stub()
})
Transform = class { Transform = class {
on(event, callback) { on(event, callback) {
@ -168,8 +171,7 @@ describe('GcsPersistorTests', function() {
}) })
it('pipes the stream through the meter', function() { it('pipes the stream through the meter', function() {
expect(Stream.pipeline).to.have.been.calledWith( expect(ReadStream.pipe).to.have.been.calledWith(
ReadStream,
sinon.match.instanceOf(Transform) sinon.match.instanceOf(Transform)
) )
}) })
@ -202,11 +204,11 @@ describe('GcsPersistorTests', function() {
beforeEach(async function() { beforeEach(async function() {
Transform.prototype.on = sinon.stub() Transform.prototype.on = sinon.stub()
Stream.pipeline.yields(GcsNotFoundError) ReadStream.on.withArgs('error').yields(GcsNotFoundError)
try { try {
stream = await GcsPersistor.promises.getFileStream(bucket, key) stream = await GcsPersistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (e) {
error = err error = e
} }
}) })
@ -232,7 +234,7 @@ describe('GcsPersistorTests', function() {
beforeEach(async function() { beforeEach(async function() {
Transform.prototype.on = sinon.stub() Transform.prototype.on = sinon.stub()
Stream.pipeline.yields(genericError) ReadStream.on.withArgs('error').yields(genericError)
try { try {
stream = await GcsPersistor.promises.getFileStream(bucket, key) stream = await GcsPersistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (err) {

View file

@ -87,11 +87,14 @@ describe('S3PersistorTests', function() {
ReadStream = { ReadStream = {
pipe: sinon.stub().returns('readStream'), pipe: sinon.stub().returns('readStream'),
on: sinon on: sinon.stub(),
.stub() removeListener: sinon.stub()
.withArgs('end')
.yields()
} }
ReadStream.on.withArgs('end').yields()
ReadStream.on.withArgs('pipe').yields({
unpipe: sinon.stub(),
resume: sinon.stub()
})
FileNotFoundError = new Error('File not found') FileNotFoundError = new Error('File not found')
FileNotFoundError.code = 'ENOENT' FileNotFoundError.code = 'ENOENT'
@ -111,6 +114,11 @@ describe('S3PersistorTests', function() {
pipe: sinon.stub(), pipe: sinon.stub(),
removeListener: sinon.stub() removeListener: sinon.stub()
} }
S3ReadStream.on.withArgs('end').yields()
S3ReadStream.on.withArgs('pipe').yields({
unpipe: sinon.stub(),
resume: sinon.stub()
})
S3Client = { S3Client = {
getObject: sinon.stub().returns({ getObject: sinon.stub().returns({
createReadStream: sinon.stub().returns(S3ReadStream) createReadStream: sinon.stub().returns(S3ReadStream)
@ -187,9 +195,8 @@ describe('S3PersistorTests', function() {
}) })
}) })
it('pipes the stream through the meter', function() { it('pipes the stream through the meter', async function() {
expect(Stream.pipeline).to.have.been.calledWith( expect(S3ReadStream.pipe).to.have.been.calledWith(
S3ReadStream,
sinon.match.instanceOf(Transform) sinon.match.instanceOf(Transform)
) )
}) })
@ -292,7 +299,7 @@ describe('S3PersistorTests', function() {
beforeEach(async function() { beforeEach(async function() {
Transform.prototype.on = sinon.stub() Transform.prototype.on = sinon.stub()
Stream.pipeline.yields(S3NotFoundError) S3ReadStream.on.withArgs('error').yields(S3NotFoundError)
try { try {
stream = await S3Persistor.promises.getFileStream(bucket, key) stream = await S3Persistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (err) {
@ -322,7 +329,7 @@ describe('S3PersistorTests', function() {
beforeEach(async function() { beforeEach(async function() {
Transform.prototype.on = sinon.stub() Transform.prototype.on = sinon.stub()
Stream.pipeline.yields(S3AccessDeniedError) S3ReadStream.on.withArgs('error').yields(S3AccessDeniedError)
try { try {
stream = await S3Persistor.promises.getFileStream(bucket, key) stream = await S3Persistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (err) {
@ -352,7 +359,7 @@ describe('S3PersistorTests', function() {
beforeEach(async function() { beforeEach(async function() {
Transform.prototype.on = sinon.stub() Transform.prototype.on = sinon.stub()
Stream.pipeline.yields(genericError) S3ReadStream.on.withArgs('error').yields(genericError)
try { try {
stream = await S3Persistor.promises.getFileStream(bucket, key) stream = await S3Persistor.promises.getFileStream(bucket, key)
} catch (err) { } catch (err) {
@ -544,8 +551,7 @@ describe('S3PersistorTests', function() {
}) })
it('should meter the download', function() { it('should meter the download', function() {
expect(Stream.pipeline).to.have.been.calledWith( expect(S3ReadStream.pipe).to.have.been.calledWith(
S3ReadStream,
sinon.match.instanceOf(Stream.Transform) sinon.match.instanceOf(Stream.Transform)
) )
}) })