mirror of
https://github.com/overleaf/overleaf.git
synced 2025-01-04 09:23:36 +00:00
3ead64344f
Read updates from Redis in smaller batches GitOrigin-RevId: 06901e4a9e43976e446c014d5d46c2488691c205
616 lines
19 KiB
JavaScript
616 lines
19 KiB
JavaScript
import { promisify } from 'util'
|
|
import logger from '@overleaf/logger'
|
|
import async from 'async'
|
|
import metrics from '@overleaf/metrics'
|
|
import Settings from '@overleaf/settings'
|
|
import OError from '@overleaf/o-error'
|
|
import * as HistoryStoreManager from './HistoryStoreManager.js'
|
|
import * as UpdateTranslator from './UpdateTranslator.js'
|
|
import * as BlobManager from './BlobManager.js'
|
|
import * as RedisManager from './RedisManager.js'
|
|
import * as ErrorRecorder from './ErrorRecorder.js'
|
|
import * as LockManager from './LockManager.js'
|
|
import * as UpdateCompressor from './UpdateCompressor.js'
|
|
import * as WebApiManager from './WebApiManager.js'
|
|
import * as SyncManager from './SyncManager.js'
|
|
import * as Versions from './Versions.js'
|
|
import * as Errors from './Errors.js'
|
|
import { Profiler } from './Profiler.js'
|
|
|
|
const keys = Settings.redis.lock.key_schema
|
|
|
|
export const REDIS_READ_BATCH_SIZE = 500
|
|
|
|
/**
|
|
* Container for functions that need to be mocked in tests
|
|
*
|
|
* TODO: Rewrite tests in terms of exported functions only
|
|
*/
|
|
export const _mocks = {}
|
|
|
|
export function getRawUpdates(projectId, batchSize, callback) {
|
|
RedisManager.getRawUpdatesBatch(projectId, batchSize, (error, batch) => {
|
|
if (error != null) {
|
|
return callback(OError.tag(error))
|
|
}
|
|
|
|
let updates
|
|
try {
|
|
updates = RedisManager.parseDocUpdates(batch.rawUpdates)
|
|
} catch (error) {
|
|
return callback(OError.tag(error))
|
|
}
|
|
|
|
_getHistoryId(projectId, updates, (error, historyId) => {
|
|
if (error != null) {
|
|
return callback(OError.tag(error))
|
|
}
|
|
HistoryStoreManager.getMostRecentChunk(
|
|
projectId,
|
|
historyId,
|
|
(error, chunk) => {
|
|
if (error != null) {
|
|
return callback(OError.tag(error))
|
|
}
|
|
callback(null, { project_id: projectId, chunk, updates })
|
|
}
|
|
)
|
|
})
|
|
})
|
|
}
|
|
|
|
// Process all updates for a project, only check project-level information once
|
|
export function processUpdatesForProject(projectId, callback) {
|
|
LockManager.runWithLock(
|
|
keys.projectHistoryLock({ project_id: projectId }),
|
|
(extendLock, releaseLock) => {
|
|
_countAndProcessUpdates(
|
|
projectId,
|
|
extendLock,
|
|
REDIS_READ_BATCH_SIZE,
|
|
releaseLock
|
|
)
|
|
},
|
|
(error, queueSize) => {
|
|
if (error) {
|
|
OError.tag(error)
|
|
}
|
|
ErrorRecorder.record(projectId, queueSize, error, callback)
|
|
// clear the timestamp in the background if the queue is now empty
|
|
RedisManager.clearDanglingFirstOpTimestamp(projectId, () => {})
|
|
}
|
|
)
|
|
}
|
|
|
|
export function processUpdatesForProjectUsingBisect(
|
|
projectId,
|
|
amountToProcess,
|
|
callback
|
|
) {
|
|
LockManager.runWithLock(
|
|
keys.projectHistoryLock({ project_id: projectId }),
|
|
(extendLock, releaseLock) => {
|
|
_countAndProcessUpdates(
|
|
projectId,
|
|
extendLock,
|
|
amountToProcess,
|
|
releaseLock
|
|
)
|
|
},
|
|
(error, queueSize) => {
|
|
if (amountToProcess === 0 || queueSize === 0) {
|
|
// no further processing possible
|
|
if (error != null) {
|
|
ErrorRecorder.record(
|
|
projectId,
|
|
queueSize,
|
|
OError.tag(error),
|
|
callback
|
|
)
|
|
} else {
|
|
callback()
|
|
}
|
|
} else {
|
|
if (error != null) {
|
|
// decrease the batch size when we hit an error
|
|
processUpdatesForProjectUsingBisect(
|
|
projectId,
|
|
Math.floor(amountToProcess / 2),
|
|
callback
|
|
)
|
|
} else {
|
|
// otherwise continue processing with the same batch size
|
|
processUpdatesForProjectUsingBisect(
|
|
projectId,
|
|
amountToProcess,
|
|
callback
|
|
)
|
|
}
|
|
}
|
|
}
|
|
)
|
|
}
|
|
|
|
export function processSingleUpdateForProject(projectId, callback) {
|
|
LockManager.runWithLock(
|
|
keys.projectHistoryLock({ project_id: projectId }),
|
|
(
|
|
extendLock,
|
|
releaseLock // set the batch size to 1 for single-stepping
|
|
) => {
|
|
_countAndProcessUpdates(projectId, extendLock, 1, releaseLock)
|
|
},
|
|
(
|
|
error,
|
|
queueSize // no need to clear the flush marker when single stepping
|
|
) => {
|
|
// it will be cleared up on the next background flush if
|
|
// the queue is empty
|
|
ErrorRecorder.record(projectId, queueSize, error, callback)
|
|
}
|
|
)
|
|
}
|
|
|
|
_mocks._countAndProcessUpdates = (
|
|
projectId,
|
|
extendLock,
|
|
batchSize,
|
|
callback
|
|
) => {
|
|
RedisManager.countUnprocessedUpdates(projectId, (error, queueSize) => {
|
|
if (error != null) {
|
|
return callback(OError.tag(error))
|
|
}
|
|
if (queueSize > 0) {
|
|
logger.debug({ projectId, queueSize }, 'processing uncompressed updates')
|
|
RedisManager.getUpdatesInBatches(
|
|
projectId,
|
|
batchSize,
|
|
(updates, cb) => {
|
|
_processUpdatesBatch(projectId, updates, extendLock, cb)
|
|
},
|
|
error => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
callback(null, queueSize)
|
|
}
|
|
)
|
|
} else {
|
|
logger.debug({ projectId }, 'no updates to process')
|
|
callback(null, queueSize)
|
|
}
|
|
})
|
|
}
|
|
|
|
function _countAndProcessUpdates(...args) {
|
|
_mocks._countAndProcessUpdates(...args)
|
|
}
|
|
|
|
function _processUpdatesBatch(projectId, updates, extendLock, callback) {
|
|
// If the project doesn't have a history then we can bail out here
|
|
_getHistoryId(projectId, updates, (error, historyId) => {
|
|
if (error != null) {
|
|
return callback(OError.tag(error))
|
|
}
|
|
|
|
if (historyId == null) {
|
|
logger.debug(
|
|
{ projectId },
|
|
'discarding updates as project does not use history'
|
|
)
|
|
return callback()
|
|
}
|
|
|
|
_processUpdates(projectId, historyId, updates, extendLock, error => {
|
|
if (error != null) {
|
|
return callback(OError.tag(error))
|
|
}
|
|
callback()
|
|
})
|
|
})
|
|
}
|
|
|
|
export function _getHistoryId(projectId, updates, callback) {
|
|
let idFromUpdates = null
|
|
|
|
// check that all updates have the same history id
|
|
for (const update of updates) {
|
|
if (update.projectHistoryId != null) {
|
|
if (idFromUpdates == null) {
|
|
idFromUpdates = update.projectHistoryId.toString()
|
|
} else if (idFromUpdates !== update.projectHistoryId.toString()) {
|
|
metrics.inc('updates.batches.project-history-id.inconsistent-update')
|
|
logger.warn(
|
|
{
|
|
projectId,
|
|
updates,
|
|
idFromUpdates,
|
|
currentId: update.projectHistoryId,
|
|
},
|
|
'inconsistent project history id between updates'
|
|
)
|
|
return callback(
|
|
new OError('inconsistent project history id between updates')
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
WebApiManager.getHistoryId(projectId, (error, idFromWeb, cached) => {
|
|
if (error != null && idFromUpdates != null) {
|
|
// present only on updates
|
|
// 404s from web are an error
|
|
metrics.inc('updates.batches.project-history-id.from-updates')
|
|
return callback(null, idFromUpdates)
|
|
} else if (error != null) {
|
|
return callback(OError.tag(error))
|
|
}
|
|
|
|
if (idFromWeb == null && idFromUpdates == null) {
|
|
// present on neither web nor updates
|
|
callback(null, null)
|
|
} else if (idFromWeb != null && idFromUpdates == null) {
|
|
// present only on web
|
|
metrics.inc('updates.batches.project-history-id.from-web')
|
|
callback(null, idFromWeb)
|
|
} else if (idFromWeb == null && idFromUpdates != null) {
|
|
// present only on updates
|
|
metrics.inc('updates.batches.project-history-id.from-updates')
|
|
callback(null, idFromUpdates)
|
|
} else if (idFromWeb.toString() !== idFromUpdates.toString()) {
|
|
// inconsistent between web and updates
|
|
metrics.inc('updates.batches.project-history-id.inconsistent-with-web')
|
|
logger.warn(
|
|
{
|
|
projectId,
|
|
idFromWeb,
|
|
idFromUpdates,
|
|
idWasCached: cached,
|
|
updates,
|
|
},
|
|
'inconsistent project history id between updates and web'
|
|
)
|
|
callback(
|
|
new OError('inconsistent project history id between updates and web')
|
|
)
|
|
} else {
|
|
// the same on web and updates
|
|
metrics.inc('updates.batches.project-history-id.from-updates')
|
|
callback(null, idFromWeb)
|
|
}
|
|
})
|
|
}
|
|
|
|
function _handleOpsOutOfOrderError(projectId, projectHistoryId, err, ...rest) {
|
|
const adjustedLength = Math.max(rest.length, 1)
|
|
const results = rest.slice(0, adjustedLength - 1)
|
|
const callback = rest[adjustedLength - 1]
|
|
ErrorRecorder.getFailureRecord(projectId, (error, failureRecord) => {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
// Bypass ops-out-of-order errors in the stored chunk when in forceDebug mode
|
|
if (failureRecord != null && failureRecord.forceDebug === true) {
|
|
logger.warn(
|
|
{ projectId, projectHistoryId },
|
|
'ops out of order in chunk, forced continue'
|
|
)
|
|
callback(null, ...results) // return results without error
|
|
} else {
|
|
logger.warn(
|
|
{ projectId, projectHistoryId },
|
|
'ops out of order in chunk, returning error'
|
|
)
|
|
callback(err, ...results)
|
|
}
|
|
})
|
|
}
|
|
|
|
function _getMostRecentVersionWithDebug(projectId, projectHistoryId, callback) {
|
|
HistoryStoreManager.getMostRecentVersion(
|
|
projectId,
|
|
projectHistoryId,
|
|
(err, ...results) => {
|
|
if (err instanceof Errors.OpsOutOfOrderError) {
|
|
_handleOpsOutOfOrderError(
|
|
projectId,
|
|
projectHistoryId,
|
|
err,
|
|
...results,
|
|
callback
|
|
)
|
|
} else {
|
|
callback(err, ...results)
|
|
}
|
|
}
|
|
)
|
|
}
|
|
|
|
function _processUpdates(
|
|
projectId,
|
|
projectHistoryId,
|
|
updates,
|
|
extendLock,
|
|
callback
|
|
) {
|
|
const profile = new Profiler('_processUpdates', {
|
|
project_id: projectId,
|
|
projectHistoryId,
|
|
})
|
|
// skip updates first if we're in a sync, we might not need to do anything else
|
|
SyncManager.skipUpdatesDuringSync(
|
|
projectId,
|
|
updates,
|
|
(error, filteredUpdates, newSyncState) => {
|
|
profile.log('skipUpdatesDuringSync')
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
if (filteredUpdates.length === 0) {
|
|
// return early if there are no updates to apply
|
|
return SyncManager.setResyncState(projectId, newSyncState, callback)
|
|
}
|
|
// only make request to history service if we have actual updates to process
|
|
_getMostRecentVersionWithDebug(
|
|
projectId,
|
|
projectHistoryId,
|
|
(error, baseVersion, projectStructureAndDocVersions) => {
|
|
if (projectStructureAndDocVersions == null) {
|
|
projectStructureAndDocVersions = { project: null, docs: {} }
|
|
}
|
|
profile.log('getMostRecentVersion')
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
async.waterfall(
|
|
[
|
|
cb => {
|
|
cb = profile.wrap('expandSyncUpdates', cb)
|
|
SyncManager.expandSyncUpdates(
|
|
projectId,
|
|
projectHistoryId,
|
|
filteredUpdates,
|
|
extendLock,
|
|
cb
|
|
)
|
|
},
|
|
(expandedUpdates, cb) => {
|
|
let unappliedUpdates
|
|
try {
|
|
unappliedUpdates = _skipAlreadyAppliedUpdates(
|
|
projectId,
|
|
expandedUpdates,
|
|
projectStructureAndDocVersions
|
|
)
|
|
} catch (err) {
|
|
return cb(err)
|
|
}
|
|
profile.log('skipAlreadyAppliedUpdates')
|
|
const compressedUpdates =
|
|
UpdateCompressor.compressRawUpdates(unappliedUpdates)
|
|
const timeTaken = profile
|
|
.log('compressRawUpdates')
|
|
.getTimeDelta()
|
|
if (timeTaken >= 1000) {
|
|
logger.debug(
|
|
{ projectId, updates: unappliedUpdates, timeTaken },
|
|
'slow compression of raw updates'
|
|
)
|
|
}
|
|
cb = profile.wrap('createBlobs', cb)
|
|
BlobManager.createBlobsForUpdates(
|
|
projectId,
|
|
projectHistoryId,
|
|
compressedUpdates,
|
|
extendLock,
|
|
cb
|
|
)
|
|
},
|
|
(updatesWithBlobs, cb) => {
|
|
cb = profile.wrap('convertToChanges', cb)
|
|
UpdateTranslator.convertToChanges(
|
|
projectId,
|
|
updatesWithBlobs,
|
|
cb
|
|
)
|
|
},
|
|
(changes, cb) => {
|
|
changes = changes.map(change => change.toRaw())
|
|
let change
|
|
const numChanges = changes.length
|
|
const byteLength = Buffer.byteLength(
|
|
JSON.stringify(changes),
|
|
'utf8'
|
|
)
|
|
let numOperations = 0
|
|
for (change of changes) {
|
|
if (change.operations != null) {
|
|
numOperations += change.operations.length
|
|
}
|
|
}
|
|
|
|
metrics.timing('history-store.request.changes', numChanges, 1)
|
|
metrics.timing('history-store.request.bytes', byteLength, 1)
|
|
metrics.timing(
|
|
'history-store.request.operations',
|
|
numOperations,
|
|
1
|
|
)
|
|
|
|
// thresholds taken from write_latex/main/lib/history_exporter.rb
|
|
if (numChanges > 1000) {
|
|
metrics.inc('history-store.request.exceeds-threshold.changes')
|
|
}
|
|
if (byteLength > Math.pow(1024, 2)) {
|
|
metrics.inc('history-store.request.exceeds-threshold.bytes')
|
|
const changeLengths = changes.map(change =>
|
|
Buffer.byteLength(JSON.stringify(change), 'utf8')
|
|
)
|
|
logger.warn(
|
|
{ projectId, byteLength, changeLengths },
|
|
'change size exceeds limit'
|
|
)
|
|
}
|
|
|
|
cb = profile.wrap('sendChanges', cb)
|
|
// this is usually the longest request, so extend the lock before starting it
|
|
extendLock(error => {
|
|
if (error != null) {
|
|
return cb(error)
|
|
}
|
|
if (changes.length === 0) {
|
|
return cb()
|
|
} // avoid unnecessary requests to history service
|
|
HistoryStoreManager.sendChanges(
|
|
projectId,
|
|
projectHistoryId,
|
|
changes,
|
|
baseVersion,
|
|
cb
|
|
)
|
|
})
|
|
},
|
|
cb => {
|
|
cb = profile.wrap('setResyncState', cb)
|
|
SyncManager.setResyncState(projectId, newSyncState, cb)
|
|
},
|
|
],
|
|
error => {
|
|
profile.end()
|
|
callback(error)
|
|
}
|
|
)
|
|
}
|
|
)
|
|
}
|
|
)
|
|
}
|
|
|
|
_mocks._skipAlreadyAppliedUpdates = (
|
|
projectId,
|
|
updates,
|
|
projectStructureAndDocVersions
|
|
) => {
|
|
function alreadySeenProjectVersion(previousProjectStructureVersion, update) {
|
|
return (
|
|
UpdateTranslator.isProjectStructureUpdate(update) &&
|
|
previousProjectStructureVersion != null &&
|
|
update.version != null &&
|
|
Versions.gte(previousProjectStructureVersion, update.version)
|
|
)
|
|
}
|
|
|
|
function alreadySeenDocVersion(previousDocVersions, update) {
|
|
if (UpdateTranslator.isTextUpdate(update) && update.v != null) {
|
|
const docId = update.doc
|
|
return (
|
|
previousDocVersions[docId] != null &&
|
|
previousDocVersions[docId].v != null &&
|
|
Versions.gte(previousDocVersions[docId].v, update.v)
|
|
)
|
|
} else {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// check that the incoming updates are in the correct order (we do not
|
|
// want to send out of order updates to the history service)
|
|
let incomingProjectStructureVersion = null
|
|
const incomingDocVersions = {}
|
|
for (const update of updates) {
|
|
if (alreadySeenProjectVersion(incomingProjectStructureVersion, update)) {
|
|
logger.warn(
|
|
{ projectId, update, incomingProjectStructureVersion },
|
|
'incoming project structure updates are out of order'
|
|
)
|
|
throw new Errors.OpsOutOfOrderError(
|
|
'project structure version out of order on incoming updates'
|
|
)
|
|
} else if (alreadySeenDocVersion(incomingDocVersions, update)) {
|
|
logger.warn(
|
|
{ projectId, update, incomingDocVersions },
|
|
'incoming doc updates are out of order'
|
|
)
|
|
throw new Errors.OpsOutOfOrderError(
|
|
'doc version out of order on incoming updates'
|
|
)
|
|
}
|
|
// update the current project structure and doc versions
|
|
if (UpdateTranslator.isProjectStructureUpdate(update)) {
|
|
incomingProjectStructureVersion = update.version
|
|
} else if (UpdateTranslator.isTextUpdate(update)) {
|
|
incomingDocVersions[update.doc] = { v: update.v }
|
|
}
|
|
}
|
|
|
|
// discard updates already applied
|
|
const updatesToApply = []
|
|
const previousProjectStructureVersion = projectStructureAndDocVersions.project
|
|
const previousDocVersions = projectStructureAndDocVersions.docs
|
|
if (projectStructureAndDocVersions != null) {
|
|
const updateProjectVersions = []
|
|
for (const update of updates) {
|
|
if (update != null && update.version != null) {
|
|
updateProjectVersions.push(update.version)
|
|
}
|
|
}
|
|
logger.debug(
|
|
{ projectId, projectStructureAndDocVersions, updateProjectVersions },
|
|
'comparing updates with existing project versions'
|
|
)
|
|
}
|
|
for (const update of updates) {
|
|
if (alreadySeenProjectVersion(previousProjectStructureVersion, update)) {
|
|
metrics.inc('updates.discarded_project_structure_version')
|
|
logger.debug(
|
|
{ projectId, update, previousProjectStructureVersion },
|
|
'discarding previously applied project structure update'
|
|
)
|
|
continue
|
|
}
|
|
if (alreadySeenDocVersion(previousDocVersions, update)) {
|
|
metrics.inc('updates.discarded_doc_version')
|
|
logger.debug(
|
|
{ projectId, update, previousDocVersions },
|
|
'discarding previously applied doc update'
|
|
)
|
|
continue
|
|
}
|
|
// remove non-BMP characters from resync updates that have bypassed the normal docupdater flow
|
|
_sanitizeUpdate(update)
|
|
// if all checks above are ok then accept the update
|
|
updatesToApply.push(update)
|
|
}
|
|
|
|
return updatesToApply
|
|
}
|
|
|
|
export function _skipAlreadyAppliedUpdates(...args) {
|
|
return _mocks._skipAlreadyAppliedUpdates(...args)
|
|
}
|
|
|
|
function _sanitizeUpdate(update) {
|
|
// adapted from docupdater's UpdateManager, we should clean these in docupdater
|
|
// too but we already have queues with this problem so we will handle it here
|
|
// too for robustness.
|
|
// Replace high and low surrogate characters with 'replacement character' (\uFFFD)
|
|
const removeBadChars = str => str.replace(/[\uD800-\uDFFF]/g, '\uFFFD')
|
|
// clean up any bad chars in resync diffs
|
|
if (update.op) {
|
|
for (const op of update.op) {
|
|
if (op.i != null) {
|
|
op.i = removeBadChars(op.i)
|
|
}
|
|
}
|
|
}
|
|
// clean up any bad chars in resync new docs
|
|
if (update.docLines != null) {
|
|
update.docLines = removeBadChars(update.docLines)
|
|
}
|
|
return update
|
|
}
|
|
|
|
export const promises = {
|
|
processUpdatesForProject: promisify(processUpdatesForProject),
|
|
}
|