mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-10 17:17:59 +00:00
Catch errors from pipeline and stream when waiting for streams to be readable
This commit is contained in:
parent
238d96ed44
commit
1f037ef653
5 changed files with 73 additions and 38 deletions
|
@ -126,10 +126,10 @@ async function getFileStream(bucketName, key, _opts = {}) {
|
|||
const observer = new PersistorHelper.ObserverStream({
|
||||
metric: 'gcs.ingress'
|
||||
})
|
||||
pipeline(stream, observer)
|
||||
|
||||
try {
|
||||
await PersistorHelper.waitForStreamReady(stream)
|
||||
// wait for the pipeline to be ready, to catch non-200s
|
||||
await PersistorHelper.getReadyPipeline(stream, observer)
|
||||
return observer
|
||||
} catch (err) {
|
||||
throw PersistorHelper.wrapError(
|
||||
|
|
|
@ -50,7 +50,7 @@ module.exports = {
|
|||
ObserverStream,
|
||||
calculateStreamMd5,
|
||||
verifyMd5,
|
||||
waitForStreamReady,
|
||||
getReadyPipeline,
|
||||
wrapError,
|
||||
hexToBase64,
|
||||
base64ToHex
|
||||
|
@ -94,19 +94,33 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) {
|
|||
|
||||
// resolves when a stream is 'readable', or rejects if the stream throws an error
|
||||
// before that happens - this lets us handle protocol-level errors before trying
|
||||
// to read them
|
||||
function waitForStreamReady(stream) {
|
||||
// to read them - these can come from the call to pipeline or the stream itself
|
||||
function getReadyPipeline(...streams) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const lastStream = streams.slice(-1)[0]
|
||||
let resolvedOrErrored = false
|
||||
|
||||
const onError = function(err) {
|
||||
reject(wrapError(err, 'error before stream became ready', {}, ReadError))
|
||||
if (!resolvedOrErrored) {
|
||||
resolvedOrErrored = true
|
||||
reject(
|
||||
wrapError(err, 'error before stream became ready', {}, ReadError)
|
||||
)
|
||||
}
|
||||
}
|
||||
const onStreamReady = function() {
|
||||
stream.removeListener('readable', onStreamReady)
|
||||
stream.removeListener('error', onError)
|
||||
resolve(stream)
|
||||
if (!resolvedOrErrored) {
|
||||
resolvedOrErrored = true
|
||||
lastStream.removeListener('readable', onStreamReady)
|
||||
lastStream.removeListener('error', onError)
|
||||
resolve(lastStream)
|
||||
}
|
||||
}
|
||||
stream.on('readable', onStreamReady)
|
||||
stream.on('error', onError)
|
||||
|
||||
pipeline(...streams).catch(onError)
|
||||
|
||||
lastStream.on('readable', onStreamReady)
|
||||
lastStream.on('error', onError)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -131,10 +131,10 @@ async function getFileStream(bucketName, key, opts) {
|
|||
|
||||
// ingress from S3 to us
|
||||
const observer = new PersistorHelper.ObserverStream({ metric: 's3.ingress' })
|
||||
pipeline(stream, observer)
|
||||
|
||||
try {
|
||||
await PersistorHelper.waitForStreamReady(stream)
|
||||
// wait for the pipeline to be ready, to catch non-200s
|
||||
await PersistorHelper.getReadyPipeline(stream, observer)
|
||||
return observer
|
||||
} catch (err) {
|
||||
throw PersistorHelper.wrapError(
|
||||
|
|
|
@ -5,7 +5,6 @@ const modulePath = '../../../app/js/GcsPersistor.js'
|
|||
const SandboxedModule = require('sandboxed-module')
|
||||
const { ObjectId } = require('mongodb')
|
||||
const asyncPool = require('tiny-async-pool')
|
||||
const StreamModule = require('stream')
|
||||
|
||||
const Errors = require('../../../app/js/Errors')
|
||||
|
||||
|
@ -21,6 +20,7 @@ describe('GcsPersistorTests', function() {
|
|||
|
||||
let Metrics,
|
||||
Logger,
|
||||
Transform,
|
||||
Storage,
|
||||
Fs,
|
||||
GcsNotFoundError,
|
||||
|
@ -68,9 +68,20 @@ describe('GcsPersistorTests', function() {
|
|||
removeListener: sinon.stub()
|
||||
}
|
||||
|
||||
Transform = class {
|
||||
on(event, callback) {
|
||||
if (event === 'readable') {
|
||||
callback()
|
||||
}
|
||||
}
|
||||
|
||||
once() {}
|
||||
removeListener() {}
|
||||
}
|
||||
|
||||
Stream = {
|
||||
pipeline: sinon.stub().yields(),
|
||||
Transform: StreamModule.Transform
|
||||
Transform: Transform
|
||||
}
|
||||
|
||||
Metrics = {
|
||||
|
@ -147,7 +158,7 @@ describe('GcsPersistorTests', function() {
|
|||
})
|
||||
|
||||
it('returns a metered stream', function() {
|
||||
expect(stream).to.be.instanceOf(StreamModule.Transform)
|
||||
expect(stream).to.be.instanceOf(Transform)
|
||||
})
|
||||
|
||||
it('fetches the right key from the right bucket', function() {
|
||||
|
@ -159,7 +170,7 @@ describe('GcsPersistorTests', function() {
|
|||
it('pipes the stream through the meter', function() {
|
||||
expect(Stream.pipeline).to.have.been.calledWith(
|
||||
ReadStream,
|
||||
sinon.match.instanceOf(StreamModule.Transform)
|
||||
sinon.match.instanceOf(Transform)
|
||||
)
|
||||
})
|
||||
})
|
||||
|
@ -175,7 +186,7 @@ describe('GcsPersistorTests', function() {
|
|||
})
|
||||
|
||||
it('returns a metered stream', function() {
|
||||
expect(stream).to.be.instanceOf(StreamModule.Transform)
|
||||
expect(stream).to.be.instanceOf(Transform)
|
||||
})
|
||||
|
||||
it('passes the byte range on to GCS', function() {
|
||||
|
@ -190,8 +201,8 @@ describe('GcsPersistorTests', function() {
|
|||
let error, stream
|
||||
|
||||
beforeEach(async function() {
|
||||
ReadStream.on = sinon.stub()
|
||||
ReadStream.on.withArgs('error').yields(GcsNotFoundError)
|
||||
Transform.prototype.on = sinon.stub()
|
||||
Transform.prototype.on.withArgs('error').yields(GcsNotFoundError)
|
||||
try {
|
||||
stream = await GcsPersistor.promises.getFileStream(bucket, key)
|
||||
} catch (err) {
|
||||
|
@ -220,8 +231,8 @@ describe('GcsPersistorTests', function() {
|
|||
let error, stream
|
||||
|
||||
beforeEach(async function() {
|
||||
ReadStream.on = sinon.stub()
|
||||
ReadStream.on.withArgs('error').yields(genericError)
|
||||
Transform.prototype.on = sinon.stub()
|
||||
Transform.prototype.on.withArgs('error').yields(genericError)
|
||||
try {
|
||||
stream = await GcsPersistor.promises.getFileStream(bucket, key)
|
||||
} catch (err) {
|
||||
|
@ -330,7 +341,7 @@ describe('GcsPersistorTests', function() {
|
|||
it('should meter the stream and pass it to GCS', function() {
|
||||
expect(Stream.pipeline).to.have.been.calledWith(
|
||||
ReadStream,
|
||||
sinon.match.instanceOf(StreamModule.Transform),
|
||||
sinon.match.instanceOf(Transform),
|
||||
WriteStream
|
||||
)
|
||||
})
|
||||
|
@ -375,7 +386,7 @@ describe('GcsPersistorTests', function() {
|
|||
Stream.pipeline
|
||||
.withArgs(
|
||||
ReadStream,
|
||||
sinon.match.instanceOf(StreamModule.Transform),
|
||||
sinon.match.instanceOf(Transform),
|
||||
WriteStream,
|
||||
sinon.match.any
|
||||
)
|
||||
|
@ -416,7 +427,7 @@ describe('GcsPersistorTests', function() {
|
|||
it('should upload the stream via the meter', function() {
|
||||
expect(Stream.pipeline).to.have.been.calledWith(
|
||||
ReadStream,
|
||||
sinon.match.instanceOf(StreamModule.Transform),
|
||||
sinon.match.instanceOf(Transform),
|
||||
WriteStream
|
||||
)
|
||||
})
|
||||
|
|
|
@ -3,7 +3,6 @@ const chai = require('chai')
|
|||
const { expect } = chai
|
||||
const modulePath = '../../../app/js/S3Persistor.js'
|
||||
const SandboxedModule = require('sandboxed-module')
|
||||
const StreamModule = require('stream')
|
||||
|
||||
const Errors = require('../../../app/js/Errors')
|
||||
|
||||
|
@ -31,6 +30,7 @@ describe('S3PersistorTests', function() {
|
|||
|
||||
let Metrics,
|
||||
Logger,
|
||||
Transform,
|
||||
S3,
|
||||
Fs,
|
||||
ReadStream,
|
||||
|
@ -61,9 +61,20 @@ describe('S3PersistorTests', function() {
|
|||
}
|
||||
}
|
||||
|
||||
Transform = class {
|
||||
on(event, callback) {
|
||||
if (event === 'readable') {
|
||||
callback()
|
||||
}
|
||||
}
|
||||
|
||||
once() {}
|
||||
removeListener() {}
|
||||
}
|
||||
|
||||
Stream = {
|
||||
pipeline: sinon.stub().yields(),
|
||||
Transform: StreamModule.Transform
|
||||
Transform: Transform
|
||||
}
|
||||
|
||||
EmptyPromise = {
|
||||
|
@ -100,7 +111,6 @@ describe('S3PersistorTests', function() {
|
|||
pipe: sinon.stub(),
|
||||
removeListener: sinon.stub()
|
||||
}
|
||||
S3ReadStream.on.withArgs('readable').yields()
|
||||
S3Client = {
|
||||
getObject: sinon.stub().returns({
|
||||
createReadStream: sinon.stub().returns(S3ReadStream)
|
||||
|
@ -163,7 +173,7 @@ describe('S3PersistorTests', function() {
|
|||
})
|
||||
|
||||
it('returns a metered stream', function() {
|
||||
expect(stream).to.be.instanceOf(StreamModule.Transform)
|
||||
expect(stream).to.be.instanceOf(Transform)
|
||||
})
|
||||
|
||||
it('sets the AWS client up with credentials from settings', function() {
|
||||
|
@ -180,7 +190,7 @@ describe('S3PersistorTests', function() {
|
|||
it('pipes the stream through the meter', function() {
|
||||
expect(Stream.pipeline).to.have.been.calledWith(
|
||||
S3ReadStream,
|
||||
sinon.match.instanceOf(StreamModule.Transform)
|
||||
sinon.match.instanceOf(Transform)
|
||||
)
|
||||
})
|
||||
})
|
||||
|
@ -281,8 +291,8 @@ describe('S3PersistorTests', function() {
|
|||
let error, stream
|
||||
|
||||
beforeEach(async function() {
|
||||
S3ReadStream.on = sinon.stub()
|
||||
S3ReadStream.on.withArgs('error').yields(S3NotFoundError)
|
||||
Transform.prototype.on = sinon.stub()
|
||||
Transform.prototype.on.withArgs('error').yields(S3NotFoundError)
|
||||
try {
|
||||
stream = await S3Persistor.promises.getFileStream(bucket, key)
|
||||
} catch (err) {
|
||||
|
@ -311,8 +321,8 @@ describe('S3PersistorTests', function() {
|
|||
let error, stream
|
||||
|
||||
beforeEach(async function() {
|
||||
S3ReadStream.on = sinon.stub()
|
||||
S3ReadStream.on.withArgs('error').yields(S3AccessDeniedError)
|
||||
Transform.prototype.on = sinon.stub()
|
||||
Transform.prototype.on.withArgs('error').yields(S3AccessDeniedError)
|
||||
try {
|
||||
stream = await S3Persistor.promises.getFileStream(bucket, key)
|
||||
} catch (err) {
|
||||
|
@ -341,8 +351,8 @@ describe('S3PersistorTests', function() {
|
|||
let error, stream
|
||||
|
||||
beforeEach(async function() {
|
||||
S3ReadStream.on = sinon.stub()
|
||||
S3ReadStream.on.withArgs('error').yields(genericError)
|
||||
Transform.prototype.on = sinon.stub()
|
||||
Transform.prototype.on.withArgs('error').yields(genericError)
|
||||
try {
|
||||
stream = await S3Persistor.promises.getFileStream(bucket, key)
|
||||
} catch (err) {
|
||||
|
@ -485,7 +495,7 @@ describe('S3PersistorTests', function() {
|
|||
expect(S3Client.upload).to.have.been.calledWith({
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
Body: sinon.match.instanceOf(StreamModule.Transform),
|
||||
Body: sinon.match.instanceOf(Transform),
|
||||
ContentMD5: 'qqqqqru7u7uqqqqqu7u7uw=='
|
||||
})
|
||||
})
|
||||
|
@ -560,7 +570,7 @@ describe('S3PersistorTests', function() {
|
|||
expect(S3Client.upload).to.have.been.calledWith({
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
Body: sinon.match.instanceOf(StreamModule.Transform)
|
||||
Body: sinon.match.instanceOf(Transform)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
Loading…
Add table
Reference in a new issue