import _ from 'lodash' import async from 'async' import logger from '@overleaf/logger' import OError from '@overleaf/o-error' import * as ChunkTranslator from './ChunkTranslator.js' import * as HistoryApiManager from './HistoryApiManager.js' import * as HistoryStoreManager from './HistoryStoreManager.js' import * as LabelsManager from './LabelsManager.js' import * as UpdatesProcessor from './UpdatesProcessor.js' import * as WebApiManager from './WebApiManager.js' const MAX_CHUNK_REQUESTS = 5 const TIME_BETWEEN_DISTINCT_UPDATES = 5 * 60 * 1000 // five minutes export function getSummarizedProjectUpdates(projectId, options, callback) { // Some notes on versions: // // Versions of the project are like the fenceposts between updates. // An update applies to a certain version of the project, and gives us the // next version. // // When we ask for updates 'before' a version, this includes the update // that created the version equal to 'before'. // // A chunk in OL has a 'startVersion', which is the version of the project // before any of the updates in it were applied. This is the same version as // the last update in the previous chunk would have created. // // If we ask the OL history store for the chunk with version that is the end of one // chunk and the start of another, it will return the older chunk, i.e. // the chunk with the updates that led up to that version. // // So once we read in the updates from a chunk, and want to get the updates from // the previous chunk, we ask OL for the chunk with the version equal to the // 'startVersion' of the newer chunk we just read. let nextVersionToRequest if (options == null) { options = {} } if (!options.min_count) { options.min_count = 25 } if (options.before != null) { // The version is of the doc, so we want the updates before that version, // which includes the update that created that version. nextVersionToRequest = options.before } else { // Return the latest updates first if no nextVersionToRequest is set. nextVersionToRequest = null } UpdatesProcessor.processUpdatesForProject(projectId, function (error) { if (error) { return callback(OError.tag(error)) } LabelsManager.getLabels(projectId, function (error, labels) { if (error) { return callback(OError.tag(error)) } const labelsByVersion = {} for (const label of labels) { if (labelsByVersion[label.version] == null) { labelsByVersion[label.version] = [] } labelsByVersion[label.version].push(label) } WebApiManager.getHistoryId(projectId, function (error, historyId) { if (error) return callback(error) let chunksRequested = 0 let summarizedUpdates = [] const shouldRequestMoreUpdates = cb => { return cb( null, chunksRequested < MAX_CHUNK_REQUESTS && (nextVersionToRequest == null || nextVersionToRequest > 0) && summarizedUpdates.length < options.min_count ) } const getNextBatchOfUpdates = cb => _getProjectUpdates( projectId, historyId, nextVersionToRequest, function (error, updateSet, startVersion) { if (error) { return cb(OError.tag(error)) } // Updates are returned in time order, but we want to go back in time updateSet.reverse() updateSet = discardUnwantedUpdates(updateSet) summarizedUpdates = _summarizeUpdates( updateSet, labelsByVersion, summarizedUpdates ) nextVersionToRequest = startVersion chunksRequested += 1 cb() } ) function discardUnwantedUpdates(updateSet) { // We're getting whole chunks from the OL history store, but we might // only want updates from before a certain version if (options.before == null) { return updateSet } else { return updateSet.filter(u => u.v < options.before) } } // If the project doesn't have a history then we can bail out here HistoryApiManager.shouldUseProjectHistory( projectId, function (error, shouldUseProjectHistory) { if (error) { return callback(OError.tag(error)) } if (shouldUseProjectHistory) { async.whilst( shouldRequestMoreUpdates, getNextBatchOfUpdates, function (error) { if (error) { return callback(OError.tag(error)) } callback( null, summarizedUpdates, nextVersionToRequest > 0 ? nextVersionToRequest : undefined ) } ) } else { logger.debug( { projectId }, 'returning no updates as project does not use history' ) callback(null, []) } } ) }) }) }) } function _getProjectUpdates(projectId, historyId, version, callback) { function getChunk(cb) { if (version != null) { HistoryStoreManager.getChunkAtVersion(projectId, historyId, version, cb) } else { HistoryStoreManager.getMostRecentChunk(projectId, historyId, cb) } } getChunk(function (error, chunk) { if (error) { return callback(OError.tag(error)) } const oldestVersion = chunk.chunk.startVersion ChunkTranslator.convertToSummarizedUpdates( chunk, function (error, updateSet) { if (error) { return callback(OError.tag(error)) } callback(error, updateSet, oldestVersion) } ) }) } function _summarizeUpdates(updates, labels, existingSummarizedUpdates) { if (existingSummarizedUpdates == null) { existingSummarizedUpdates = [] } const summarizedUpdates = existingSummarizedUpdates.slice() for (const update of updates) { // The client needs to know the exact version that a delete happened, in order // to be able to restore. So even when summarizing, retain the version that each // projectOp happened at. for (const projectOp of update.project_ops) { projectOp.atV = update.v } const summarizedUpdate = summarizedUpdates[summarizedUpdates.length - 1] const labelsForVersion = labels[update.v + 1] || [] if ( summarizedUpdate && _shouldMergeUpdate(update, summarizedUpdate, labelsForVersion) ) { _mergeUpdate(update, summarizedUpdate) } else { const newUpdate = { fromV: update.v, toV: update.v + 1, meta: { users: update.meta.users, start_ts: update.meta.start_ts, end_ts: update.meta.end_ts, }, labels: labelsForVersion, pathnames: new Set(update.pathnames), project_ops: update.project_ops.slice(), // Clone since we'll modify } if (update.meta.origin) { newUpdate.meta.origin = update.meta.origin } summarizedUpdates.push(newUpdate) } } return summarizedUpdates } /** * Given an update, the latest summarized update, and the labels that apply to * the update, figure out if we can merge the update into the summarized * update. */ function _shouldMergeUpdate(update, summarizedUpdate, labels) { // Split updates on labels if (labels.length > 0) { return false } // Split updates on origin if (update.meta.origin) { if (summarizedUpdate.meta.origin) { if (update.meta.origin.kind !== summarizedUpdate.meta.origin.kind) { return false } } else { return false } } else if (summarizedUpdate.meta.origin) { return false } // Split updates if it's been too long since the last update. We're going // backwards in time through the updates, so the update comes before the summarized update. if ( summarizedUpdate.meta.end_ts - update.meta.start_ts >= TIME_BETWEEN_DISTINCT_UPDATES ) { return false } // Do not merge text operations and file operations, except for history resyncs const updateHasTextOps = update.pathnames.length > 0 const updateHasFileOps = update.project_ops.length > 0 const summarizedUpdateHasTextOps = summarizedUpdate.pathnames.size > 0 const summarizedUpdateHasFileOps = summarizedUpdate.project_ops.length > 0 const isHistoryResync = update.meta.origin && ['history-resync', 'history-migration'].includes(update.meta.origin.kind) if ( !isHistoryResync && ((updateHasTextOps && summarizedUpdateHasFileOps) || (updateHasFileOps && summarizedUpdateHasTextOps)) ) { return false } return true } /** * Merge an update into a summarized update. * * This mutates the summarized update. */ function _mergeUpdate(update, summarizedUpdate) { // check if the user in this update is already present in the earliest update, // if not, add them to the users list of the earliest update summarizedUpdate.meta.users = _.uniqBy( _.union(summarizedUpdate.meta.users, update.meta.users), function (user) { if (user == null) { return null } if (user.id == null) { return user } return user.id } ) summarizedUpdate.fromV = Math.min(summarizedUpdate.fromV, update.v) summarizedUpdate.toV = Math.max(summarizedUpdate.toV, update.v + 1) summarizedUpdate.meta.start_ts = Math.min( summarizedUpdate.meta.start_ts, update.meta.start_ts ) summarizedUpdate.meta.end_ts = Math.max( summarizedUpdate.meta.end_ts, update.meta.end_ts ) // Add file operations for (const op of update.project_ops || []) { summarizedUpdate.project_ops.push(op) if (op.add) { // Merging a file creation. Remove any corresponding edit since that's redundant. summarizedUpdate.pathnames.delete(op.add.pathname) } } // Add edit operations for (const pathname of update.pathnames || []) { summarizedUpdate.pathnames.add(pathname) } }