Merge pull request #10245 from overleaf/em-object-persistor-promises-api

Use Node 16 promises APIs in object-persistor

GitOrigin-RevId: 8520da0c0678c17b22a9164d15d61c2b57af8f4e
This commit is contained in:
Eric Mc Sween 2022-11-10 07:06:08 -05:00 committed by Copybot
parent ff944917a6
commit 8fb3edbecd
9 changed files with 108 additions and 76 deletions

View file

@ -1,19 +1,16 @@
const fs = require('fs')
const glob = require('glob')
const fsPromises = require('fs/promises')
const globCallbacks = require('glob')
const uuid = require('node-uuid')
const path = require('path')
const Stream = require('stream')
const { pipeline } = require('stream/promises')
const { promisify } = require('util')
const AbstractPersistor = require('./AbstractPersistor')
const { NotFoundError, ReadError, WriteError } = require('./Errors')
const PersistorHelper = require('./PersistorHelper')
const pipeline = promisify(Stream.pipeline)
const fsUnlink = promisify(fs.unlink)
const fsOpen = promisify(fs.open)
const fsStat = promisify(fs.stat)
const fsGlob = promisify(glob)
const glob = promisify(globCallbacks)
const filterName = key => key.replace(/\//g, '_')
@ -72,7 +69,7 @@ module.exports = class FSPersistor extends AbstractPersistor {
const filteredName = filterName(name)
try {
opts.fd = await fsOpen(`${location}/${filteredName}`, 'r')
opts.fd = await fsPromises.open(`${location}/${filteredName}`, 'r')
} catch (err) {
throw PersistorHelper.wrapError(
err,
@ -94,7 +91,7 @@ module.exports = class FSPersistor extends AbstractPersistor {
const fullPath = path.join(location, filterName(filename))
try {
const stat = await fsStat(fullPath)
const stat = await fsPromises.stat(fullPath)
return stat.size
} catch (err) {
throw PersistorHelper.wrapError(
@ -142,7 +139,7 @@ module.exports = class FSPersistor extends AbstractPersistor {
async deleteObject(location, name) {
const filteredName = filterName(name)
try {
await fsUnlink(`${location}/${filteredName}`)
await fsPromises.unlink(`${location}/${filteredName}`)
} catch (err) {
const wrappedError = PersistorHelper.wrapError(
err,
@ -164,8 +161,8 @@ module.exports = class FSPersistor extends AbstractPersistor {
try {
await Promise.all(
(
await fsGlob(`${location}/${filteredName}_*`)
).map(file => fsUnlink(file))
await glob(`${location}/${filteredName}_*`)
).map(file => fsPromises.unlink(file))
)
} catch (err) {
throw PersistorHelper.wrapError(
@ -180,7 +177,7 @@ module.exports = class FSPersistor extends AbstractPersistor {
async checkIfObjectExists(location, name) {
const filteredName = filterName(name)
try {
const stat = await fsStat(`${location}/${filteredName}`)
const stat = await fsPromises.stat(`${location}/${filteredName}`)
return !!stat
} catch (err) {
if (err.code === 'ENOENT') {
@ -201,10 +198,10 @@ module.exports = class FSPersistor extends AbstractPersistor {
let size = 0
try {
const files = await fsGlob(`${location}/${filteredName}_*`)
const files = await glob(`${location}/${filteredName}_*`)
for (const file of files) {
try {
const stat = await fsStat(file)
const stat = await fsPromises.stat(file)
if (stat.isFile()) {
size += stat.size
}
@ -261,7 +258,7 @@ module.exports = class FSPersistor extends AbstractPersistor {
return
}
try {
await fsUnlink(fsPath)
await fsPromises.unlink(fsPath)
} catch (err) {
if (err.code !== 'ENOENT') {
throw new WriteError('failed to delete file', { fsPath }, err)

View file

@ -1,14 +1,11 @@
const fs = require('fs')
const { promisify } = require('util')
const Stream = require('stream')
const { pipeline } = require('stream/promises')
const { Storage } = require('@google-cloud/storage')
const { WriteError, ReadError, NotFoundError } = require('./Errors')
const asyncPool = require('tiny-async-pool')
const AbstractPersistor = require('./AbstractPersistor')
const PersistorHelper = require('./PersistorHelper')
const pipeline = promisify(Stream.pipeline)
module.exports = class GcsPersistor extends AbstractPersistor {
constructor(settings) {
super()

View file

@ -1,11 +1,9 @@
const AbstractPersistor = require('./AbstractPersistor')
const Logger = require('@overleaf/logger')
const Stream = require('stream')
const { promisify } = require('util')
const { pipeline } = require('stream/promises')
const { NotFoundError, WriteError } = require('./Errors')
const pipeline = promisify(Stream.pipeline)
// Persistor that wraps two other persistors. Talks to the 'primary' by default,
// but will fall back to an older persistor in the case of a not-found error.
// If `Settings.fallback.copyOnMiss` is set, this will copy files from the fallback

View file

@ -1,10 +1,8 @@
const Crypto = require('crypto')
const Stream = require('stream')
const { pipeline } = require('stream/promises')
const Logger = require('@overleaf/logger')
const { WriteError, ReadError, NotFoundError } = require('./Errors')
const { promisify } = require('util')
const pipeline = promisify(Stream.pipeline)
// Observes data that passes through and computes some metadata for it
// - specifically, it computes the number of bytes transferred, and optionally

View file

@ -5,7 +5,7 @@ const SandboxedModule = require('sandboxed-module')
const Errors = require('../../src/Errors')
const StreamModule = require('stream')
const modulePath = '../../src/FSPersistor.js'
const MODULE_PATH = '../../src/FSPersistor.js'
describe('FSPersistorTests', function () {
const stat = { size: 4, isFile: sinon.stub().returns(true) }
@ -19,7 +19,17 @@ describe('FSPersistorTests', function () {
const files = ['animals/wombat.tex', 'vegetables/potato.tex']
const globs = [`${location}/${files[0]}`, `${location}/${files[1]}`]
const filteredFilenames = ['animals_wombat.tex', 'vegetables_potato.tex']
let fs, stream, FSPersistor, glob, readStream, crypto, Hash, uuid, tempFile
let fs,
fsPromises,
Stream,
StreamPromises,
FSPersistor,
glob,
readStream,
crypto,
Hash,
uuid,
tempFile
beforeEach(function () {
const randomNumber = Math.random().toString()
@ -35,15 +45,19 @@ describe('FSPersistorTests', function () {
fs = {
createReadStream: sinon.stub().returns(readStream),
createWriteStream: sinon.stub().returns(writeStream),
unlink: sinon.stub().yields(),
open: sinon.stub().yields(null, fd),
stat: sinon.stub().yields(null, stat),
}
fsPromises = {
unlink: sinon.stub().resolves(),
open: sinon.stub().resolves(fd),
stat: sinon.stub().resolves(stat),
}
glob = sinon.stub().yields(null, globs)
stream = {
pipeline: sinon.stub().yields(),
Stream = {
Transform: StreamModule.Transform,
}
StreamPromises = {
pipeline: sinon.stub().resolves(),
}
Hash = {
end: sinon.stub(),
read: sinon.stub().returns(md5),
@ -53,12 +67,14 @@ describe('FSPersistorTests', function () {
crypto = {
createHash: sinon.stub().returns(Hash),
}
FSPersistor = new (SandboxedModule.require(modulePath, {
FSPersistor = new (SandboxedModule.require(MODULE_PATH, {
requires: {
'./Errors': Errors,
fs,
'fs/promises': fsPromises,
glob,
stream,
stream: Stream,
'stream/promises': StreamPromises,
crypto,
'node-uuid': uuid,
// imported by PersistorHelper but otherwise unused here
@ -76,11 +92,14 @@ describe('FSPersistorTests', function () {
expect(fs.createWriteStream).to.have.been.calledWith(
`${location}/${filteredFilenames[0]}`
)
expect(stream.pipeline).to.have.been.calledWith(readStream, writeStream)
expect(StreamPromises.pipeline).to.have.been.calledWith(
readStream,
writeStream
)
})
it('should return an error if the file cannot be stored', async function () {
stream.pipeline.yields(error)
StreamPromises.pipeline.rejects(error)
await expect(
FSPersistor.sendFile(location, files[0], localFilesystemPath)
).to.eventually.be.rejected.and.have.property('cause', error)
@ -90,16 +109,19 @@ describe('FSPersistorTests', function () {
describe('sendStream', function () {
it('should write the stream to disk', async function () {
await FSPersistor.sendStream(location, files[0], remoteStream)
expect(stream.pipeline).to.have.been.calledWith(remoteStream, writeStream)
expect(StreamPromises.pipeline).to.have.been.calledWith(
remoteStream,
writeStream
)
})
it('should delete the temporary file', async function () {
await FSPersistor.sendStream(location, files[0], remoteStream)
expect(fs.unlink).to.have.been.calledWith(tempFile)
expect(fsPromises.unlink).to.have.been.calledWith(tempFile)
})
it('should wrap the error from the filesystem', async function () {
stream.pipeline.yields(error)
StreamPromises.pipeline.rejects(error)
await expect(FSPersistor.sendStream(location, files[0], remoteStream))
.to.eventually.be.rejected.and.be.instanceOf(Errors.WriteError)
.and.have.property('cause', error)
@ -127,7 +149,7 @@ describe('FSPersistorTests', function () {
sourceMd5: '00000000',
})
} catch (_) {}
expect(fs.unlink).to.have.been.calledWith(
expect(fsPromises.unlink).to.have.been.calledWith(
`${location}/${filteredFilenames[0]}`
)
})
@ -137,7 +159,7 @@ describe('FSPersistorTests', function () {
describe('getObjectStream', function () {
it('should use correct file location', async function () {
await FSPersistor.getObjectStream(location, files[0], {})
expect(fs.open).to.have.been.calledWith(
expect(fsPromises.open).to.have.been.calledWith(
`${location}/${filteredFilenames[0]}`
)
})
@ -157,7 +179,7 @@ describe('FSPersistorTests', function () {
it('should give a NotFoundError if the file does not exist', async function () {
const err = new Error()
err.code = 'ENOENT'
fs.open.yields(err)
fsPromises.open.rejects(err)
await expect(FSPersistor.getObjectStream(location, files[0], {}))
.to.eventually.be.rejected.and.be.an.instanceOf(Errors.NotFoundError)
@ -165,7 +187,7 @@ describe('FSPersistorTests', function () {
})
it('should wrap any other error', async function () {
fs.open.yields(error)
fsPromises.open.rejects(error)
await expect(FSPersistor.getObjectStream(location, files[0], {}))
.to.eventually.be.rejectedWith('failed to open file for streaming')
.and.be.an.instanceOf(Errors.ReadError)
@ -180,12 +202,12 @@ describe('FSPersistorTests', function () {
noentError.code = 'ENOENT'
beforeEach(function () {
fs.stat
.yields(error)
fsPromises.stat
.rejects(error)
.withArgs(`${location}/${filteredFilenames[0]}`)
.yields(null, { size })
.resolves({ size })
.withArgs(`${location}/${badFilename}`)
.yields(noentError)
.rejects(noentError)
})
it('should return the file size', async function () {
@ -222,20 +244,23 @@ describe('FSPersistorTests', function () {
it('Should pipe the source to the target', async function () {
await FSPersistor.copyObject(location, files[0], files[1])
expect(stream.pipeline).to.have.been.calledWith(readStream, writeStream)
expect(StreamPromises.pipeline).to.have.been.calledWith(
readStream,
writeStream
)
})
})
describe('deleteObject', function () {
it('Should call unlink with correct options', async function () {
await FSPersistor.deleteObject(location, files[0])
expect(fs.unlink).to.have.been.calledWith(
expect(fsPromises.unlink).to.have.been.calledWith(
`${location}/${filteredFilenames[0]}`
)
})
it('Should propagate the error', async function () {
fs.unlink.yields(error)
fsPromises.unlink.rejects(error)
await expect(
FSPersistor.deleteObject(location, files[0])
).to.eventually.be.rejected.and.have.property('cause', error)
@ -253,7 +278,7 @@ describe('FSPersistorTests', function () {
it('Should call unlink on the returned files', async function () {
await FSPersistor.deleteDirectory(location, files[0])
for (const filename of globs) {
expect(fs.unlink).to.have.been.calledWith(filename)
expect(fsPromises.unlink).to.have.been.calledWith(filename)
}
})
@ -271,17 +296,17 @@ describe('FSPersistorTests', function () {
noentError.code = 'ENOENT'
beforeEach(function () {
fs.stat
.yields(error)
fsPromises.stat
.rejects(error)
.withArgs(`${location}/${filteredFilenames[0]}`)
.yields(null, {})
.resolves({})
.withArgs(`${location}/${badFilename}`)
.yields(noentError)
.rejects(noentError)
})
it('Should call stat with correct options', async function () {
await FSPersistor.checkIfObjectExists(location, files[0])
expect(fs.stat).to.have.been.calledWith(
expect(fsPromises.stat).to.have.been.calledWith(
`${location}/${filteredFilenames[0]}`
)
})

View file

@ -26,6 +26,7 @@ describe('GcsPersistorTests', function () {
GcsNotFoundError,
ReadStream,
Stream,
StreamPromises,
GcsBucket,
GcsFile,
GcsPersistor,
@ -78,10 +79,13 @@ describe('GcsPersistorTests', function () {
}
Stream = {
pipeline: sinon.stub().yields(),
Transform,
}
StreamPromises = {
pipeline: sinon.stub().resolves(),
}
GcsFile = {
delete: sinon.stub().resolves(),
createReadStream: sinon.stub().returns(ReadStream),
@ -136,6 +140,7 @@ describe('GcsPersistorTests', function () {
'./Errors': Errors,
fs: Fs,
stream: Stream,
'stream/promises': StreamPromises,
crypto,
},
globals: { console, Buffer },
@ -373,7 +378,7 @@ describe('GcsPersistorTests', function () {
})
it('should meter the stream and pass it to GCS', function () {
expect(Stream.pipeline).to.have.been.calledWith(
expect(StreamPromises.pipeline).to.have.been.calledWith(
ReadStream,
sinon.match.instanceOf(Transform),
WriteStream
@ -433,14 +438,9 @@ describe('GcsPersistorTests', function () {
describe('when the upload fails', function () {
let error
beforeEach(async function () {
Stream.pipeline
.withArgs(
ReadStream,
sinon.match.instanceOf(Transform),
WriteStream,
sinon.match.any
)
.yields(genericError)
StreamPromises.pipeline
.withArgs(ReadStream, sinon.match.instanceOf(Transform), WriteStream)
.rejects(genericError)
try {
await GcsPersistor.sendStream(bucket, key, ReadStream)
} catch (err) {
@ -475,7 +475,7 @@ describe('GcsPersistorTests', function () {
})
it('should upload the stream via the meter', function () {
expect(Stream.pipeline).to.have.been.calledWith(
expect(StreamPromises.pipeline).to.have.been.calledWith(
ReadStream,
sinon.match.instanceOf(Transform),
WriteStream

View file

@ -23,7 +23,13 @@ describe('MigrationPersistorTests', function () {
const size = 33
const md5 = 'ffffffff'
let Settings, Logger, Stream, MigrationPersistor, fileStream, newPersistor
let Settings,
Logger,
Stream,
StreamPromises,
MigrationPersistor,
fileStream,
newPersistor
beforeEach(function () {
fileStream = {
@ -64,10 +70,13 @@ describe('MigrationPersistorTests', function () {
}
Stream = {
pipeline: sinon.stub().yields(),
PassThrough: sinon.stub(),
}
StreamPromises = {
pipeline: sinon.stub().resolves(),
}
Logger = {
warn: sinon.stub(),
}
@ -75,6 +84,7 @@ describe('MigrationPersistorTests', function () {
MigrationPersistor = SandboxedModule.require(modulePath, {
requires: {
stream: Stream,
'stream/promises': StreamPromises,
'./Errors': Errors,
'@overleaf/logger': Logger,
},

View file

@ -1,8 +1,9 @@
const chai = require('chai')
const { expect } = chai
const SandboxedModule = require('sandboxed-module')
const StreamPromises = require('stream/promises')
const modulePath = '../../src/PersistorFactory.js'
const MODULE_PATH = '../../src/PersistorFactory.js'
describe('PersistorManager', function () {
let PersistorFactory, FSPersistor, S3Persistor, Settings, GcsPersistor
@ -33,8 +34,9 @@ describe('PersistorManager', function () {
info() {},
err() {},
},
'stream/promises': StreamPromises,
}
PersistorFactory = SandboxedModule.require(modulePath, { requires })
PersistorFactory = SandboxedModule.require(MODULE_PATH, { requires })
})
it('should implement the S3 wrapped method when S3 is configured', function () {

View file

@ -1,11 +1,11 @@
const sinon = require('sinon')
const chai = require('chai')
const { expect } = chai
const modulePath = '../../src/S3Persistor.js'
const SandboxedModule = require('sandboxed-module')
const Errors = require('../../src/Errors')
const MODULE_PATH = '../../src/S3Persistor.js'
describe('S3PersistorTests', function () {
const defaultS3Key = 'frog'
const defaultS3Secret = 'prince'
@ -35,6 +35,7 @@ describe('S3PersistorTests', function () {
Fs,
ReadStream,
Stream,
StreamPromises,
S3Persistor,
S3Client,
S3ReadStream,
@ -65,10 +66,13 @@ describe('S3PersistorTests', function () {
}
Stream = {
pipeline: sinon.stub().yields(),
Transform,
}
StreamPromises = {
pipeline: sinon.stub().resolves(),
}
EmptyPromise = {
promise: sinon.stub().resolves(),
}
@ -145,13 +149,14 @@ describe('S3PersistorTests', function () {
warn: sinon.stub(),
}
S3Persistor = new (SandboxedModule.require(modulePath, {
S3Persistor = new (SandboxedModule.require(MODULE_PATH, {
requires: {
'aws-sdk/clients/s3': S3,
'@overleaf/logger': Logger,
'./Errors': Errors,
fs: Fs,
stream: Stream,
'stream/promises': StreamPromises,
crypto,
},
globals: { console, Buffer },