const http = require('http') const https = require('https') if (http.globalAgent.maxSockets < 300) { http.globalAgent.maxSockets = 300 } if (https.globalAgent.maxSockets < 300) { https.globalAgent.maxSockets = 300 } const AbstractPersistor = require('./AbstractPersistor') const PersistorHelper = require('./PersistorHelper') const fs = require('fs') const S3 = require('aws-sdk/clients/s3') const { URL } = require('url') const { WriteError, ReadError, NotFoundError, SettingsError, } = require('./Errors') module.exports = class S3Persistor extends AbstractPersistor { constructor(settings = {}) { super() this.settings = settings } async sendFile(bucketName, key, fsPath) { return this.sendStream(bucketName, key, fs.createReadStream(fsPath)) } async sendStream(bucketName, key, readStream, opts = {}) { try { // egress from us to S3 const observeOptions = { metric: 's3.egress', Metrics: this.settings.Metrics, } const observer = new PersistorHelper.ObserverStream(observeOptions) // observer will catch errors, clean up and log a warning readStream.pipe(observer) // if we have an md5 hash, pass this to S3 to verify the upload const uploadOptions = { Bucket: bucketName, Key: key, Body: observer, } if (opts.contentType) { uploadOptions.ContentType = opts.contentType } if (opts.contentEncoding) { uploadOptions.ContentEncoding = opts.contentEncoding } // if we have an md5 hash, pass this to S3 to verify the upload - otherwise // we rely on the S3 client's checksum calculation to validate the upload const clientOptions = {} if (opts.sourceMd5) { uploadOptions.ContentMD5 = PersistorHelper.hexToBase64(opts.sourceMd5) } else { clientOptions.computeChecksums = true } await this._getClientForBucket(bucketName, clientOptions) .upload(uploadOptions, { partSize: this.settings.partSize }) .promise() } catch (err) { throw PersistorHelper.wrapError( err, 'upload to S3 failed', { bucketName, key }, WriteError ) } } async getObjectStream(bucketName, key, opts) { opts = opts || {} const params = { Bucket: bucketName, Key: key, } if (opts.start != null && opts.end != null) { params.Range = `bytes=${opts.start}-${opts.end}` } const stream = this._getClientForBucket(bucketName) .getObject(params) .createReadStream() // ingress from S3 to us const observer = new PersistorHelper.ObserverStream({ metric: 's3.ingress', Metrics: this.settings.Metrics, }) try { // wait for the pipeline to be ready, to catch non-200s await PersistorHelper.getReadyPipeline(stream, observer) return observer } catch (err) { throw PersistorHelper.wrapError( err, 'error reading file from S3', { bucketName, key, opts }, ReadError ) } } async getRedirectUrl(bucketName, key) { const expiresSeconds = Math.round(this.settings.signedUrlExpiryInMs / 1000) try { const url = await this._getClientForBucket( bucketName ).getSignedUrlPromise('getObject', { Bucket: bucketName, Key: key, Expires: expiresSeconds, }) return url } catch (err) { throw PersistorHelper.wrapError( err, 'error generating signed url for S3 file', { bucketName, key }, ReadError ) } } async deleteDirectory(bucketName, key, continuationToken) { let response const options = { Bucket: bucketName, Prefix: key } if (continuationToken) { options.ContinuationToken = continuationToken } try { response = await this._getClientForBucket(bucketName) .listObjectsV2(options) .promise() } catch (err) { throw PersistorHelper.wrapError( err, 'failed to list objects in S3', { bucketName, key }, ReadError ) } const objects = response.Contents.map(item => ({ Key: item.Key })) if (objects.length) { try { await this._getClientForBucket(bucketName) .deleteObjects({ Bucket: bucketName, Delete: { Objects: objects, Quiet: true, }, }) .promise() } catch (err) { throw PersistorHelper.wrapError( err, 'failed to delete objects in S3', { bucketName, key }, WriteError ) } } if (response.IsTruncated) { await this.deleteDirectory( bucketName, key, response.NextContinuationToken ) } } async getObjectSize(bucketName, key) { try { const response = await this._getClientForBucket(bucketName) .headObject({ Bucket: bucketName, Key: key }) .promise() return response.ContentLength } catch (err) { throw PersistorHelper.wrapError( err, 'error getting size of s3 object', { bucketName, key }, ReadError ) } } async getObjectMd5Hash(bucketName, key) { try { const response = await this._getClientForBucket(bucketName) .headObject({ Bucket: bucketName, Key: key }) .promise() const md5 = S3Persistor._md5FromResponse(response) if (md5) { return md5 } // etag is not in md5 format if (this.settings.Metrics) { this.settings.Metrics.inc('s3.md5Download') } return PersistorHelper.calculateStreamMd5( await this.getObjectStream(bucketName, key) ) } catch (err) { throw PersistorHelper.wrapError( err, 'error getting hash of s3 object', { bucketName, key }, ReadError ) } } async deleteObject(bucketName, key) { try { await this._getClientForBucket(bucketName) .deleteObject({ Bucket: bucketName, Key: key }) .promise() } catch (err) { // s3 does not give us a NotFoundError here throw PersistorHelper.wrapError( err, 'failed to delete file in S3', { bucketName, key }, WriteError ) } } async copyObject(bucketName, sourceKey, destKey) { const params = { Bucket: bucketName, Key: destKey, CopySource: `${bucketName}/${sourceKey}`, } try { await this._getClientForBucket(bucketName).copyObject(params).promise() } catch (err) { throw PersistorHelper.wrapError( err, 'failed to copy file in S3', params, WriteError ) } } async checkIfObjectExists(bucketName, key) { try { await this.getObjectSize(bucketName, key) return true } catch (err) { if (err instanceof NotFoundError) { return false } throw PersistorHelper.wrapError( err, 'error checking whether S3 object exists', { bucketName, key }, ReadError ) } } async directorySize(bucketName, key, continuationToken) { try { const options = { Bucket: bucketName, Prefix: key, } if (continuationToken) { options.ContinuationToken = continuationToken } const response = await this._getClientForBucket(bucketName) .listObjectsV2(options) .promise() const size = response.Contents.reduce((acc, item) => item.Size + acc, 0) if (response.IsTruncated) { return ( size + (await this.directorySize( bucketName, key, response.NextContinuationToken )) ) } return size } catch (err) { throw PersistorHelper.wrapError( err, 'error getting directory size in S3', { bucketName, key }, ReadError ) } } _getClientForBucket(bucket, clientOptions) { if (this.settings.bucketCreds && this.settings.bucketCreds[bucket]) { return new S3( this._buildClientOptions( this.settings.bucketCreds[bucket], clientOptions ) ) } // no specific credentials for the bucket if (this.settings.key) { return new S3(this._buildClientOptions(null, clientOptions)) } throw new SettingsError( 'no bucket-specific or default credentials provided', { bucket } ) } _buildClientOptions(bucketCredentials, clientOptions) { const options = clientOptions || {} if (bucketCredentials) { options.credentials = { accessKeyId: bucketCredentials.auth_key, secretAccessKey: bucketCredentials.auth_secret, } } else { options.credentials = { accessKeyId: this.settings.key, secretAccessKey: this.settings.secret, } } if (this.settings.endpoint) { const endpoint = new URL(this.settings.endpoint) options.endpoint = this.settings.endpoint options.sslEnabled = endpoint.protocol === 'https' } // path-style access is only used for acceptance tests if (this.settings.pathStyle) { options.s3ForcePathStyle = true } for (const opt of ['httpOptions', 'maxRetries']) { if (this.settings[opt]) { options[opt] = this.settings[opt] } } return options } static _md5FromResponse(response) { const md5 = (response.ETag || '').replace(/[ "]/g, '') if (!md5.match(/^[a-f0-9]{32}$/)) { return null } return md5 } }