mirror of
https://github.com/overleaf/overleaf.git
synced 2025-01-02 17:53:11 +00:00
832 lines
24 KiB
JavaScript
832 lines
24 KiB
JavaScript
/* eslint-disable
|
|
camelcase,
|
|
handle-callback-err,
|
|
no-unused-vars,
|
|
*/
|
|
// TODO: This file was created by bulk-decaffeinate.
|
|
// Fix any style issues and re-enable lint.
|
|
/*
|
|
* decaffeinate suggestions:
|
|
* DS101: Remove unnecessary use of Array.from
|
|
* DS102: Remove unnecessary code created because of implicit returns
|
|
* DS103: Rewrite code to no longer use __guard__
|
|
* DS205: Consider reworking code to avoid use of IIFEs
|
|
* DS207: Consider shorter variations of null checks
|
|
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
|
*/
|
|
let fiveMinutes, UpdatesManager
|
|
const MongoManager = require('./MongoManager')
|
|
const PackManager = require('./PackManager')
|
|
const RedisManager = require('./RedisManager')
|
|
const UpdateCompressor = require('./UpdateCompressor')
|
|
const LockManager = require('./LockManager')
|
|
const WebApiManager = require('./WebApiManager')
|
|
const UpdateTrimmer = require('./UpdateTrimmer')
|
|
const logger = require('logger-sharelatex')
|
|
const async = require('async')
|
|
const _ = require('underscore')
|
|
const Settings = require('settings-sharelatex')
|
|
const keys = Settings.redis.lock.key_schema
|
|
|
|
module.exports = UpdatesManager = {
|
|
compressAndSaveRawUpdates(
|
|
project_id,
|
|
doc_id,
|
|
rawUpdates,
|
|
temporary,
|
|
callback
|
|
) {
|
|
let i
|
|
if (callback == null) {
|
|
callback = function (error) {}
|
|
}
|
|
const { length } = rawUpdates
|
|
if (length === 0) {
|
|
return callback()
|
|
}
|
|
|
|
// check that ops are in the correct order
|
|
for (i = 0; i < rawUpdates.length; i++) {
|
|
const op = rawUpdates[i]
|
|
if (i > 0) {
|
|
const thisVersion = op != null ? op.v : undefined
|
|
const prevVersion = __guard__(rawUpdates[i - 1], (x) => x.v)
|
|
if (!(prevVersion < thisVersion)) {
|
|
logger.error(
|
|
{
|
|
project_id,
|
|
doc_id,
|
|
rawUpdates,
|
|
temporary,
|
|
thisVersion,
|
|
prevVersion
|
|
},
|
|
'op versions out of order'
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// FIXME: we no longer need the lastCompressedUpdate, so change functions not to need it
|
|
// CORRECTION: we do use it to log the time in case of error
|
|
return MongoManager.peekLastCompressedUpdate(doc_id, function (
|
|
error,
|
|
lastCompressedUpdate,
|
|
lastVersion
|
|
) {
|
|
// lastCompressedUpdate is the most recent update in Mongo, and
|
|
// lastVersion is its sharejs version number.
|
|
//
|
|
// The peekLastCompressedUpdate method may pass the update back
|
|
// as 'null' (for example if the previous compressed update has
|
|
// been archived). In this case it can still pass back the
|
|
// lastVersion from the update to allow us to check consistency.
|
|
let op
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
|
|
// Ensure that raw updates start where lastVersion left off
|
|
if (lastVersion != null) {
|
|
const discardedUpdates = []
|
|
rawUpdates = rawUpdates.slice(0)
|
|
while (rawUpdates[0] != null && rawUpdates[0].v <= lastVersion) {
|
|
discardedUpdates.push(rawUpdates.shift())
|
|
}
|
|
if (discardedUpdates.length) {
|
|
logger.error(
|
|
{ project_id, doc_id, discardedUpdates, temporary, lastVersion },
|
|
'discarded updates already present'
|
|
)
|
|
}
|
|
|
|
if (rawUpdates[0] != null && rawUpdates[0].v !== lastVersion + 1) {
|
|
const ts = __guard__(
|
|
lastCompressedUpdate != null
|
|
? lastCompressedUpdate.meta
|
|
: undefined,
|
|
(x1) => x1.end_ts
|
|
)
|
|
const last_timestamp = ts != null ? new Date(ts) : 'unknown time'
|
|
error = new Error(
|
|
`Tried to apply raw op at version ${rawUpdates[0].v} to last compressed update with version ${lastVersion} from ${last_timestamp}`
|
|
)
|
|
logger.error(
|
|
{
|
|
err: error,
|
|
doc_id,
|
|
project_id,
|
|
prev_end_ts: ts,
|
|
temporary,
|
|
lastCompressedUpdate
|
|
},
|
|
'inconsistent doc versions'
|
|
)
|
|
if (
|
|
(Settings.trackchanges != null
|
|
? Settings.trackchanges.continueOnError
|
|
: undefined) &&
|
|
rawUpdates[0].v > lastVersion + 1
|
|
) {
|
|
// we have lost some ops - continue to write into the database, we can't recover at this point
|
|
lastCompressedUpdate = null
|
|
} else {
|
|
return callback(error)
|
|
}
|
|
}
|
|
}
|
|
|
|
if (rawUpdates.length === 0) {
|
|
return callback()
|
|
}
|
|
|
|
// some old large ops in redis need to be rejected, they predate
|
|
// the size limit that now prevents them going through the system
|
|
const REJECT_LARGE_OP_SIZE = 4 * 1024 * 1024
|
|
for (var rawUpdate of Array.from(rawUpdates)) {
|
|
const opSizes = (() => {
|
|
const result = []
|
|
for (op of Array.from(
|
|
(rawUpdate != null ? rawUpdate.op : undefined) || []
|
|
)) {
|
|
result.push(
|
|
(op.i != null ? op.i.length : undefined) ||
|
|
(op.d != null ? op.d.length : undefined)
|
|
)
|
|
}
|
|
return result
|
|
})()
|
|
const size = _.max(opSizes)
|
|
if (size > REJECT_LARGE_OP_SIZE) {
|
|
error = new Error(
|
|
`dropped op exceeding maximum allowed size of ${REJECT_LARGE_OP_SIZE}`
|
|
)
|
|
logger.error(
|
|
{ err: error, doc_id, project_id, size, rawUpdate },
|
|
'dropped op - too big'
|
|
)
|
|
rawUpdate.op = []
|
|
}
|
|
}
|
|
|
|
const compressedUpdates = UpdateCompressor.compressRawUpdates(
|
|
null,
|
|
rawUpdates
|
|
)
|
|
return PackManager.insertCompressedUpdates(
|
|
project_id,
|
|
doc_id,
|
|
lastCompressedUpdate,
|
|
compressedUpdates,
|
|
temporary,
|
|
function (error, result) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
if (result != null) {
|
|
logger.log(
|
|
{
|
|
project_id,
|
|
doc_id,
|
|
orig_v:
|
|
lastCompressedUpdate != null
|
|
? lastCompressedUpdate.v
|
|
: undefined,
|
|
new_v: result.v
|
|
},
|
|
'inserted updates into pack'
|
|
)
|
|
}
|
|
return callback()
|
|
}
|
|
)
|
|
})
|
|
},
|
|
|
|
// Check whether the updates are temporary (per-project property)
|
|
_prepareProjectForUpdates(project_id, callback) {
|
|
if (callback == null) {
|
|
callback = function (error, temporary) {}
|
|
}
|
|
return UpdateTrimmer.shouldTrimUpdates(project_id, function (
|
|
error,
|
|
temporary
|
|
) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return callback(null, temporary)
|
|
})
|
|
},
|
|
|
|
// Check for project id on document history (per-document property)
|
|
_prepareDocForUpdates(project_id, doc_id, callback) {
|
|
if (callback == null) {
|
|
callback = function (error) {}
|
|
}
|
|
return MongoManager.backportProjectId(project_id, doc_id, function (error) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return callback(null)
|
|
})
|
|
},
|
|
|
|
// Apply updates for specific project/doc after preparing at project and doc level
|
|
REDIS_READ_BATCH_SIZE: 100,
|
|
processUncompressedUpdates(project_id, doc_id, temporary, callback) {
|
|
// get the updates as strings from redis (so we can delete them after they are applied)
|
|
if (callback == null) {
|
|
callback = function (error) {}
|
|
}
|
|
return RedisManager.getOldestDocUpdates(
|
|
doc_id,
|
|
UpdatesManager.REDIS_READ_BATCH_SIZE,
|
|
function (error, docUpdates) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
const { length } = docUpdates
|
|
// parse the redis strings into ShareJs updates
|
|
return RedisManager.expandDocUpdates(docUpdates, function (
|
|
error,
|
|
rawUpdates
|
|
) {
|
|
if (error != null) {
|
|
logger.err(
|
|
{ project_id, doc_id, docUpdates },
|
|
'failed to parse docUpdates'
|
|
)
|
|
return callback(error)
|
|
}
|
|
logger.log(
|
|
{ project_id, doc_id, rawUpdates },
|
|
'retrieved raw updates from redis'
|
|
)
|
|
return UpdatesManager.compressAndSaveRawUpdates(
|
|
project_id,
|
|
doc_id,
|
|
rawUpdates,
|
|
temporary,
|
|
function (error) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
logger.log(
|
|
{ project_id, doc_id },
|
|
'compressed and saved doc updates'
|
|
)
|
|
// delete the applied updates from redis
|
|
return RedisManager.deleteAppliedDocUpdates(
|
|
project_id,
|
|
doc_id,
|
|
docUpdates,
|
|
function (error) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
if (length === UpdatesManager.REDIS_READ_BATCH_SIZE) {
|
|
// There might be more updates
|
|
logger.log(
|
|
{ project_id, doc_id },
|
|
'continuing processing updates'
|
|
)
|
|
return setTimeout(
|
|
() =>
|
|
UpdatesManager.processUncompressedUpdates(
|
|
project_id,
|
|
doc_id,
|
|
temporary,
|
|
callback
|
|
),
|
|
0
|
|
)
|
|
} else {
|
|
logger.log(
|
|
{ project_id, doc_id },
|
|
'all raw updates processed'
|
|
)
|
|
return callback()
|
|
}
|
|
}
|
|
)
|
|
}
|
|
)
|
|
})
|
|
}
|
|
)
|
|
},
|
|
|
|
// Process updates for a doc when we flush it individually
|
|
processUncompressedUpdatesWithLock(project_id, doc_id, callback) {
|
|
if (callback == null) {
|
|
callback = function (error) {}
|
|
}
|
|
return UpdatesManager._prepareProjectForUpdates(project_id, function (
|
|
error,
|
|
temporary
|
|
) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return UpdatesManager._processUncompressedUpdatesForDocWithLock(
|
|
project_id,
|
|
doc_id,
|
|
temporary,
|
|
callback
|
|
)
|
|
})
|
|
},
|
|
|
|
// Process updates for a doc when the whole project is flushed (internal method)
|
|
_processUncompressedUpdatesForDocWithLock(
|
|
project_id,
|
|
doc_id,
|
|
temporary,
|
|
callback
|
|
) {
|
|
if (callback == null) {
|
|
callback = function (error) {}
|
|
}
|
|
return UpdatesManager._prepareDocForUpdates(project_id, doc_id, function (
|
|
error
|
|
) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return LockManager.runWithLock(
|
|
keys.historyLock({ doc_id }),
|
|
(releaseLock) =>
|
|
UpdatesManager.processUncompressedUpdates(
|
|
project_id,
|
|
doc_id,
|
|
temporary,
|
|
releaseLock
|
|
),
|
|
callback
|
|
)
|
|
})
|
|
},
|
|
|
|
// Process all updates for a project, only check project-level information once
|
|
processUncompressedUpdatesForProject(project_id, callback) {
|
|
if (callback == null) {
|
|
callback = function (error) {}
|
|
}
|
|
return RedisManager.getDocIdsWithHistoryOps(project_id, function (
|
|
error,
|
|
doc_ids
|
|
) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return UpdatesManager._prepareProjectForUpdates(project_id, function (
|
|
error,
|
|
temporary
|
|
) {
|
|
const jobs = []
|
|
for (const doc_id of Array.from(doc_ids)) {
|
|
;((doc_id) =>
|
|
jobs.push((cb) =>
|
|
UpdatesManager._processUncompressedUpdatesForDocWithLock(
|
|
project_id,
|
|
doc_id,
|
|
temporary,
|
|
cb
|
|
)
|
|
))(doc_id)
|
|
}
|
|
return async.parallelLimit(jobs, 5, callback)
|
|
})
|
|
})
|
|
},
|
|
|
|
// flush all outstanding changes
|
|
flushAll(limit, callback) {
|
|
if (callback == null) {
|
|
callback = function (error, result) {}
|
|
}
|
|
return RedisManager.getProjectIdsWithHistoryOps(function (
|
|
error,
|
|
project_ids
|
|
) {
|
|
let project_id
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
logger.log(
|
|
{
|
|
count: project_ids != null ? project_ids.length : undefined,
|
|
project_ids
|
|
},
|
|
'found projects'
|
|
)
|
|
const jobs = []
|
|
project_ids = _.shuffle(project_ids) // randomise to avoid hitting same projects each time
|
|
const selectedProjects =
|
|
limit < 0 ? project_ids : project_ids.slice(0, limit)
|
|
for (project_id of Array.from(selectedProjects)) {
|
|
;((project_id) =>
|
|
jobs.push((cb) =>
|
|
UpdatesManager.processUncompressedUpdatesForProject(
|
|
project_id,
|
|
(err) => cb(null, { failed: err != null, project_id })
|
|
)
|
|
))(project_id)
|
|
}
|
|
return async.series(jobs, function (error, result) {
|
|
let x
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
const failedProjects = (() => {
|
|
const result1 = []
|
|
for (x of Array.from(result)) {
|
|
if (x.failed) {
|
|
result1.push(x.project_id)
|
|
}
|
|
}
|
|
return result1
|
|
})()
|
|
const succeededProjects = (() => {
|
|
const result2 = []
|
|
for (x of Array.from(result)) {
|
|
if (!x.failed) {
|
|
result2.push(x.project_id)
|
|
}
|
|
}
|
|
return result2
|
|
})()
|
|
return callback(null, {
|
|
failed: failedProjects,
|
|
succeeded: succeededProjects,
|
|
all: project_ids
|
|
})
|
|
})
|
|
})
|
|
},
|
|
|
|
getDanglingUpdates(callback) {
|
|
if (callback == null) {
|
|
callback = function (error, doc_ids) {}
|
|
}
|
|
return RedisManager.getAllDocIdsWithHistoryOps(function (
|
|
error,
|
|
all_doc_ids
|
|
) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return RedisManager.getProjectIdsWithHistoryOps(function (
|
|
error,
|
|
all_project_ids
|
|
) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
// function to get doc_ids for each project
|
|
const task = (cb) =>
|
|
async.concatSeries(
|
|
all_project_ids,
|
|
RedisManager.getDocIdsWithHistoryOps,
|
|
cb
|
|
)
|
|
// find the dangling doc ids
|
|
return task(function (error, project_doc_ids) {
|
|
const dangling_doc_ids = _.difference(all_doc_ids, project_doc_ids)
|
|
logger.log(
|
|
{ all_doc_ids, all_project_ids, project_doc_ids, dangling_doc_ids },
|
|
'checking for dangling doc ids'
|
|
)
|
|
return callback(null, dangling_doc_ids)
|
|
})
|
|
})
|
|
})
|
|
},
|
|
|
|
getDocUpdates(project_id, doc_id, options, callback) {
|
|
if (options == null) {
|
|
options = {}
|
|
}
|
|
if (callback == null) {
|
|
callback = function (error, updates) {}
|
|
}
|
|
return UpdatesManager.processUncompressedUpdatesWithLock(
|
|
project_id,
|
|
doc_id,
|
|
function (error) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
// console.log "options", options
|
|
return PackManager.getOpsByVersionRange(
|
|
project_id,
|
|
doc_id,
|
|
options.from,
|
|
options.to,
|
|
function (error, updates) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return callback(null, updates)
|
|
}
|
|
)
|
|
}
|
|
)
|
|
},
|
|
|
|
getDocUpdatesWithUserInfo(project_id, doc_id, options, callback) {
|
|
if (options == null) {
|
|
options = {}
|
|
}
|
|
if (callback == null) {
|
|
callback = function (error, updates) {}
|
|
}
|
|
return UpdatesManager.getDocUpdates(project_id, doc_id, options, function (
|
|
error,
|
|
updates
|
|
) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return UpdatesManager.fillUserInfo(updates, function (error, updates) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return callback(null, updates)
|
|
})
|
|
})
|
|
},
|
|
|
|
getSummarizedProjectUpdates(project_id, options, callback) {
|
|
if (options == null) {
|
|
options = {}
|
|
}
|
|
if (callback == null) {
|
|
callback = function (error, updates) {}
|
|
}
|
|
if (!options.min_count) {
|
|
options.min_count = 25
|
|
}
|
|
let summarizedUpdates = []
|
|
const { before } = options
|
|
let nextBeforeTimestamp = null
|
|
return UpdatesManager.processUncompressedUpdatesForProject(
|
|
project_id,
|
|
function (error) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
return PackManager.makeProjectIterator(project_id, before, function (
|
|
err,
|
|
iterator
|
|
) {
|
|
if (err != null) {
|
|
return callback(err)
|
|
}
|
|
// repeatedly get updates and pass them through the summariser to get an final output with user info
|
|
return async.whilst(
|
|
() =>
|
|
// console.log "checking iterator.done", iterator.done()
|
|
summarizedUpdates.length < options.min_count && !iterator.done(),
|
|
|
|
(cb) =>
|
|
iterator.next(function (err, partialUpdates) {
|
|
if (err != null) {
|
|
return callback(err)
|
|
}
|
|
// logger.log {partialUpdates}, 'got partialUpdates'
|
|
if (partialUpdates.length === 0) {
|
|
return cb()
|
|
} // # FIXME should try to avoid this happening
|
|
nextBeforeTimestamp =
|
|
partialUpdates[partialUpdates.length - 1].meta.end_ts
|
|
// add the updates to the summary list
|
|
summarizedUpdates = UpdatesManager._summarizeUpdates(
|
|
partialUpdates,
|
|
summarizedUpdates
|
|
)
|
|
return cb()
|
|
}),
|
|
|
|
() =>
|
|
// finally done all updates
|
|
// console.log 'summarized Updates', summarizedUpdates
|
|
UpdatesManager.fillSummarizedUserInfo(
|
|
summarizedUpdates,
|
|
function (err, results) {
|
|
if (err != null) {
|
|
return callback(err)
|
|
}
|
|
return callback(
|
|
null,
|
|
results,
|
|
!iterator.done() ? nextBeforeTimestamp : undefined
|
|
)
|
|
}
|
|
)
|
|
)
|
|
})
|
|
}
|
|
)
|
|
},
|
|
|
|
fetchUserInfo(users, callback) {
|
|
if (callback == null) {
|
|
callback = function (error, fetchedUserInfo) {}
|
|
}
|
|
const jobs = []
|
|
const fetchedUserInfo = {}
|
|
for (const user_id in users) {
|
|
;((user_id) =>
|
|
jobs.push((callback) =>
|
|
WebApiManager.getUserInfo(user_id, function (error, userInfo) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
fetchedUserInfo[user_id] = userInfo
|
|
return callback()
|
|
})
|
|
))(user_id)
|
|
}
|
|
|
|
return async.series(jobs, function (err) {
|
|
if (err != null) {
|
|
return callback(err)
|
|
}
|
|
return callback(null, fetchedUserInfo)
|
|
})
|
|
},
|
|
|
|
fillUserInfo(updates, callback) {
|
|
let update, user_id
|
|
if (callback == null) {
|
|
callback = function (error, updates) {}
|
|
}
|
|
const users = {}
|
|
for (update of Array.from(updates)) {
|
|
;({ user_id } = update.meta)
|
|
if (UpdatesManager._validUserId(user_id)) {
|
|
users[user_id] = true
|
|
}
|
|
}
|
|
|
|
return UpdatesManager.fetchUserInfo(users, function (
|
|
error,
|
|
fetchedUserInfo
|
|
) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
for (update of Array.from(updates)) {
|
|
;({ user_id } = update.meta)
|
|
delete update.meta.user_id
|
|
if (UpdatesManager._validUserId(user_id)) {
|
|
update.meta.user = fetchedUserInfo[user_id]
|
|
}
|
|
}
|
|
return callback(null, updates)
|
|
})
|
|
},
|
|
|
|
fillSummarizedUserInfo(updates, callback) {
|
|
let update, user_id, user_ids
|
|
if (callback == null) {
|
|
callback = function (error, updates) {}
|
|
}
|
|
const users = {}
|
|
for (update of Array.from(updates)) {
|
|
user_ids = update.meta.user_ids || []
|
|
for (user_id of Array.from(user_ids)) {
|
|
if (UpdatesManager._validUserId(user_id)) {
|
|
users[user_id] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
return UpdatesManager.fetchUserInfo(users, function (
|
|
error,
|
|
fetchedUserInfo
|
|
) {
|
|
if (error != null) {
|
|
return callback(error)
|
|
}
|
|
for (update of Array.from(updates)) {
|
|
user_ids = update.meta.user_ids || []
|
|
update.meta.users = []
|
|
delete update.meta.user_ids
|
|
for (user_id of Array.from(user_ids)) {
|
|
if (UpdatesManager._validUserId(user_id)) {
|
|
update.meta.users.push(fetchedUserInfo[user_id])
|
|
} else {
|
|
update.meta.users.push(null)
|
|
}
|
|
}
|
|
}
|
|
return callback(null, updates)
|
|
})
|
|
},
|
|
|
|
_validUserId(user_id) {
|
|
if (user_id == null) {
|
|
return false
|
|
} else {
|
|
return !!user_id.match(/^[a-f0-9]{24}$/)
|
|
}
|
|
},
|
|
|
|
TIME_BETWEEN_DISTINCT_UPDATES: (fiveMinutes = 5 * 60 * 1000),
|
|
SPLIT_ON_DELETE_SIZE: 16, // characters
|
|
_summarizeUpdates(updates, existingSummarizedUpdates) {
|
|
if (existingSummarizedUpdates == null) {
|
|
existingSummarizedUpdates = []
|
|
}
|
|
const summarizedUpdates = existingSummarizedUpdates.slice()
|
|
let previousUpdateWasBigDelete = false
|
|
for (const update of Array.from(updates)) {
|
|
var doc_id
|
|
const earliestUpdate = summarizedUpdates[summarizedUpdates.length - 1]
|
|
let shouldConcat = false
|
|
|
|
// If a user inserts some text, then deletes a big chunk including that text,
|
|
// the update we show might concat the insert and delete, and there will be no sign
|
|
// of that insert having happened, or be able to restore to it (restoring after a big delete is common).
|
|
// So, we split the summary on 'big' deletes. However, we've stepping backwards in time with
|
|
// most recent changes considered first, so if this update is a big delete, we want to start
|
|
// a new summarized update next timge, hence we monitor the previous update.
|
|
if (previousUpdateWasBigDelete) {
|
|
shouldConcat = false
|
|
} else if (
|
|
earliestUpdate &&
|
|
earliestUpdate.meta.end_ts - update.meta.start_ts <
|
|
this.TIME_BETWEEN_DISTINCT_UPDATES
|
|
) {
|
|
// We're going backwards in time through the updates, so only combine if this update starts less than 5 minutes before
|
|
// the end of current summarized block, so no block spans more than 5 minutes.
|
|
shouldConcat = true
|
|
}
|
|
|
|
let isBigDelete = false
|
|
for (const op of Array.from(update.op || [])) {
|
|
if (op.d != null && op.d.length > this.SPLIT_ON_DELETE_SIZE) {
|
|
isBigDelete = true
|
|
}
|
|
}
|
|
|
|
previousUpdateWasBigDelete = isBigDelete
|
|
|
|
if (shouldConcat) {
|
|
// check if the user in this update is already present in the earliest update,
|
|
// if not, add them to the users list of the earliest update
|
|
earliestUpdate.meta.user_ids = _.union(earliestUpdate.meta.user_ids, [
|
|
update.meta.user_id
|
|
])
|
|
|
|
doc_id = update.doc_id.toString()
|
|
const doc = earliestUpdate.docs[doc_id]
|
|
if (doc != null) {
|
|
doc.fromV = Math.min(doc.fromV, update.v)
|
|
doc.toV = Math.max(doc.toV, update.v)
|
|
} else {
|
|
earliestUpdate.docs[doc_id] = {
|
|
fromV: update.v,
|
|
toV: update.v
|
|
}
|
|
}
|
|
|
|
earliestUpdate.meta.start_ts = Math.min(
|
|
earliestUpdate.meta.start_ts,
|
|
update.meta.start_ts
|
|
)
|
|
earliestUpdate.meta.end_ts = Math.max(
|
|
earliestUpdate.meta.end_ts,
|
|
update.meta.end_ts
|
|
)
|
|
} else {
|
|
const newUpdate = {
|
|
meta: {
|
|
user_ids: [],
|
|
start_ts: update.meta.start_ts,
|
|
end_ts: update.meta.end_ts
|
|
},
|
|
docs: {}
|
|
}
|
|
|
|
newUpdate.docs[update.doc_id.toString()] = {
|
|
fromV: update.v,
|
|
toV: update.v
|
|
}
|
|
newUpdate.meta.user_ids.push(update.meta.user_id)
|
|
summarizedUpdates.push(newUpdate)
|
|
}
|
|
}
|
|
|
|
return summarizedUpdates
|
|
}
|
|
}
|
|
|
|
function __guard__(value, transform) {
|
|
return typeof value !== 'undefined' && value !== null
|
|
? transform(value)
|
|
: undefined
|
|
}
|