From dd15ade2b90ddc4f9bb74e562e6af53820716961 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Fri, 15 Nov 2024 15:53:27 +0100 Subject: [PATCH] Merge pull request #21880 from overleaf/jpa-history-store [history-v1] make HistoryStore generic and add backupHistoryStore GitOrigin-RevId: 65d275de182dbcf5d4b6bf3c610b71b58db68e70 --- libraries/overleaf-editor-core/lib/history.js | 2 +- services/history-v1/storage/index.js | 2 +- .../storage/lib/backupPersistor.mjs | 6 + .../storage/lib/chunk_store/index.js | 4 +- .../history-v1/storage/lib/history_store.js | 201 +++++++++++------- services/history-v1/storage/lib/streams.js | 88 ++------ 6 files changed, 143 insertions(+), 160 deletions(-) diff --git a/libraries/overleaf-editor-core/lib/history.js b/libraries/overleaf-editor-core/lib/history.js index a2e222acd3..d9d1253f34 100644 --- a/libraries/overleaf-editor-core/lib/history.js +++ b/libraries/overleaf-editor-core/lib/history.js @@ -109,7 +109,7 @@ class History { * @param {BlobStore} blobStore * @param {number} [concurrency] applies separately to files, changes and * operations - * @return {Promise.} + * @return {Promise} */ async store(blobStore, concurrency) { assert.maybe.number(concurrency, 'bad concurrency') diff --git a/services/history-v1/storage/index.js b/services/history-v1/storage/index.js index a0fd471829..7fd1d589ea 100644 --- a/services/history-v1/storage/index.js +++ b/services/history-v1/storage/index.js @@ -2,7 +2,7 @@ exports.BatchBlobStore = require('./lib/batch_blob_store') exports.blobHash = require('./lib/blob_hash') exports.HashCheckBlobStore = require('./lib/hash_check_blob_store') exports.chunkStore = require('./lib/chunk_store') -exports.historyStore = require('./lib/history_store') +exports.historyStore = require('./lib/history_store').historyStore exports.knex = require('./lib/knex') exports.mongodb = require('./lib/mongodb') exports.persistChanges = require('./lib/persist_changes') diff --git a/services/history-v1/storage/lib/backupPersistor.mjs b/services/history-v1/storage/lib/backupPersistor.mjs index 959c914371..72ab9d45e3 100644 --- a/services/history-v1/storage/lib/backupPersistor.mjs +++ b/services/history-v1/storage/lib/backupPersistor.mjs @@ -8,6 +8,7 @@ import { PerProjectEncryptedS3Persistor, RootKeyEncryptionKey, } from '@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js' +import { HistoryStore } from './history_store.js' const persistorConfig = _.cloneDeep(config.get('backupPersistor')) const { chunksBucket, deksBucket, globalBlobsBucket, projectBlobsBucket } = @@ -105,3 +106,8 @@ export const backupPersistor = new PerProjectEncryptedS3Persistor({ [projectBlobsBucket]: persistorConfig.tieringStorageClass, }, }) + +export const backupHistoryStore = new HistoryStore( + backupPersistor, + chunksBucket +) diff --git a/services/history-v1/storage/lib/chunk_store/index.js b/services/history-v1/storage/lib/chunk_store/index.js index 806bb81f60..9ccf948820 100644 --- a/services/history-v1/storage/lib/chunk_store/index.js +++ b/services/history-v1/storage/lib/chunk_store/index.js @@ -27,7 +27,7 @@ const { Chunk, History, Snapshot } = require('overleaf-editor-core') const assert = require('../assert') const BatchBlobStore = require('../batch_blob_store') const { BlobStore } = require('../blob_store') -const historyStore = require('../history_store') +const { historyStore } = require('../history_store') const mongoBackend = require('./mongo') const postgresBackend = require('./postgres') const { ChunkVersionConflictError } = require('./errors') @@ -81,7 +81,7 @@ async function lazyLoadHistoryFiles(history, batchBlobStore) { /** * Load the latest Chunk stored for a project, including blob metadata. * - * @param {number|string} projectId + * @param {string} projectId * @return {Promise.} */ async function loadLatest(projectId) { diff --git a/services/history-v1/storage/lib/history_store.js b/services/history-v1/storage/lib/history_store.js index e9d6fe1043..58c46d92c0 100644 --- a/services/history-v1/storage/lib/history_store.js +++ b/services/history-v1/storage/lib/history_store.js @@ -1,10 +1,13 @@ +// @ts-check 'use strict' -const BPromise = require('bluebird') const core = require('overleaf-editor-core') const config = require('config') const path = require('node:path') +const Stream = require('node:stream') +const { promisify } = require('node:util') +const zlib = require('node:zlib') const OError = require('@overleaf/o-error') const objectPersistor = require('@overleaf/object-persistor') @@ -17,26 +20,47 @@ const streams = require('./streams') const Chunk = core.Chunk -const BUCKET = config.get('chunkStore.bucket') +const gzip = promisify(zlib.gzip) class LoadError extends OError { - constructor(projectId, chunkId) { - super('HistoryStore: failed to load chunk history', { projectId, chunkId }) + /** + * @param {number|string} projectId + * @param {number|string} chunkId + * @param {any} cause + */ + constructor(projectId, chunkId, cause) { + super( + 'HistoryStore: failed to load chunk history', + { projectId, chunkId }, + cause + ) this.projectId = projectId this.chunkId = chunkId } } -HistoryStore.LoadError = LoadError class StoreError extends OError { - constructor(projectId, chunkId) { - super('HistoryStore: failed to store chunk history', { projectId, chunkId }) + /** + * @param {number|string} projectId + * @param {number|string} chunkId + * @param {any} cause + */ + constructor(projectId, chunkId, cause) { + super( + 'HistoryStore: failed to store chunk history', + { projectId, chunkId }, + cause + ) this.projectId = projectId this.chunkId = chunkId } } -HistoryStore.StoreError = StoreError +/** + * @param {number|string} projectId + * @param {number|string} chunkId + * @return {string} + */ function getKey(projectId, chunkId) { return path.join(projectKey.format(projectId), projectKey.pad(chunkId)) } @@ -53,86 +77,99 @@ function getKey(projectId, chunkId) { * * @class */ -function HistoryStore() {} +class HistoryStore { + #persistor + #bucket + constructor(persistor, bucket) { + this.#persistor = persistor + this.#bucket = bucket + } -/** - * Load the raw object for a History. - * - * @param {number} projectId - * @param {number} chunkId - * @return {Promise.} - */ -HistoryStore.prototype.loadRaw = function historyStoreLoadRaw( - projectId, - chunkId -) { - assert.projectId(projectId, 'bad projectId') - assert.chunkId(chunkId, 'bad chunkId') + /** + * Load the raw object for a History. + * + * @param {number|string} projectId + * @param {number|string} chunkId + * @return {Promise} + */ + async loadRaw(projectId, chunkId) { + assert.projectId(projectId, 'bad projectId') + assert.chunkId(chunkId, 'bad chunkId') - const key = getKey(projectId, chunkId) + const key = getKey(projectId, chunkId) - logger.debug({ projectId, chunkId }, 'loadRaw started') - return BPromise.resolve() - .then(() => persistor.getObjectStream(BUCKET, key)) - .then(streams.gunzipStreamToBuffer) - .then(buffer => JSON.parse(buffer)) - .catch(err => { + logger.debug({ projectId, chunkId }, 'loadRaw started') + try { + const buf = await streams.gunzipStreamToBuffer( + await this.#persistor.getObjectStream(this.#bucket, key) + ) + return JSON.parse(buf.toString('utf-8')) + } catch (err) { if (err instanceof objectPersistor.Errors.NotFoundError) { throw new Chunk.NotPersistedError(projectId) } - throw new HistoryStore.LoadError(projectId, chunkId).withCause(err) - }) - .finally(() => logger.debug({ projectId, chunkId }, 'loadRaw finished')) + throw new LoadError(projectId, chunkId, err) + } finally { + logger.debug({ projectId, chunkId }, 'loadRaw finished') + } + } + + /** + * Compress and store a {@link History}. + * + * @param {number|string} projectId + * @param {number|string} chunkId + * @param {import('overleaf-editor-core/lib/types').RawHistory} rawHistory + */ + async storeRaw(projectId, chunkId, rawHistory) { + assert.projectId(projectId, 'bad projectId') + assert.chunkId(chunkId, 'bad chunkId') + assert.object(rawHistory, 'bad rawHistory') + + const key = getKey(projectId, chunkId) + + logger.debug({ projectId, chunkId }, 'storeRaw started') + + const buf = await gzip(JSON.stringify(rawHistory)) + try { + await this.#persistor.sendStream( + this.#bucket, + key, + Stream.Readable.from([buf]), + { + contentType: 'application/json', + contentEncoding: 'gzip', + contentLength: buf.byteLength, + } + ) + } catch (err) { + throw new StoreError(projectId, chunkId, err) + } finally { + logger.debug({ projectId, chunkId }, 'storeRaw finished') + } + } + + /** + * Delete multiple chunks from bucket. Expects an Array of objects with + * projectId and chunkId properties + * @param {Array<{projectId: string,chunkId:string}>} chunks + */ + async deleteChunks(chunks) { + logger.debug({ chunks }, 'deleteChunks started') + try { + await Promise.all( + chunks.map(chunk => { + const key = getKey(chunk.projectId, chunk.chunkId) + return this.#persistor.deleteObject(this.#bucket, key) + }) + ) + } finally { + logger.debug({ chunks }, 'deleteChunks finished') + } + } } -/** - * Compress and store a {@link History}. - * - * @param {number} projectId - * @param {number} chunkId - * @param {Object} rawHistory - * @return {Promise} - */ -HistoryStore.prototype.storeRaw = function historyStoreStoreRaw( - projectId, - chunkId, - rawHistory -) { - assert.projectId(projectId, 'bad projectId') - assert.chunkId(chunkId, 'bad chunkId') - assert.object(rawHistory, 'bad rawHistory') - - const key = getKey(projectId, chunkId) - - logger.debug({ projectId, chunkId }, 'storeRaw started') - return BPromise.resolve() - .then(() => streams.gzipStringToStream(JSON.stringify(rawHistory))) - .then(stream => - persistor.sendStream(BUCKET, key, stream, { - contentType: 'application/json', - contentEncoding: 'gzip', - }) - ) - .catch(err => { - throw new HistoryStore.StoreError(projectId, chunkId).withCause(err) - }) - .finally(() => logger.debug({ projectId, chunkId }, 'storeRaw finished')) +module.exports = { + HistoryStore, + historyStore: new HistoryStore(persistor, config.get('chunkStore.bucket')), } - -/** - * Delete multiple chunks from bucket. Expects an Array of objects with - * projectId and chunkId properties - * @param {Array} chunks - * @return {Promise} - */ -HistoryStore.prototype.deleteChunks = function historyDeleteChunks(chunks) { - logger.debug({ chunks }, 'deleteChunks started') - return BPromise.all( - chunks.map(chunk => { - const key = getKey(chunk.projectId, chunk.chunkId) - return persistor.deleteObject(BUCKET, key) - }) - ).finally(() => logger.debug({ chunks }, 'deleteChunks finished')) -} - -module.exports = new HistoryStore() diff --git a/services/history-v1/storage/lib/streams.js b/services/history-v1/storage/lib/streams.js index c30e2d0e09..e60e5aa725 100644 --- a/services/history-v1/storage/lib/streams.js +++ b/services/history-v1/storage/lib/streams.js @@ -1,3 +1,4 @@ +// @ts-check /** * Promises are promises and streams are streams, and ne'er the twain shall * meet. @@ -5,52 +6,20 @@ */ 'use strict' -const BPromise = require('bluebird') +const Stream = require('node:stream') const zlib = require('node:zlib') -const { WritableBuffer, ReadableString } = require('@overleaf/stream-utils') -const { pipeline } = require('node:stream') - -/** - * Pipe a read stream to a write stream. The promise resolves when the write - * stream finishes. - * - * @function - * @param {stream.Readable} readStream - * @param {stream.Writable} writeStream - * @return {Promise} - */ -function promisePipe(readStream, writeStream) { - return new BPromise(function (resolve, reject) { - pipeline(readStream, writeStream, function (err) { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) -} - -exports.promisePipe = promisePipe +const { WritableBuffer } = require('@overleaf/stream-utils') /** * Create a promise for the result of reading a stream to a buffer. * - * @function - * @param {stream.Readable} readStream - * @return {Promise.} + * @param {Stream.Readable} readStream + * @return {Promise} */ -function readStreamToBuffer(readStream) { - return new BPromise(function (resolve, reject) { - const bufferStream = new WritableBuffer() - pipeline(readStream, bufferStream, function (err) { - if (err) { - reject(err) - } else { - resolve(bufferStream.contents()) - } - }) - }) +async function readStreamToBuffer(readStream) { + const bufferStream = new WritableBuffer() + await Stream.promises.pipeline(readStream, bufferStream) + return bufferStream.contents() } exports.readStreamToBuffer = readStreamToBuffer @@ -58,43 +27,14 @@ exports.readStreamToBuffer = readStreamToBuffer /** * Create a promise for the result of un-gzipping a stream to a buffer. * - * @function - * @param {stream.Readable} readStream - * @return {Promise.} + * @param {NodeJS.ReadableStream} readStream + * @return {Promise} */ -function gunzipStreamToBuffer(readStream) { +async function gunzipStreamToBuffer(readStream) { const gunzip = zlib.createGunzip() const bufferStream = new WritableBuffer() - return new BPromise(function (resolve, reject) { - pipeline(readStream, gunzip, bufferStream, function (err) { - if (err) { - reject(err) - } else { - resolve(bufferStream.contents()) - } - }) - }) + await Stream.promises.pipeline(readStream, gunzip, bufferStream) + return bufferStream.contents() } exports.gunzipStreamToBuffer = gunzipStreamToBuffer - -/** - * Create a write stream that gzips the given string. - * - * @function - * @param {string} string - * @return {Promise.} - */ -function gzipStringToStream(string) { - return new BPromise(function (resolve, reject) { - zlib.gzip(Buffer.from(string), function (error, result) { - if (error) { - reject(error) - } else { - resolve(new ReadableString(result)) - } - }) - }) -} - -exports.gzipStringToStream = gzipStringToStream