diff --git a/services/filestore/app/js/GcsPersistor.js b/services/filestore/app/js/GcsPersistor.js index 799ee65905..ae1c2dd53a 100644 --- a/services/filestore/app/js/GcsPersistor.js +++ b/services/filestore/app/js/GcsPersistor.js @@ -65,17 +65,15 @@ async function sendFile(bucketName, key, fsPath) { async function sendStream(bucketName, key, readStream, sourceMd5) { try { - let hashPromise + // egress from us to gcs + const observeOptions = { metric: 'gcs.egress' } - // if there is no supplied md5 hash, we calculate the hash as the data passes through if (!sourceMd5) { - hashPromise = PersistorHelper.calculateStreamMd5(readStream) + // if there is no supplied md5 hash, we calculate the hash as the data passes through + observeOptions.hash = 'md5' } - const meteredStream = PersistorHelper.getMeteredStream( - readStream, - 'gcs.egress' // egress from us to gcs - ) + const observer = new PersistorHelper.ObserverStream(observeOptions) const writeOptions = { // disabling of resumable uploads is recommended by Google: @@ -94,12 +92,12 @@ async function sendStream(bucketName, key, readStream, sourceMd5) { .file(key) .createWriteStream(writeOptions) - await pipeline(meteredStream, uploadStream) + await pipeline(readStream, observer, uploadStream) // if we didn't have an md5 hash, we should compare our computed one with Google's // as we couldn't tell GCS about it beforehand - if (hashPromise) { - sourceMd5 = await hashPromise + if (!sourceMd5) { + sourceMd5 = observer.getHash() // throws on mismatch await PersistorHelper.verifyMd5(GcsPersistor, bucketName, key, sourceMd5) } @@ -124,14 +122,15 @@ async function getFileStream(bucketName, key, _opts = {}) { .file(key) .createReadStream(opts) - const meteredStream = PersistorHelper.getMeteredStream( - stream, - 'gcs.ingress' // ingress to us from gcs - ) + // ingress to us from gcs + const observer = new PersistorHelper.ObserverStream({ + metric: 'gcs.ingress' + }) + pipeline(stream, observer) try { await PersistorHelper.waitForStreamReady(stream) - return meteredStream + return observer } catch (err) { throw PersistorHelper.wrapError( err, diff --git a/services/filestore/app/js/PersistorHelper.js b/services/filestore/app/js/PersistorHelper.js index a58f024bb4..ad5152374f 100644 --- a/services/filestore/app/js/PersistorHelper.js +++ b/services/filestore/app/js/PersistorHelper.js @@ -1,17 +1,54 @@ const crypto = require('crypto') -const metrics = require('metrics-sharelatex') -const meter = require('stream-meter') const Stream = require('stream') const logger = require('logger-sharelatex') +const metrics = require('metrics-sharelatex') const { WriteError, ReadError, NotFoundError } = require('./Errors') const { promisify } = require('util') const pipeline = promisify(Stream.pipeline) +// Observes data that passes through and computes some metadata for it +// - specifically, it computes the number of bytes transferred, and optionally +// computes a cryptographic hash based on the 'hash' option. e.g., pass +// { hash: 'md5' } to compute the md5 hash of the stream +// - if 'metric' is supplied as an option, this metric will be incremented by +// the number of bytes transferred +class ObserverStream extends Stream.Transform { + constructor(options) { + super(options) + + this.bytes = 0 + + if (options.hash) { + this.hash = crypto.createHash(options.hash) + } + if (options.metric) { + const onEnd = () => { + metrics.count(options.metric, this.bytes) + } + this.once('error', onEnd) + this.once('end', onEnd) + } + } + + _transform(chunk, encoding, done) { + if (this.hash) { + this.hash.update(chunk) + } + this.bytes += chunk.length + this.push(chunk) + done() + } + + getHash() { + return this.hash && this.hash.digest('hex') + } +} + module.exports = { + ObserverStream, calculateStreamMd5, verifyMd5, - getMeteredStream, waitForStreamReady, wrapError, hexToBase64, @@ -19,6 +56,7 @@ module.exports = { } // returns a promise which resolves with the md5 hash of the stream +// - consumes the stream function calculateStreamMd5(stream) { const hash = crypto.createHash('md5') hash.setEncoding('hex') @@ -53,23 +91,6 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) { } } -// returns the next stream in the pipeline, and calls the callback with the byte count -// when the stream finishes or receives an error -function getMeteredStream(stream, metricName) { - const meteredStream = meter() - - pipeline(stream, meteredStream) - .then(() => { - metrics.count(metricName, meteredStream.bytes) - }) - .catch(() => { - // on error, just send how many bytes we received before the stream stopped - metrics.count(metricName, meteredStream.bytes) - }) - - return meteredStream -} - // resolves when a stream is 'readable', or rejects if the stream throws an error // before that happens - this lets us handle protocol-level errors before trying // to read them diff --git a/services/filestore/app/js/S3Persistor.js b/services/filestore/app/js/S3Persistor.js index 1b92a61ae6..ba82db31e2 100644 --- a/services/filestore/app/js/S3Persistor.js +++ b/services/filestore/app/js/S3Persistor.js @@ -10,13 +10,15 @@ const PersistorHelper = require('./PersistorHelper') const fs = require('fs') const S3 = require('aws-sdk/clients/s3') const { URL } = require('url') -const { callbackify } = require('util') +const Stream = require('stream') +const { promisify, callbackify } = require('util') const { WriteError, ReadError, NotFoundError, SettingsError } = require('./Errors') +const pipeline = promisify(Stream.pipeline) const S3Persistor = { sendFile: callbackify(sendFile), @@ -51,26 +53,25 @@ async function sendFile(bucketName, key, fsPath) { async function sendStream(bucketName, key, readStream, sourceMd5) { try { - // if there is no supplied md5 hash, we calculate the hash as the data passes through - let hashPromise + // egress from us to S3 + const observeOptions = { metric: 's3.egress' } let b64Hash if (sourceMd5) { b64Hash = PersistorHelper.hexToBase64(sourceMd5) } else { - hashPromise = PersistorHelper.calculateStreamMd5(readStream) + // if there is no supplied md5 hash, we calculate the hash as the data passes through + observeOptions.hash = 'md5' } - const meteredStream = PersistorHelper.getMeteredStream( - readStream, - 's3.egress' // egress from us to s3 - ) + const observer = new PersistorHelper.ObserverStream(observeOptions) + pipeline(readStream, observer) // if we have an md5 hash, pass this to S3 to verify the upload const uploadOptions = { Bucket: bucketName, Key: key, - Body: meteredStream + Body: observer } if (b64Hash) { uploadOptions.ContentMD5 = b64Hash @@ -92,8 +93,8 @@ async function sendStream(bucketName, key, readStream, sourceMd5) { // if we didn't have an md5 hash, we should compare our computed one with S3's // as we couldn't tell S3 about it beforehand - if (hashPromise) { - sourceMd5 = await hashPromise + if (!sourceMd5) { + sourceMd5 = observer.getHash() // throws on mismatch await PersistorHelper.verifyMd5( S3Persistor, @@ -128,14 +129,13 @@ async function getFileStream(bucketName, key, opts) { .getObject(params) .createReadStream() - const meteredStream = PersistorHelper.getMeteredStream( - stream, - 's3.ingress' // ingress to us from s3 - ) + // ingress from S3 to us + const observer = new PersistorHelper.ObserverStream({ metric: 's3.ingress' }) + pipeline(stream, observer) try { await PersistorHelper.waitForStreamReady(stream) - return meteredStream + return observer } catch (err) { throw PersistorHelper.wrapError( err, diff --git a/services/filestore/package-lock.json b/services/filestore/package-lock.json index 90f5698668..f50ef5f552 100644 --- a/services/filestore/package-lock.json +++ b/services/filestore/package-lock.json @@ -5382,14 +5382,6 @@ "stubs": "^3.0.0" } }, - "stream-meter": { - "version": "1.0.4", - "resolved": "https://registry.npmjs.org/stream-meter/-/stream-meter-1.0.4.tgz", - "integrity": "sha512-4sOEtrbgFotXwnEuzzsQBYEV1elAeFSO8rSGeTwabuX1RRn/kEq9JVH7I0MRBhKVRR0sJkr0M0QCH7yOLf9fhQ==", - "requires": { - "readable-stream": "^2.1.4" - } - }, "stream-shift": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.1.tgz", diff --git a/services/filestore/package.json b/services/filestore/package.json index c4b8f16b15..53d7e24c6c 100644 --- a/services/filestore/package.json +++ b/services/filestore/package.json @@ -35,7 +35,6 @@ "request-promise-native": "^1.0.8", "settings-sharelatex": "^1.1.0", "stream-buffers": "~0.2.5", - "stream-meter": "^1.0.4", "tiny-async-pool": "^1.1.0" }, "devDependencies": { diff --git a/services/filestore/test/unit/js/FSPersistorTests.js b/services/filestore/test/unit/js/FSPersistorTests.js index 4777de502a..c8255987ba 100644 --- a/services/filestore/test/unit/js/FSPersistorTests.js +++ b/services/filestore/test/unit/js/FSPersistorTests.js @@ -3,6 +3,7 @@ const chai = require('chai') const { expect } = chai const SandboxedModule = require('sandboxed-module') const Errors = require('../../../app/js/Errors') +const StreamModule = require('stream') chai.use(require('sinon-chai')) chai.use(require('chai-as-promised')) @@ -38,7 +39,10 @@ describe('FSPersistorTests', function() { stat: sinon.stub().yields(null, stat) } glob = sinon.stub().yields(null, globs) - stream = { pipeline: sinon.stub().yields() } + stream = { + pipeline: sinon.stub().yields(), + Transform: StreamModule.Transform + } LocalFileWriter = { promises: { writeStream: sinon.stub().resolves(tempFile), @@ -48,6 +52,7 @@ describe('FSPersistorTests', function() { Hash = { end: sinon.stub(), read: sinon.stub().returns(md5), + digest: sinon.stub().returns(md5), setEncoding: sinon.stub() } crypto = { @@ -62,7 +67,6 @@ describe('FSPersistorTests', function() { stream, crypto, // imported by PersistorHelper but otherwise unused here - 'stream-meter': {}, 'logger-sharelatex': {}, 'metrics-sharelatex': {} }, diff --git a/services/filestore/test/unit/js/GcsPersistorTests.js b/services/filestore/test/unit/js/GcsPersistorTests.js index cd95bf1e20..0ca0f39d0f 100644 --- a/services/filestore/test/unit/js/GcsPersistorTests.js +++ b/services/filestore/test/unit/js/GcsPersistorTests.js @@ -5,6 +5,7 @@ const modulePath = '../../../app/js/GcsPersistor.js' const SandboxedModule = require('sandboxed-module') const { ObjectId } = require('mongodb') const asyncPool = require('tiny-async-pool') +const StreamModule = require('stream') const Errors = require('../../../app/js/Errors') @@ -13,7 +14,6 @@ describe('GcsPersistorTests', function() { const bucket = 'womBucket' const key = 'monKey' const destKey = 'donKey' - const objectSize = 5555 const genericError = new Error('guru meditation error') const filesSize = 33 const md5 = 'ffffffff00000000ffffffff00000000' @@ -24,8 +24,6 @@ describe('GcsPersistorTests', function() { Storage, Fs, GcsNotFoundError, - Meter, - MeteredStream, ReadStream, Stream, GcsBucket, @@ -71,7 +69,8 @@ describe('GcsPersistorTests', function() { } Stream = { - pipeline: sinon.stub().yields() + pipeline: sinon.stub().yields(), + Transform: StreamModule.Transform } Metrics = { @@ -109,18 +108,10 @@ describe('GcsPersistorTests', function() { FileNotFoundError = new Error('File not found') FileNotFoundError.code = 'ENOENT' - MeteredStream = { - type: 'metered', - on: sinon.stub(), - bytes: objectSize - } - MeteredStream.on.withArgs('finish').yields() - MeteredStream.on.withArgs('readable').yields() - Meter = sinon.stub().returns(MeteredStream) - Hash = { end: sinon.stub(), read: sinon.stub().returns(md5), + digest: sinon.stub().returns(md5), setEncoding: sinon.stub() } crypto = { @@ -139,7 +130,6 @@ describe('GcsPersistorTests', function() { 'tiny-async-pool': asyncPool, './Errors': Errors, fs: Fs, - 'stream-meter': Meter, stream: Stream, 'metrics-sharelatex': Metrics, crypto @@ -157,7 +147,7 @@ describe('GcsPersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.equal(MeteredStream) + expect(stream).to.be.instanceOf(StreamModule.Transform) }) it('fetches the right key from the right bucket', function() { @@ -169,13 +159,9 @@ describe('GcsPersistorTests', function() { it('pipes the stream through the meter', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - MeteredStream + sinon.match.instanceOf(StreamModule.Transform) ) }) - - it('records an ingress metric', function() { - expect(Metrics.count).to.have.been.calledWith('gcs.ingress', objectSize) - }) }) describe('when called with a byte range', function() { @@ -189,7 +175,7 @@ describe('GcsPersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.equal(MeteredStream) + expect(stream).to.be.instanceOf(StreamModule.Transform) }) it('passes the byte range on to GCS', function() { @@ -341,26 +327,16 @@ describe('GcsPersistorTests', function() { }) }) - it('should meter the stream', function() { + it('should meter the stream and pass it to GCS', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - MeteredStream - ) - }) - - it('should pipe the metered stream to GCS', function() { - expect(Stream.pipeline).to.have.been.calledWith( - MeteredStream, + sinon.match.instanceOf(StreamModule.Transform), WriteStream ) }) - it('should record an egress metric', function() { - expect(Metrics.count).to.have.been.calledWith('gcs.egress', objectSize) - }) - it('calculates the md5 hash of the file', function() { - expect(Stream.pipeline).to.have.been.calledWith(ReadStream, Hash) + expect(Hash.digest).to.have.been.called }) }) @@ -375,10 +351,7 @@ describe('GcsPersistorTests', function() { }) it('should not calculate the md5 hash of the file', function() { - expect(Stream.pipeline).not.to.have.been.calledWith( - sinon.match.any, - Hash - ) + expect(Hash.digest).not.to.have.been.called }) it('sends the hash in base64', function() { @@ -400,7 +373,12 @@ describe('GcsPersistorTests', function() { let error beforeEach(async function() { Stream.pipeline - .withArgs(MeteredStream, WriteStream, sinon.match.any) + .withArgs( + ReadStream, + sinon.match.instanceOf(StreamModule.Transform), + WriteStream, + sinon.match.any + ) .yields(genericError) try { await GcsPersistor.promises.sendStream(bucket, key, ReadStream) @@ -438,10 +416,7 @@ describe('GcsPersistorTests', function() { it('should upload the stream via the meter', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - MeteredStream - ) - expect(Stream.pipeline).to.have.been.calledWith( - MeteredStream, + sinon.match.instanceOf(StreamModule.Transform), WriteStream ) }) diff --git a/services/filestore/test/unit/js/S3PersistorTests.js b/services/filestore/test/unit/js/S3PersistorTests.js index 484a0209a8..2117164d74 100644 --- a/services/filestore/test/unit/js/S3PersistorTests.js +++ b/services/filestore/test/unit/js/S3PersistorTests.js @@ -3,6 +3,7 @@ const chai = require('chai') const { expect } = chai const modulePath = '../../../app/js/S3Persistor.js' const SandboxedModule = require('sandboxed-module') +const StreamModule = require('stream') const Errors = require('../../../app/js/Errors') @@ -32,8 +33,6 @@ describe('S3PersistorTests', function() { Logger, S3, Fs, - Meter, - MeteredStream, ReadStream, Stream, S3Persistor, @@ -63,7 +62,8 @@ describe('S3PersistorTests', function() { } Stream = { - pipeline: sinon.stub().yields() + pipeline: sinon.stub().yields(), + Transform: StreamModule.Transform } EmptyPromise = { @@ -89,14 +89,6 @@ describe('S3PersistorTests', function() { createReadStream: sinon.stub().returns(ReadStream) } - MeteredStream = { - type: 'metered', - on: sinon.stub(), - bytes: objectSize - } - MeteredStream.on.withArgs('finish').yields() - Meter = sinon.stub().returns(MeteredStream) - S3NotFoundError = new Error('not found') S3NotFoundError.code = 'NoSuchKey' @@ -136,6 +128,7 @@ describe('S3PersistorTests', function() { Hash = { end: sinon.stub(), read: sinon.stub().returns(md5), + digest: sinon.stub().returns(md5), setEncoding: sinon.stub() } crypto = { @@ -153,7 +146,6 @@ describe('S3PersistorTests', function() { 'logger-sharelatex': Logger, './Errors': Errors, fs: Fs, - 'stream-meter': Meter, stream: Stream, 'metrics-sharelatex': Metrics, crypto @@ -171,7 +163,7 @@ describe('S3PersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.equal(MeteredStream) + expect(stream).to.be.instanceOf(StreamModule.Transform) }) it('sets the AWS client up with credentials from settings', function() { @@ -188,13 +180,9 @@ describe('S3PersistorTests', function() { it('pipes the stream through the meter', function() { expect(Stream.pipeline).to.have.been.calledWith( S3ReadStream, - MeteredStream + sinon.match.instanceOf(StreamModule.Transform) ) }) - - it('records an ingress metric', function() { - expect(Metrics.count).to.have.been.calledWith('s3.ingress', objectSize) - }) }) describe('when called with a byte range', function() { @@ -208,7 +196,7 @@ describe('S3PersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.equal(MeteredStream) + expect(stream).to.be.instanceOf(Stream.Transform) }) it('passes the byte range on to S3', function() { @@ -242,7 +230,7 @@ describe('S3PersistorTests', function() { }) it('returns a metered stream', function() { - expect(stream).to.equal(MeteredStream) + expect(stream).to.be.instanceOf(Stream.Transform) }) it('sets the AWS client up with the alternative credentials', function() { @@ -457,7 +445,7 @@ describe('S3PersistorTests', function() { expect(S3Client.upload).to.have.been.calledWith({ Bucket: bucket, Key: key, - Body: MeteredStream + Body: sinon.match.instanceOf(Stream.Transform) }) }) @@ -470,16 +458,12 @@ describe('S3PersistorTests', function() { it('should meter the stream', function() { expect(Stream.pipeline).to.have.been.calledWith( ReadStream, - MeteredStream + sinon.match.instanceOf(Stream.Transform) ) }) - it('should record an egress metric', function() { - expect(Metrics.count).to.have.been.calledWith('s3.egress', objectSize) - }) - it('calculates the md5 hash of the file', function() { - expect(Stream.pipeline).to.have.been.calledWith(ReadStream, Hash) + expect(Hash.digest).to.have.been.called }) }) @@ -494,17 +478,14 @@ describe('S3PersistorTests', function() { }) it('should not calculate the md5 hash of the file', function() { - expect(Stream.pipeline).not.to.have.been.calledWith( - sinon.match.any, - Hash - ) + expect(Hash.digest).not.to.have.been.called }) it('sends the hash in base64', function() { expect(S3Client.upload).to.have.been.calledWith({ Bucket: bucket, Key: key, - Body: MeteredStream, + Body: sinon.match.instanceOf(StreamModule.Transform), ContentMD5: 'qqqqqru7u7uqqqqqu7u7uw==' }) }) @@ -555,12 +536,12 @@ describe('S3PersistorTests', function() { it('should meter the download', function() { expect(Stream.pipeline).to.have.been.calledWith( S3ReadStream, - MeteredStream + sinon.match.instanceOf(Stream.Transform) ) }) it('should calculate the md5 hash from the file', function() { - expect(Stream.pipeline).to.have.been.calledWith(MeteredStream, Hash) + expect(Hash.digest).to.have.been.called }) }) }) @@ -579,7 +560,7 @@ describe('S3PersistorTests', function() { expect(S3Client.upload).to.have.been.calledWith({ Bucket: bucket, Key: key, - Body: MeteredStream + Body: sinon.match.instanceOf(StreamModule.Transform) }) }) })