2014-02-24 12:43:27 -05:00
MongoManager = require " ./MongoManager "
2015-12-11 10:56:47 -05:00
PackManager = require " ./PackManager "
2014-02-25 11:48:42 -05:00
RedisManager = require " ./RedisManager "
2014-01-27 11:26:58 -05:00
UpdateCompressor = require " ./UpdateCompressor "
2014-02-26 05:55:20 -05:00
LockManager = require " ./LockManager "
2014-03-06 13:04:00 -05:00
WebApiManager = require " ./WebApiManager "
2014-03-28 12:01:34 -04:00
UpdateTrimmer = require " ./UpdateTrimmer "
2014-01-27 11:26:58 -05:00
logger = require " logger-sharelatex "
2014-03-06 13:04:00 -05:00
async = require " async "
2015-12-03 10:47:55 -05:00
_ = require " underscore "
2015-12-17 11:28:02 -05:00
Settings = require " settings-sharelatex "
2017-05-15 05:34:24 -04:00
keys = Settings . redis . lock . key_schema
2014-01-27 11:26:58 -05:00
2014-03-05 10:59:40 -05:00
module.exports = UpdatesManager =
2014-05-16 10:59:12 -04:00
compressAndSaveRawUpdates: ( project_id , doc_id , rawUpdates , temporary , callback = (error) -> ) ->
2014-01-27 11:26:58 -05:00
length = rawUpdates . length
if length == 0
return callback ( )
2017-03-30 06:48:26 -04:00
# check that ops are in the correct order
for op , i in rawUpdates when i > 0
thisVersion = op ? . v
prevVersion = rawUpdates [ i - 1 ] ? . v
if not ( prevVersion < thisVersion )
logger . error project_id: project_id , doc_id: doc_id , rawUpdates : rawUpdates , temporary: temporary , thisVersion : thisVersion , prevVersion : prevVersion , " op versions out of order "
2016-02-08 11:22:42 -05:00
# FIXME: we no longer need the lastCompressedUpdate, so change functions not to need it
# CORRECTION: we do use it to log the time in case of error
2015-10-08 11:10:48 -04:00
MongoManager . peekLastCompressedUpdate doc_id , (error, lastCompressedUpdate, lastVersion) ->
2016-01-15 10:02:09 -05:00
# lastCompressedUpdate is the most recent update in Mongo, and
# lastVersion is its sharejs version number.
2015-09-23 11:31:33 -04:00
#
2016-01-15 10:02:09 -05:00
# The peekLastCompressedUpdate method may pass the update back
# as 'null' (for example if the previous compressed update has
# been archived). In this case it can still pass back the
# lastVersion from the update to allow us to check consistency.
2014-01-27 11:26:58 -05:00
return callback ( error ) if error ?
2014-02-25 07:27:42 -05:00
2015-09-23 08:22:38 -04:00
# Ensure that raw updates start where lastVersion left off
if lastVersion ?
2016-03-24 07:54:06 -04:00
discardedUpdates = [ ]
2014-02-25 07:27:42 -05:00
rawUpdates = rawUpdates . slice ( 0 )
2015-09-23 08:22:38 -04:00
while rawUpdates [ 0 ] ? and rawUpdates [ 0 ] . v <= lastVersion
2016-03-24 07:54:06 -04:00
discardedUpdates . push rawUpdates . shift ( )
if discardedUpdates . length
logger . error project_id: project_id , doc_id: doc_id , discardedUpdates: discardedUpdates , temporary: temporary , lastVersion: lastVersion , " discarded updates already present "
2014-02-25 07:27:42 -05:00
2015-09-23 08:22:38 -04:00
if rawUpdates [ 0 ] ? and rawUpdates [ 0 ] . v != lastVersion + 1
2016-01-05 06:30:24 -05:00
ts = lastCompressedUpdate ? . meta ? . end_ts
last_timestamp = if ts ? then new Date ( ts ) else ' unknown time '
error = new Error ( " Tried to apply raw op at version #{ rawUpdates [ 0 ] . v } to last compressed update with version #{ lastVersion } from #{ last_timestamp } " )
2016-03-24 07:54:06 -04:00
logger . error err: error , doc_id: doc_id , project_id: project_id , prev_end_ts: ts , temporary: temporary , lastCompressedUpdate: lastCompressedUpdate , " inconsistent doc versions "
2015-12-17 11:28:02 -05:00
if Settings . trackchanges ? . continueOnError and rawUpdates [ 0 ] . v > lastVersion + 1
# we have lost some ops - continue to write into the database, we can't recover at this point
2015-12-21 08:52:26 -05:00
lastCompressedUpdate = null
2015-12-17 11:28:02 -05:00
else
return callback error
2014-02-25 07:27:42 -05:00
2015-12-11 10:56:47 -05:00
if rawUpdates . length == 0
return callback ( )
2015-08-25 15:52:28 -04:00
2016-01-26 09:52:40 -05:00
# some old large ops in redis need to be rejected, they predate
# the size limit that now prevents them going through the system
REJECT_LARGE_OP_SIZE = 4 * 1024 * 1024
for rawUpdate in rawUpdates
opSizes = ( ( op . i ? . length || op . d ? . length ) for op in rawUpdate ? . op or [ ] )
size = _ . max opSizes
if size > REJECT_LARGE_OP_SIZE
error = new Error ( " dropped op exceeding maximum allowed size of #{ REJECT_LARGE_OP_SIZE } " )
logger . error err: error , doc_id: doc_id , project_id: project_id , size: size , rawUpdate: rawUpdate , " dropped op - too big "
rawUpdate.op = [ ]
2016-02-08 11:22:42 -05:00
compressedUpdates = UpdateCompressor . compressRawUpdates null , rawUpdates
PackManager . insertCompressedUpdates project_id , doc_id , lastCompressedUpdate , compressedUpdates , temporary , (error, result) ->
2014-01-27 11:26:58 -05:00
return callback ( error ) if error ?
2016-02-08 11:22:42 -05:00
logger . log { project_id , doc_id , orig_v: lastCompressedUpdate ? . v , new_v: result . v } , " inserted updates into pack " if result ?
2015-12-11 10:56:47 -05:00
callback ( )
2014-01-27 11:26:58 -05:00
2017-03-27 09:23:34 -04:00
# Check whether the updates are temporary (per-project property)
_prepareProjectForUpdates: ( project_id , callback = (error, temporary) -> ) ->
2014-05-16 10:59:12 -04:00
UpdateTrimmer . shouldTrimUpdates project_id , (error, temporary) ->
2014-02-25 11:48:42 -05:00
return callback ( error ) if error ?
2017-03-27 09:23:34 -04:00
callback ( null , temporary )
# Check for project id on document history (per-document property)
_prepareDocForUpdates: ( project_id , doc_id , callback = (error) -> ) ->
MongoManager . backportProjectId project_id , doc_id , (error) ->
return callback ( error ) if error ?
callback ( null )
# Apply updates for specific project/doc after preparing at project and doc level
REDIS_READ_BATCH_SIZE: 100
processUncompressedUpdates: ( project_id , doc_id , temporary , callback = (error) -> ) ->
# get the updates as strings from redis (so we can delete them after they are applied)
RedisManager . getOldestDocUpdates doc_id , UpdatesManager . REDIS_READ_BATCH_SIZE , (error, docUpdates) ->
return callback ( error ) if error ?
length = docUpdates . length
# parse the redis strings into ShareJs updates
RedisManager . expandDocUpdates docUpdates , (error, rawUpdates) ->
2017-05-23 08:55:10 -04:00
if error ?
logger . err project_id: project_id , doc_id: doc_id , docUpdates: docUpdates , " failed to parse docUpdates "
return callback ( error )
2017-03-27 09:23:34 -04:00
logger . log project_id: project_id , doc_id: doc_id , rawUpdates: rawUpdates , " retrieved raw updates from redis "
UpdatesManager . compressAndSaveRawUpdates project_id , doc_id , rawUpdates , temporary , (error) ->
2014-02-25 11:48:42 -05:00
return callback ( error ) if error ?
2017-03-27 09:23:34 -04:00
logger . log project_id: project_id , doc_id: doc_id , " compressed and saved doc updates "
# delete the applied updates from redis
RedisManager . deleteAppliedDocUpdates project_id , doc_id , docUpdates , (error) ->
2014-03-21 10:40:51 -04:00
return callback ( error ) if error ?
2017-03-27 09:23:34 -04:00
if length == UpdatesManager . REDIS_READ_BATCH_SIZE
# There might be more updates
logger . log project_id: project_id , doc_id: doc_id , " continuing processing updates "
setTimeout () ->
UpdatesManager . processUncompressedUpdates project_id , doc_id , temporary , callback
, 0
else
logger . log project_id: project_id , doc_id: doc_id , " all raw updates processed "
callback ( )
2014-02-25 11:48:42 -05:00
2017-03-27 09:23:34 -04:00
# Process updates for a doc when we flush it individually
2014-03-19 12:40:55 -04:00
processUncompressedUpdatesWithLock: ( project_id , doc_id , callback = (error) -> ) ->
2017-03-27 09:23:34 -04:00
UpdatesManager . _prepareProjectForUpdates project_id , (error, temporary) ->
return callback ( error ) if error ?
UpdatesManager . _processUncompressedUpdatesForDocWithLock project_id , doc_id , temporary , callback
# Process updates for a doc when the whole project is flushed (internal method)
_processUncompressedUpdatesForDocWithLock: ( project_id , doc_id , temporary , callback = (error) -> ) ->
UpdatesManager . _prepareDocForUpdates project_id , doc_id , (error) ->
return callback ( error ) if error ?
LockManager . runWithLock (
2017-05-15 05:34:24 -04:00
keys . historyLock ( { doc_id } ) ,
2017-03-27 09:23:34 -04:00
(releaseLock) ->
UpdatesManager . processUncompressedUpdates project_id , doc_id , temporary , releaseLock
callback
)
2014-02-25 11:48:42 -05:00
2017-03-27 09:23:34 -04:00
# Process all updates for a project, only check project-level information once
2014-03-21 09:48:14 -04:00
processUncompressedUpdatesForProject: ( project_id , callback = (error) -> ) ->
RedisManager . getDocIdsWithHistoryOps project_id , (error, doc_ids) ->
return callback ( error ) if error ?
2017-03-27 09:23:34 -04:00
UpdatesManager . _prepareProjectForUpdates project_id , (error, temporary) ->
jobs = [ ]
for doc_id in doc_ids
do (doc_id) ->
jobs . push (cb) ->
UpdatesManager . _processUncompressedUpdatesForDocWithLock project_id , doc_id , temporary , cb
async . parallelLimit jobs , 5 , callback
2014-03-21 09:48:14 -04:00
2017-04-12 11:34:28 -04:00
# flush all outstanding changes
2017-04-20 06:01:46 -04:00
flushAll: ( limit , callback = (error, result) -> ) ->
2017-04-12 11:34:28 -04:00
RedisManager . getProjectIdsWithHistoryOps (error, project_ids) ->
return callback ( error ) if error ?
logger . log { count: project_ids ? . length , project_ids: project_ids } , " found projects "
jobs = [ ]
2017-04-20 09:32:23 -04:00
project_ids = _ . shuffle project_ids # randomise to avoid hitting same projects each time
2017-04-20 06:01:46 -04:00
selectedProjects = if limit < 0 then project_ids else project_ids [ 0 . . . limit ]
for project_id in selectedProjects
2017-04-12 11:34:28 -04:00
do (project_id) ->
jobs . push (cb) ->
UpdatesManager . processUncompressedUpdatesForProject project_id , (err) ->
return cb ( null , { failed: err ? , project_id: project_id } )
async . series jobs , (error, result) ->
return callback ( error ) if error ?
failedProjects = ( x . project_id for x in result when x . failed )
succeededProjects = ( x . project_id for x in result when not x . failed )
2017-04-20 06:01:46 -04:00
callback ( null , { failed: failedProjects , succeeded: succeededProjects , all: project_ids } )
2017-04-12 11:34:28 -04:00
2017-04-13 06:31:45 -04:00
getDanglingUpdates: ( callback = (error, doc_ids) -> ) ->
RedisManager . getAllDocIdsWithHistoryOps (error, all_doc_ids) ->
return callback ( error ) if error ?
RedisManager . getProjectIdsWithHistoryOps (error, all_project_ids) ->
return callback ( error ) if error ?
# function to get doc_ids for each project
task = (cb) -> async . concatSeries all_project_ids , RedisManager . getDocIdsWithHistoryOps , cb
# find the dangling doc ids
task (error, project_doc_ids) ->
dangling_doc_ids = _ . difference ( all_doc_ids , project_doc_ids )
logger . log { all_doc_ids: all_doc_ids , all_project_ids: all_project_ids , project_doc_ids: project_doc_ids , dangling_doc_ids: dangling_doc_ids } , " checking for dangling doc ids "
callback ( null , dangling_doc_ids )
2014-03-19 13:44:16 -04:00
getDocUpdates: ( project_id , doc_id , options = { } , callback = (error, updates) -> ) ->
2014-03-19 12:40:55 -04:00
UpdatesManager . processUncompressedUpdatesWithLock project_id , doc_id , (error) ->
2014-03-05 10:59:40 -05:00
return callback ( error ) if error ?
2016-02-08 11:22:42 -05:00
#console.log "options", options
PackManager . getOpsByVersionRange project_id , doc_id , options . from , options . to , (error, updates) ->
return callback ( error ) if error ?
2016-03-09 09:01:48 -05:00
callback null , updates
2014-03-05 10:59:40 -05:00
2014-03-19 13:44:16 -04:00
getDocUpdatesWithUserInfo: ( project_id , doc_id , options = { } , callback = (error, updates) -> ) ->
UpdatesManager . getDocUpdates project_id , doc_id , options , (error, updates) ->
2014-03-06 13:04:00 -05:00
return callback ( error ) if error ?
UpdatesManager . fillUserInfo updates , (error, updates) ->
return callback ( error ) if error ?
callback null , updates
2014-03-20 08:10:04 -04:00
getSummarizedProjectUpdates: ( project_id , options = { } , callback = (error, updates) -> ) ->
options . min_count || = 25
2014-03-18 14:09:25 -04:00
summarizedUpdates = [ ]
2014-03-20 08:10:04 -04:00
before = options . before
2016-02-08 11:22:42 -05:00
nextBeforeTimestamp = null
UpdatesManager . processUncompressedUpdatesForProject project_id , (error) ->
2014-03-18 07:41:48 -04:00
return callback ( error ) if error ?
2016-02-08 11:22:42 -05:00
PackManager . makeProjectIterator project_id , before , (err, iterator) ->
return callback ( err ) if err ?
# repeatedly get updates and pass them through the summariser to get an final output with user info
async . whilst () ->
#console.log "checking iterator.done", iterator.done()
return summarizedUpdates . length < options . min_count and not iterator . done ( )
, (cb) ->
iterator . next (err, partialUpdates) ->
return callback ( err ) if err ?
#logger.log {partialUpdates}, 'got partialUpdates'
return cb ( ) if partialUpdates . length is 0 ## FIXME should try to avoid this happening
nextBeforeTimestamp = partialUpdates [ partialUpdates . length - 1 ] . meta . end_ts
# add the updates to the summary list
summarizedUpdates = UpdatesManager . _summarizeUpdates partialUpdates , summarizedUpdates
cb ( )
, () ->
# finally done all updates
#console.log 'summarized Updates', summarizedUpdates
UpdatesManager . fillSummarizedUserInfo summarizedUpdates , (err, results) ->
return callback ( err ) if err ?
callback null , results , if not iterator . done ( ) then nextBeforeTimestamp else undefined
fetchUserInfo: ( users , callback = (error, fetchedUserInfo) -> ) ->
2014-03-06 13:04:00 -05:00
jobs = [ ]
2016-02-08 11:22:42 -05:00
fetchedUserInfo = { }
2015-09-18 11:26:05 -04:00
for user_id of users
2014-03-06 13:04:00 -05:00
do (user_id) ->
jobs . push (callback) ->
WebApiManager . getUserInfo user_id , (error, userInfo) ->
return callback ( error ) if error ?
2016-02-08 11:22:42 -05:00
fetchedUserInfo [ user_id ] = userInfo
2014-03-06 13:04:00 -05:00
callback ( )
2016-02-08 11:22:42 -05:00
async . series jobs , (err) ->
return callback ( err ) if err ?
callback ( null , fetchedUserInfo )
fillUserInfo: ( updates , callback = (error, updates) -> ) ->
users = { }
for update in updates
user_id = update . meta . user_id
if UpdatesManager . _validUserId ( user_id )
users [ user_id ] = true
UpdatesManager . fetchUserInfo users , (error, fetchedUserInfo) ->
2014-03-06 13:04:00 -05:00
return callback ( error ) if error ?
for update in updates
user_id = update . meta . user_id
delete update . meta . user_id
2014-03-11 07:45:25 -04:00
if UpdatesManager . _validUserId ( user_id )
2016-02-08 11:22:42 -05:00
update.meta.user = fetchedUserInfo [ user_id ]
callback null , updates
fillSummarizedUserInfo: ( updates , callback = (error, updates) -> ) ->
users = { }
for update in updates
user_ids = update . meta . user_ids or [ ]
for user_id in user_ids
if UpdatesManager . _validUserId ( user_id )
users [ user_id ] = true
UpdatesManager . fetchUserInfo users , (error, fetchedUserInfo) ->
return callback ( error ) if error ?
for update in updates
user_ids = update . meta . user_ids or [ ]
update.meta.users = [ ]
delete update . meta . user_ids
for user_id in user_ids
if UpdatesManager . _validUserId ( user_id )
update . meta . users . push fetchedUserInfo [ user_id ]
2016-03-10 10:15:29 -05:00
else
update . meta . users . push null
2014-03-06 13:04:00 -05:00
callback null , updates
2014-03-11 07:45:25 -04:00
_validUserId: (user_id) ->
if ! user_id ?
return false
else
return ! ! user_id . match ( /^[a-f0-9]{24}$/ )
2014-03-18 07:41:48 -04:00
TIME_BETWEEN_DISTINCT_UPDATES: fiveMinutes = 5 * 60 * 1000
2016-10-04 10:13:04 -04:00
SPLIT_ON_DELETE_SIZE: 16 # characters
2014-03-18 14:09:25 -04:00
_summarizeUpdates: (updates, existingSummarizedUpdates = []) ->
summarizedUpdates = existingSummarizedUpdates . slice ( )
2016-10-04 10:13:04 -04:00
previousUpdateWasBigDelete = false
2014-03-18 14:09:25 -04:00
for update in updates
earliestUpdate = summarizedUpdates [ summarizedUpdates . length - 1 ]
2016-10-04 10:13:04 -04:00
shouldConcat = false
# If a user inserts some text, then deletes a big chunk including that text,
# the update we show might concat the insert and delete, and there will be no sign
# of that insert having happened, or be able to restore to it (restoring after a big delete is common).
# So, we split the summary on 'big' deletes. However, we've stepping backwards in time with
# most recent changes considered first, so if this update is a big delete, we want to start
# a new summarized update next timge, hence we monitor the previous update.
if previousUpdateWasBigDelete
shouldConcat = false
2016-10-11 06:01:50 -04:00
else if earliestUpdate and earliestUpdate . meta . end_ts - update . meta . start_ts < @ TIME_BETWEEN_DISTINCT_UPDATES
# We're going backwards in time through the updates, so only combine if this update starts less than 5 minutes before
# the end of current summarized block, so no block spans more than 5 minutes.
2016-10-04 10:13:04 -04:00
shouldConcat = true
isBigDelete = false
for op in update . op or [ ]
if op . d ? and op . d . length > @ SPLIT_ON_DELETE_SIZE
isBigDelete = true
previousUpdateWasBigDelete = isBigDelete
if shouldConcat
2015-09-10 11:43:40 -04:00
# check if the user in this update is already present in the earliest update,
# if not, add them to the users list of the earliest update
2016-03-09 09:02:07 -05:00
earliestUpdate.meta.user_ids = _ . union earliestUpdate . meta . user_ids , [ update . meta . user_id ]
2014-03-20 08:10:04 -04:00
2014-03-20 09:37:23 -04:00
doc_id = update . doc_id . toString ( )
doc = earliestUpdate . docs [ doc_id ]
if doc ?
doc.fromV = Math . min ( doc . fromV , update . v )
doc.toV = Math . max ( doc . toV , update . v )
else
earliestUpdate . docs [ doc_id ] =
fromV: update . v
toV: update . v
2014-03-20 08:10:04 -04:00
2014-03-18 14:09:25 -04:00
earliestUpdate.meta.start_ts = Math . min ( earliestUpdate . meta . start_ts , update . meta . start_ts )
earliestUpdate.meta.end_ts = Math . max ( earliestUpdate . meta . end_ts , update . meta . end_ts )
2014-03-18 07:41:48 -04:00
else
newUpdate =
meta:
2016-02-08 11:22:42 -05:00
user_ids: [ ]
2014-03-18 07:41:48 -04:00
start_ts: update . meta . start_ts
end_ts: update . meta . end_ts
2014-03-20 09:37:23 -04:00
docs: { }
newUpdate . docs [ update . doc_id . toString ( ) ] =
2014-03-18 07:41:48 -04:00
fromV: update . v
toV: update . v
2016-02-08 11:22:42 -05:00
newUpdate . meta . user_ids . push update . meta . user_id
2014-03-18 14:09:25 -04:00
summarizedUpdates . push newUpdate
return summarizedUpdates