Support metadata when uploading objects

Add contentType and contentEncoding options to sendStream(). These
options will set the corresponding metadata on the object.

This changes the API. The fourth argument to sendStream() used to be the
source md5. Now, it's an options object and the source md5 is a property
of that object.
This commit is contained in:
Eric Mc Sween 2020-07-07 10:17:14 -04:00
parent cd8c8b2b7f
commit 523ff9c4cd
10 changed files with 95 additions and 41 deletions

View file

@ -37,7 +37,7 @@ const { Errors } = ObjectPersistor
#### sendStream #### sendStream
```JavaScript ```JavaScript
async function sendStream(bucketName, key, readStream, sourceMd5 = null) async function sendStream(bucketName, key, readStream, opts = {})
``` ```
Uploads a stream to the backend. Uploads a stream to the backend.
@ -45,7 +45,10 @@ Uploads a stream to the backend.
- `bucketName`: The name of the bucket to upload to - `bucketName`: The name of the bucket to upload to
- `key`: The key for the uploaded object - `key`: The key for the uploaded object
- `readStream`: The data stream to upload - `readStream`: The data stream to upload
- `sourceMd5` (optional): The md5 hash of the source data, if known. The uploaded data will be compared against this and the operation will fail if it does not match. If omitted, the md5 is calculated as the data is uploaded instead, and verified against the backend. - `opts` (optional):
- `sourceMd5`: The md5 hash of the source data, if known. The uploaded data will be compared against this and the operation will fail if it does not match. If omitted, the md5 is calculated as the data is uploaded instead, and verified against the backend.
- `contentType`: The content type to write in the object metadata
- `contentEncoding`: The content encoding to write in the object metadata
##### Notes ##### Notes

View file

@ -10,12 +10,12 @@ module.exports = class AbstractPersistor {
}) })
} }
async sendStream(location, target, sourceStream, sourceMd5) { async sendStream(location, target, sourceStream, opts = {}) {
throw new NotImplementedError('method not implemented in persistor', { throw new NotImplementedError('method not implemented in persistor', {
method: 'sendStream', method: 'sendStream',
location, location,
target, target,
sourceMd5 opts
}) })
} }

View file

