/* * decaffeinate suggestions: * DS101: Remove unnecessary use of Array.from * DS102: Remove unnecessary code created because of implicit returns * DS103: Rewrite code to no longer use __guard__ * DS205: Consider reworking code to avoid use of IIFEs * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ let fiveMinutes, UpdatesManager; const MongoManager = require("./MongoManager"); const PackManager = require("./PackManager"); const RedisManager = require("./RedisManager"); const UpdateCompressor = require("./UpdateCompressor"); const LockManager = require("./LockManager"); const WebApiManager = require("./WebApiManager"); const UpdateTrimmer = require("./UpdateTrimmer"); const logger = require("logger-sharelatex"); const async = require("async"); const _ = require("underscore"); const Settings = require("settings-sharelatex"); const keys = Settings.redis.lock.key_schema; module.exports = (UpdatesManager = { compressAndSaveRawUpdates(project_id, doc_id, rawUpdates, temporary, callback) { let i; if (callback == null) { callback = function(error) {}; } const { length } = rawUpdates; if (length === 0) { return callback(); } // check that ops are in the correct order for (i = 0; i < rawUpdates.length; i++) { const op = rawUpdates[i]; if (i > 0) { const thisVersion = op != null ? op.v : undefined; const prevVersion = __guard__(rawUpdates[i-1], x => x.v); if (!(prevVersion < thisVersion)) { logger.error({project_id, doc_id, rawUpdates, temporary, thisVersion, prevVersion}, "op versions out of order"); } } } // FIXME: we no longer need the lastCompressedUpdate, so change functions not to need it // CORRECTION: we do use it to log the time in case of error return MongoManager.peekLastCompressedUpdate(doc_id, function(error, lastCompressedUpdate, lastVersion) { // lastCompressedUpdate is the most recent update in Mongo, and // lastVersion is its sharejs version number. // // The peekLastCompressedUpdate method may pass the update back // as 'null' (for example if the previous compressed update has // been archived). In this case it can still pass back the // lastVersion from the update to allow us to check consistency. let op; if (error != null) { return callback(error); } // Ensure that raw updates start where lastVersion left off if (lastVersion != null) { const discardedUpdates = []; rawUpdates = rawUpdates.slice(0); while ((rawUpdates[0] != null) && (rawUpdates[0].v <= lastVersion)) { discardedUpdates.push(rawUpdates.shift()); } if (discardedUpdates.length) { logger.error({project_id, doc_id, discardedUpdates, temporary, lastVersion}, "discarded updates already present"); } if ((rawUpdates[0] != null) && (rawUpdates[0].v !== (lastVersion + 1))) { const ts = __guard__(lastCompressedUpdate != null ? lastCompressedUpdate.meta : undefined, x1 => x1.end_ts); const last_timestamp = (ts != null) ? new Date(ts) : 'unknown time'; error = new Error(`Tried to apply raw op at version ${rawUpdates[0].v} to last compressed update with version ${lastVersion} from ${last_timestamp}`); logger.error({err: error, doc_id, project_id, prev_end_ts: ts, temporary, lastCompressedUpdate}, "inconsistent doc versions"); if ((Settings.trackchanges != null ? Settings.trackchanges.continueOnError : undefined) && (rawUpdates[0].v > (lastVersion + 1))) { // we have lost some ops - continue to write into the database, we can't recover at this point lastCompressedUpdate = null; } else { return callback(error); } } } if (rawUpdates.length === 0) { return callback(); } // some old large ops in redis need to be rejected, they predate // the size limit that now prevents them going through the system const REJECT_LARGE_OP_SIZE = 4 * 1024 * 1024; for (var rawUpdate of Array.from(rawUpdates)) { const opSizes = ((() => { const result = []; for (op of Array.from((rawUpdate != null ? rawUpdate.op : undefined) || [])) { result.push(((op.i != null ? op.i.length : undefined) || (op.d != null ? op.d.length : undefined))); } return result; })()); const size = _.max(opSizes); if (size > REJECT_LARGE_OP_SIZE) { error = new Error(`dropped op exceeding maximum allowed size of ${REJECT_LARGE_OP_SIZE}`); logger.error({err: error, doc_id, project_id, size, rawUpdate}, "dropped op - too big"); rawUpdate.op = []; } } const compressedUpdates = UpdateCompressor.compressRawUpdates(null, rawUpdates); return PackManager.insertCompressedUpdates(project_id, doc_id, lastCompressedUpdate, compressedUpdates, temporary, function(error, result) { if (error != null) { return callback(error); } if (result != null) { logger.log({project_id, doc_id, orig_v: (lastCompressedUpdate != null ? lastCompressedUpdate.v : undefined), new_v: result.v}, "inserted updates into pack"); } return callback(); }); }); }, // Check whether the updates are temporary (per-project property) _prepareProjectForUpdates(project_id, callback) { if (callback == null) { callback = function(error, temporary) {}; } return UpdateTrimmer.shouldTrimUpdates(project_id, function(error, temporary) { if (error != null) { return callback(error); } return callback(null, temporary); }); }, // Check for project id on document history (per-document property) _prepareDocForUpdates(project_id, doc_id, callback) { if (callback == null) { callback = function(error) {}; } return MongoManager.backportProjectId(project_id, doc_id, function(error) { if (error != null) { return callback(error); } return callback(null); }); }, // Apply updates for specific project/doc after preparing at project and doc level REDIS_READ_BATCH_SIZE: 100, processUncompressedUpdates(project_id, doc_id, temporary, callback) { // get the updates as strings from redis (so we can delete them after they are applied) if (callback == null) { callback = function(error) {}; } return RedisManager.getOldestDocUpdates(doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, function(error, docUpdates) { if (error != null) { return callback(error); } const { length } = docUpdates; // parse the redis strings into ShareJs updates return RedisManager.expandDocUpdates(docUpdates, function(error, rawUpdates) { if (error != null) { logger.err({project_id, doc_id, docUpdates}, "failed to parse docUpdates"); return callback(error); } logger.log({project_id, doc_id, rawUpdates}, "retrieved raw updates from redis"); return UpdatesManager.compressAndSaveRawUpdates(project_id, doc_id, rawUpdates, temporary, function(error) { if (error != null) { return callback(error); } logger.log({project_id, doc_id}, "compressed and saved doc updates"); // delete the applied updates from redis return RedisManager.deleteAppliedDocUpdates(project_id, doc_id, docUpdates, function(error) { if (error != null) { return callback(error); } if (length === UpdatesManager.REDIS_READ_BATCH_SIZE) { // There might be more updates logger.log({project_id, doc_id}, "continuing processing updates"); return setTimeout(() => UpdatesManager.processUncompressedUpdates(project_id, doc_id, temporary, callback) , 0); } else { logger.log({project_id, doc_id}, "all raw updates processed"); return callback(); } }); }); }); }); }, // Process updates for a doc when we flush it individually processUncompressedUpdatesWithLock(project_id, doc_id, callback) { if (callback == null) { callback = function(error) {}; } return UpdatesManager._prepareProjectForUpdates(project_id, function(error, temporary) { if (error != null) { return callback(error); } return UpdatesManager._processUncompressedUpdatesForDocWithLock(project_id, doc_id, temporary, callback); }); }, // Process updates for a doc when the whole project is flushed (internal method) _processUncompressedUpdatesForDocWithLock(project_id, doc_id, temporary, callback) { if (callback == null) { callback = function(error) {}; } return UpdatesManager._prepareDocForUpdates(project_id, doc_id, function(error) { if (error != null) { return callback(error); } return LockManager.runWithLock( keys.historyLock({doc_id}), releaseLock => UpdatesManager.processUncompressedUpdates(project_id, doc_id, temporary, releaseLock), callback ); }); }, // Process all updates for a project, only check project-level information once processUncompressedUpdatesForProject(project_id, callback) { if (callback == null) { callback = function(error) {}; } return RedisManager.getDocIdsWithHistoryOps(project_id, function(error, doc_ids) { if (error != null) { return callback(error); } return UpdatesManager._prepareProjectForUpdates(project_id, function(error, temporary) { const jobs = []; for (let doc_id of Array.from(doc_ids)) { (doc_id => jobs.push(cb => UpdatesManager._processUncompressedUpdatesForDocWithLock(project_id, doc_id, temporary, cb)) )(doc_id); } return async.parallelLimit(jobs, 5, callback); }); }); }, // flush all outstanding changes flushAll(limit, callback) { if (callback == null) { callback = function(error, result) {}; } return RedisManager.getProjectIdsWithHistoryOps(function(error, project_ids) { let project_id; if (error != null) { return callback(error); } logger.log({count: (project_ids != null ? project_ids.length : undefined), project_ids}, "found projects"); const jobs = []; project_ids = _.shuffle(project_ids); // randomise to avoid hitting same projects each time const selectedProjects = limit < 0 ? project_ids : project_ids.slice(0, limit); for (project_id of Array.from(selectedProjects)) { (project_id => jobs.push(cb => UpdatesManager.processUncompressedUpdatesForProject(project_id, err => cb(null, {failed: (err != null), project_id})) ) )(project_id); } return async.series(jobs, function(error, result) { let x; if (error != null) { return callback(error); } const failedProjects = ((() => { const result1 = []; for (x of Array.from(result)) { if (x.failed) { result1.push(x.project_id); } } return result1; })()); const succeededProjects = ((() => { const result2 = []; for (x of Array.from(result)) { if (!x.failed) { result2.push(x.project_id); } } return result2; })()); return callback(null, {failed: failedProjects, succeeded: succeededProjects, all: project_ids}); }); }); }, getDanglingUpdates(callback) { if (callback == null) { callback = function(error, doc_ids) {}; } return RedisManager.getAllDocIdsWithHistoryOps(function(error, all_doc_ids) { if (error != null) { return callback(error); } return RedisManager.getProjectIdsWithHistoryOps(function(error, all_project_ids) { if (error != null) { return callback(error); } // function to get doc_ids for each project const task = cb => async.concatSeries(all_project_ids, RedisManager.getDocIdsWithHistoryOps, cb); // find the dangling doc ids return task(function(error, project_doc_ids) { const dangling_doc_ids = _.difference(all_doc_ids, project_doc_ids); logger.log({all_doc_ids, all_project_ids, project_doc_ids, dangling_doc_ids}, "checking for dangling doc ids"); return callback(null, dangling_doc_ids); }); }); }); }, getDocUpdates(project_id, doc_id, options, callback) { if (options == null) { options = {}; } if (callback == null) { callback = function(error, updates) {}; } return UpdatesManager.processUncompressedUpdatesWithLock(project_id, doc_id, function(error) { if (error != null) { return callback(error); } //console.log "options", options return PackManager.getOpsByVersionRange(project_id, doc_id, options.from, options.to, function(error, updates) { if (error != null) { return callback(error); } return callback(null, updates); }); }); }, getDocUpdatesWithUserInfo(project_id, doc_id, options, callback) { if (options == null) { options = {}; } if (callback == null) { callback = function(error, updates) {}; } return UpdatesManager.getDocUpdates(project_id, doc_id, options, function(error, updates) { if (error != null) { return callback(error); } return UpdatesManager.fillUserInfo(updates, function(error, updates) { if (error != null) { return callback(error); } return callback(null, updates); }); }); }, getSummarizedProjectUpdates(project_id, options, callback) { if (options == null) { options = {}; } if (callback == null) { callback = function(error, updates) {}; } if (!options.min_count) { options.min_count = 25; } let summarizedUpdates = []; const { before } = options; let nextBeforeTimestamp = null; return UpdatesManager.processUncompressedUpdatesForProject(project_id, function(error) { if (error != null) { return callback(error); } return PackManager.makeProjectIterator(project_id, before, function(err, iterator) { if (err != null) { return callback(err); } // repeatedly get updates and pass them through the summariser to get an final output with user info return async.whilst(() => //console.log "checking iterator.done", iterator.done() (summarizedUpdates.length < options.min_count) && !iterator.done() , cb => iterator.next(function(err, partialUpdates) { if (err != null) { return callback(err); } //logger.log {partialUpdates}, 'got partialUpdates' if (partialUpdates.length === 0) { return cb(); } //# FIXME should try to avoid this happening nextBeforeTimestamp = partialUpdates[partialUpdates.length - 1].meta.end_ts; // add the updates to the summary list summarizedUpdates = UpdatesManager._summarizeUpdates(partialUpdates, summarizedUpdates); return cb(); }) , () => // finally done all updates //console.log 'summarized Updates', summarizedUpdates UpdatesManager.fillSummarizedUserInfo(summarizedUpdates, function(err, results) { if (err != null) { return callback(err); } return callback(null, results, !iterator.done() ? nextBeforeTimestamp : undefined); }) ); }); }); }, fetchUserInfo(users, callback) { if (callback == null) { callback = function(error, fetchedUserInfo) {}; } const jobs = []; const fetchedUserInfo = {}; for (let user_id in users) { (user_id => jobs.push(callback => WebApiManager.getUserInfo(user_id, function(error, userInfo) { if (error != null) { return callback(error); } fetchedUserInfo[user_id] = userInfo; return callback(); }) ) )(user_id); } return async.series(jobs, function(err) { if (err != null) { return callback(err); } return callback(null, fetchedUserInfo); }); }, fillUserInfo(updates, callback) { let update, user_id; if (callback == null) { callback = function(error, updates) {}; } const users = {}; for (update of Array.from(updates)) { ({ user_id } = update.meta); if (UpdatesManager._validUserId(user_id)) { users[user_id] = true; } } return UpdatesManager.fetchUserInfo(users, function(error, fetchedUserInfo) { if (error != null) { return callback(error); } for (update of Array.from(updates)) { ({ user_id } = update.meta); delete update.meta.user_id; if (UpdatesManager._validUserId(user_id)) { update.meta.user = fetchedUserInfo[user_id]; } } return callback(null, updates); }); }, fillSummarizedUserInfo(updates, callback) { let update, user_id, user_ids; if (callback == null) { callback = function(error, updates) {}; } const users = {}; for (update of Array.from(updates)) { user_ids = update.meta.user_ids || []; for (user_id of Array.from(user_ids)) { if (UpdatesManager._validUserId(user_id)) { users[user_id] = true; } } } return UpdatesManager.fetchUserInfo(users, function(error, fetchedUserInfo) { if (error != null) { return callback(error); } for (update of Array.from(updates)) { user_ids = update.meta.user_ids || []; update.meta.users = []; delete update.meta.user_ids; for (user_id of Array.from(user_ids)) { if (UpdatesManager._validUserId(user_id)) { update.meta.users.push(fetchedUserInfo[user_id]); } else { update.meta.users.push(null); } } } return callback(null, updates); }); }, _validUserId(user_id) { if ((user_id == null)) { return false; } else { return !!user_id.match(/^[a-f0-9]{24}$/); } }, TIME_BETWEEN_DISTINCT_UPDATES: (fiveMinutes = 5 * 60 * 1000), SPLIT_ON_DELETE_SIZE: 16, // characters _summarizeUpdates(updates, existingSummarizedUpdates) { if (existingSummarizedUpdates == null) { existingSummarizedUpdates = []; } const summarizedUpdates = existingSummarizedUpdates.slice(); let previousUpdateWasBigDelete = false; for (let update of Array.from(updates)) { var doc_id; const earliestUpdate = summarizedUpdates[summarizedUpdates.length - 1]; let shouldConcat = false; // If a user inserts some text, then deletes a big chunk including that text, // the update we show might concat the insert and delete, and there will be no sign // of that insert having happened, or be able to restore to it (restoring after a big delete is common). // So, we split the summary on 'big' deletes. However, we've stepping backwards in time with // most recent changes considered first, so if this update is a big delete, we want to start // a new summarized update next timge, hence we monitor the previous update. if (previousUpdateWasBigDelete) { shouldConcat = false; } else if (earliestUpdate && ((earliestUpdate.meta.end_ts - update.meta.start_ts) < this.TIME_BETWEEN_DISTINCT_UPDATES)) { // We're going backwards in time through the updates, so only combine if this update starts less than 5 minutes before // the end of current summarized block, so no block spans more than 5 minutes. shouldConcat = true; } let isBigDelete = false; for (let op of Array.from(update.op || [])) { if ((op.d != null) && (op.d.length > this.SPLIT_ON_DELETE_SIZE)) { isBigDelete = true; } } previousUpdateWasBigDelete = isBigDelete; if (shouldConcat) { // check if the user in this update is already present in the earliest update, // if not, add them to the users list of the earliest update earliestUpdate.meta.user_ids = _.union(earliestUpdate.meta.user_ids, [update.meta.user_id]); doc_id = update.doc_id.toString(); const doc = earliestUpdate.docs[doc_id]; if (doc != null) { doc.fromV = Math.min(doc.fromV, update.v); doc.toV = Math.max(doc.toV, update.v); } else { earliestUpdate.docs[doc_id] = { fromV: update.v, toV: update.v }; } earliestUpdate.meta.start_ts = Math.min(earliestUpdate.meta.start_ts, update.meta.start_ts); earliestUpdate.meta.end_ts = Math.max(earliestUpdate.meta.end_ts, update.meta.end_ts); } else { const newUpdate = { meta: { user_ids: [], start_ts: update.meta.start_ts, end_ts: update.meta.end_ts }, docs: {} }; newUpdate.docs[update.doc_id.toString()] = { fromV: update.v, toV: update.v }; newUpdate.meta.user_ids.push(update.meta.user_id); summarizedUpdates.push(newUpdate); } } return summarizedUpdates; } }); function __guard__(value, transform) { return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined; }