Merge pull request #10261 from overleaf/em-object-persistor-md5

Compute the md5 hash as we receive the stream

GitOrigin-RevId: 6c0c0a31165c5e1320e7f7a6dbf59a0d3576d3c4
This commit is contained in:
Eric Mc Sween 2022-11-10 07:06:51 -05:00 committed by Copybot
parent 3ee794da47
commit be081856a8
2 changed files with 44 additions and 28 deletions

View file

@ -1,3 +1,4 @@
const crypto = require('crypto')
const fs = require('fs') const fs = require('fs')
const fsPromises = require('fs/promises') const fsPromises = require('fs/promises')
const globCallbacks = require('glob') const globCallbacks = require('glob')
@ -41,30 +42,16 @@ module.exports = class FSPersistor extends AbstractPersistor {
await this._ensureDirectoryExists(targetPath) await this._ensureDirectoryExists(targetPath)
const tempFilePath = await this._writeStreamToTempFile( const tempFilePath = await this._writeStreamToTempFile(
location, location,
sourceStream sourceStream,
opts
) )
try { try {
if (opts.sourceMd5) {
const actualMd5 = await _getFileMd5HashForPath(tempFilePath)
if (actualMd5 !== opts.sourceMd5) {
throw new WriteError('md5 hash mismatch', {
location,
target,
expectedMd5: opts.sourceMd5,
actualMd5,
})
}
}
await fsPromises.rename(tempFilePath, targetPath) await fsPromises.rename(tempFilePath, targetPath)
} finally { } finally {
await this._cleanupTempFile(tempFilePath) await this._cleanupTempFile(tempFilePath)
} }
} catch (err) { } catch (err) {
if (err instanceof WriteError) {
throw err
}
throw PersistorHelper.wrapError( throw PersistorHelper.wrapError(
err, err,
'failed to write stream', 'failed to write stream',
@ -116,7 +103,9 @@ module.exports = class FSPersistor extends AbstractPersistor {
async getObjectMd5Hash(location, filename) { async getObjectMd5Hash(location, filename) {
const fsPath = this._getFsPath(location, filename) const fsPath = this._getFsPath(location, filename)
try { try {
return await _getFileMd5HashForPath(fsPath) const stream = fs.createReadStream(fsPath)
const hash = await PersistorHelper.calculateStreamMd5(stream)
return hash
} catch (err) { } catch (err) {
throw new ReadError( throw new ReadError(
'unable to get md5 hash from file', 'unable to get md5 hash from file',
@ -231,30 +220,49 @@ module.exports = class FSPersistor extends AbstractPersistor {
return size return size
} }
async _writeStreamToTempFile(location, stream) { async _writeStreamToTempFile(location, stream, opts = {}) {
const tempDirPath = await fsPromises.mkdtemp(Path.join(location, 'tmp-')) const tempDirPath = await fsPromises.mkdtemp(Path.join(location, 'tmp-'))
const tempFilePath = Path.join(tempDirPath, 'uploaded-file') const tempFilePath = Path.join(tempDirPath, 'uploaded-file')
const transforms = []
let md5Observer
if (opts.sourceMd5) {
md5Observer = createMd5Observer()
transforms.push(md5Observer.transform)
}
let timer let timer
if (this.metrics) { if (this.metrics) {
timer = new this.metrics.Timer('writingFile') timer = new this.metrics.Timer('writingFile')
} }
const writeStream = fs.createWriteStream(tempFilePath)
try { try {
await pipeline(stream, writeStream) const writeStream = fs.createWriteStream(tempFilePath)
await pipeline(stream, ...transforms, writeStream)
if (timer) { if (timer) {
timer.done() timer.done()
} }
return tempFilePath
} catch (err) { } catch (err) {
await fsPromises.rm(tempFilePath, { force: true }) await this._cleanupTempFile(tempFilePath)
throw new WriteError( throw new WriteError(
'problem writing temp file locally', 'problem writing temp file locally',
{ err, tempFilePath }, { tempFilePath },
err err
) )
} }
if (opts.sourceMd5) {
const actualMd5 = md5Observer.hash.digest('hex')
if (actualMd5 !== opts.sourceMd5) {
await this._cleanupTempFile(tempFilePath)
throw new WriteError('md5 hash mismatch', {
expectedMd5: opts.sourceMd5,
actualMd5,
})
}
}
return tempFilePath
} }
async _cleanupTempFile(tempFilePath) { async _cleanupTempFile(tempFilePath) {
@ -283,7 +291,15 @@ module.exports = class FSPersistor extends AbstractPersistor {
} }
} }
async function _getFileMd5HashForPath(fullPath) { function createMd5Observer() {
const stream = fs.createReadStream(fullPath) const hash = crypto.createHash('md5')
return PersistorHelper.calculateStreamMd5(stream)
async function* transform(chunks) {
for await (const chunk of chunks) {
hash.update(chunk)
yield chunk
}
}
return { hash, transform }
} }

View file

@ -147,7 +147,7 @@ describe('FSPersistorTests', function () {
persistor.sendStream(location, files.wombat, stream, { persistor.sendStream(location, files.wombat, stream, {
sourceMd5: md5('wrong content'), sourceMd5: md5('wrong content'),
}) })
).to.be.rejectedWith(Errors.WriteError, 'md5 hash mismatch') ).to.be.rejectedWith(Errors.WriteError)
}) })
it('should not write the target file', async function () { it('should not write the target file', async function () {
@ -236,7 +236,7 @@ describe('FSPersistorTests', function () {
persistor.sendStream(location, files.wombat, stream, { persistor.sendStream(location, files.wombat, stream, {
sourceMd5: md5('wrong content'), sourceMd5: md5('wrong content'),
}) })
).to.be.rejectedWith(Errors.WriteError, 'md5 hash mismatch') ).to.be.rejectedWith(Errors.WriteError)
}) })
it('should not update the target file', async function () { it('should not update the target file', async function () {