2020-02-17 14:04:42 +00:00
|
|
|
const crypto = require('crypto')
|
2020-03-04 16:17:36 +00:00
|
|
|
const metrics = require('metrics-sharelatex')
|
2020-02-17 14:04:42 +00:00
|
|
|
const meter = require('stream-meter')
|
|
|
|
const Stream = require('stream')
|
|
|
|
const logger = require('logger-sharelatex')
|
|
|
|
const { WriteError, ReadError, NotFoundError } = require('./Errors')
|
|
|
|
const { promisify } = require('util')
|
|
|
|
|
|
|
|
const pipeline = promisify(Stream.pipeline)
|
|
|
|
|
|
|
|
module.exports = {
|
|
|
|
calculateStreamMd5,
|
|
|
|
verifyMd5,
|
|
|
|
getMeteredStream,
|
|
|
|
waitForStreamReady,
|
2020-02-13 16:55:01 +00:00
|
|
|
wrapError,
|
|
|
|
hexToBase64,
|
|
|
|
base64ToHex
|
2020-02-17 14:04:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// returns a promise which resolves with the md5 hash of the stream
|
|
|
|
function calculateStreamMd5(stream) {
|
|
|
|
const hash = crypto.createHash('md5')
|
|
|
|
hash.setEncoding('hex')
|
|
|
|
|
|
|
|
return pipeline(stream, hash).then(() => hash.read())
|
|
|
|
}
|
|
|
|
|
|
|
|
// verifies the md5 hash of a file against the supplied md5 or the one stored in
|
|
|
|
// storage if not supplied - deletes the new file if the md5 does not match and
|
|
|
|
// throws an error
|
|
|
|
async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) {
|
|
|
|
if (!destMd5) {
|
|
|
|
destMd5 = await persistor.promises.getFileMd5Hash(bucket, key)
|
|
|
|
}
|
|
|
|
|
|
|
|
if (sourceMd5 !== destMd5) {
|
|
|
|
try {
|
|
|
|
await persistor.promises.deleteFile(bucket, key)
|
|
|
|
} catch (err) {
|
|
|
|
logger.warn(err, 'error deleting file for invalid upload')
|
|
|
|
}
|
|
|
|
|
|
|
|
throw new WriteError({
|
|
|
|
message: 'source and destination hashes do not match',
|
|
|
|
info: {
|
|
|
|
sourceMd5,
|
|
|
|
destMd5,
|
|
|
|
bucket,
|
|
|
|
key
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// returns the next stream in the pipeline, and calls the callback with the byte count
|
|
|
|
// when the stream finishes or receives an error
|
2020-03-04 16:17:36 +00:00
|
|
|
function getMeteredStream(stream, metricName) {
|
2020-02-17 14:04:42 +00:00
|
|
|
const meteredStream = meter()
|
|
|
|
|
|
|
|
pipeline(stream, meteredStream)
|
|
|
|
.then(() => {
|
2020-03-04 16:17:36 +00:00
|
|
|
metrics.count(metricName, meteredStream.bytes)
|
2020-02-17 14:04:42 +00:00
|
|
|
})
|
2020-03-04 16:17:36 +00:00
|
|
|
.catch(() => {
|
2020-02-17 14:04:42 +00:00
|
|
|
// on error, just send how many bytes we received before the stream stopped
|
2020-03-04 16:17:36 +00:00
|
|
|
metrics.count(metricName, meteredStream.bytes)
|
2020-02-17 14:04:42 +00:00
|
|
|
})
|
|
|
|
|
|
|
|
return meteredStream
|
|
|
|
}
|
|
|
|
|
|
|
|
// resolves when a stream is 'readable', or rejects if the stream throws an error
|
|
|
|
// before that happens - this lets us handle protocol-level errors before trying
|
|
|
|
// to read them
|
|
|
|
function waitForStreamReady(stream) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
const onError = function(err) {
|
|
|
|
reject(wrapError(err, 'error before stream became ready', {}, ReadError))
|
|
|
|
}
|
|
|
|
const onStreamReady = function() {
|
|
|
|
stream.removeListener('readable', onStreamReady)
|
|
|
|
stream.removeListener('error', onError)
|
|
|
|
resolve(stream)
|
|
|
|
}
|
|
|
|
stream.on('readable', onStreamReady)
|
|
|
|
stream.on('error', onError)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
function wrapError(error, message, params, ErrorType) {
|
|
|
|
if (
|
|
|
|
error instanceof NotFoundError ||
|
|
|
|
['NoSuchKey', 'NotFound', 404, 'AccessDenied', 'ENOENT'].includes(
|
|
|
|
error.code
|
2020-03-13 16:14:06 +00:00
|
|
|
) ||
|
|
|
|
(error.response && error.response.statusCode === 404)
|
2020-02-17 14:04:42 +00:00
|
|
|
) {
|
|
|
|
return new NotFoundError({
|
|
|
|
message: 'no such file',
|
|
|
|
info: params
|
|
|
|
}).withCause(error)
|
|
|
|
} else {
|
|
|
|
return new ErrorType({
|
|
|
|
message: message,
|
|
|
|
info: params
|
|
|
|
}).withCause(error)
|
|
|
|
}
|
|
|
|
}
|
2020-02-13 16:55:01 +00:00
|
|
|
|
|
|
|
function base64ToHex(base64) {
|
|
|
|
return Buffer.from(base64, 'base64').toString('hex')
|
|
|
|
}
|
|
|
|
|
|
|
|
function hexToBase64(hex) {
|
|
|
|
return Buffer.from(hex, 'hex').toString('base64')
|
|
|
|
}
|