Use single pipeline when calculating md5

This commit is contained in:
Simon Detheridge 2020-03-25 16:59:51 +00:00
parent 63b4b3e9a2
commit d073fe75ca
8 changed files with 111 additions and 140 deletions

View file

@ -65,17 +65,15 @@ async function sendFile(bucketName, key, fsPath) {
async function sendStream(bucketName, key, readStream, sourceMd5) {
try {
let hashPromise
// egress from us to gcs
const observeOptions = { metric: 'gcs.egress' }
// if there is no supplied md5 hash, we calculate the hash as the data passes through
if (!sourceMd5) {
hashPromise = PersistorHelper.calculateStreamMd5(readStream)
// if there is no supplied md5 hash, we calculate the hash as the data passes through
observeOptions.hash = 'md5'
}
const meteredStream = PersistorHelper.getMeteredStream(
readStream,
'gcs.egress' // egress from us to gcs
)
const observer = new PersistorHelper.ObserverStream(observeOptions)
const writeOptions = {
// disabling of resumable uploads is recommended by Google:
@ -94,12 +92,12 @@ async function sendStream(bucketName, key, readStream, sourceMd5) {
.file(key)
.createWriteStream(writeOptions)
await pipeline(meteredStream, uploadStream)
await pipeline(readStream, observer, uploadStream)
// if we didn't have an md5 hash, we should compare our computed one with Google's
// as we couldn't tell GCS about it beforehand
if (hashPromise) {
sourceMd5 = await hashPromise
if (!sourceMd5) {
sourceMd5 = observer.getHash()
// throws on mismatch
await PersistorHelper.verifyMd5(GcsPersistor, bucketName, key, sourceMd5)
}
@ -124,14 +122,15 @@ async function getFileStream(bucketName, key, _opts = {}) {
.file(key)
.createReadStream(opts)
const meteredStream = PersistorHelper.getMeteredStream(
stream,
'gcs.ingress' // ingress to us from gcs
)
// ingress to us from gcs
const observer = new PersistorHelper.ObserverStream({
metric: 'gcs.ingress'
})
pipeline(stream, observer)
try {
await PersistorHelper.waitForStreamReady(stream)
return meteredStream
return observer
} catch (err) {
throw PersistorHelper.wrapError(
err,

View file

@ -1,17 +1,54 @@
const crypto = require('crypto')
const metrics = require('metrics-sharelatex')
const meter = require('stream-meter')
const Stream = require('stream')
const logger = require('logger-sharelatex')
const metrics = require('metrics-sharelatex')
const { WriteError, ReadError, NotFoundError } = require('./Errors')
const { promisify } = require('util')
const pipeline = promisify(Stream.pipeline)
// Observes data that passes through and computes some metadata for it
// - specifically, it computes the number of bytes transferred, and optionally
// computes a cryptographic hash based on the 'hash' option. e.g., pass
// { hash: 'md5' } to compute the md5 hash of the stream
// - if 'metric' is supplied as an option, this metric will be incremented by
// the number of bytes transferred
class ObserverStream extends Stream.Transform {
constructor(options) {
super(options)
this.bytes = 0
if (options.hash) {
this.hash = crypto.createHash(options.hash)
}
if (options.metric) {
const onEnd = () => {
metrics.count(options.metric, this.bytes)
}
this.once('error', onEnd)
this.once('end', onEnd)
}
}
_transform(chunk, encoding, done) {
if (this.hash) {
this.hash.update(chunk)
}
this.bytes += chunk.length
this.push(chunk)
done()
}
getHash() {
return this.hash && this.hash.digest('hex')
}
}
module.exports = {
ObserverStream,
calculateStreamMd5,
verifyMd5,
getMeteredStream,
waitForStreamReady,
wrapError,
hexToBase64,
@ -19,6 +56,7 @@ module.exports = {
}
// returns a promise which resolves with the md5 hash of the stream
// - consumes the stream
function calculateStreamMd5(stream) {
const hash = crypto.createHash('md5')
hash.setEncoding('hex')
@ -53,23 +91,6 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) {
}
}
// returns the next stream in the pipeline, and calls the callback with the byte count
// when the stream finishes or receives an error
function getMeteredStream(stream, metricName) {
const meteredStream = meter()
pipeline(stream, meteredStream)
.then(() => {
metrics.count(metricName, meteredStream.bytes)
})
.catch(() => {
// on error, just send how many bytes we received before the stream stopped
metrics.count(metricName, meteredStream.bytes)
})
return meteredStream
}
// resolves when a stream is 'readable', or rejects if the stream throws an error
// before that happens - this lets us handle protocol-level errors before trying
// to read them

View file

@ -10,13 +10,15 @@ const PersistorHelper = require('./PersistorHelper')
const fs = require('fs')
const S3 = require('aws-sdk/clients/s3')
const { URL } = require('url')
const { callbackify } = require('util')
const Stream = require('stream')
const { promisify, callbackify } = require('util')
const {
WriteError,
ReadError,
NotFoundError,
SettingsError
} = require('./Errors')
const pipeline = promisify(Stream.pipeline)
const S3Persistor = {
sendFile: callbackify(sendFile),
@ -51,26 +53,25 @@ async function sendFile(bucketName, key, fsPath) {
async function sendStream(bucketName, key, readStream, sourceMd5) {
try {
// if there is no supplied md5 hash, we calculate the hash as the data passes through
let hashPromise
// egress from us to S3
const observeOptions = { metric: 's3.egress' }
let b64Hash
if (sourceMd5) {
b64Hash = PersistorHelper.hexToBase64(sourceMd5)
} else {
hashPromise = PersistorHelper.calculateStreamMd5(readStream)
// if there is no supplied md5 hash, we calculate the hash as the data passes through
observeOptions.hash = 'md5'
}
const meteredStream = PersistorHelper.getMeteredStream(
readStream,
's3.egress' // egress from us to s3
)
const observer = new PersistorHelper.ObserverStream(observeOptions)
pipeline(readStream, observer)
// if we have an md5 hash, pass this to S3 to verify the upload
const uploadOptions = {
Bucket: bucketName,
Key: key,
Body: meteredStream
Body: observer
}
if (b64Hash) {
uploadOptions.ContentMD5 = b64Hash
@ -92,8 +93,8 @@ async function sendStream(bucketName, key, readStream, sourceMd5) {
// if we didn't have an md5 hash, we should compare our computed one with S3's
// as we couldn't tell S3 about it beforehand
if (hashPromise) {
sourceMd5 = await hashPromise
if (!sourceMd5) {
sourceMd5 = observer.getHash()
// throws on mismatch
await PersistorHelper.verifyMd5(
S3Persistor,
@ -128,14 +129,13 @@ async function getFileStream(bucketName, key, opts) {
.getObject(params)
.createReadStream()
const meteredStream = PersistorHelper.getMeteredStream(
stream,
's3.ingress' // ingress to us from s3
)
// ingress from S3 to us
const observer = new PersistorHelper.ObserverStream({ metric: 's3.ingress' })
pipeline(stream, observer)
try {
await PersistorHelper.waitForStreamReady(stream)
return meteredStream
return observer
} catch (err) {
throw PersistorHelper.wrapError(
err,

View file

@ -5382,14 +5382,6 @@
"stubs": "^3.0.0"
}
},
"stream-meter": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/stream-meter/-/stream-meter-1.0.4.tgz",
"integrity": "sha512-4sOEtrbgFotXwnEuzzsQBYEV1elAeFSO8rSGeTwabuX1RRn/kEq9JVH7I0MRBhKVRR0sJkr0M0QCH7yOLf9fhQ==",
"requires": {
"readable-stream": "^2.1.4"
}
},
"stream-shift": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.1.tgz",

View file

@ -35,7 +35,6 @@
"request-promise-native": "^1.0.8",
"settings-sharelatex": "^1.1.0",
"stream-buffers": "~0.2.5",
"stream-meter": "^1.0.4",
"tiny-async-pool": "^1.1.0"
},
"devDependencies": {

View file

@ -3,6 +3,7 @@ const chai = require('chai')
const { expect } = chai
const SandboxedModule = require('sandboxed-module')
const Errors = require('../../../app/js/Errors')
const StreamModule = require('stream')
chai.use(require('sinon-chai'))
chai.use(require('chai-as-promised'))
@ -38,7 +39,10 @@ describe('FSPersistorTests', function() {
stat: sinon.stub().yields(null, stat)
}
glob = sinon.stub().yields(null, globs)
stream = { pipeline: sinon.stub().yields() }
stream = {
pipeline: sinon.stub().yields(),
Transform: StreamModule.Transform
}
LocalFileWriter = {
promises: {
writeStream: sinon.stub().resolves(tempFile),
@ -48,6 +52,7 @@ describe('FSPersistorTests', function() {
Hash = {
end: sinon.stub(),
read: sinon.stub().returns(md5),
digest: sinon.stub().returns(md5),
setEncoding: sinon.stub()
}
crypto = {
@ -62,7 +67,6 @@ describe('FSPersistorTests', function() {
stream,
crypto,
// imported by PersistorHelper but otherwise unused here
'stream-meter': {},
'logger-sharelatex': {},
'metrics-sharelatex': {}
},

View file

@ -5,6 +5,7 @@ const modulePath = '../../../app/js/GcsPersistor.js'
const SandboxedModule = require('sandboxed-module')
const { ObjectId } = require('mongodb')
const asyncPool = require('tiny-async-pool')
const StreamModule = require('stream')
const Errors = require('../../../app/js/Errors')
@ -13,7 +14,6 @@ describe('GcsPersistorTests', function() {
const bucket = 'womBucket'
const key = 'monKey'
const destKey = 'donKey'
const objectSize = 5555
const genericError = new Error('guru meditation error')
const filesSize = 33
const md5 = 'ffffffff00000000ffffffff00000000'
@ -24,8 +24,6 @@ describe('GcsPersistorTests', function() {
Storage,
Fs,
GcsNotFoundError,
Meter,
MeteredStream,
ReadStream,
Stream,
GcsBucket,
@ -71,7 +69,8 @@ describe('GcsPersistorTests', function() {
}
Stream = {
pipeline: sinon.stub().yields()
pipeline: sinon.stub().yields(),
Transform: StreamModule.Transform
}
Metrics = {
@ -109,18 +108,10 @@ describe('GcsPersistorTests', function() {
FileNotFoundError = new Error('File not found')
FileNotFoundError.code = 'ENOENT'
MeteredStream = {
type: 'metered',
on: sinon.stub(),
bytes: objectSize
}
MeteredStream.on.withArgs('finish').yields()
MeteredStream.on.withArgs('readable').yields()
Meter = sinon.stub().returns(MeteredStream)
Hash = {
end: sinon.stub(),
read: sinon.stub().returns(md5),
digest: sinon.stub().returns(md5),
setEncoding: sinon.stub()
}
crypto = {
@ -139,7 +130,6 @@ describe('GcsPersistorTests', function() {
'tiny-async-pool': asyncPool,
'./Errors': Errors,
fs: Fs,
'stream-meter': Meter,
stream: Stream,
'metrics-sharelatex': Metrics,
crypto
@ -157,7 +147,7 @@ describe('GcsPersistorTests', function() {
})
it('returns a metered stream', function() {
expect(stream).to.equal(MeteredStream)
expect(stream).to.be.instanceOf(StreamModule.Transform)
})
it('fetches the right key from the right bucket', function() {
@ -169,13 +159,9 @@ describe('GcsPersistorTests', function() {
it('pipes the stream through the meter', function() {
expect(Stream.pipeline).to.have.been.calledWith(
ReadStream,
MeteredStream
sinon.match.instanceOf(StreamModule.Transform)
)
})
it('records an ingress metric', function() {
expect(Metrics.count).to.have.been.calledWith('gcs.ingress', objectSize)
})
})
describe('when called with a byte range', function() {
@ -189,7 +175,7 @@ describe('GcsPersistorTests', function() {
})
it('returns a metered stream', function() {
expect(stream).to.equal(MeteredStream)
expect(stream).to.be.instanceOf(StreamModule.Transform)
})
it('passes the byte range on to GCS', function() {
@ -341,26 +327,16 @@ describe('GcsPersistorTests', function() {
})
})
it('should meter the stream', function() {
it('should meter the stream and pass it to GCS', function() {
expect(Stream.pipeline).to.have.been.calledWith(
ReadStream,
MeteredStream
)
})
it('should pipe the metered stream to GCS', function() {
expect(Stream.pipeline).to.have.been.calledWith(
MeteredStream,
sinon.match.instanceOf(StreamModule.Transform),
WriteStream
)
})
it('should record an egress metric', function() {
expect(Metrics.count).to.have.been.calledWith('gcs.egress', objectSize)
})
it('calculates the md5 hash of the file', function() {
expect(Stream.pipeline).to.have.been.calledWith(ReadStream, Hash)
expect(Hash.digest).to.have.been.called
})
})
@ -375,10 +351,7 @@ describe('GcsPersistorTests', function() {
})
it('should not calculate the md5 hash of the file', function() {
expect(Stream.pipeline).not.to.have.been.calledWith(
sinon.match.any,
Hash
)
expect(Hash.digest).not.to.have.been.called
})
it('sends the hash in base64', function() {
@ -400,7 +373,12 @@ describe('GcsPersistorTests', function() {
let error
beforeEach(async function() {
Stream.pipeline
.withArgs(MeteredStream, WriteStream, sinon.match.any)
.withArgs(
ReadStream,
sinon.match.instanceOf(StreamModule.Transform),
WriteStream,
sinon.match.any
)
.yields(genericError)
try {
await GcsPersistor.promises.sendStream(bucket, key, ReadStream)
@ -438,10 +416,7 @@ describe('GcsPersistorTests', function() {
it('should upload the stream via the meter', function() {
expect(Stream.pipeline).to.have.been.calledWith(
ReadStream,
MeteredStream
)
expect(Stream.pipeline).to.have.been.calledWith(
MeteredStream,
sinon.match.instanceOf(StreamModule.Transform),
WriteStream
)
})

View file

@ -3,6 +3,7 @@ const chai = require('chai')
const { expect } = chai
const modulePath = '../../../app/js/S3Persistor.js'
const SandboxedModule = require('sandboxed-module')
const StreamModule = require('stream')
const Errors = require('../../../app/js/Errors')
@ -32,8 +33,6 @@ describe('S3PersistorTests', function() {
Logger,
S3,
Fs,
Meter,
MeteredStream,
ReadStream,
Stream,
S3Persistor,
@ -63,7 +62,8 @@ describe('S3PersistorTests', function() {
}
Stream = {
pipeline: sinon.stub().yields()
pipeline: sinon.stub().yields(),
Transform: StreamModule.Transform
}
EmptyPromise = {
@ -89,14 +89,6 @@ describe('S3PersistorTests', function() {
createReadStream: sinon.stub().returns(ReadStream)
}
MeteredStream = {
type: 'metered',
on: sinon.stub(),
bytes: objectSize
}
MeteredStream.on.withArgs('finish').yields()
Meter = sinon.stub().returns(MeteredStream)
S3NotFoundError = new Error('not found')
S3NotFoundError.code = 'NoSuchKey'
@ -136,6 +128,7 @@ describe('S3PersistorTests', function() {
Hash = {
end: sinon.stub(),
read: sinon.stub().returns(md5),
digest: sinon.stub().returns(md5),
setEncoding: sinon.stub()
}
crypto = {
@ -153,7 +146,6 @@ describe('S3PersistorTests', function() {
'logger-sharelatex': Logger,
'./Errors': Errors,
fs: Fs,
'stream-meter': Meter,
stream: Stream,
'metrics-sharelatex': Metrics,
crypto
@ -171,7 +163,7 @@ describe('S3PersistorTests', function() {
})
it('returns a metered stream', function() {
expect(stream).to.equal(MeteredStream)
expect(stream).to.be.instanceOf(StreamModule.Transform)
})
it('sets the AWS client up with credentials from settings', function() {
@ -188,13 +180,9 @@ describe('S3PersistorTests', function() {
it('pipes the stream through the meter', function() {
expect(Stream.pipeline).to.have.been.calledWith(
S3ReadStream,
MeteredStream
sinon.match.instanceOf(StreamModule.Transform)
)
})
it('records an ingress metric', function() {
expect(Metrics.count).to.have.been.calledWith('s3.ingress', objectSize)
})
})
describe('when called with a byte range', function() {
@ -208,7 +196,7 @@ describe('S3PersistorTests', function() {
})
it('returns a metered stream', function() {
expect(stream).to.equal(MeteredStream)
expect(stream).to.be.instanceOf(Stream.Transform)
})
it('passes the byte range on to S3', function() {
@ -242,7 +230,7 @@ describe('S3PersistorTests', function() {
})
it('returns a metered stream', function() {
expect(stream).to.equal(MeteredStream)
expect(stream).to.be.instanceOf(Stream.Transform)
})
it('sets the AWS client up with the alternative credentials', function() {
@ -457,7 +445,7 @@ describe('S3PersistorTests', function() {
expect(S3Client.upload).to.have.been.calledWith({
Bucket: bucket,
Key: key,
Body: MeteredStream
Body: sinon.match.instanceOf(Stream.Transform)
})
})
@ -470,16 +458,12 @@ describe('S3PersistorTests', function() {
it('should meter the stream', function() {
expect(Stream.pipeline).to.have.been.calledWith(
ReadStream,
MeteredStream
sinon.match.instanceOf(Stream.Transform)
)
})
it('should record an egress metric', function() {
expect(Metrics.count).to.have.been.calledWith('s3.egress', objectSize)
})
it('calculates the md5 hash of the file', function() {
expect(Stream.pipeline).to.have.been.calledWith(ReadStream, Hash)
expect(Hash.digest).to.have.been.called
})
})
@ -494,17 +478,14 @@ describe('S3PersistorTests', function() {
})
it('should not calculate the md5 hash of the file', function() {
expect(Stream.pipeline).not.to.have.been.calledWith(
sinon.match.any,
Hash
)
expect(Hash.digest).not.to.have.been.called
})
it('sends the hash in base64', function() {
expect(S3Client.upload).to.have.been.calledWith({
Bucket: bucket,
Key: key,
Body: MeteredStream,
Body: sinon.match.instanceOf(StreamModule.Transform),
ContentMD5: 'qqqqqru7u7uqqqqqu7u7uw=='
})
})
@ -555,12 +536,12 @@ describe('S3PersistorTests', function() {
it('should meter the download', function() {
expect(Stream.pipeline).to.have.been.calledWith(
S3ReadStream,
MeteredStream
sinon.match.instanceOf(Stream.Transform)
)
})
it('should calculate the md5 hash from the file', function() {
expect(Stream.pipeline).to.have.been.calledWith(MeteredStream, Hash)
expect(Hash.digest).to.have.been.called
})
})
})
@ -579,7 +560,7 @@ describe('S3PersistorTests', function() {
expect(S3Client.upload).to.have.been.calledWith({
Bucket: bucket,
Key: key,
Body: MeteredStream
Body: sinon.match.instanceOf(StreamModule.Transform)
})
})
})