diff --git a/libraries/object-persistor/README.md b/libraries/object-persistor/README.md index dd14d2d485..add2e2a38d 100644 --- a/libraries/object-persistor/README.md +++ b/libraries/object-persistor/README.md @@ -37,7 +37,7 @@ const { Errors } = ObjectPersistor #### sendStream ```JavaScript -async function sendStream(bucketName, key, readStream, sourceMd5 = null) +async function sendStream(bucketName, key, readStream, opts = {}) ``` Uploads a stream to the backend. @@ -45,7 +45,10 @@ Uploads a stream to the backend. - `bucketName`: The name of the bucket to upload to - `key`: The key for the uploaded object - `readStream`: The data stream to upload -- `sourceMd5` (optional): The md5 hash of the source data, if known. The uploaded data will be compared against this and the operation will fail if it does not match. If omitted, the md5 is calculated as the data is uploaded instead, and verified against the backend. +- `opts` (optional): + - `sourceMd5`: The md5 hash of the source data, if known. The uploaded data will be compared against this and the operation will fail if it does not match. If omitted, the md5 is calculated as the data is uploaded instead, and verified against the backend. + - `contentType`: The content type to write in the object metadata + - `contentEncoding`: The content encoding to write in the object metadata ##### Notes diff --git a/libraries/object-persistor/src/AbstractPersistor.js b/libraries/object-persistor/src/AbstractPersistor.js index 84d76afe85..705b03fcef 100644 --- a/libraries/object-persistor/src/AbstractPersistor.js +++ b/libraries/object-persistor/src/AbstractPersistor.js @@ -10,12 +10,12 @@ module.exports = class AbstractPersistor { }) } - async sendStream(location, target, sourceStream, sourceMd5) { + async sendStream(location, target, sourceStream, opts = {}) { throw new NotImplementedError('method not implemented in persistor', { method: 'sendStream', location, target, - sourceMd5 + opts }) } diff --git a/libraries/object-persistor/src/FSPersistor.js b/libraries/object-persistor/src/FSPersistor.js index 228fe88cee..0ff9390d80 100644 --- a/libraries/object-persistor/src/FSPersistor.js +++ b/libraries/object-persistor/src/FSPersistor.js @@ -43,8 +43,9 @@ module.exports = class FSPersistor extends AbstractPersistor { } } - async sendStream(location, target, sourceStream, sourceMd5) { + async sendStream(location, target, sourceStream, opts = {}) { const fsPath = await this._writeStream(sourceStream) + let sourceMd5 = opts.sourceMd5 if (!sourceMd5) { sourceMd5 = await FSPersistor._getFileMd5HashForPath(fsPath) } diff --git a/libraries/object-persistor/src/GcsPersistor.js b/libraries/object-persistor/src/GcsPersistor.js index 2783e3e8f2..fc9cd111fb 100644 --- a/libraries/object-persistor/src/GcsPersistor.js +++ b/libraries/object-persistor/src/GcsPersistor.js @@ -39,7 +39,7 @@ module.exports = class GcsPersistor extends AbstractPersistor { return this.sendStream(bucketName, key, fs.createReadStream(fsPath)) } - async sendStream(bucketName, key, readStream, sourceMd5) { + async sendStream(bucketName, key, readStream, opts = {}) { try { // egress from us to gcs const observeOptions = { @@ -47,6 +47,7 @@ module.exports = class GcsPersistor extends AbstractPersistor { Metrics: this.settings.Metrics } + let sourceMd5 = opts.sourceMd5 if (!sourceMd5) { // if there is no supplied md5 hash, we calculate the hash as the data passes through observeOptions.hash = 'md5' @@ -61,9 +62,16 @@ module.exports = class GcsPersistor extends AbstractPersistor { if (sourceMd5) { writeOptions.validation = 'md5' - writeOptions.metadata = { - md5Hash: PersistorHelper.hexToBase64(sourceMd5) - } + writeOptions.metadata = writeOptions.metadata || {} + writeOptions.metadata.md5Hash = PersistorHelper.hexToBase64(sourceMd5) + } + if (opts.contentType) { + writeOptions.metadata = writeOptions.metadata || {} + writeOptions.metadata.contentType = opts.contentType + } + if (opts.contentEncoding) { + writeOptions.metadata = writeOptions.metadata || {} + writeOptions.metadata.contentEncoding = opts.contentEncoding } const uploadStream = this.storage diff --git a/libraries/object-persistor/src/MigrationPersistor.js b/libraries/object-persistor/src/MigrationPersistor.js index 4f0e64ce97..15c4c756ac 100644 --- a/libraries/object-persistor/src/MigrationPersistor.js +++ b/libraries/object-persistor/src/MigrationPersistor.js @@ -172,12 +172,9 @@ module.exports = class MigrationPersistor extends AbstractPersistor { Logger.warn(err, 'error getting md5 hash from fallback persistor') } - await this.primaryPersistor.sendStream( - destBucket, - destKey, - stream, + await this.primaryPersistor.sendStream(destBucket, destKey, stream, { sourceMd5 - ) + }) } catch (err) { const error = new WriteError( 'unable to copy file to destination persistor', diff --git a/libraries/object-persistor/src/S3Persistor.js b/libraries/object-persistor/src/S3Persistor.js index e352693b65..fcf6c7b099 100644 --- a/libraries/object-persistor/src/S3Persistor.js +++ b/libraries/object-persistor/src/S3Persistor.js @@ -31,7 +31,7 @@ module.exports = class S3Persistor extends AbstractPersistor { return this.sendStream(bucketName, key, fs.createReadStream(fsPath)) } - async sendStream(bucketName, key, readStream, sourceMd5) { + async sendStream(bucketName, key, readStream, opts = {}) { try { // egress from us to S3 const observeOptions = { @@ -50,11 +50,18 @@ module.exports = class S3Persistor extends AbstractPersistor { Body: observer } + if (opts.contentType) { + uploadOptions.ContentType = opts.contentType + } + if (opts.contentEncoding) { + uploadOptions.ContentEncoding = opts.contentEncoding + } + // if we have an md5 hash, pass this to S3 to verify the upload - otherwise // we rely on the S3 client's checksum calculation to validate the upload const clientOptions = {} - if (sourceMd5) { - uploadOptions.ContentMD5 = PersistorHelper.hexToBase64(sourceMd5) + if (opts.sourceMd5) { + uploadOptions.ContentMD5 = PersistorHelper.hexToBase64(opts.sourceMd5) } else { clientOptions.computeChecksums = true } diff --git a/libraries/object-persistor/test/unit/FSPersistorTests.js b/libraries/object-persistor/test/unit/FSPersistorTests.js index 4c8594551d..f7c9e8a816 100644 --- a/libraries/object-persistor/test/unit/FSPersistorTests.js +++ b/libraries/object-persistor/test/unit/FSPersistorTests.js @@ -113,7 +113,9 @@ describe('FSPersistorTests', function () { describe('when the md5 hash does not match', function () { it('should return a write error', async function () { await expect( - FSPersistor.sendStream(location, files[0], remoteStream, '00000000') + FSPersistor.sendStream(location, files[0], remoteStream, { + sourceMd5: '00000000' + }) ) .to.eventually.be.rejected.and.be.an.instanceOf(Errors.WriteError) .and.have.property('message', 'md5 hash mismatch') @@ -121,12 +123,9 @@ describe('FSPersistorTests', function () { it('deletes the copied file', async function () { try { - await FSPersistor.sendStream( - location, - files[0], - remoteStream, - '00000000' - ) + await FSPersistor.sendStream(location, files[0], remoteStream, { + sourceMd5: '00000000' + }) } catch (_) {} expect(fs.unlink).to.have.been.calledWith( `${location}/${filteredFilenames[0]}` diff --git a/libraries/object-persistor/test/unit/GcsPersistorTests.js b/libraries/object-persistor/test/unit/GcsPersistorTests.js index fae4944197..7d98053ccd 100644 --- a/libraries/object-persistor/test/unit/GcsPersistorTests.js +++ b/libraries/object-persistor/test/unit/GcsPersistorTests.js @@ -361,12 +361,9 @@ describe('GcsPersistorTests', function () { describe('when a hash is supplied', function () { beforeEach(async function () { - return GcsPersistor.sendStream( - bucket, - key, - ReadStream, - 'aaaaaaaabbbbbbbbaaaaaaaabbbbbbbb' - ) + return GcsPersistor.sendStream(bucket, key, ReadStream, { + sourceMd5: 'aaaaaaaabbbbbbbbaaaaaaaabbbbbbbb' + }) }) it('should not calculate the md5 hash of the file', function () { @@ -388,6 +385,25 @@ describe('GcsPersistorTests', function () { }) }) + describe('when metadata is supplied', function () { + const contentType = 'text/csv' + const contentEncoding = 'gzip' + + beforeEach(async function () { + return GcsPersistor.sendStream(bucket, key, ReadStream, { + contentType, + contentEncoding + }) + }) + + it('should send the metadata to GCS', function () { + expect(GcsFile.createWriteStream).to.have.been.calledWith({ + metadata: { contentType, contentEncoding }, + resumable: false + }) + }) + }) + describe('when the upload fails', function () { let error beforeEach(async function () { diff --git a/libraries/object-persistor/test/unit/MigrationPersistorTests.js b/libraries/object-persistor/test/unit/MigrationPersistorTests.js index 137f1525b7..04293cabc1 100644 --- a/libraries/object-persistor/test/unit/MigrationPersistorTests.js +++ b/libraries/object-persistor/test/unit/MigrationPersistorTests.js @@ -195,11 +195,13 @@ describe('MigrationPersistorTests', function () { }) it('should send a stream to the primary', function () { - expect(primaryPersistor.sendStream).to.have.been.calledWithExactly( + expect( + primaryPersistor.sendStream + ).to.have.been.calledWithExactly( bucket, key, sinon.match.instanceOf(Stream.PassThrough), - md5 + { sourceMd5: md5 } ) }) @@ -474,11 +476,13 @@ describe('MigrationPersistorTests', function () { }) it('should send the file to the primary', function () { - expect(primaryPersistor.sendStream).to.have.been.calledWithExactly( + expect( + primaryPersistor.sendStream + ).to.have.been.calledWithExactly( bucket, destKey, sinon.match.instanceOf(Stream.PassThrough), - md5 + { sourceMd5: md5 } ) }) }) diff --git a/libraries/object-persistor/test/unit/S3PersistorTests.js b/libraries/object-persistor/test/unit/S3PersistorTests.js index 96c3bfac10..02a78981c5 100644 --- a/libraries/object-persistor/test/unit/S3PersistorTests.js +++ b/libraries/object-persistor/test/unit/S3PersistorTests.js @@ -448,14 +448,11 @@ describe('S3PersistorTests', function () { }) }) - describe('when a hash is supploed', function () { + describe('when a hash is supplied', function () { beforeEach(async function () { - return S3Persistor.sendStream( - bucket, - key, - ReadStream, - 'aaaaaaaabbbbbbbbaaaaaaaabbbbbbbb' - ) + return S3Persistor.sendStream(bucket, key, ReadStream, { + sourceMd5: 'aaaaaaaabbbbbbbbaaaaaaaabbbbbbbb' + }) }) it('sends the hash in base64', function () { @@ -468,6 +465,28 @@ describe('S3PersistorTests', function () { }) }) + describe('when metadata is supplied', function () { + const contentType = 'text/csv' + const contentEncoding = 'gzip' + + beforeEach(async function () { + return S3Persistor.sendStream(bucket, key, ReadStream, { + contentType, + contentEncoding + }) + }) + + it('sends the metadata to S3', function () { + expect(S3Client.upload).to.have.been.calledWith({ + Bucket: bucket, + Key: key, + Body: sinon.match.instanceOf(Transform), + ContentType: contentType, + ContentEncoding: contentEncoding + }) + }) + }) + describe('when the upload fails', function () { let error beforeEach(async function () {