Merge pull request #12904 from overleaf/bg-history-v1-streams-for-node-18

use pipeline for stream operations in history-v1

GitOrigin-RevId: 301a78c2c264d4951ab23054067d6be381778fcf
This commit is contained in:
Brian Gough 2023-05-09 10:10:28 +01:00 committed by Copybot
parent 74a8d6111a
commit 333b54c237
3 changed files with 59 additions and 40 deletions

View file

@ -4,6 +4,7 @@
const BPromise = require('bluebird')
const fs = BPromise.promisifyAll(require('fs'))
const crypto = require('crypto')
const { pipeline } = require('stream')
const assert = require('./assert')
function getGitBlobHeader(byteLength) {
@ -34,12 +35,14 @@ exports.fromStream = BPromise.method(function blobHashFromStream(
const hash = getBlobHash(byteLength)
return new BPromise(function (resolve, reject) {
stream.on('end', function () {
hash.end()
resolve(hash.read())
pipeline(stream, hash, function (err) {
if (err) {
reject(err)
} else {
hash.end()
resolve(hash.read())
}
})
stream.on('error', reject)
stream.pipe(hash)
})
})

View file

@ -3,6 +3,7 @@
const Archive = require('archiver')
const BPromise = require('bluebird')
const fs = require('fs')
const { pipeline } = require('stream')
const core = require('overleaf-editor-core')
const Snapshot = core.Snapshot
@ -104,12 +105,14 @@ ProjectArchive.prototype.writeZip = function projectArchiveToZip(
})
const streamArchiveToFile = new BPromise(function (resolve, reject) {
archive.on('error', reject)
const stream = fs.createWriteStream(zipFilePath)
stream.on('error', reject)
stream.on('finish', resolve)
archive.pipe(stream)
pipeline(archive, stream, function (err) {
if (err) {
reject(err)
} else {
resolve()
}
})
})
return BPromise.join(streamArchiveToFile, addFilesToArchiveAndFinalize)

View file

@ -8,17 +8,18 @@
const BPromise = require('bluebird')
const zlib = require('zlib')
const stringToStream = require('string-to-stream')
function promiseWriteStreamFinish(writeStream) {
return new BPromise(function (resolve, reject) {
writeStream.on('finish', resolve)
writeStream.on('error', reject)
})
}
const { pipeline, Writable } = require('stream')
function promisePipe(readStream, writeStream) {
readStream.pipe(writeStream)
return promiseWriteStreamFinish(writeStream)
return new BPromise(function (resolve, reject) {
pipeline(readStream, writeStream, function (err) {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}
/**
@ -32,22 +33,36 @@ function promisePipe(readStream, writeStream) {
*/
exports.promisePipe = promisePipe
class WritableBuffer extends Writable {
constructor(options) {
super(options)
this.buffers = []
}
_write(chunk, encoding, callback) {
this.buffers.push(chunk)
callback()
}
_final(callback) {
callback()
}
contents() {
return Buffer.concat(this.buffers)
}
}
function readStreamToBuffer(readStream) {
return new BPromise(function (resolve, reject) {
const buffers = []
readStream.on('readable', function () {
while (true) {
const buffer = this.read()
if (!buffer) {
break
}
buffers.push(buffer)
const bufferStream = new WritableBuffer()
pipeline(readStream, bufferStream, function (err) {
if (err) {
reject(err)
} else {
resolve(bufferStream.contents())
}
})
readStream.on('end', function () {
resolve(Buffer.concat(buffers))
})
readStream.on('error', reject)
})
}
@ -62,17 +77,15 @@ exports.readStreamToBuffer = readStreamToBuffer
function gunzipStreamToBuffer(readStream) {
const gunzip = zlib.createGunzip()
const gunzipStream = readStream.pipe(gunzip)
const bufferStream = new WritableBuffer()
return new BPromise(function (resolve, reject) {
const buffers = []
gunzipStream.on('data', function (buffer) {
buffers.push(buffer)
pipeline(readStream, gunzip, bufferStream, function (err) {
if (err) {
reject(err)
} else {
resolve(bufferStream.contents())
}
})
gunzipStream.on('end', function () {
resolve(Buffer.concat(buffers))
})
readStream.on('error', reject)
gunzipStream.on('error', reject)
})
}