overleaf/services/track-changes/app/coffee/UpdatesManager.coffee

281 lines
12 KiB
CoffeeScript

MongoManager = require "./MongoManager"
RedisManager = require "./RedisManager"
UpdateCompressor = require "./UpdateCompressor"
LockManager = require "./LockManager"
WebApiManager = require "./WebApiManager"
UpdateTrimmer = require "./UpdateTrimmer"
logger = require "logger-sharelatex"
async = require "async"
DocArchiveManager = require "./DocArchiveManager"
_ = require "underscore"
Settings = require "settings-sharelatex"
module.exports = UpdatesManager =
compressAndSaveRawUpdates: (project_id, doc_id, rawUpdates, temporary, callback = (error) ->) ->
length = rawUpdates.length
if length == 0
return callback()
MongoManager.peekLastCompressedUpdate doc_id, (error, lastCompressedUpdate, lastVersion) ->
# lastCompressedUpdate is the most recent update in Mongo.
#
# The peekLastCompressedUpdate method may pass it back as 'null'
# to force the start of a new compressed update, even when there
# was a previous compressed update in Mongo. In this case it
# passes back the lastVersion from the update to check
# consistency.
# when lastVersion is not provided, default to lastCompressedUpdate.v
lastVersion ?= lastCompressedUpdate?.v
return callback(error) if error?
# Ensure that raw updates start where lastVersion left off
if lastVersion?
rawUpdates = rawUpdates.slice(0)
while rawUpdates[0]? and rawUpdates[0].v <= lastVersion
rawUpdates.shift()
if rawUpdates[0]? and rawUpdates[0].v != lastVersion + 1
ts = lastCompressedUpdate?.meta?.end_ts
last_timestamp = if ts? then new Date(ts) else 'unknown time'
error = new Error("Tried to apply raw op at version #{rawUpdates[0].v} to last compressed update with version #{lastVersion} from #{last_timestamp}")
logger.error err: error, doc_id: doc_id, project_id: project_id, prev_end_ts: ts, "inconsistent doc versions"
if Settings.trackchanges?.continueOnError and rawUpdates[0].v > lastVersion + 1
# we have lost some ops - continue to write into the database, we can't recover at this point
lastCompressedUpdate = null
else
return callback error
compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates
if not lastCompressedUpdate?
# no existing update, insert everything
updateToModify = null
updatesToInsert = compressedUpdates
else
# there are existing updates, see what happens when we
# compress them together with the new ones
[firstUpdate, additionalUpdates...] = compressedUpdates
if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate)
# first update version hasn't changed, skip it and insert remaining updates
# this is an optimisation, we could update the existing op with itself
updateToModify = null
updatesToInsert = additionalUpdates
else
# first update version did changed, modify it and insert remaining updates
updateToModify = firstUpdate
updatesToInsert = additionalUpdates
MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) ->
return callback(error) if error?
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result?
MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) ->
return callback(error) if error?
logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates"
callback()
REDIS_READ_BATCH_SIZE: 100
processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) ->
UpdateTrimmer.shouldTrimUpdates project_id, (error, temporary) ->
return callback(error) if error?
MongoManager.backportProjectId project_id, doc_id, (error) ->
return callback(error) if error?
# get the updates as strings from redis (so we can delete them after they are applied)
RedisManager.getOldestDocUpdates doc_id, UpdatesManager.REDIS_READ_BATCH_SIZE, (error, docUpdates) ->
return callback(error) if error?
length = docUpdates.length
# parse the redis strings into ShareJs updates
RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) ->
return callback(error) if error?
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) ->
return callback(error) if error?
logger.log project_id: project_id, doc_id: doc_id, "compressed and saved doc updates"
# delete the applied updates from redis
RedisManager.deleteAppliedDocUpdates project_id, doc_id, docUpdates, (error) ->
return callback(error) if error?
if length == UpdatesManager.REDIS_READ_BATCH_SIZE
# There might be more updates
logger.log project_id: project_id, doc_id: doc_id, "continuing processing updates"
setTimeout () ->
UpdatesManager.processUncompressedUpdates project_id, doc_id, callback
, 0
else
logger.log project_id: project_id, doc_id: doc_id, "all raw updates processed"
callback()
processUncompressedUpdatesWithLock: (project_id, doc_id, callback = (error) ->) ->
LockManager.runWithLock(
"HistoryLock:#{doc_id}",
(releaseLock) ->
UpdatesManager.processUncompressedUpdates project_id, doc_id, releaseLock
callback
)
processUncompressedUpdatesForProject: (project_id, callback = (error) ->) ->
RedisManager.getDocIdsWithHistoryOps project_id, (error, doc_ids) ->
return callback(error) if error?
jobs = []
for doc_id in doc_ids
do (doc_id) ->
jobs.push (callback) ->
UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, callback
async.parallelLimit jobs, 5, callback
getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) ->
UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) ->
return callback(error) if error?
MongoManager.getDocUpdates doc_id, options, callback
getDocUpdatesWithUserInfo: (project_id, doc_id, options = {}, callback = (error, updates) ->) ->
UpdatesManager.getDocUpdates project_id, doc_id, options, (error, updates) ->
return callback(error) if error?
UpdatesManager.fillUserInfo updates, (error, updates) ->
return callback(error) if error?
callback null, updates
getProjectUpdates: (project_id, options = {}, callback = (error, updates) ->) ->
UpdatesManager.processUncompressedUpdatesForProject project_id, (error) ->
return callback(error) if error?
MongoManager.getProjectUpdates project_id, options, (error, updates) ->
jobs = []
for update in updates
if update.inS3
do (update) ->
jobs.push (callback) -> DocArchiveManager.unArchiveDocChanges update.project_id, update.doc_id, callback
if jobs.length?
async.series jobs, (err) ->
MongoManager.getProjectUpdates project_id, options, callback
else
callback(error, updates)
getProjectUpdatesWithUserInfo: (project_id, options = {}, callback = (error, updates) ->) ->
UpdatesManager.getProjectUpdates project_id, options, (error, updates) ->
return callback(error) if error?
UpdatesManager.fillUserInfo updates, (error, updates) ->
return callback(error) if error?
callback null, updates
getSummarizedProjectUpdates: (project_id, options = {}, callback = (error, updates) ->) ->
options.min_count ||= 25
summarizedUpdates = []
before = options.before
do fetchNextBatch = () ->
UpdatesManager._extendBatchOfSummarizedUpdates project_id, summarizedUpdates, before, options.min_count, (error, updates, nextBeforeUpdate) ->
return callback(error) if error?
if !nextBeforeUpdate? or updates.length >= options.min_count
callback null, updates, nextBeforeUpdate
else
before = nextBeforeUpdate
summarizedUpdates = updates
fetchNextBatch()
_extendBatchOfSummarizedUpdates: (
project_id,
existingSummarizedUpdates,
before, desiredLength,
callback = (error, summarizedUpdates, endOfDatabase) ->
) ->
UpdatesManager.getProjectUpdatesWithUserInfo project_id, { before: before, limit: 3 * desiredLength }, (error, updates) ->
return callback(error) if error?
# Suppose in this request we have fetch the solid updates. In the next request we need
# to fetch the dotted updates. These are defined by having an end timestamp less than
# the last update's end timestamp (updates are ordered by descending end_ts). I.e.
# start_ts--v v--end_ts
# doc1: |......| |...| |-------|
# doc2: |------------------|
# ^----- Next time, fetch all updates with an
# end_ts less than this
#
if updates? and updates.length > 0
nextBeforeTimestamp = updates[updates.length - 1].meta.end_ts
if nextBeforeTimestamp >= before
error = new Error("history order is broken")
logger.error err: error, project_id:project_id, nextBeforeTimestamp: nextBeforeTimestamp, before:before, "error in project history"
return callback(error)
else
nextBeforeTimestamp = null
summarizedUpdates = UpdatesManager._summarizeUpdates(
updates, existingSummarizedUpdates
)
callback null,
summarizedUpdates,
nextBeforeTimestamp
fillUserInfo: (updates, callback = (error, updates) ->) ->
users = {}
for update in updates
if UpdatesManager._validUserId(update.meta.user_id)
users[update.meta.user_id] = true
jobs = []
for user_id of users
do (user_id) ->
jobs.push (callback) ->
WebApiManager.getUserInfo user_id, (error, userInfo) ->
return callback(error) if error?
users[user_id] = userInfo
callback()
async.series jobs, (error) ->
return callback(error) if error?
for update in updates
user_id = update.meta.user_id
delete update.meta.user_id
if UpdatesManager._validUserId(user_id)
update.meta.user = users[user_id]
callback null, updates
_validUserId: (user_id) ->
if !user_id?
return false
else
return !!user_id.match(/^[a-f0-9]{24}$/)
TIME_BETWEEN_DISTINCT_UPDATES: fiveMinutes = 5 * 60 * 1000
_summarizeUpdates: (updates, existingSummarizedUpdates = []) ->
summarizedUpdates = existingSummarizedUpdates.slice()
for update in updates
earliestUpdate = summarizedUpdates[summarizedUpdates.length - 1]
if earliestUpdate and earliestUpdate.meta.start_ts - update.meta.end_ts < @TIME_BETWEEN_DISTINCT_UPDATES
# check if the user in this update is already present in the earliest update,
# if not, add them to the users list of the earliest update
userExists = false
for user in earliestUpdate.meta.users
if (!user and !update.meta.user) or (user?.id == update.meta.user?.id)
userExists = true
break
if !userExists
earliestUpdate.meta.users.push update.meta.user
doc_id = update.doc_id.toString()
doc = earliestUpdate.docs[doc_id]
if doc?
doc.fromV = Math.min(doc.fromV, update.v)
doc.toV = Math.max(doc.toV, update.v)
else
earliestUpdate.docs[doc_id] =
fromV: update.v
toV: update.v
earliestUpdate.meta.start_ts = Math.min(earliestUpdate.meta.start_ts, update.meta.start_ts)
earliestUpdate.meta.end_ts = Math.max(earliestUpdate.meta.end_ts, update.meta.end_ts)
else
newUpdate =
meta:
users: []
start_ts: update.meta.start_ts
end_ts: update.meta.end_ts
docs: {}
newUpdate.docs[update.doc_id.toString()] =
fromV: update.v
toV: update.v
newUpdate.meta.users.push update.meta.user
summarizedUpdates.push newUpdate
return summarizedUpdates