Merge pull request #10259 from overleaf/em-object-persistor-tempfile

Atomic update of files in FS backend of object-persistor

GitOrigin-RevId: b57c0c1a7d6299affd00e174cb7ae75ae711c5d3
This commit is contained in:
Eric Mc Sween 2022-11-10 07:06:40 -05:00 committed by Copybot
parent a98f752b99
commit 3ee794da47
5 changed files with 247 additions and 142 deletions

View file

@ -253,7 +253,6 @@ An object with the relevant configuration should be passed to the main function
### FS-specific parameters
- `paths.uploadFolder` (required): Location for temporary files that are being uploaded
- `useSubdirectories`: If true, files will be stored in subdirectories on the filesystem. By default, the directory structure is flattened and slashes in the object keys are replaced with underscores.
#### Notes

View file

@ -27,7 +27,6 @@
"aws-sdk": "^2.718.0",
"fast-crc32c": "https://github.com/overleaf/node-fast-crc32c/archive/aae6b2a4c7a7a159395df9cc6c38dfde702d6f51.tar.gz",
"glob": "^7.1.6",
"node-uuid": "^1.4.8",
"range-parser": "^1.2.1",
"tiny-async-pool": "^1.1.0"
},

View file

@ -1,34 +1,29 @@
const fs = require('fs')
const fsPromises = require('fs/promises')
const globCallbacks = require('glob')
const uuid = require('node-uuid')
const Path = require('path')
const { pipeline } = require('stream/promises')
const { promisify } = require('util')
const AbstractPersistor = require('./AbstractPersistor')
const { NotFoundError, ReadError, WriteError } = require('./Errors')
const { ReadError, WriteError } = require('./Errors')
const PersistorHelper = require('./PersistorHelper')
const glob = promisify(globCallbacks)
module.exports = class FSPersistor extends AbstractPersistor {
constructor(settings) {
constructor(settings = {}) {
super()
this.settings = settings
this.useSubdirectories = Boolean(settings.useSubdirectories)
this.metrics = settings.Metrics
}
async sendFile(location, target, source) {
const fsPath = this._getFsPath(location, target)
// actually copy the file (instead of moving it) to maintain consistent behaviour
// between the different implementations
try {
await this._ensureDirectoryExists(fsPath)
const sourceStream = fs.createReadStream(source)
const targetStream = fs.createWriteStream(fsPath)
await pipeline(sourceStream, targetStream)
await this.sendStream(location, target, sourceStream)
} catch (err) {
throw PersistorHelper.wrapError(
err,
@ -40,27 +35,42 @@ module.exports = class FSPersistor extends AbstractPersistor {
}
async sendStream(location, target, sourceStream, opts = {}) {
const tempFilePath = await this._writeStream(sourceStream)
let sourceMd5 = opts.sourceMd5
if (!sourceMd5) {
sourceMd5 = await _getFileMd5HashForPath(tempFilePath)
}
const targetPath = this._getFsPath(location, target)
try {
await this.sendFile(location, target, tempFilePath)
const destMd5 = await this.getObjectMd5Hash(location, target)
if (sourceMd5 !== destMd5) {
const fsPath = this._getFsPath(location, target)
await this._deleteFile(fsPath)
await this._ensureDirectoryExists(targetPath)
const tempFilePath = await this._writeStreamToTempFile(
location,
sourceStream
)
try {
if (opts.sourceMd5) {
const actualMd5 = await _getFileMd5HashForPath(tempFilePath)
if (actualMd5 !== opts.sourceMd5) {
throw new WriteError('md5 hash mismatch', {
sourceMd5,
destMd5,
location,
target,
expectedMd5: opts.sourceMd5,
actualMd5,
})
}
}
await fsPromises.rename(tempFilePath, targetPath)
} finally {
await this._deleteFile(tempFilePath)
await this._cleanupTempFile(tempFilePath)
}
} catch (err) {
if (err instanceof WriteError) {
throw err
}
throw PersistorHelper.wrapError(
err,
'failed to write stream',
{ location, target },
WriteError
)
}
}
@ -122,9 +132,7 @@ module.exports = class FSPersistor extends AbstractPersistor {
try {
await this._ensureDirectoryExists(targetFsPath)
const sourceStream = fs.createReadStream(sourceFsPath)
const targetStream = fs.createWriteStream(targetFsPath)
await pipeline(sourceStream, targetStream)
await fsPromises.copyFile(sourceFsPath, targetFsPath)
} catch (err) {
throw PersistorHelper.wrapError(
err,
@ -138,19 +146,16 @@ module.exports = class FSPersistor extends AbstractPersistor {
async deleteObject(location, name) {
const fsPath = this._getFsPath(location, name)
try {
await fsPromises.unlink(fsPath)
// S3 doesn't give us a 404 when a file wasn't there to be deleted, so we
// should be consistent here as well
await fsPromises.rm(fsPath, { force: true })
} catch (err) {
const wrappedError = PersistorHelper.wrapError(
throw PersistorHelper.wrapError(
err,
'failed to delete file',
{ location, name, fsPath },
WriteError
)
if (!(wrappedError instanceof NotFoundError)) {
// S3 doesn't give us a 404 when a file wasn't there to be deleted, so we
// should be consistent here as well
throw wrappedError
}
}
}
@ -158,12 +163,12 @@ module.exports = class FSPersistor extends AbstractPersistor {
const fsPath = this._getFsPath(location, name)
try {
if (this.settings.useSubdirectories) {
if (this.useSubdirectories) {
await fsPromises.rm(fsPath, { recursive: true, force: true })
} else {
const files = await this._listDirectory(fsPath)
for (const file of files) {
await fsPromises.unlink(file)
await fsPromises.rm(file, { force: true })
}
}
} catch (err) {
@ -226,58 +231,47 @@ module.exports = class FSPersistor extends AbstractPersistor {
return size
}
_getPath(key) {
if (key == null) {
key = uuid.v1()
}
key = key.replace(/\//g, '-')
return Path.join(this.settings.paths.uploadFolder, key)
}
async _writeStreamToTempFile(location, stream) {
const tempDirPath = await fsPromises.mkdtemp(Path.join(location, 'tmp-'))
const tempFilePath = Path.join(tempDirPath, 'uploaded-file')
async _writeStream(stream, key) {
let timer
if (this.settings.Metrics) {
timer = new this.settings.Metrics.Timer('writingFile')
if (this.metrics) {
timer = new this.metrics.Timer('writingFile')
}
const fsPath = this._getPath(key)
const writeStream = fs.createWriteStream(fsPath)
const writeStream = fs.createWriteStream(tempFilePath)
try {
await pipeline(stream, writeStream)
if (timer) {
timer.done()
}
return fsPath
return tempFilePath
} catch (err) {
await this._deleteFile(fsPath)
throw new WriteError('problem writing file locally', { err, fsPath }, err)
await fsPromises.rm(tempFilePath, { force: true })
throw new WriteError(
'problem writing temp file locally',
{ err, tempFilePath },
err
)
}
}
async _deleteFile(fsPath) {
if (!fsPath) {
return
}
try {
await fsPromises.unlink(fsPath)
} catch (err) {
if (err.code !== 'ENOENT') {
throw new WriteError('failed to delete file', { fsPath }, err)
}
}
async _cleanupTempFile(tempFilePath) {
const dirPath = Path.dirname(tempFilePath)
await fsPromises.rm(dirPath, { force: true, recursive: true })
}
_getFsPath(location, key) {
key = key.replace(/\/$/, '')
if (!this.settings.useSubdirectories) {
if (!this.useSubdirectories) {
key = key.replace(/\//g, '_')
}
return Path.join(location, key)
}
async _listDirectory(path) {
if (this.settings.useSubdirectories) {
if (this.useSubdirectories) {
return await glob(Path.join(path, '**'))
} else {
return await glob(`${path}_*`)

View file

@ -11,11 +11,14 @@ const Errors = require('../../src/Errors')
const MODULE_PATH = '../../src/FSPersistor.js'
describe('FSPersistorTests', function () {
const localFilePath = '/uploads/info.txt'
const localFileContents = Buffer.from('This information is critical', {
const localFiles = {
'/uploads/info.txt': Buffer.from('This information is critical', {
encoding: 'utf-8',
})
const uploadFolder = '/tmp'
}),
'/uploads/other.txt': Buffer.from('Some other content', {
encoding: 'utf-8',
}),
}
const location = '/bucket'
const files = {
wombat: 'animals/wombat.tex',
@ -26,12 +29,12 @@ describe('FSPersistorTests', function () {
const scenarios = [
{
description: 'default settings',
settings: { paths: { uploadFolder } },
settings: {},
fsPath: key => Path.join(location, key.replaceAll('/', '_')),
},
{
description: 'with useSubdirectories = true',
settings: { paths: { uploadFolder }, useSubdirectories: true },
settings: { useSubdirectories: true },
fsPath: key => Path.join(location, key),
},
]
@ -53,8 +56,7 @@ describe('FSPersistorTests', function () {
beforeEach(function () {
mockFs({
[localFilePath]: localFileContents,
[location]: {},
...localFiles,
'/not-a-dir':
'This regular file is meant to prevent using this path as a directory',
'/directory/subdirectory': {},
@ -67,16 +69,16 @@ describe('FSPersistorTests', function () {
describe('sendFile', function () {
it('should copy the file', async function () {
await persistor.sendFile(location, files.wombat, localFilePath)
await persistor.sendFile(location, files.wombat, '/uploads/info.txt')
const contents = await fsPromises.readFile(
scenario.fsPath(files.wombat)
)
expect(contents.equals(localFileContents)).to.be.true
expect(contents.equals(localFiles['/uploads/info.txt'])).to.be.true
})
it('should return an error if the file cannot be stored', async function () {
await expect(
persistor.sendFile('/not-a-dir', files.wombat, localFilePath)
persistor.sendFile('/not-a-dir', files.wombat, '/uploads/info.txt')
).to.be.rejectedWith(Errors.WriteError)
})
})
@ -84,8 +86,9 @@ describe('FSPersistorTests', function () {
describe('sendStream', function () {
let stream
describe("when the file doesn't exist", function () {
beforeEach(function () {
stream = fs.createReadStream(localFilePath)
stream = fs.createReadStream('/uploads/info.txt')
})
it('should write the stream to disk', async function () {
@ -93,67 +96,177 @@ describe('FSPersistorTests', function () {
const contents = await fsPromises.readFile(
scenario.fsPath(files.wombat)
)
expect(contents.equals(localFileContents)).to.be.true
expect(contents.equals(localFiles['/uploads/info.txt'])).to.be.true
})
it('should delete the temporary file', async function () {
await persistor.sendStream(location, files.wombat, stream)
const tempFiles = await fsPromises.readdir(uploadFolder)
expect(tempFiles).to.be.empty
const entries = await fsPromises.readdir(location)
const tempDirs = entries.filter(dir => dir.startsWith('tmp-'))
expect(tempDirs).to.be.empty
})
it('should wrap the error from the filesystem', async function () {
describe('on error', function () {
beforeEach(async function () {
await expect(
persistor.sendStream('/not-a-dir', files.wombat, stream)
).to.be.rejectedWith(Errors.WriteError)
})
it('should not write the target file', async function () {
await expect(
fsPromises.access(scenario.fsPath(files.wombat))
).to.be.rejected
})
it('should delete the temporary file', async function () {
await persistor.sendStream(location, files.wombat, stream)
const entries = await fsPromises.readdir(location)
const tempDirs = entries.filter(dir => dir.startsWith('tmp-'))
expect(tempDirs).to.be.empty
})
})
describe('when the md5 hash matches', function () {
it('should write the stream to disk', async function () {
await persistor.sendStream(location, files.wombat, stream, {
sourceMd5: md5(localFileContents),
sourceMd5: md5(localFiles['/uploads/info.txt']),
})
const contents = await fsPromises.readFile(
scenario.fsPath(files.wombat)
)
expect(contents.equals(localFileContents)).to.be.true
expect(
contents.equals(localFiles['/uploads/info.txt'])
).to.be.true
})
})
describe('when the md5 hash does not match', function () {
let promise
beforeEach(function () {
promise = persistor.sendStream(location, files.wombat, stream, {
beforeEach(async function () {
await expect(
persistor.sendStream(location, files.wombat, stream, {
sourceMd5: md5('wrong content'),
})
).to.be.rejectedWith(Errors.WriteError, 'md5 hash mismatch')
})
it('should return a write error', async function () {
await expect(promise).to.be.rejectedWith(
Errors.WriteError,
'md5 hash mismatch'
)
})
it('deletes the copied file', async function () {
await expect(promise).to.be.rejected
it('should not write the target file', async function () {
await expect(
fsPromises.access(scenario.fsPath(files.wombat))
).to.be.rejected
})
it('should delete the temporary file', async function () {
await persistor.sendStream(location, files.wombat, stream)
const entries = await fsPromises.readdir(location)
const tempDirs = entries.filter(dir => dir.startsWith('tmp-'))
expect(tempDirs).to.be.empty
})
})
})
describe('when the file already exists', function () {
let stream
beforeEach(async function () {
await persistor.sendFile(
location,
files.wombat,
'/uploads/info.txt'
)
stream = fs.createReadStream('/uploads/other.txt')
})
it('should write the stream to disk', async function () {
await persistor.sendStream(location, files.wombat, stream)
const contents = await fsPromises.readFile(
scenario.fsPath(files.wombat)
)
expect(contents.equals(localFiles['/uploads/other.txt'])).to.be.true
})
it('should delete the temporary file', async function () {
await persistor.sendStream(location, files.wombat, stream)
const entries = await fsPromises.readdir(location)
const tempDirs = entries.filter(dir => dir.startsWith('tmp-'))
expect(tempDirs).to.be.empty
})
describe('on error', function () {
beforeEach(async function () {
await expect(
persistor.sendStream('/not-a-dir', files.wombat, stream)
).to.be.rejectedWith(Errors.WriteError)
})
it('should not update the target file', async function () {
const contents = await fsPromises.readFile(
scenario.fsPath(files.wombat)
)
expect(
contents.equals(localFiles['/uploads/info.txt'])
).to.be.true
})
it('should delete the temporary file', async function () {
await persistor.sendStream(location, files.wombat, stream)
const entries = await fsPromises.readdir(location)
const tempDirs = entries.filter(dir => dir.startsWith('tmp-'))
expect(tempDirs).to.be.empty
})
})
describe('when the md5 hash matches', function () {
it('should write the stream to disk', async function () {
await persistor.sendStream(location, files.wombat, stream, {
sourceMd5: md5(localFiles['/uploads/other.txt']),
})
const contents = await fsPromises.readFile(
scenario.fsPath(files.wombat)
)
expect(
contents.equals(localFiles['/uploads/other.txt'])
).to.be.true
})
})
describe('when the md5 hash does not match', function () {
beforeEach(async function () {
await expect(
persistor.sendStream(location, files.wombat, stream, {
sourceMd5: md5('wrong content'),
})
).to.be.rejectedWith(Errors.WriteError, 'md5 hash mismatch')
})
it('should not update the target file', async function () {
const contents = await fsPromises.readFile(
scenario.fsPath(files.wombat)
)
expect(
contents.equals(localFiles['/uploads/info.txt'])
).to.be.true
})
it('should delete the temporary file', async function () {
await persistor.sendStream(location, files.wombat, stream)
const entries = await fsPromises.readdir(location)
const tempDirs = entries.filter(dir => dir.startsWith('tmp-'))
expect(tempDirs).to.be.empty
})
})
})
})
describe('getObjectStream', function () {
beforeEach(async function () {
await persistor.sendFile(location, files.wombat, localFilePath)
await persistor.sendFile(location, files.wombat, '/uploads/info.txt')
})
it('should return a string with the object contents', async function () {
const stream = await persistor.getObjectStream(location, files.wombat)
const contents = await streamToBuffer(stream)
expect(contents.equals(localFileContents)).to.be.true
expect(contents.equals(localFiles['/uploads/info.txt'])).to.be.true
})
it('should support ranges', async function () {
@ -167,7 +280,9 @@ describe('FSPersistorTests', function () {
)
const contents = await streamToBuffer(stream)
// end is inclusive in ranges, but exclusive in slice()
expect(contents.equals(localFileContents.slice(5, 17))).to.be.true
expect(
contents.equals(localFiles['/uploads/info.txt'].slice(5, 17))
).to.be.true
})
it('should give a NotFoundError if the file does not exist', async function () {
@ -179,13 +294,13 @@ describe('FSPersistorTests', function () {
describe('getObjectSize', function () {
beforeEach(async function () {
await persistor.sendFile(location, files.wombat, localFilePath)
await persistor.sendFile(location, files.wombat, '/uploads/info.txt')
})
it('should return the file size', async function () {
expect(
await persistor.getObjectSize(location, files.wombat)
).to.equal(localFileContents.length)
).to.equal(localFiles['/uploads/info.txt'].length)
})
it('should throw a NotFoundError if the file does not exist', async function () {
@ -197,7 +312,7 @@ describe('FSPersistorTests', function () {
describe('copyObject', function () {
beforeEach(async function () {
await persistor.sendFile(location, files.wombat, localFilePath)
await persistor.sendFile(location, files.wombat, '/uploads/info.txt')
})
it('Should copy the file to the new location', async function () {
@ -205,13 +320,13 @@ describe('FSPersistorTests', function () {
const contents = await fsPromises.readFile(
scenario.fsPath(files.potato)
)
expect(contents.equals(localFileContents)).to.be.true
expect(contents.equals(localFiles['/uploads/info.txt'])).to.be.true
})
})
describe('deleteObject', function () {
beforeEach(async function () {
await persistor.sendFile(location, files.wombat, localFilePath)
await persistor.sendFile(location, files.wombat, '/uploads/info.txt')
await fsPromises.access(scenario.fsPath(files.wombat))
})
@ -230,7 +345,7 @@ describe('FSPersistorTests', function () {
describe('deleteDirectory', function () {
beforeEach(async function () {
for (const file of Object.values(files)) {
await persistor.sendFile(location, file, localFilePath)
await persistor.sendFile(location, file, '/uploads/info.txt')
await fsPromises.access(scenario.fsPath(file))
}
})
@ -258,7 +373,7 @@ describe('FSPersistorTests', function () {
describe('checkIfObjectExists', function () {
beforeEach(async function () {
await persistor.sendFile(location, files.wombat, localFilePath)
await persistor.sendFile(location, files.wombat, '/uploads/info.txt')
})
it('should return true for existing files', async function () {
@ -277,13 +392,13 @@ describe('FSPersistorTests', function () {
describe('directorySize', function () {
beforeEach(async function () {
for (const file of Object.values(files)) {
await persistor.sendFile(location, file, localFilePath)
await persistor.sendFile(location, file, '/uploads/info.txt')
}
})
it('should sum directory files size', async function () {
expect(await persistor.directorySize(location, 'animals')).to.equal(
2 * localFileContents.length
2 * localFiles['/uploads/info.txt'].length
)
})

4
package-lock.json generated
View file

@ -321,7 +321,6 @@
"aws-sdk": "^2.718.0",
"fast-crc32c": "https://github.com/overleaf/node-fast-crc32c/archive/aae6b2a4c7a7a159395df9cc6c38dfde702d6f51.tar.gz",
"glob": "^7.1.6",
"node-uuid": "^1.4.8",
"range-parser": "^1.2.1",
"tiny-async-pool": "^1.1.0"
},
@ -47378,9 +47377,8 @@
"fast-crc32c": "https://github.com/overleaf/node-fast-crc32c/archive/aae6b2a4c7a7a159395df9cc6c38dfde702d6f51.tar.gz",
"glob": "^7.1.6",
"mocha": "^8.4.0",
"mock-fs": "*",
"mock-fs": "^5.2.0",
"mongodb": "^3.5.9",
"node-uuid": "^1.4.8",
"range-parser": "^1.2.1",
"sandboxed-module": "^2.0.4",
"sinon": "^9.2.4",