mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-30 06:45:21 -05:00
f7275a6c4b
Upgrade logger and metrics in all services GitOrigin-RevId: 2baf63eeeab77fb3559cf763ddacfbf4b745cd0b
262 lines
6.9 KiB
JavaScript
262 lines
6.9 KiB
JavaScript
const { callbackify } = require('util')
|
|
const MongoManager = require('./MongoManager').promises
|
|
const Errors = require('./Errors')
|
|
const logger = require('@overleaf/logger')
|
|
const settings = require('@overleaf/settings')
|
|
const crypto = require('crypto')
|
|
const Streamifier = require('streamifier')
|
|
const RangeManager = require('./RangeManager')
|
|
const PersistorManager = require('./PersistorManager')
|
|
const pMap = require('p-map')
|
|
|
|
const PARALLEL_JOBS = settings.parallelArchiveJobs
|
|
const ARCHIVE_BATCH_SIZE = settings.archiveBatchSize
|
|
const UN_ARCHIVE_BATCH_SIZE = settings.unArchiveBatchSize
|
|
const DESTROY_BATCH_SIZE = settings.destroyBatchSize
|
|
const DESTROY_RETRY_COUNT = settings.destroyRetryCount
|
|
|
|
module.exports = {
|
|
archiveAllDocs: callbackify(archiveAllDocs),
|
|
archiveDocById: callbackify(archiveDocById),
|
|
archiveDoc: callbackify(archiveDoc),
|
|
unArchiveAllDocs: callbackify(unArchiveAllDocs),
|
|
unarchiveDoc: callbackify(unarchiveDoc),
|
|
destroyAllDocs: callbackify(destroyAllDocs),
|
|
destroyDoc: callbackify(destroyDoc),
|
|
getDoc: callbackify(getDoc),
|
|
promises: {
|
|
archiveAllDocs,
|
|
archiveDocById,
|
|
archiveDoc,
|
|
unArchiveAllDocs,
|
|
unarchiveDoc,
|
|
destroyAllDocs,
|
|
destroyDoc,
|
|
getDoc,
|
|
},
|
|
}
|
|
|
|
async function archiveAllDocs(projectId) {
|
|
while (true) {
|
|
const docs = await MongoManager.getNonArchivedProjectDocs(
|
|
projectId,
|
|
ARCHIVE_BATCH_SIZE
|
|
)
|
|
if (!docs || docs.length === 0) {
|
|
break
|
|
}
|
|
|
|
await pMap(docs, doc => archiveDoc(projectId, doc), {
|
|
concurrency: PARALLEL_JOBS,
|
|
})
|
|
}
|
|
}
|
|
|
|
async function archiveDocById(projectId, docId) {
|
|
const doc = await MongoManager.findDoc(projectId, docId, {
|
|
lines: true,
|
|
ranges: true,
|
|
rev: true,
|
|
inS3: true,
|
|
})
|
|
|
|
if (!doc) {
|
|
throw new Errors.NotFoundError(
|
|
`Cannot find doc ${docId} in project ${projectId}`
|
|
)
|
|
}
|
|
|
|
// TODO(das7pad): consider refactoring MongoManager.findDoc to take a query
|
|
if (doc.inS3) return
|
|
return archiveDoc(projectId, doc)
|
|
}
|
|
|
|
async function archiveDoc(projectId, doc) {
|
|
logger.log(
|
|
{ project_id: projectId, doc_id: doc._id },
|
|
'sending doc to persistor'
|
|
)
|
|
const key = `${projectId}/${doc._id}`
|
|
|
|
if (doc.lines == null) {
|
|
throw new Error('doc has no lines')
|
|
}
|
|
|
|
const json = JSON.stringify({
|
|
lines: doc.lines,
|
|
ranges: doc.ranges,
|
|
schema_v: 1,
|
|
})
|
|
|
|
// this should never happen, but protects against memory-corruption errors that
|
|
// have happened in the past
|
|
if (json.indexOf('\u0000') > -1) {
|
|
const error = new Error('null bytes detected')
|
|
logger.err({ err: error, doc }, error.message)
|
|
throw error
|
|
}
|
|
|
|
const md5 = crypto.createHash('md5').update(json).digest('hex')
|
|
const stream = Streamifier.createReadStream(json)
|
|
await PersistorManager.sendStream(settings.docstore.bucket, key, stream, {
|
|
sourceMd5: md5,
|
|
})
|
|
await MongoManager.markDocAsArchived(doc._id, doc.rev)
|
|
}
|
|
|
|
async function unArchiveAllDocs(projectId) {
|
|
while (true) {
|
|
let docs
|
|
if (settings.docstore.keepSoftDeletedDocsArchived) {
|
|
docs = await MongoManager.getNonDeletedArchivedProjectDocs(
|
|
projectId,
|
|
UN_ARCHIVE_BATCH_SIZE
|
|
)
|
|
} else {
|
|
docs = await MongoManager.getArchivedProjectDocs(
|
|
projectId,
|
|
UN_ARCHIVE_BATCH_SIZE
|
|
)
|
|
}
|
|
if (!docs || docs.length === 0) {
|
|
break
|
|
}
|
|
await pMap(docs, doc => unarchiveDoc(projectId, doc._id), {
|
|
concurrency: PARALLEL_JOBS,
|
|
})
|
|
}
|
|
}
|
|
|
|
// get the doc from the PersistorManager without storing it in mongo
|
|
async function getDoc(projectId, docId) {
|
|
const key = `${projectId}/${docId}`
|
|
const sourceMd5 = await PersistorManager.getObjectMd5Hash(
|
|
settings.docstore.bucket,
|
|
key
|
|
)
|
|
const stream = await PersistorManager.getObjectStream(
|
|
settings.docstore.bucket,
|
|
key
|
|
)
|
|
stream.resume()
|
|
const buffer = await _streamToBuffer(stream)
|
|
const md5 = crypto.createHash('md5').update(buffer).digest('hex')
|
|
if (sourceMd5 !== md5) {
|
|
throw new Errors.Md5MismatchError('md5 mismatch when downloading doc', {
|
|
key,
|
|
sourceMd5,
|
|
md5,
|
|
})
|
|
}
|
|
|
|
const json = buffer.toString()
|
|
const doc = JSON.parse(json)
|
|
|
|
const mongoDoc = {}
|
|
if (doc.schema_v === 1 && doc.lines != null) {
|
|
mongoDoc.lines = doc.lines
|
|
if (doc.ranges != null) {
|
|
mongoDoc.ranges = RangeManager.jsonRangesToMongo(doc.ranges)
|
|
}
|
|
} else if (Array.isArray(doc)) {
|
|
mongoDoc.lines = doc
|
|
} else {
|
|
throw new Error("I don't understand the doc format in s3")
|
|
}
|
|
|
|
return mongoDoc
|
|
}
|
|
|
|
// get the doc and unarchive it to mongo
|
|
async function unarchiveDoc(projectId, docId) {
|
|
logger.log(
|
|
{ project_id: projectId, doc_id: docId },
|
|
'getting doc from persistor'
|
|
)
|
|
const key = `${projectId}/${docId}`
|
|
const originalDoc = await MongoManager.findDoc(projectId, docId, { inS3: 1 })
|
|
if (!originalDoc.inS3) {
|
|
// return if it's not actually in S3 as there's nothing to do
|
|
return
|
|
}
|
|
let mongoDoc
|
|
try {
|
|
mongoDoc = await getDoc(projectId, docId)
|
|
} catch (err) {
|
|
// if we get a 404, we could be in a race and something else has unarchived the doc already
|
|
if (err instanceof Errors.NotFoundError) {
|
|
const doc = await MongoManager.findDoc(projectId, docId, { inS3: 1 })
|
|
if (!doc.inS3) {
|
|
// the doc has been archived while we were looking for it, so no error
|
|
return
|
|
}
|
|
}
|
|
throw err
|
|
}
|
|
await MongoManager.upsertIntoDocCollection(projectId, docId, mongoDoc)
|
|
await PersistorManager.deleteObject(settings.docstore.bucket, key)
|
|
}
|
|
|
|
async function destroyAllDocs(projectId) {
|
|
while (true) {
|
|
const docs = await MongoManager.getProjectsDocs(
|
|
projectId,
|
|
{ include_deleted: true, limit: DESTROY_BATCH_SIZE },
|
|
{ _id: 1 }
|
|
)
|
|
if (!docs || docs.length === 0) {
|
|
break
|
|
}
|
|
await pMap(docs, doc => destroyDoc(projectId, doc._id), {
|
|
concurrency: PARALLEL_JOBS,
|
|
})
|
|
}
|
|
}
|
|
|
|
async function destroyDoc(projectId, docId) {
|
|
logger.log(
|
|
{ project_id: projectId, doc_id: docId },
|
|
'removing doc from mongo and persistor'
|
|
)
|
|
const doc = await MongoManager.findDoc(projectId, docId, {
|
|
inS3: 1,
|
|
})
|
|
if (!doc) {
|
|
throw new Errors.NotFoundError('Doc not found in Mongo')
|
|
}
|
|
|
|
if (doc.inS3) {
|
|
await destroyArchiveWithRetry(projectId, docId)
|
|
}
|
|
await MongoManager.destroyDoc(docId)
|
|
}
|
|
|
|
async function destroyArchiveWithRetry(projectId, docId) {
|
|
let attempt = 0
|
|
let lastError
|
|
while (attempt++ <= DESTROY_RETRY_COUNT) {
|
|
try {
|
|
await PersistorManager.deleteObject(
|
|
settings.docstore.bucket,
|
|
`${projectId}/${docId}`
|
|
)
|
|
return
|
|
} catch (err) {
|
|
lastError = err
|
|
logger.warn(
|
|
{ projectId, docId, err, attempt },
|
|
'destroying archive failed'
|
|
)
|
|
}
|
|
}
|
|
throw lastError
|
|
}
|
|
|
|
async function _streamToBuffer(stream) {
|
|
const chunks = []
|
|
return new Promise((resolve, reject) => {
|
|
stream.on('data', chunk => chunks.push(chunk))
|
|
stream.on('error', reject)
|
|
stream.on('end', () => resolve(Buffer.concat(chunks)))
|
|
})
|
|
}
|