mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-07 20:31:06 -05:00
a7c7f2e69b
[document-updater] add script for checking for redis vs mongo de-syncs GitOrigin-RevId: b51b61efd750f37180c09af5ccf5681712719ee2
341 lines
9.1 KiB
JavaScript
341 lines
9.1 KiB
JavaScript
const RedisManager = require('./RedisManager')
|
|
const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager')
|
|
const DocumentManager = require('./DocumentManager')
|
|
const HistoryManager = require('./HistoryManager')
|
|
const async = require('async')
|
|
const logger = require('@overleaf/logger')
|
|
const Metrics = require('./Metrics')
|
|
const Errors = require('./Errors')
|
|
const { promisifyAll } = require('@overleaf/promise-utils')
|
|
|
|
module.exports = {
|
|
flushProjectWithLocks,
|
|
flushAndDeleteProjectWithLocks,
|
|
queueFlushAndDeleteProject,
|
|
getProjectDocsTimestamps,
|
|
getProjectDocsAndFlushIfOld,
|
|
clearProjectState,
|
|
updateProjectWithLocks,
|
|
}
|
|
|
|
module.exports.promises = promisifyAll(module.exports)
|
|
|
|
function flushProjectWithLocks(projectId, _callback) {
|
|
const timer = new Metrics.Timer('projectManager.flushProjectWithLocks')
|
|
const callback = function (...args) {
|
|
timer.done()
|
|
_callback(...args)
|
|
}
|
|
|
|
RedisManager.getDocIdsInProject(projectId, (error, docIds) => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
const errors = []
|
|
const jobs = docIds.map(docId => callback => {
|
|
DocumentManager.flushDocIfLoadedWithLock(projectId, docId, error => {
|
|
if (error instanceof Errors.NotFoundError) {
|
|
logger.warn(
|
|
{ err: error, projectId, docId },
|
|
'found deleted doc when flushing'
|
|
)
|
|
callback()
|
|
} else if (error) {
|
|
logger.error({ err: error, projectId, docId }, 'error flushing doc')
|
|
errors.push(error)
|
|
callback()
|
|
} else {
|
|
callback()
|
|
}
|
|
})
|
|
})
|
|
|
|
logger.debug({ projectId, docIds }, 'flushing docs')
|
|
async.series(jobs, () => {
|
|
if (errors.length > 0) {
|
|
callback(new Error('Errors flushing docs. See log for details'))
|
|
} else {
|
|
callback(null)
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
function flushAndDeleteProjectWithLocks(projectId, options, _callback) {
|
|
const timer = new Metrics.Timer(
|
|
'projectManager.flushAndDeleteProjectWithLocks'
|
|
)
|
|
const callback = function (...args) {
|
|
timer.done()
|
|
_callback(...args)
|
|
}
|
|
|
|
RedisManager.getDocIdsInProject(projectId, (error, docIds) => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
const errors = []
|
|
const jobs = docIds.map(docId => callback => {
|
|
DocumentManager.flushAndDeleteDocWithLock(projectId, docId, {}, error => {
|
|
if (error) {
|
|
logger.error({ err: error, projectId, docId }, 'error deleting doc')
|
|
errors.push(error)
|
|
}
|
|
callback()
|
|
})
|
|
})
|
|
|
|
logger.debug({ projectId, docIds }, 'deleting docs')
|
|
async.series(jobs, () =>
|
|
// When deleting the project here we want to ensure that project
|
|
// history is completely flushed because the project may be
|
|
// deleted in web after this call completes, and so further
|
|
// attempts to flush would fail after that.
|
|
HistoryManager.flushProjectChanges(projectId, options, error => {
|
|
if (errors.length > 0) {
|
|
callback(new Error('Errors deleting docs. See log for details'))
|
|
} else if (error) {
|
|
callback(error)
|
|
} else {
|
|
callback(null)
|
|
}
|
|
})
|
|
)
|
|
})
|
|
}
|
|
|
|
function queueFlushAndDeleteProject(projectId, callback) {
|
|
RedisManager.queueFlushAndDeleteProject(projectId, error => {
|
|
if (error) {
|
|
logger.error(
|
|
{ projectId, error },
|
|
'error adding project to flush and delete queue'
|
|
)
|
|
return callback(error)
|
|
}
|
|
Metrics.inc('queued-delete')
|
|
callback()
|
|
})
|
|
}
|
|
|
|
function getProjectDocsTimestamps(projectId, callback) {
|
|
RedisManager.getDocIdsInProject(projectId, (error, docIds) => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
if (docIds.length === 0) {
|
|
return callback(null, [])
|
|
}
|
|
RedisManager.getDocTimestamps(docIds, (error, timestamps) => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
callback(null, timestamps)
|
|
})
|
|
})
|
|
}
|
|
|
|
function getProjectDocsAndFlushIfOld(
|
|
projectId,
|
|
projectStateHash,
|
|
excludeVersions,
|
|
_callback
|
|
) {
|
|
const timer = new Metrics.Timer('projectManager.getProjectDocsAndFlushIfOld')
|
|
const callback = function (...args) {
|
|
timer.done()
|
|
_callback(...args)
|
|
}
|
|
|
|
RedisManager.checkOrSetProjectState(
|
|
projectId,
|
|
projectStateHash,
|
|
(error, projectStateChanged) => {
|
|
if (error) {
|
|
logger.error(
|
|
{ err: error, projectId },
|
|
'error getting/setting project state in getProjectDocsAndFlushIfOld'
|
|
)
|
|
return callback(error)
|
|
}
|
|
// we can't return docs if project structure has changed
|
|
if (projectStateChanged) {
|
|
return callback(
|
|
Errors.ProjectStateChangedError('project state changed')
|
|
)
|
|
}
|
|
// project structure hasn't changed, return doc content from redis
|
|
RedisManager.getDocIdsInProject(projectId, (error, docIds) => {
|
|
if (error) {
|
|
logger.error(
|
|
{ err: error, projectId },
|
|
'error getting doc ids in getProjectDocs'
|
|
)
|
|
return callback(error)
|
|
}
|
|
// get the doc lines from redis
|
|
const jobs = docIds.map(docId => cb => {
|
|
DocumentManager.getDocAndFlushIfOldWithLock(
|
|
projectId,
|
|
docId,
|
|
(err, lines, version) => {
|
|
if (err) {
|
|
logger.error(
|
|
{ err, projectId, docId },
|
|
'error getting project doc lines in getProjectDocsAndFlushIfOld'
|
|
)
|
|
return cb(err)
|
|
}
|
|
const doc = { _id: docId, lines, v: version } // create a doc object to return
|
|
cb(null, doc)
|
|
}
|
|
)
|
|
})
|
|
async.series(jobs, (error, docs) => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
callback(null, docs)
|
|
})
|
|
})
|
|
}
|
|
)
|
|
}
|
|
|
|
function clearProjectState(projectId, callback) {
|
|
RedisManager.clearProjectState(projectId, callback)
|
|
}
|
|
|
|
function updateProjectWithLocks(
|
|
projectId,
|
|
projectHistoryId,
|
|
userId,
|
|
updates,
|
|
projectVersion,
|
|
source,
|
|
_callback
|
|
) {
|
|
const timer = new Metrics.Timer('projectManager.updateProject')
|
|
const callback = function (...args) {
|
|
timer.done()
|
|
_callback(...args)
|
|
}
|
|
|
|
let projectSubversion = 0 // project versions can have multiple operations
|
|
let projectOpsLength = 0
|
|
|
|
function handleUpdate(update, cb) {
|
|
update.version = `${projectVersion}.${projectSubversion++}`
|
|
switch (update.type) {
|
|
case 'add-doc':
|
|
ProjectHistoryRedisManager.queueAddEntity(
|
|
projectId,
|
|
projectHistoryId,
|
|
'doc',
|
|
update.id,
|
|
userId,
|
|
update,
|
|
source,
|
|
(error, count) => {
|
|
projectOpsLength = count
|
|
cb(error)
|
|
}
|
|
)
|
|
break
|
|
case 'rename-doc':
|
|
if (!update.newPathname) {
|
|
// an empty newPathname signifies a delete, so there is no need to
|
|
// update the pathname in redis
|
|
ProjectHistoryRedisManager.queueRenameEntity(
|
|
projectId,
|
|
projectHistoryId,
|
|
'doc',
|
|
update.id,
|
|
userId,
|
|
update,
|
|
source,
|
|
(error, count) => {
|
|
projectOpsLength = count
|
|
cb(error)
|
|
}
|
|
)
|
|
} else {
|
|
// rename the doc in redis before queuing the update
|
|
DocumentManager.renameDocWithLock(
|
|
projectId,
|
|
update.id,
|
|
userId,
|
|
update,
|
|
projectHistoryId,
|
|
error => {
|
|
if (error) {
|
|
return cb(error)
|
|
}
|
|
ProjectHistoryRedisManager.queueRenameEntity(
|
|
projectId,
|
|
projectHistoryId,
|
|
'doc',
|
|
update.id,
|
|
userId,
|
|
update,
|
|
source,
|
|
(error, count) => {
|
|
projectOpsLength = count
|
|
cb(error)
|
|
}
|
|
)
|
|
}
|
|
)
|
|
}
|
|
break
|
|
case 'add-file':
|
|
ProjectHistoryRedisManager.queueAddEntity(
|
|
projectId,
|
|
projectHistoryId,
|
|
'file',
|
|
update.id,
|
|
userId,
|
|
update,
|
|
source,
|
|
(error, count) => {
|
|
projectOpsLength = count
|
|
cb(error)
|
|
}
|
|
)
|
|
break
|
|
case 'rename-file':
|
|
ProjectHistoryRedisManager.queueRenameEntity(
|
|
projectId,
|
|
projectHistoryId,
|
|
'file',
|
|
update.id,
|
|
userId,
|
|
update,
|
|
source,
|
|
(error, count) => {
|
|
projectOpsLength = count
|
|
cb(error)
|
|
}
|
|
)
|
|
break
|
|
default:
|
|
cb(new Error(`Unknown update type: ${update.type}`))
|
|
}
|
|
}
|
|
|
|
async.eachSeries(updates, handleUpdate, error => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
if (
|
|
HistoryManager.shouldFlushHistoryOps(
|
|
projectOpsLength,
|
|
updates.length,
|
|
HistoryManager.FLUSH_PROJECT_EVERY_N_OPS
|
|
)
|
|
) {
|
|
HistoryManager.flushProjectChangesAsync(projectId)
|
|
}
|
|
callback()
|
|
})
|
|
}
|