mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
Merge pull request #21880 from overleaf/jpa-history-store
[history-v1] make HistoryStore generic and add backupHistoryStore GitOrigin-RevId: 65d275de182dbcf5d4b6bf3c610b71b58db68e70
This commit is contained in:
parent
9cc6f2a9d5
commit
dd15ade2b9
6 changed files with 143 additions and 160 deletions
|
@ -109,7 +109,7 @@ class History {
|
|||
* @param {BlobStore} blobStore
|
||||
* @param {number} [concurrency] applies separately to files, changes and
|
||||
* operations
|
||||
* @return {Promise.<Object>}
|
||||
* @return {Promise<import('overleaf-editor-core/lib/types').RawHistory>}
|
||||
*/
|
||||
async store(blobStore, concurrency) {
|
||||
assert.maybe.number(concurrency, 'bad concurrency')
|
||||
|
|
|
@ -2,7 +2,7 @@ exports.BatchBlobStore = require('./lib/batch_blob_store')
|
|||
exports.blobHash = require('./lib/blob_hash')
|
||||
exports.HashCheckBlobStore = require('./lib/hash_check_blob_store')
|
||||
exports.chunkStore = require('./lib/chunk_store')
|
||||
exports.historyStore = require('./lib/history_store')
|
||||
exports.historyStore = require('./lib/history_store').historyStore
|
||||
exports.knex = require('./lib/knex')
|
||||
exports.mongodb = require('./lib/mongodb')
|
||||
exports.persistChanges = require('./lib/persist_changes')
|
||||
|
|
|
@ -8,6 +8,7 @@ import {
|
|||
PerProjectEncryptedS3Persistor,
|
||||
RootKeyEncryptionKey,
|
||||
} from '@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js'
|
||||
import { HistoryStore } from './history_store.js'
|
||||
|
||||
const persistorConfig = _.cloneDeep(config.get('backupPersistor'))
|
||||
const { chunksBucket, deksBucket, globalBlobsBucket, projectBlobsBucket } =
|
||||
|
@ -105,3 +106,8 @@ export const backupPersistor = new PerProjectEncryptedS3Persistor({
|
|||
[projectBlobsBucket]: persistorConfig.tieringStorageClass,
|
||||
},
|
||||
})
|
||||
|
||||
export const backupHistoryStore = new HistoryStore(
|
||||
backupPersistor,
|
||||
chunksBucket
|
||||
)
|
||||
|
|
|
@ -27,7 +27,7 @@ const { Chunk, History, Snapshot } = require('overleaf-editor-core')
|
|||
const assert = require('../assert')
|
||||
const BatchBlobStore = require('../batch_blob_store')
|
||||
const { BlobStore } = require('../blob_store')
|
||||
const historyStore = require('../history_store')
|
||||
const { historyStore } = require('../history_store')
|
||||
const mongoBackend = require('./mongo')
|
||||
const postgresBackend = require('./postgres')
|
||||
const { ChunkVersionConflictError } = require('./errors')
|
||||
|
@ -81,7 +81,7 @@ async function lazyLoadHistoryFiles(history, batchBlobStore) {
|
|||
/**
|
||||
* Load the latest Chunk stored for a project, including blob metadata.
|
||||
*
|
||||
* @param {number|string} projectId
|
||||
* @param {string} projectId
|
||||
* @return {Promise.<Chunk>}
|
||||
*/
|
||||
async function loadLatest(projectId) {
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
// @ts-check
|
||||
'use strict'
|
||||
|
||||
const BPromise = require('bluebird')
|
||||
const core = require('overleaf-editor-core')
|
||||
|
||||
const config = require('config')
|
||||
const path = require('node:path')
|
||||
const Stream = require('node:stream')
|
||||
const { promisify } = require('node:util')
|
||||
const zlib = require('node:zlib')
|
||||
|
||||
const OError = require('@overleaf/o-error')
|
||||
const objectPersistor = require('@overleaf/object-persistor')
|
||||
|
@ -17,26 +20,47 @@ const streams = require('./streams')
|
|||
|
||||
const Chunk = core.Chunk
|
||||
|
||||
const BUCKET = config.get('chunkStore.bucket')
|
||||
const gzip = promisify(zlib.gzip)
|
||||
|
||||
class LoadError extends OError {
|
||||
constructor(projectId, chunkId) {
|
||||
super('HistoryStore: failed to load chunk history', { projectId, chunkId })
|
||||
/**
|
||||
* @param {number|string} projectId
|
||||
* @param {number|string} chunkId
|
||||
* @param {any} cause
|
||||
*/
|
||||
constructor(projectId, chunkId, cause) {
|
||||
super(
|
||||
'HistoryStore: failed to load chunk history',
|
||||
{ projectId, chunkId },
|
||||
cause
|
||||
)
|
||||
this.projectId = projectId
|
||||
this.chunkId = chunkId
|
||||
}
|
||||
}
|
||||
HistoryStore.LoadError = LoadError
|
||||
|
||||
class StoreError extends OError {
|
||||
constructor(projectId, chunkId) {
|
||||
super('HistoryStore: failed to store chunk history', { projectId, chunkId })
|
||||
/**
|
||||
* @param {number|string} projectId
|
||||
* @param {number|string} chunkId
|
||||
* @param {any} cause
|
||||
*/
|
||||
constructor(projectId, chunkId, cause) {
|
||||
super(
|
||||
'HistoryStore: failed to store chunk history',
|
||||
{ projectId, chunkId },
|
||||
cause
|
||||
)
|
||||
this.projectId = projectId
|
||||
this.chunkId = chunkId
|
||||
}
|
||||
}
|
||||
HistoryStore.StoreError = StoreError
|
||||
|
||||
/**
|
||||
* @param {number|string} projectId
|
||||
* @param {number|string} chunkId
|
||||
* @return {string}
|
||||
*/
|
||||
function getKey(projectId, chunkId) {
|
||||
return path.join(projectKey.format(projectId), projectKey.pad(chunkId))
|
||||
}
|
||||
|
@ -53,86 +77,99 @@ function getKey(projectId, chunkId) {
|
|||
*
|
||||
* @class
|
||||
*/
|
||||
function HistoryStore() {}
|
||||
class HistoryStore {
|
||||
#persistor
|
||||
#bucket
|
||||
constructor(persistor, bucket) {
|
||||
this.#persistor = persistor
|
||||
this.#bucket = bucket
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the raw object for a History.
|
||||
*
|
||||
* @param {number} projectId
|
||||
* @param {number} chunkId
|
||||
* @return {Promise.<Object>}
|
||||
*/
|
||||
HistoryStore.prototype.loadRaw = function historyStoreLoadRaw(
|
||||
projectId,
|
||||
chunkId
|
||||
) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
assert.chunkId(chunkId, 'bad chunkId')
|
||||
/**
|
||||
* Load the raw object for a History.
|
||||
*
|
||||
* @param {number|string} projectId
|
||||
* @param {number|string} chunkId
|
||||
* @return {Promise<import('overleaf-editor-core/lib/types').RawHistory>}
|
||||
*/
|
||||
async loadRaw(projectId, chunkId) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
assert.chunkId(chunkId, 'bad chunkId')
|
||||
|
||||
const key = getKey(projectId, chunkId)
|
||||
const key = getKey(projectId, chunkId)
|
||||
|
||||
logger.debug({ projectId, chunkId }, 'loadRaw started')
|
||||
return BPromise.resolve()
|
||||
.then(() => persistor.getObjectStream(BUCKET, key))
|
||||
.then(streams.gunzipStreamToBuffer)
|
||||
.then(buffer => JSON.parse(buffer))
|
||||
.catch(err => {
|
||||
logger.debug({ projectId, chunkId }, 'loadRaw started')
|
||||
try {
|
||||
const buf = await streams.gunzipStreamToBuffer(
|
||||
await this.#persistor.getObjectStream(this.#bucket, key)
|
||||
)
|
||||
return JSON.parse(buf.toString('utf-8'))
|
||||
} catch (err) {
|
||||
if (err instanceof objectPersistor.Errors.NotFoundError) {
|
||||
throw new Chunk.NotPersistedError(projectId)
|
||||
}
|
||||
throw new HistoryStore.LoadError(projectId, chunkId).withCause(err)
|
||||
})
|
||||
.finally(() => logger.debug({ projectId, chunkId }, 'loadRaw finished'))
|
||||
throw new LoadError(projectId, chunkId, err)
|
||||
} finally {
|
||||
logger.debug({ projectId, chunkId }, 'loadRaw finished')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compress and store a {@link History}.
|
||||
*
|
||||
* @param {number|string} projectId
|
||||
* @param {number|string} chunkId
|
||||
* @param {import('overleaf-editor-core/lib/types').RawHistory} rawHistory
|
||||
*/
|
||||
async storeRaw(projectId, chunkId, rawHistory) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
assert.chunkId(chunkId, 'bad chunkId')
|
||||
assert.object(rawHistory, 'bad rawHistory')
|
||||
|
||||
const key = getKey(projectId, chunkId)
|
||||
|
||||
logger.debug({ projectId, chunkId }, 'storeRaw started')
|
||||
|
||||
const buf = await gzip(JSON.stringify(rawHistory))
|
||||
try {
|
||||
await this.#persistor.sendStream(
|
||||
this.#bucket,
|
||||
key,
|
||||
Stream.Readable.from([buf]),
|
||||
{
|
||||
contentType: 'application/json',
|
||||
contentEncoding: 'gzip',
|
||||
contentLength: buf.byteLength,
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
throw new StoreError(projectId, chunkId, err)
|
||||
} finally {
|
||||
logger.debug({ projectId, chunkId }, 'storeRaw finished')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete multiple chunks from bucket. Expects an Array of objects with
|
||||
* projectId and chunkId properties
|
||||
* @param {Array<{projectId: string,chunkId:string}>} chunks
|
||||
*/
|
||||
async deleteChunks(chunks) {
|
||||
logger.debug({ chunks }, 'deleteChunks started')
|
||||
try {
|
||||
await Promise.all(
|
||||
chunks.map(chunk => {
|
||||
const key = getKey(chunk.projectId, chunk.chunkId)
|
||||
return this.#persistor.deleteObject(this.#bucket, key)
|
||||
})
|
||||
)
|
||||
} finally {
|
||||
logger.debug({ chunks }, 'deleteChunks finished')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compress and store a {@link History}.
|
||||
*
|
||||
* @param {number} projectId
|
||||
* @param {number} chunkId
|
||||
* @param {Object} rawHistory
|
||||
* @return {Promise}
|
||||
*/
|
||||
HistoryStore.prototype.storeRaw = function historyStoreStoreRaw(
|
||||
projectId,
|
||||
chunkId,
|
||||
rawHistory
|
||||
) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
assert.chunkId(chunkId, 'bad chunkId')
|
||||
assert.object(rawHistory, 'bad rawHistory')
|
||||
|
||||
const key = getKey(projectId, chunkId)
|
||||
|
||||
logger.debug({ projectId, chunkId }, 'storeRaw started')
|
||||
return BPromise.resolve()
|
||||
.then(() => streams.gzipStringToStream(JSON.stringify(rawHistory)))
|
||||
.then(stream =>
|
||||
persistor.sendStream(BUCKET, key, stream, {
|
||||
contentType: 'application/json',
|
||||
contentEncoding: 'gzip',
|
||||
})
|
||||
)
|
||||
.catch(err => {
|
||||
throw new HistoryStore.StoreError(projectId, chunkId).withCause(err)
|
||||
})
|
||||
.finally(() => logger.debug({ projectId, chunkId }, 'storeRaw finished'))
|
||||
module.exports = {
|
||||
HistoryStore,
|
||||
historyStore: new HistoryStore(persistor, config.get('chunkStore.bucket')),
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete multiple chunks from bucket. Expects an Array of objects with
|
||||
* projectId and chunkId properties
|
||||
* @param {Array} chunks
|
||||
* @return {Promise}
|
||||
*/
|
||||
HistoryStore.prototype.deleteChunks = function historyDeleteChunks(chunks) {
|
||||
logger.debug({ chunks }, 'deleteChunks started')
|
||||
return BPromise.all(
|
||||
chunks.map(chunk => {
|
||||
const key = getKey(chunk.projectId, chunk.chunkId)
|
||||
return persistor.deleteObject(BUCKET, key)
|
||||
})
|
||||
).finally(() => logger.debug({ chunks }, 'deleteChunks finished'))
|
||||
}
|
||||
|
||||
module.exports = new HistoryStore()
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
// @ts-check
|
||||
/**
|
||||
* Promises are promises and streams are streams, and ne'er the twain shall
|
||||
* meet.
|
||||
|
@ -5,52 +6,20 @@
|
|||
*/
|
||||
'use strict'
|
||||
|
||||
const BPromise = require('bluebird')
|
||||
const Stream = require('node:stream')
|
||||
const zlib = require('node:zlib')
|
||||
const { WritableBuffer, ReadableString } = require('@overleaf/stream-utils')
|
||||
const { pipeline } = require('node:stream')
|
||||
|
||||
/**
|
||||
* Pipe a read stream to a write stream. The promise resolves when the write
|
||||
* stream finishes.
|
||||
*
|
||||
* @function
|
||||
* @param {stream.Readable} readStream
|
||||
* @param {stream.Writable} writeStream
|
||||
* @return {Promise}
|
||||
*/
|
||||
function promisePipe(readStream, writeStream) {
|
||||
return new BPromise(function (resolve, reject) {
|
||||
pipeline(readStream, writeStream, function (err) {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
exports.promisePipe = promisePipe
|
||||
const { WritableBuffer } = require('@overleaf/stream-utils')
|
||||
|
||||
/**
|
||||
* Create a promise for the result of reading a stream to a buffer.
|
||||
*
|
||||
* @function
|
||||
* @param {stream.Readable} readStream
|
||||
* @return {Promise.<Buffer>}
|
||||
* @param {Stream.Readable} readStream
|
||||
* @return {Promise<Buffer>}
|
||||
*/
|
||||
function readStreamToBuffer(readStream) {
|
||||
return new BPromise(function (resolve, reject) {
|
||||
const bufferStream = new WritableBuffer()
|
||||
pipeline(readStream, bufferStream, function (err) {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve(bufferStream.contents())
|
||||
}
|
||||
})
|
||||
})
|
||||
async function readStreamToBuffer(readStream) {
|
||||
const bufferStream = new WritableBuffer()
|
||||
await Stream.promises.pipeline(readStream, bufferStream)
|
||||
return bufferStream.contents()
|
||||
}
|
||||
|
||||
exports.readStreamToBuffer = readStreamToBuffer
|
||||
|
@ -58,43 +27,14 @@ exports.readStreamToBuffer = readStreamToBuffer
|
|||
/**
|
||||
* Create a promise for the result of un-gzipping a stream to a buffer.
|
||||
*
|
||||
* @function
|
||||
* @param {stream.Readable} readStream
|
||||
* @return {Promise.<Buffer>}
|
||||
* @param {NodeJS.ReadableStream} readStream
|
||||
* @return {Promise<Buffer>}
|
||||
*/
|
||||
function gunzipStreamToBuffer(readStream) {
|
||||
async function gunzipStreamToBuffer(readStream) {
|
||||
const gunzip = zlib.createGunzip()
|
||||
const bufferStream = new WritableBuffer()
|
||||
return new BPromise(function (resolve, reject) {
|
||||
pipeline(readStream, gunzip, bufferStream, function (err) {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve(bufferStream.contents())
|
||||
}
|
||||
})
|
||||
})
|
||||
await Stream.promises.pipeline(readStream, gunzip, bufferStream)
|
||||
return bufferStream.contents()
|
||||
}
|
||||
|
||||
exports.gunzipStreamToBuffer = gunzipStreamToBuffer
|
||||
|
||||
/**
|
||||
* Create a write stream that gzips the given string.
|
||||
*
|
||||
* @function
|
||||
* @param {string} string
|
||||
* @return {Promise.<stream.Readable>}
|
||||
*/
|
||||
function gzipStringToStream(string) {
|
||||
return new BPromise(function (resolve, reject) {
|
||||
zlib.gzip(Buffer.from(string), function (error, result) {
|
||||
if (error) {
|
||||
reject(error)
|
||||
} else {
|
||||
resolve(new ReadableString(result))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
exports.gzipStringToStream = gzipStringToStream
|
||||
|
|
Loading…
Reference in a new issue