Cleanup and refactor S3PersistorManager to use aws-sdk only

This commit is contained in:
Simon Detheridge 2020-01-03 18:22:08 +00:00
parent 473aea4e60
commit 2ca74fdf15
4 changed files with 896 additions and 879 deletions

View file

@ -1,376 +1,258 @@
/* eslint-disable
handle-callback-err,
new-cap,
no-return-assign,
no-unused-vars,
node/no-deprecated-api,
standard/no-callback-literal,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS102: Remove unnecessary code created because of implicit returns
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
// This module is the one which is used in production. It needs to be migrated
// to use aws-sdk throughout, see the comments in AWSSDKPersistorManager for
// details. The knox library is unmaintained and has bugs.
const http = require('http')
http.globalAgent.maxSockets = 300
const https = require('https')
http.globalAgent.maxSockets = 300
https.globalAgent.maxSockets = 300
const settings = require('settings-sharelatex')
const request = require('request')
const logger = require('logger-sharelatex')
const metrics = require('metrics-sharelatex')
const meter = require('stream-meter')
const fs = require('fs')
const knox = require('knox')
const path = require('path')
const LocalFileWriter = require('./LocalFileWriter')
const Errors = require('./Errors')
const _ = require('underscore')
const awsS3 = require('aws-sdk/clients/s3')
const URL = require('url')
const S3 = require('aws-sdk/clients/s3')
const { URL } = require('url')
const { callbackify } = require('util')
const { WriteError, ReadError, NotFoundError } = require('./Errors')
const thirtySeconds = 30 * 1000
const buildDefaultOptions = function(bucketName, method, key) {
let endpoint
if (settings.filestore.s3.endpoint) {
endpoint = `${settings.filestore.s3.endpoint}/${bucketName}`
} else {
endpoint = `https://${bucketName}.s3.amazonaws.com`
}
return {
aws: {
key: settings.filestore.s3.key,
secret: settings.filestore.s3.secret,
bucket: bucketName
},
method,
timeout: thirtySeconds,
uri: `${endpoint}/${key}`
module.exports = {
sendFile: callbackify(sendFile),
sendStream: callbackify(sendStream),
getFileStream: callbackify(getFileStream),
deleteDirectory: callbackify(deleteDirectory),
getFileSize: callbackify(getFileSize),
deleteFile: callbackify(deleteFile),
copyFile: callbackify(copyFile),
checkIfFileExists: callbackify(checkIfFileExists),
getDirectorySize: callbackify(getDirectorySize),
promises: {
sendFile,
sendStream,
getFileStream,
deleteDirectory,
getFileSize,
deleteFile,
copyFile,
checkIfFileExists,
getDirectorySize
}
}
const getS3Options = function(credentials) {
const _client = new S3(_defaultOptions())
async function sendFile(bucketName, key, fsPath) {
let readStream
try {
readStream = fs.createReadStream(fsPath)
} catch (err) {
throw _wrapError(
err,
'error reading file from disk',
{ bucketName, key, fsPath },
ReadError
)
}
return sendStream(bucketName, key, readStream)
}
async function sendStream(bucketName, key, readStream) {
try {
const meteredStream = meter()
meteredStream.on('finish', () => {
metrics.count('s3.egress', meteredStream.bytes)
})
const response = await _client
.upload({
Bucket: bucketName,
Key: key,
Body: readStream.pipe(meteredStream)
})
.promise()
logger.log({ response, bucketName, key }, 'data uploaded to s3')
} catch (err) {
throw _wrapError(
err,
'upload to S3 failed',
{ bucketName, key },
WriteError
)
}
}
async function getFileStream(bucketName, key, opts) {
opts = opts || {}
const params = {
Bucket: bucketName,
Key: key
}
if (opts.start != null && opts.end != null) {
params.Range = `bytes=${opts.start}-${opts.end}`
}
return new Promise((resolve, reject) => {
const stream = _client.getObject(params).createReadStream()
const meteredStream = meter()
meteredStream.on('finish', () => {
metrics.count('s3.ingress', meteredStream.bytes)
})
const onStreamReady = function() {
stream.removeListener('readable', onStreamReady)
resolve(stream.pipe(meteredStream))
}
stream.on('readable', onStreamReady)
stream.on('error', err => {
reject(_wrapError(err, 'error reading from S3', params, ReadError))
})
})
}
async function deleteDirectory(bucketName, key) {
logger.log({ key, bucketName }, 'deleting directory')
let response
try {
response = await _client
.listObjects({ Bucket: bucketName, Prefix: key })
.promise()
} catch (err) {
throw _wrapError(
err,
'failed to list objects in S3',
{ bucketName, key },
ReadError
)
}
const objects = response.Contents.map(item => ({ Key: item.Key }))
if (objects.length) {
try {
await _client
.deleteObjects({
Bucket: bucketName,
Delete: {
Objects: objects,
Quiet: true
}
})
.promise()
} catch (err) {
throw _wrapError(
err,
'failed to delete objects in S3',
{ bucketName, key },
WriteError
)
}
}
}
async function getFileSize(bucketName, key) {
try {
const response = await _client
.headObject({ Bucket: bucketName, Key: key })
.promise()
return response.ContentLength
} catch (err) {
throw _wrapError(
err,
'error getting size of s3 object',
{ bucketName, key },
ReadError
)
}
}
async function deleteFile(bucketName, key) {
try {
await _client.deleteObject({ Bucket: bucketName, Key: key }).promise()
} catch (err) {
throw _wrapError(
err,
'failed to delete file in S3',
{ bucketName, key },
WriteError
)
}
}
async function copyFile(bucketName, sourceKey, destKey) {
const params = {
Bucket: bucketName,
Key: destKey,
CopySource: `${bucketName}/${sourceKey}`
}
try {
await _client.copyObject(params).promise()
} catch (err) {
throw _wrapError(err, 'failed to copy file in S3', params, WriteError)
}
}
async function checkIfFileExists(bucketName, key) {
try {
await getFileSize(bucketName, key)
return true
} catch (err) {
if (err instanceof NotFoundError) {
return false
}
throw _wrapError(
err,
'error checking whether S3 object exists',
{ bucketName, key },
ReadError
)
}
}
async function getDirectorySize(bucketName, key) {
try {
const response = await _client
.listObjects({ Bucket: bucketName, Prefix: key })
.promise()
return response.Contents.reduce((acc, item) => item.Size + acc, 0)
} catch (err) {
throw _wrapError(
err,
'error getting directory size in S3',
{ bucketName, key },
ReadError
)
}
}
function _wrapError(error, message, params, ErrorType) {
if (['NoSuchKey', 'NotFound', 'ENOENT'].includes(error.code)) {
return new NotFoundError({
message: 'no such file',
info: params
}).withCause(error)
} else {
return new ErrorType({
message: message,
info: params
}).withCause(error)
}
}
function _defaultOptions() {
const options = {
credentials: {
accessKeyId: credentials.auth_key,
secretAccessKey: credentials.auth_secret
accessKeyId: settings.filestore.s3.key,
secretAccessKey: settings.filestore.s3.secret
}
}
if (settings.filestore.s3.endpoint) {
const endpoint = URL.parse(settings.filestore.s3.endpoint)
const endpoint = new URL(settings.filestore.s3.endpoint)
options.endpoint = settings.filestore.s3.endpoint
options.sslEnabled = endpoint.protocol === 'https'
}
return options
}
const defaultS3Client = new awsS3(
getS3Options({
auth_key: settings.filestore.s3.key,
auth_secret: settings.filestore.s3.secret
})
)
const getS3Client = function(credentials) {
if (credentials != null) {
return new awsS3(getS3Options(credentials))
} else {
return defaultS3Client
}
}
const getKnoxClient = bucketName => {
const options = {
key: settings.filestore.s3.key,
secret: settings.filestore.s3.secret,
bucket: bucketName
}
if (settings.filestore.s3.endpoint) {
const endpoint = URL.parse(settings.filestore.s3.endpoint)
options.endpoint = endpoint.hostname
options.port = endpoint.port
}
return knox.createClient(options)
}
module.exports = {
sendFile(bucketName, key, fsPath, callback) {
const s3Client = getKnoxClient(bucketName)
let uploaded = 0
const putEventEmiter = s3Client.putFile(fsPath, key, function(err, res) {
metrics.count('s3.egress', uploaded)
if (err != null) {
logger.err(
{ err, bucketName, key, fsPath },
'something went wrong uploading file to s3'
)
return callback(err)
}
if (res == null) {
logger.err(
{ err, res, bucketName, key, fsPath },
'no response from s3 put file'
)
return callback('no response from put file')
}
if (res.statusCode !== 200) {
logger.err(
{ bucketName, key, fsPath },
'non 200 response from s3 putting file'
)
return callback('non 200 response from s3 on put file')
}
logger.log({ res, bucketName, key, fsPath }, 'file uploaded to s3')
return callback(err)
})
putEventEmiter.on('error', function(err) {
logger.err(
{ err, bucketName, key, fsPath },
'error emmited on put of file'
)
return callback(err)
})
return putEventEmiter.on(
'progress',
progress => (uploaded = progress.written)
)
},
sendStream(bucketName, key, readStream, callback) {
logger.log({ bucketName, key }, 'sending file to s3')
readStream.on('error', err =>
logger.err({ bucketName, key }, 'error on stream to send to s3')
)
return LocalFileWriter.writeStream(readStream, null, (err, fsPath) => {
if (err != null) {
logger.err(
{ bucketName, key, fsPath, err },
'something went wrong writing stream to disk'
)
return callback(err)
}
return this.sendFile(bucketName, key, fsPath, (
err // delete the temporary file created above and return the original error
) => LocalFileWriter.deleteFile(fsPath, () => callback(err)))
})
},
// opts may be {start: Number, end: Number}
getFileStream(bucketName, key, opts, callback) {
if (callback == null) {
callback = function(err, res) {}
}
opts = opts || {}
callback = _.once(callback)
logger.log({ bucketName, key }, 'getting file from s3')
const s3 = getS3Client(opts.credentials)
const s3Params = {
Bucket: bucketName,
Key: key
}
if (opts.start != null && opts.end != null) {
s3Params.Range = `bytes=${opts.start}-${opts.end}`
}
const s3Request = s3.getObject(s3Params)
s3Request.on(
'httpHeaders',
(statusCode, headers, response, statusMessage) => {
if ([403, 404].includes(statusCode)) {
// S3 returns a 403 instead of a 404 when the user doesn't have
// permission to list the bucket contents.
logger.log({ bucketName, key }, 'file not found in s3')
return callback(
new Errors.NotFoundError(
`File not found in S3: ${bucketName}:${key}`
),
null
)
}
if (![200, 206].includes(statusCode)) {
logger.log(
{ bucketName, key },
`error getting file from s3: ${statusCode}`
)
return callback(
new Error(
`Got non-200 response from S3: ${statusCode} ${statusMessage}`
),
null
)
}
const stream = response.httpResponse.createUnbufferedStream()
stream.on('data', data => metrics.count('s3.ingress', data.byteLength))
return callback(null, stream)
}
)
s3Request.on('error', err => {
logger.err({ err, bucketName, key }, 'error getting file stream from s3')
return callback(err)
})
return s3Request.send()
},
getFileSize(bucketName, key, callback) {
logger.log({ bucketName, key }, 'getting file size from S3')
const s3 = getS3Client()
return s3.headObject({ Bucket: bucketName, Key: key }, function(err, data) {
if (err != null) {
if ([403, 404].includes(err.statusCode)) {
// S3 returns a 403 instead of a 404 when the user doesn't have
// permission to list the bucket contents.
logger.log(
{
bucketName,
key
},
'file not found in s3'
)
callback(
new Errors.NotFoundError(
`File not found in S3: ${bucketName}:${key}`
)
)
} else {
logger.err(
{
bucketName,
key,
err
},
'error performing S3 HeadObject'
)
callback(err)
}
return
}
return callback(null, data.ContentLength)
})
},
copyFile(bucketName, sourceKey, destKey, callback) {
logger.log({ bucketName, sourceKey, destKey }, 'copying file in s3')
const source = bucketName + '/' + sourceKey
// use the AWS SDK instead of knox due to problems with error handling (https://github.com/Automattic/knox/issues/114)
const s3 = getS3Client()
return s3.copyObject(
{ Bucket: bucketName, Key: destKey, CopySource: source },
function(err) {
if (err != null) {
if (err.code === 'NoSuchKey') {
logger.err(
{ bucketName, sourceKey },
'original file not found in s3 when copying'
)
return callback(
new Errors.NotFoundError(
'original file not found in S3 when copying'
)
)
} else {
logger.err(
{ err, bucketName, sourceKey, destKey },
'something went wrong copying file in aws'
)
return callback(err)
}
} else {
return callback()
}
}
)
},
deleteFile(bucketName, key, callback) {
logger.log({ bucketName, key }, 'delete file in s3')
const options = buildDefaultOptions(bucketName, 'delete', key)
return request(options, function(err, res) {
if (err != null) {
logger.err(
{ err, res, bucketName, key },
'something went wrong deleting file in aws'
)
}
return callback(err)
})
},
deleteDirectory(bucketName, key, _callback) {
// deleteMultiple can call the callback multiple times so protect against this.
const callback = function(...args) {
_callback(...Array.from(args || []))
return (_callback = function() {})
}
logger.log({ key, bucketName }, 'deleting directory')
const s3Client = getKnoxClient(bucketName)
return s3Client.list({ prefix: key }, function(err, data) {
if (err != null) {
logger.err(
{ err, bucketName, key },
'something went wrong listing prefix in aws'
)
return callback(err)
}
const keys = _.map(data.Contents, entry => entry.Key)
return s3Client.deleteMultiple(keys, callback)
})
},
checkIfFileExists(bucketName, key, callback) {
logger.log({ bucketName, key }, 'checking if file exists in s3')
const options = buildDefaultOptions(bucketName, 'head', key)
return request(options, function(err, res) {
if (err != null) {
logger.err(
{ err, res, bucketName, key },
'something went wrong checking file in aws'
)
return callback(err)
}
if (res == null) {
logger.err(
{ err, res, bucketName, key },
'no response object returned when checking if file exists'
)
err = new Error(`no response from s3 ${bucketName} ${key}`)
return callback(err)
}
const exists = res.statusCode === 200
logger.log({ bucketName, key, exists }, 'checked if file exsists in s3')
return callback(err, exists)
})
},
directorySize(bucketName, key, callback) {
logger.log({ bucketName, key }, 'get project size in s3')
const s3Client = getKnoxClient(bucketName)
return s3Client.list({ prefix: key }, function(err, data) {
if (err != null) {
logger.err(
{ err, bucketName, key },
'something went wrong listing prefix in aws'
)
return callback(err)
}
let totalSize = 0
_.each(data.Contents, entry => (totalSize += entry.Size))
logger.log({ totalSize }, 'total size')
return callback(null, totalSize)
})
}
}

View file

@ -5018,6 +5018,38 @@
"resolved": "https://registry.npmjs.org/stream-counter/-/stream-counter-1.0.0.tgz",
"integrity": "sha1-kc8lac5NxQYf6816yyY5SloRR1E="
},
"stream-meter": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/stream-meter/-/stream-meter-1.0.4.tgz",
"integrity": "sha1-Uq+Vql6nYKJJFxZwTb/5D3Ov3R0=",
"requires": {
"readable-stream": "^2.1.4"
},
"dependencies": {
"readable-stream": {
"version": "2.3.6",
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.6.tgz",
"integrity": "sha512-tQtKA9WIAhBF3+VLAseyMqZeBjW0AHJoxOtYqSUZNJxauErmLbVm2FW1y+J/YA9dUrAC39ITejlZWhVIwawkKw==",
"requires": {
"core-util-is": "~1.0.0",
"inherits": "~2.0.3",
"isarray": "~1.0.0",
"process-nextick-args": "~2.0.0",
"safe-buffer": "~5.1.1",
"string_decoder": "~1.1.1",
"util-deprecate": "~1.0.1"
}
},
"string_decoder": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz",
"integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==",
"requires": {
"safe-buffer": "~5.1.0"
}
}
}
},
"stream-shift": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.0.tgz",
@ -5531,7 +5563,7 @@
"xml2js": {
"version": "0.4.19",
"resolved": "https://registry.npmjs.org/xml2js/-/xml2js-0.4.19.tgz",
"integrity": "sha1-aGwg8hMgnpSr8NG88e+qKRx4J6c=",
"integrity": "sha512-esZnJZJOiJR9wWKMyuvSE1y6Dq5LCuJanqhxslH2bxM6duahNZ+HMpCLhBQGZkbX6xRf8x1Y2eJlgt2q3qo49Q==",
"requires": {
"sax": ">=0.6.0",
"xmlbuilder": "~9.0.1"

View file

@ -44,6 +44,7 @@
"settings-sharelatex": "^1.1.0",
"stream-browserify": "^2.0.1",
"stream-buffers": "~0.2.5",
"stream-meter": "^1.0.4",
"underscore": "~1.5.2"
},
"devDependencies": {

File diff suppressed because it is too large Load diff