overleaf/services/track-changes/app/js/UpdatesManager.js

883 lines
26 KiB
JavaScript

/* eslint-disable
camelcase,
handle-callback-err,
no-unused-vars,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS102: Remove unnecessary code created because of implicit returns
* DS103: Rewrite code to no longer use __guard__
* DS205: Consider reworking code to avoid use of IIFEs
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let fiveMinutes, UpdatesManager
const MongoManager = require('./MongoManager')
const PackManager = require('./PackManager')
const RedisManager = require('./RedisManager')
const UpdateCompressor = require('./UpdateCompressor')
const LockManager = require('./LockManager')
const WebApiManager = require('./WebApiManager')
const UpdateTrimmer = require('./UpdateTrimmer')
const logger = require('logger-sharelatex')
const async = require('async')
const _ = require('underscore')
const Settings = require('settings-sharelatex')
const keys = Settings.redis.lock.key_schema
module.exports = UpdatesManager = {
compressAndSaveRawUpdates(
project_id,
doc_id,
rawUpdates,
temporary,
callback
) {
let i
if (callback == null) {
callback = function (error) {}
}
const { length } = rawUpdates
if (length === 0) {
return callback()
}
// check that ops are in the correct order
for (i = 0; i < rawUpdates.length; i++) {
const op = rawUpdates[i]
if (i > 0) {
const thisVersion = op != null ? op.v : undefined
const prevVersion = __guard__(rawUpdates[i - 1], (x) => x.v)
if (!(prevVersion < thisVersion)) {
logger.error(
{
project_id,
doc_id,
rawUpdates,
temporary,
thisVersion,
prevVersion
},
'op versions out of order'
)
}
}
}
// FIXME: we no longer need the lastCompressedUpdate, so change functions not to need it
// CORRECTION: we do use it to log the time in case of error
return MongoManager.peekLastCompressedUpdate(doc_id, function (
error,
lastCompressedUpdate,
lastVersion
) {
// lastCompressedUpdate is the most recent update in Mongo, and
// lastVersion is its sharejs version number.
//
// The peekLastCompressedUpdate method may pass the update back
// as 'null' (for example if the previous compressed update has
// been archived). In this case it can still pass back the
// lastVersion from the update to allow us to check consistency.
let op
if (error != null) {
return callback(error)
}
// Ensure that raw updates start where lastVersion left off
if (lastVersion != null) {
const discardedUpdates = []
rawUpdates = rawUpdates.slice(0)
while (rawUpdates[0] != null && rawUpdates[0].v <= lastVersion) {
discardedUpdates.push(rawUpdates.shift())
}
if (discardedUpdates.length) {
logger.error(
{ project_id, doc_id, discardedUpdates, temporary, lastVersion },
'discarded updates already present'
)
}
if (rawUpdates[0] != null && rawUpdates[0].v !== lastVersion + 1) {
const ts = __guard__(
lastCompressedUpdate != null
? lastCompressedUpdate.meta
: undefined,
(x1) => x1.end_ts
)
const last_timestamp = ts != null ? new Date(ts) : 'unknown time'
error = new Error(
`Tried to apply raw op at version ${rawUpdates[0].v} to last compressed update with version ${lastVersion} from ${last_timestamp}`
)
logger.error(
{
err: error,
doc_id,
project_id,
prev_end_ts: ts,
temporary,
lastCompressedUpdate
},
'inconsistent doc versions'
)
if (
(Settings.trackchanges != null
? Settings.trackchanges.continueOnError
: undefined) &&
rawUpdates[0].v > lastVersion + 1
) {
// we have lost some ops - continue to write into the database, we can't recover at this point
lastCompressedUpdate = null
} else {
return callback(error)
}
}
}
if (rawUpdates.length === 0) {
return callback()
}
// some old large ops in redis need to be rejected, they predate
// the size limit that now prevents them going through the system
const REJECT_LARGE_OP_SIZE = 4 * 1024 * 1024
for (var rawUpdate of Array.from(rawUpdates)) {
const opSizes = (() => {
const result = []
for (op of Array.from(
(rawUpdate != null ? rawUpdate.op : undefined) || []
)) {
result.push(
(op.i != null ? op.i.length : undefined) ||
(op.d != null ? op.d.length : undefined)
)
}
return result
})()
const size = _.max(opSizes)
if (size > REJECT_LARGE_OP_SIZE) {
error = new Error(
`dropped op exceeding maximum allowed size of ${REJECT_LARGE_OP_SIZE}`
)
logger.error(
{ err: error, doc_id, project_id, size, rawUpdate },
'dropped op - too big'
)
rawUpdate.op = []
}
}
const compressedUpdates = UpdateCompressor.compressRawUpdates(
null,
rawUpdates
)
return PackManager.insertCompressedUpdates(
project_id,
doc_id,
lastCompressedUpdate,
compressedUpdates,
temporary,
function (error, result) {
if (error != null) {
return callback(error)
}
if (result != null) {
logger.log(
{
project_id,
doc_id,
orig_v:
lastCompressedUpdate != null
? lastCompressedUpdate.v
: undefined,
new_v: result.v
},
'inserted updates into pack'
)
}
return callback()
}
)
})
},
// Check whether the updates are temporary (per-project property)
_prepareProjectForUpdates(project_id, callback) {
if (callback == null) {
callback = function (error, temporary) {}
}
return UpdateTrimmer.shouldTrimUpdates(project_id, function (
error,
temporary
) {
if (error != null) {
return callback(error)
}
return callback(null, temporary)
})
},
// Check for project id on document history (per-document property)
_prepareDocForUpdates(project_id, doc_id, callback) {
if (callback == null) {
callback = function (error) {}
}
return MongoManager.backportProjectId(project_id, doc_id, function (error) {
if (error != null) {
return callback(error)
}
return callback(null)
})
},
// Apply updates for specific project/doc after preparing at project and doc level
REDIS_READ_BATCH_SIZE: 100,
processUncompressedUpdates(project_id, doc_id, temporary, callback) {
// get the updates as strings from redis (so we can delete them after they are applied)
if (callback == null) {
callback = function (error) {}
}
return RedisManager.getOldestDocUpdates(
doc_id,
UpdatesManager.REDIS_READ_BATCH_SIZE,
function (error, docUpdates) {
if (error != null) {
return callback(error)
}
const { length } = docUpdates
// parse the redis strings into ShareJs updates
return RedisManager.expandDocUpdates(docUpdates, function (
error,
rawUpdates
) {
if (error != null) {
logger.err(
{ project_id, doc_id, docUpdates },
'failed to parse docUpdates'
)
return callback(error)
}
logger.log(
{ project_id, doc_id, rawUpdates },
'retrieved raw updates from redis'
)
return UpdatesManager.compressAndSaveRawUpdates(
project_id,
doc_id,
rawUpdates,
temporary,
function (error) {
if (error != null) {
return callback(error)
}
logger.log(
{ project_id, doc_id },
'compressed and saved doc updates'
)
// delete the applied updates from redis
return RedisManager.deleteAppliedDocUpdates(
project_id,
doc_id,
docUpdates,
function (error) {
if (error != null) {
return callback(error)
}
if (length === UpdatesManager.REDIS_READ_BATCH_SIZE) {
// There might be more updates
logger.log(
{ project_id, doc_id },
'continuing processing updates'
)
return setTimeout(
() =>
UpdatesManager.processUncompressedUpdates(
project_id,
doc_id,
temporary,
callback
),
0
)
} else {
logger.log(
{ project_id, doc_id },
'all raw updates processed'
)
return callback()
}
}
)
}
)
})
}
)
},
// Process updates for a doc when we flush it individually
processUncompressedUpdatesWithLock(project_id, doc_id, callback) {
if (callback == null) {
callback = function (error) {}
}
return UpdatesManager._prepareProjectForUpdates(project_id, function (
error,
temporary
) {
if (error != null) {
return callback(error)
}
return UpdatesManager._processUncompressedUpdatesForDocWithLock(
project_id,
doc_id,
temporary,
callback
)
})
},
// Process updates for a doc when the whole project is flushed (internal method)
_processUncompressedUpdatesForDocWithLock(
project_id,
doc_id,
temporary,
callback
) {
if (callback == null) {
callback = function (error) {}
}
return UpdatesManager._prepareDocForUpdates(project_id, doc_id, function (
error
) {
if (error != null) {
return callback(error)
}
return LockManager.runWithLock(
keys.historyLock({ doc_id }),
(releaseLock) =>
UpdatesManager.processUncompressedUpdates(
project_id,
doc_id,
temporary,
releaseLock
),
callback
)
})
},
// Process all updates for a project, only check project-level information once
processUncompressedUpdatesForProject(project_id, callback) {
if (callback == null) {
callback = function (error) {}
}
return RedisManager.getDocIdsWithHistoryOps(project_id, function (
error,
doc_ids
) {
if (error != null) {
return callback(error)
}
return UpdatesManager._prepareProjectForUpdates(project_id, function (
error,
temporary
) {
const jobs = []
for (const doc_id of Array.from(doc_ids)) {
;((doc_id) =>
jobs.push((cb) =>
UpdatesManager._processUncompressedUpdatesForDocWithLock(
project_id,
doc_id,
temporary,
cb
)
))(doc_id)
}
return async.parallelLimit(jobs, 5, callback)
})
})
},
// flush all outstanding changes
flushAll(limit, callback) {
if (callback == null) {
callback = function (error, result) {}
}
return RedisManager.getProjectIdsWithHistoryOps(function (
error,
project_ids
) {
let project_id
if (error != null) {
return callback(error)
}
logger.log(
{
count: project_ids != null ? project_ids.length : undefined,
project_ids
},
'found projects'
)
const jobs = []
project_ids = _.shuffle(project_ids) // randomise to avoid hitting same projects each time
const selectedProjects =
limit < 0 ? project_ids : project_ids.slice(0, limit)
for (project_id of Array.from(selectedProjects)) {
;((project_id) =>
jobs.push((cb) =>
UpdatesManager.processUncompressedUpdatesForProject(
project_id,
(err) => cb(null, { failed: err != null, project_id })
)
))(project_id)
}
return async.series(jobs, function (error, result) {
let x
if (error != null) {
return callback(error)
}
const failedProjects = (() => {
const result1 = []
for (x of Array.from(result)) {
if (x.failed) {
result1.push(x.project_id)
}
}
return result1
})()
const succeededProjects = (() => {
const result2 = []
for (x of Array.from(result)) {
if (!x.failed) {
result2.push(x.project_id)
}
}
return result2
})()
return callback(null, {
failed: failedProjects,
succeeded: succeededProjects,
all: project_ids
})
})
})
},
getDanglingUpdates(callback) {
if (callback == null) {
callback = function (error, doc_ids) {}
}
return RedisManager.getAllDocIdsWithHistoryOps(function (
error,
all_doc_ids
) {
if (error != null) {
return callback(error)
}
return RedisManager.getProjectIdsWithHistoryOps(function (
error,
all_project_ids
) {
if (error != null) {
return callback(error)
}
// function to get doc_ids for each project
const task = (cb) =>
async.concatSeries(
all_project_ids,
RedisManager.getDocIdsWithHistoryOps,
cb
)
// find the dangling doc ids
return task(function (error, project_doc_ids) {
const dangling_doc_ids = _.difference(all_doc_ids, project_doc_ids)
logger.log(
{ all_doc_ids, all_project_ids, project_doc_ids, dangling_doc_ids },
'checking for dangling doc ids'
)
return callback(null, dangling_doc_ids)
})
})
})
},
getDocUpdates(project_id, doc_id, options, callback) {
if (options == null) {
options = {}
}
if (callback == null) {
callback = function (error, updates) {}
}
return UpdatesManager.processUncompressedUpdatesWithLock(
project_id,
doc_id,
function (error) {
if (error != null) {
return callback(error)
}
// console.log "options", options
return PackManager.getOpsByVersionRange(
project_id,
doc_id,
options.from,
options.to,
function (error, updates) {
if (error != null) {
return callback(error)
}
return callback(null, updates)
}
)
}
)
},
getDocUpdatesWithUserInfo(project_id, doc_id, options, callback) {
if (options == null) {
options = {}
}
if (callback == null) {
callback = function (error, updates) {}
}
return UpdatesManager.getDocUpdates(project_id, doc_id, options, function (
error,
updates
) {
if (error != null) {
return callback(error)
}
return UpdatesManager.fillUserInfo(updates, function (error, updates) {
if (error != null) {
return callback(error)
}
return callback(null, updates)
})
})
},
getSummarizedProjectUpdates(project_id, options, callback) {
if (options == null) {
options = {}
}
if (callback == null) {
callback = function (error, updates) {}
}
if (!options.min_count) {
options.min_count = 25
}
let summarizedUpdates = []
const { before } = options
let nextBeforeTimestamp = null
return UpdatesManager.processUncompressedUpdatesForProject(
project_id,
function (error) {
if (error != null) {
return callback(error)
}
return PackManager.makeProjectIterator(project_id, before, function (
err,
iterator
) {
if (err != null) {
return callback(err)
}
// repeatedly get updates and pass them through the summariser to get an final output with user info
return async.whilst(
() =>
// console.log "checking iterator.done", iterator.done()
summarizedUpdates.length < options.min_count && !iterator.done(),
(cb) =>
iterator.next(function (err, partialUpdates) {
if (err != null) {
return callback(err)
}
// logger.log {partialUpdates}, 'got partialUpdates'
if (partialUpdates.length === 0) {
return cb()
} // # FIXME should try to avoid this happening
nextBeforeTimestamp =
partialUpdates[partialUpdates.length - 1].meta.end_ts
// add the updates to the summary list
summarizedUpdates = UpdatesManager._summarizeUpdates(
partialUpdates,
summarizedUpdates
)
return cb()
}),
() =>
// finally done all updates
// console.log 'summarized Updates', summarizedUpdates
UpdatesManager.fillSummarizedUserInfo(
summarizedUpdates,
function (err, results) {
if (err != null) {
return callback(err)
}
return callback(
null,
results,
!iterator.done() ? nextBeforeTimestamp : undefined
)
}
)
)
})
}
)
},
exportProject(projectId, consumer) {
// Flush anything before collecting updates.
UpdatesManager.processUncompressedUpdatesForProject(projectId, (err) => {
if (err) return consumer(err)
// Fetch all the packs.
const before = undefined
PackManager.makeProjectIterator(projectId, before, (err, iterator) => {
if (err) return consumer(err)
const accumulatedUserIds = new Set()
async.whilst(
() => !iterator.done(),
(cb) =>
iterator.next((err, updatesFromASinglePack) => {
if (err) return cb(err)
if (updatesFromASinglePack.length === 0) {
// This should not happen when `iterator.done() == false`.
// Emitting an empty array would signal the consumer the final
// call.
return cb()
}
updatesFromASinglePack.forEach((update) => {
accumulatedUserIds.add(
// Super defensive access on update details.
String(update && update.meta && update.meta.user_id)
)
})
// Emit updates and wait for the consumer.
consumer(null, { updates: updatesFromASinglePack }, cb)
}),
(err) => {
if (err) return consumer(err)
// Adding undefined can happen for broken updates.
accumulatedUserIds.delete('undefined')
consumer(null, {
updates: [],
userIds: Array.from(accumulatedUserIds).sort()
})
}
)
})
})
},
fetchUserInfo(users, callback) {
if (callback == null) {
callback = function (error, fetchedUserInfo) {}
}
const jobs = []
const fetchedUserInfo = {}
for (const user_id in users) {
;((user_id) =>
jobs.push((callback) =>
WebApiManager.getUserInfo(user_id, function (error, userInfo) {
if (error != null) {
return callback(error)
}
fetchedUserInfo[user_id] = userInfo
return callback()
})
))(user_id)
}
return async.series(jobs, function (err) {
if (err != null) {
return callback(err)
}
return callback(null, fetchedUserInfo)
})
},
fillUserInfo(updates, callback) {
let update, user_id
if (callback == null) {
callback = function (error, updates) {}
}
const users = {}
for (update of Array.from(updates)) {
;({ user_id } = update.meta)
if (UpdatesManager._validUserId(user_id)) {
users[user_id] = true
}
}
return UpdatesManager.fetchUserInfo(users, function (
error,
fetchedUserInfo
) {
if (error != null) {
return callback(error)
}
for (update of Array.from(updates)) {
;({ user_id } = update.meta)
delete update.meta.user_id
if (UpdatesManager._validUserId(user_id)) {
update.meta.user = fetchedUserInfo[user_id]
}
}
return callback(null, updates)
})
},
fillSummarizedUserInfo(updates, callback) {
let update, user_id, user_ids
if (callback == null) {
callback = function (error, updates) {}
}
const users = {}
for (update of Array.from(updates)) {
user_ids = update.meta.user_ids || []
for (user_id of Array.from(user_ids)) {
if (UpdatesManager._validUserId(user_id)) {
users[user_id] = true
}
}
}
return UpdatesManager.fetchUserInfo(users, function (
error,
fetchedUserInfo
) {
if (error != null) {
return callback(error)
}
for (update of Array.from(updates)) {
user_ids = update.meta.user_ids || []
update.meta.users = []
delete update.meta.user_ids
for (user_id of Array.from(user_ids)) {
if (UpdatesManager._validUserId(user_id)) {
update.meta.users.push(fetchedUserInfo[user_id])
} else {
update.meta.users.push(null)
}
}
}
return callback(null, updates)
})
},
_validUserId(user_id) {
if (user_id == null) {
return false
} else {
return !!user_id.match(/^[a-f0-9]{24}$/)
}
},
TIME_BETWEEN_DISTINCT_UPDATES: (fiveMinutes = 5 * 60 * 1000),
SPLIT_ON_DELETE_SIZE: 16, // characters
_summarizeUpdates(updates, existingSummarizedUpdates) {
if (existingSummarizedUpdates == null) {
existingSummarizedUpdates = []
}
const summarizedUpdates = existingSummarizedUpdates.slice()
let previousUpdateWasBigDelete = false
for (const update of Array.from(updates)) {
var doc_id
const earliestUpdate = summarizedUpdates[summarizedUpdates.length - 1]
let shouldConcat = false
// If a user inserts some text, then deletes a big chunk including that text,
// the update we show might concat the insert and delete, and there will be no sign
// of that insert having happened, or be able to restore to it (restoring after a big delete is common).
// So, we split the summary on 'big' deletes. However, we've stepping backwards in time with
// most recent changes considered first, so if this update is a big delete, we want to start
// a new summarized update next timge, hence we monitor the previous update.
if (previousUpdateWasBigDelete) {
shouldConcat = false
} else if (
earliestUpdate &&
earliestUpdate.meta.end_ts - update.meta.start_ts <
this.TIME_BETWEEN_DISTINCT_UPDATES
) {
// We're going backwards in time through the updates, so only combine if this update starts less than 5 minutes before
// the end of current summarized block, so no block spans more than 5 minutes.
shouldConcat = true
}
let isBigDelete = false
for (const op of Array.from(update.op || [])) {
if (op.d != null && op.d.length > this.SPLIT_ON_DELETE_SIZE) {
isBigDelete = true
}
}
previousUpdateWasBigDelete = isBigDelete
if (shouldConcat) {
// check if the user in this update is already present in the earliest update,
// if not, add them to the users list of the earliest update
earliestUpdate.meta.user_ids = _.union(earliestUpdate.meta.user_ids, [
update.meta.user_id
])
doc_id = update.doc_id.toString()
const doc = earliestUpdate.docs[doc_id]
if (doc != null) {
doc.fromV = Math.min(doc.fromV, update.v)
doc.toV = Math.max(doc.toV, update.v)
} else {
earliestUpdate.docs[doc_id] = {
fromV: update.v,
toV: update.v
}
}
earliestUpdate.meta.start_ts = Math.min(
earliestUpdate.meta.start_ts,
update.meta.start_ts
)
earliestUpdate.meta.end_ts = Math.max(
earliestUpdate.meta.end_ts,
update.meta.end_ts
)
} else {
const newUpdate = {
meta: {
user_ids: [],
start_ts: update.meta.start_ts,
end_ts: update.meta.end_ts
},
docs: {}
}
newUpdate.docs[update.doc_id.toString()] = {
fromV: update.v,
toV: update.v
}
newUpdate.meta.user_ids.push(update.meta.user_id)
summarizedUpdates.push(newUpdate)
}
}
return summarizedUpdates
}
}
function __guard__(value, transform) {
return typeof value !== 'undefined' && value !== null
? transform(value)
: undefined
}