@ -43,8 +43,9 @@ module.exports = class FSPersistor extends AbstractPersistor {
} }
} }
async sendStream(location, target, sourceStream, sourceMd5) { async sendStream(location, target, sourceStream, opts = {}) {
const fsPath = await this._writeStream(sourceStream) const fsPath = await this._writeStream(sourceStream)
let sourceMd5 = opts.sourceMd5
if (!sourceMd5) { if (!sourceMd5) {
sourceMd5 = await FSPersistor._getFileMd5HashForPath(fsPath) sourceMd5 = await FSPersistor._getFileMd5HashForPath(fsPath)
} }

View file

@ -39,7 +39,7 @@ module.exports = class GcsPersistor extends AbstractPersistor {
return this.sendStream(bucketName, key, fs.createReadStream(fsPath)) return this.sendStream(bucketName, key, fs.createReadStream(fsPath))
} }
async sendStream(bucketName, key, readStream, sourceMd5) { async sendStream(bucketName, key, readStream, opts = {}) {
try { try {
// egress from us to gcs // egress from us to gcs
const observeOptions = { const observeOptions = {
@ -47,6 +47,7 @@ module.exports = class GcsPersistor extends AbstractPersistor {
Metrics: this.settings.Metrics Metrics: this.settings.Metrics
} }
let sourceMd5 = opts.sourceMd5
if (!sourceMd5) { if (!sourceMd5) {
// if there is no supplied md5 hash, we calculate the hash as the data passes through // if there is no supplied md5 hash, we calculate the hash as the data passes through
observeOptions.hash = 'md5' observeOptions.hash = 'md5'
@ -61,9 +62,16 @@ module.exports = class GcsPersistor extends AbstractPersistor {
if (sourceMd5) { if (sourceMd5) {
writeOptions.validation = 'md5' writeOptions.validation = 'md5'
writeOptions.metadata = { writeOptions.metadata = writeOptions.metadata || {}
md5Hash: PersistorHelper.hexToBase64(sourceMd5) writeOptions.metadata.md5Hash = PersistorHelper.hexToBase64(sourceMd5)
} }
if (opts.contentType) {
writeOptions.metadata = writeOptions.metadata || {}
writeOptions.metadata.contentType = opts.contentType
}
if (opts.contentEncoding) {
writeOptions.metadata = writeOptions.metadata || {}
writeOptions.metadata.contentEncoding = opts.contentEncoding
} }
const uploadStream = this.storage const uploadStream = this.storage

View file

@ -172,12 +172,9 @@ module.exports = class MigrationPersistor extends AbstractPersistor {
Logger.warn(err, 'error getting md5 hash from fallback persistor') Logger.warn(err, 'error getting md5 hash from fallback persistor')
} }
await this.primaryPersistor.sendStream( await this.primaryPersistor.sendStream(destBucket, destKey, stream, {
destBucket,
destKey,
stream,
sourceMd5 sourceMd5
) })
} catch (err) { } catch (err) {
const error = new WriteError( const error = new WriteError(
'unable to copy file to destination persistor', 'unable to copy file to destination persistor',

View file

@ -31,7 +31,7 @@ module.exports = class S3Persistor extends AbstractPersistor {
return this.sendStream(bucketName, key, fs.createReadStream(fsPath)) return this.sendStream(bucketName, key, fs.createReadStream(fsPath))
} }
async sendStream(bucketName, key, readStream, sourceMd5) { async sendStream(bucketName, key, readStream, opts = {}) {
try { try {
// egress from us to S3 // egress from us to S3
const observeOptions = { const observeOptions = {
@ -50,11 +50,18 @@ module.exports = class S3Persistor extends AbstractPersistor {
Body: observer Body: observer
} }
if (opts.contentType) {
uploadOptions.ContentType = opts.contentType
}
if (opts.contentEncoding) {
uploadOptions.ContentEncoding = opts.contentEncoding
}
// if we have an md5 hash, pass this to S3 to verify the upload - otherwise // if we have an md5 hash, pass this to S3 to verify the upload - otherwise
// we rely on the S3 client's checksum calculation to validate the upload // we rely on the S3 client's checksum calculation to validate the upload
const clientOptions = {} const clientOptions = {}
if (sourceMd5) { if (opts.sourceMd5) {
uploadOptions.ContentMD5 = PersistorHelper.hexToBase64(sourceMd5) uploadOptions.ContentMD5 = PersistorHelper.hexToBase64(opts.sourceMd5)
} else { } else {
clientOptions.computeChecksums = true clientOptions.computeChecksums = true
} }

View file

@ -113,7 +113,9 @@ describe('FSPersistorTests', function () {
describe('when the md5 hash does not match', function () { describe('when the md5 hash does not match', function () {
it('should return a write error', async function () { it('should return a write error', async function () {
await expect( await expect(
FSPersistor.sendStream(location, files[0], remoteStream, '00000000') FSPersistor.sendStream(location, files[0], remoteStream, {
sourceMd5: '00000000'
})
) )
.to.eventually.be.rejected.and.be.an.instanceOf(Errors.WriteError) .to.eventually.be.rejected.and.be.an.instanceOf(Errors.WriteError)
.and.have.property('message', 'md5 hash mismatch') .and.have.property('message', 'md5 hash mismatch')
@ -121,12 +123,9 @@ describe('FSPersistorTests', function () {
it('deletes the copied file', async function () { it('deletes the copied file', async function () {
try { try {
await FSPersistor.sendStream( await FSPersistor.sendStream(location, files[0], remoteStream, {
location, sourceMd5: '00000000'
files[0], })
remoteStream,
'00000000'
)
} catch (_) {} } catch (_) {}
expect(fs.unlink).to.have.been.calledWith( expect(fs.unlink).to.have.been.calledWith(
`${location}/${filteredFilenames[0]}` `${location}/${filteredFilenames[0]}`

View file

@ -361,12 +361,9 @@ describe('GcsPersistorTests', function () {
describe('when a hash is supplied', function () { describe('when a hash is supplied', function () {
beforeEach(async function () { beforeEach(async function () {
return GcsPersistor.sendStream( return GcsPersistor.sendStream(bucket, key, ReadStream, {
bucket, sourceMd5: 'aaaaaaaabbbbbbbbaaaaaaaabbbbbbbb'
key, })
ReadStream,
'aaaaaaaabbbbbbbbaaaaaaaabbbbbbbb'
)
}) })
it('should not calculate the md5 hash of the file', function () { it('should not calculate the md5 hash of the file', function () {
@ -388,6 +385,25 @@ describe('GcsPersistorTests', function () {
}) })
}) })
describe('when metadata is supplied', function () {
const contentType = 'text/csv'
const contentEncoding = 'gzip'
beforeEach(async function () {
return GcsPersistor.sendStream(bucket, key, ReadStream, {
contentType,
contentEncoding
})
})
it('should send the metadata to GCS', function () {
expect(GcsFile.createWriteStream).to.have.been.calledWith({
metadata: { contentType, contentEncoding },
resumable: false
})
})
})
describe('when the upload fails', function () { describe('when the upload fails', function () {
let error let error
beforeEach(async function () { beforeEach(async function () {

View file

@ -195,11 +195,13 @@ describe('MigrationPersistorTests', function () {
}) })
it('should send a stream to the primary', function () { it('should send a stream to the primary', function () {
expect(primaryPersistor.sendStream).to.have.been.calledWithExactly( expect(
primaryPersistor.sendStream
).to.have.been.calledWithExactly(
bucket, bucket,
key, key,
sinon.match.instanceOf(Stream.PassThrough), sinon.match.instanceOf(Stream.PassThrough),
md5 { sourceMd5: md5 }
) )
}) })
@ -474,11 +476,13 @@ describe('MigrationPersistorTests', function () {
}) })
it('should send the file to the primary', function () { it('should send the file to the primary', function () {
expect(primaryPersistor.sendStream).to.have.been.calledWithExactly( expect(
primaryPersistor.sendStream
).to.have.been.calledWithExactly(
bucket, bucket,
destKey, destKey,
sinon.match.instanceOf(Stream.PassThrough), sinon.match.instanceOf(Stream.PassThrough),
md5 { sourceMd5: md5 }
) )
}) })
}) })

View file

@ -448,14 +448,11 @@ describe('S3PersistorTests', function () {
}) })
}) })
describe('when a hash is supploed', function () { describe('when a hash is supplied', function () {
beforeEach(async function () { beforeEach(async function () {
return S3Persistor.sendStream( return S3Persistor.sendStream(bucket, key, ReadStream, {
bucket, sourceMd5: 'aaaaaaaabbbbbbbbaaaaaaaabbbbbbbb'
key, })
ReadStream,
'aaaaaaaabbbbbbbbaaaaaaaabbbbbbbb'
)
}) })
it('sends the hash in base64', function () { it('sends the hash in base64', function () {
@ -468,6 +465,28 @@ describe('S3PersistorTests', function () {
}) })
}) })
describe('when metadata is supplied', function () {
const contentType = 'text/csv'
const contentEncoding = 'gzip'
beforeEach(async function () {
return S3Persistor.sendStream(bucket, key, ReadStream, {
contentType,
contentEncoding
})
})
it('sends the metadata to S3', function () {
expect(S3Client.upload).to.have.been.calledWith({
Bucket: bucket,
Key: key,
Body: sinon.match.instanceOf(Transform),
ContentType: contentType,
ContentEncoding: contentEncoding
})
})
})
describe('when the upload fails', function () { describe('when the upload fails', function () {
let error let error
beforeEach(async function () { beforeEach(async function () {