Refactor persistors to use a helper for common things

This commit is contained in:
Simon Detheridge 2020-01-29 12:23:31 +00:00
parent 304fdfd35c
commit 93cd55fb79
5 changed files with 203 additions and 126 deletions

View file

@ -8,6 +8,7 @@ const { promisify, callbackify } = require('util')
const LocalFileWriter = require('./LocalFileWriter').promises
const { NotFoundError, ReadError, WriteError } = require('./Errors')
const PersistorHelper = require('./PersistorHelper')
const pipeline = promisify(Stream.pipeline)
const fsUnlink = promisify(fs.unlink)
@ -28,7 +29,7 @@ async function sendFile(location, target, source) {
const targetStream = fs.createWriteStream(`${location}/${filteredTarget}`)
await pipeline(sourceStream, targetStream)
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to copy the specified file',
{ location, target, source },
@ -65,7 +66,7 @@ async function getFileStream(location, name, opts) {
try {
opts.fd = await fsOpen(`${location}/${filteredName}`, 'r')
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to open file for streaming',
{ location, filteredName, opts },
@ -83,7 +84,7 @@ async function getFileSize(location, filename) {
const stat = await fsStat(fullPath)
return stat.size
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to stat file',
{ location, filename },
@ -126,7 +127,7 @@ async function copyFile(location, fromName, toName) {
const targetStream = fs.createWriteStream(`${location}/${filteredToName}`)
await pipeline(sourceStream, targetStream)
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to copy file',
{ location, filteredFromName, filteredToName },
@ -140,7 +141,7 @@ async function deleteFile(location, name) {
try {
await fsUnlink(`${location}/${filteredName}`)
} catch (err) {
const wrappedError = _wrapError(
const wrappedError = PersistorHelper.wrapError(
err,
'failed to delete file',
{ location, filteredName },
@ -161,7 +162,7 @@ async function deleteDirectory(location, name) {
try {
await rmrf(`${location}/${filteredName}`)
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to delete directory',
{ location, filteredName },
@ -179,7 +180,7 @@ async function checkIfFileExists(location, name) {
if (err.code === 'ENOENT') {
return false
}
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to stat file',
{ location, filteredName },
@ -209,7 +210,7 @@ async function directorySize(location, name) {
}
}
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to get directory size',
{ location, name },
@ -220,20 +221,6 @@ async function directorySize(location, name) {
return size
}
function _wrapError(error, message, params, ErrorType) {
if (error.code === 'ENOENT') {
return new NotFoundError({
message: 'no such file or directory',
info: params
}).withCause(error)
} else {
return new ErrorType({
message: message,
info: params
}).withCause(error)
}
}
module.exports = {
sendFile: callbackify(sendFile),
sendStream: callbackify(sendStream),

View file

@ -0,0 +1,114 @@
const crypto = require('crypto')
const meter = require('stream-meter')
const Stream = require('stream')
const logger = require('logger-sharelatex')
const { WriteError, ReadError, NotFoundError } = require('./Errors')
const { promisify } = require('util')
const pipeline = promisify(Stream.pipeline)
module.exports = {
calculateStreamMd5,
verifyMd5,
getMeteredStream,
waitForStreamReady,
wrapError
}
// returns a promise which resolves with the md5 hash of the stream
function calculateStreamMd5(stream) {
const hash = crypto.createHash('md5')
hash.setEncoding('hex')
return new Promise((resolve, reject) => {
pipeline(stream, hash)
.then(() => {
hash.end()
resolve(hash.read())
})
.catch(err => {
reject(err)
})
})
}
// verifies the md5 hash of a file against the supplied md5 or the one stored in
// storage if not supplied - deletes the new file if the md5 does not match and
// throws an error
async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) {
if (!destMd5) {
destMd5 = await persistor.promises.getFileMd5Hash(bucket, key)
}
if (sourceMd5 !== destMd5) {
try {
await persistor.promises.deleteFile(bucket, key)
} catch (err) {
logger.warn(err, 'error deleting file for invalid upload')
}
throw new WriteError({
message: 'source and destination hashes do not match',
info: {
sourceMd5,
destMd5,
bucket,
key
}
})
}
}
// returns the next stream in the pipeline, and calls the callback with the byte count
// when the stream finishes or receives an error
function getMeteredStream(stream, callback) {
const meteredStream = meter()
pipeline(stream, meteredStream)
.then(() => {
callback(null, meteredStream.bytes)
})
.catch(err => {
// on error, just send how many bytes we received before the stream stopped
callback(err, meteredStream.bytes)
})
return meteredStream
}
// resolves when a stream is 'readable', or rejects if the stream throws an error
// before that happens - this lets us handle protocol-level errors before trying
// to read them
function waitForStreamReady(stream) {
return new Promise((resolve, reject) => {
const onError = function(err) {
reject(wrapError(err, 'error before stream became ready', {}, ReadError))
}
const onStreamReady = function() {
stream.removeListener('readable', onStreamReady)
stream.removeListener('error', onError)
resolve(stream)
}
stream.on('readable', onStreamReady)
stream.on('error', onError)
})
}
function wrapError(error, message, params, ErrorType) {
if (
error instanceof NotFoundError ||
['NoSuchKey', 'NotFound', 404, 'AccessDenied', 'ENOENT'].includes(
error.code
)
) {
return new NotFoundError({
message: 'no such file',
info: params
}).withCause(error)
} else {
return new ErrorType({
message: message,
info: params
}).withCause(error)
}
}

View file

@ -5,11 +5,11 @@ https.globalAgent.maxSockets = 300
const settings = require('settings-sharelatex')
const metrics = require('metrics-sharelatex')
const logger = require('logger-sharelatex')
const PersistorHelper = require('./PersistorHelper')
const meter = require('stream-meter')
const Stream = require('stream')
const crypto = require('crypto')
const fs = require('fs')
const S3 = require('aws-sdk/clients/s3')
const { URL } = require('url')
@ -21,7 +21,7 @@ const {
SettingsError
} = require('./Errors')
module.exports = {
const S3Persistor = {
sendFile: callbackify(sendFile),
sendStream: callbackify(sendStream),
getFileStream: callbackify(getFileStream),
@ -46,6 +46,8 @@ module.exports = {
}
}
module.exports = S3Persistor
const pipeline = promisify(Stream.pipeline)
function hexToBase64(hex) {
@ -57,7 +59,7 @@ async function sendFile(bucketName, key, fsPath) {
try {
readStream = fs.createReadStream(fsPath)
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'error reading file from disk',
{ bucketName, key, fsPath },
@ -76,27 +78,14 @@ async function sendStream(bucketName, key, readStream, sourceMd5) {
if (sourceMd5) {
b64Hash = hexToBase64(sourceMd5)
} else {
const hash = crypto.createHash('md5')
hash.setEncoding('hex')
pipeline(readStream, hash)
hashPromise = new Promise((resolve, reject) => {
readStream.on('end', () => {
hash.end()
resolve(hash.read())
})
readStream.on('error', err => {
reject(err)
})
})
hashPromise = PersistorHelper.calculateStreamMd5(readStream)
}
const meteredStream = meter()
meteredStream.on('finish', () => {
metrics.count('s3.egress', meteredStream.bytes)
const meteredStream = PersistorHelper.getMeteredStream(readStream, (_, byteCount) => {
// ignore the error parameter and just log the byte count
metrics.count('s3.egress', byteCount)
})
pipeline(readStream, meteredStream)
// if we have an md5 hash, pass this to S3 to verify the upload
const uploadOptions = {
Bucket: bucketName,
@ -112,30 +101,21 @@ async function sendStream(bucketName, key, readStream, sourceMd5) {
.promise()
const destMd5 = _md5FromResponse(response)
// if we didn't have an md5 hash, compare our computed one with S3's
// if we didn't have an md5 hash, we should compare our computed one with S3's
// as we couldn't tell S3 about it beforehand
if (hashPromise) {
sourceMd5 = await hashPromise
if (sourceMd5 !== destMd5) {
try {
await deleteFile(bucketName, key)
} catch (err) {
logger.warn(err, 'error deleting file for invalid upload')
}
throw new WriteError({
message: 'source and destination hashes do not match',
info: {
sourceMd5,
destMd5,
bucketName,
key
}
})
}
// throws on mismatch
await PersistorHelper.verifyMd5(
S3Persistor,
bucketName,
key,
sourceMd5,
destMd5
)
}
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'upload to S3 failed',
{ bucketName, key },
@ -155,25 +135,29 @@ async function getFileStream(bucketName, key, opts) {
params.Range = `bytes=${opts.start}-${opts.end}`
}
return new Promise((resolve, reject) => {
const stream = _getClientForBucket(bucketName)
.getObject(params)
.createReadStream()
const stream = _getClientForBucket(bucketName)
.getObject(params)
.createReadStream()
const meteredStream = meter()
meteredStream.on('finish', () => {
metrics.count('s3.ingress', meteredStream.bytes)
})
const onStreamReady = function() {
stream.removeListener('readable', onStreamReady)
resolve(stream.pipe(meteredStream))
const meteredStream = PersistorHelper.getMeteredStream(
stream,
(_, byteCount) => {
// ignore the error parameter and just log the byte count
metrics.count('s3.ingress', byteCount)
}
stream.on('readable', onStreamReady)
stream.on('error', err => {
reject(_wrapError(err, 'error reading from S3', params, ReadError))
})
})
)
try {
await PersistorHelper.waitForStreamReady(stream)
return meteredStream
} catch (err) {
throw PersistorHelper.wrapError(
err,
'error reading file from S3',
{ bucketName, key, opts },
ReadError
)
}
}
async function deleteDirectory(bucketName, key) {
@ -184,7 +168,7 @@ async function deleteDirectory(bucketName, key) {
.listObjects({ Bucket: bucketName, Prefix: key })
.promise()
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to list objects in S3',
{ bucketName, key },
@ -205,7 +189,7 @@ async function deleteDirectory(bucketName, key) {
})
.promise()
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to delete objects in S3',
{ bucketName, key },
@ -222,7 +206,7 @@ async function getFileSize(bucketName, key) {
.promise()
return response.ContentLength
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'error getting size of s3 object',
{ bucketName, key },
@ -239,7 +223,7 @@ async function getFileMd5Hash(bucketName, key) {
const md5 = _md5FromResponse(response)
return md5
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'error getting hash of s3 object',
{ bucketName, key },
@ -255,7 +239,7 @@ async function deleteFile(bucketName, key) {
.promise()
} catch (err) {
// s3 does not give us a NotFoundError here
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'failed to delete file in S3',
{ bucketName, key },
@ -275,7 +259,12 @@ async function copyFile(bucketName, sourceKey, destKey) {
.copyObject(params)
.promise()
} catch (err) {
throw _wrapError(err, 'failed to copy file in S3', params, WriteError)
throw PersistorHelper.wrapError(
err,
'failed to copy file in S3',
params,
WriteError
)
}
}
@ -287,7 +276,7 @@ async function checkIfFileExists(bucketName, key) {
if (err instanceof NotFoundError) {
return false
}
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'error checking whether S3 object exists',
{ bucketName, key },
@ -304,7 +293,7 @@ async function directorySize(bucketName, key) {
return response.Contents.reduce((acc, item) => item.Size + acc, 0)
} catch (err) {
throw _wrapError(
throw PersistorHelper.wrapError(
err,
'error getting directory size in S3',
{ bucketName, key },
@ -313,26 +302,6 @@ async function directorySize(bucketName, key) {
}
}
function _wrapError(error, message, params, ErrorType) {
// the AWS client can return one of 'NoSuchKey', 'NotFound' or 404 (integer)
// when something is not found, depending on the endpoint
if (
['NoSuchKey', 'NotFound', 404, 'AccessDenied', 'ENOENT'].includes(
error.code
)
) {
return new NotFoundError({
message: 'no such file',
info: params
}).withCause(error)
} else {
return new ErrorType({
message: message,
info: params
}).withCause(error)
}
}
const _clients = new Map()
let _defaultClient

View file

@ -70,7 +70,10 @@ describe('FSPersistorTests', function() {
glob,
rimraf,
stream,
crypto
crypto,
// imported by PersistorHelper but otherwise unused here
'stream-meter': {},
'logger-sharelatex': {}
},
globals: { console }
})

View file

@ -89,6 +89,7 @@ describe('S3PersistorTests', function() {
}
MeteredStream = {
type: 'metered',
on: sinon.stub(),
bytes: objectSize
}
@ -103,7 +104,7 @@ describe('S3PersistorTests', function() {
S3ReadStream = {
on: sinon.stub(),
pipe: sinon.stub().returns('s3Stream'),
pipe: sinon.stub(),
removeListener: sinon.stub()
}
S3ReadStream.on.withArgs('readable').yields()
@ -168,8 +169,8 @@ describe('S3PersistorTests', function() {
stream = await S3Persistor.promises.getFileStream(bucket, key)
})
it('returns a stream', function() {
expect(stream).to.equal('s3Stream')
it('returns a metered stream', function() {
expect(stream).to.equal(MeteredStream)
})
it('sets the AWS client up with credentials from settings', function() {
@ -184,7 +185,10 @@ describe('S3PersistorTests', function() {
})
it('pipes the stream through the meter', function() {
expect(S3ReadStream.pipe).to.have.been.calledWith(MeteredStream)
expect(Stream.pipeline).to.have.been.calledWith(
S3ReadStream,
MeteredStream
)
})
it('records an ingress metric', function() {
@ -202,8 +206,8 @@ describe('S3PersistorTests', function() {
})
})
it('returns a stream', function() {
expect(stream).to.equal('s3Stream')
it('returns a metered stream', function() {
expect(stream).to.equal(MeteredStream)
})
it('passes the byte range on to S3', function() {
@ -236,8 +240,8 @@ describe('S3PersistorTests', function() {
stream = await S3Persistor.promises.getFileStream(bucket, key)
})
it('returns a stream', function() {
expect(stream).to.equal('s3Stream')
it('returns a metered stream', function() {
expect(stream).to.equal(MeteredStream)
})
it('sets the AWS client up with the alternative credentials', function() {
@ -305,12 +309,12 @@ describe('S3PersistorTests', function() {
expect(error).to.be.an.instanceOf(Errors.NotFoundError)
})
it('wraps the error from S3', function() {
expect(error.cause).to.equal(S3NotFoundError)
it('wraps the error', function() {
expect(error.cause).to.exist
})
it('stores the bucket and key in the error', function() {
expect(error.info).to.deep.equal({ Bucket: bucket, Key: key })
expect(error.info).to.include({ bucketName: bucket, key: key })
})
})
@ -335,12 +339,12 @@ describe('S3PersistorTests', function() {
expect(error).to.be.an.instanceOf(Errors.NotFoundError)
})
it('wraps the error from S3', function() {
expect(error.cause).to.equal(S3AccessDeniedError)
it('wraps the error', function() {
expect(error.cause).to.exist
})
it('stores the bucket and key in the error', function() {
expect(error.info).to.deep.equal({ Bucket: bucket, Key: key })
expect(error.info).to.include({ bucketName: bucket, key: key })
})
})
@ -365,12 +369,12 @@ describe('S3PersistorTests', function() {
expect(error).to.be.an.instanceOf(Errors.ReadError)
})
it('wraps the error from S3', function() {
expect(error.cause).to.equal(genericError)
it('wraps the error', function() {
expect(error.cause).to.exist
})
it('stores the bucket and key in the error', function() {
expect(error.info).to.deep.equal({ Bucket: bucket, Key: key })
expect(error.info).to.include({ bucketName: bucket, key: key })
})
})
})