overleaf/services/track-changes/app/coffee/PackManager.coffee
2017-05-15 10:34:24 +01:00

540 lines
21 KiB
CoffeeScript

async = require "async"
_ = require "underscore"
{db, ObjectId, BSON} = require "./mongojs"
logger = require "logger-sharelatex"
LockManager = require "./LockManager"
MongoAWS = require "./MongoAWS"
Metrics = require "metrics-sharelatex"
ProjectIterator = require "./ProjectIterator"
Settings = require "settings-sharelatex"
keys = Settings.redis.lock.key_schema
# Sharejs operations are stored in a 'pack' object
#
# e.g. a single sharejs update looks like
#
# {
# "doc_id" : 549dae9e0a2a615c0c7f0c98,
# "project_id" : 549dae9c0a2a615c0c7f0c8c,
# "op" : [ {"p" : 6981, "d" : "?" } ],
# "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 },
# "v" : 17082
# }
#
# and a pack looks like this
#
# {
# "doc_id" : 549dae9e0a2a615c0c7f0c98,
# "project_id" : 549dae9c0a2a615c0c7f0c8c,
# "pack" : [ U1, U2, U3, ...., UN],
# "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 },
# "v" : 17082
# "v_end" : ...
# }
#
# where U1, U2, U3, .... are single updates stripped of their
# doc_id and project_id fields (which are the same for all the
# updates in the pack).
#
# The pack itself has v and meta fields, this makes it possible to
# treat packs and single updates in a similar way.
#
# The v field of the pack itself is from the first entry U1, the
# v_end field from UN. The meta.end_ts field of the pack itself is
# from the last entry UN, the meta.start_ts field from U1.
DAYS = 24 * 3600 * 1000 # one day in milliseconds
module.exports = PackManager =
MAX_SIZE: 1024*1024 # make these configurable parameters
MAX_COUNT: 1024
insertCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
return callback() if newUpdates.length == 0
# never append permanent ops to a pack that will expire
lastUpdate = null if lastUpdate?.expiresAt? and not temporary
updatesToFlush = []
updatesRemaining = newUpdates.slice()
n = lastUpdate?.n || 0
sz = lastUpdate?.sz || 0
while updatesRemaining.length and n < PackManager.MAX_COUNT and sz < PackManager.MAX_SIZE
nextUpdate = updatesRemaining[0]
nextUpdateSize = BSON.calculateObjectSize(nextUpdate)
if nextUpdateSize + sz > PackManager.MAX_SIZE and n > 0
break
n++
sz += nextUpdateSize
updatesToFlush.push updatesRemaining.shift()
PackManager.flushCompressedUpdates project_id, doc_id, lastUpdate, updatesToFlush, temporary, (error) ->
return callback(error) if error?
PackManager.insertCompressedUpdates project_id, doc_id, null, updatesRemaining, temporary, callback
flushCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
return callback() if newUpdates.length == 0
canAppend = false
# check if it is safe to append to an existing pack
if lastUpdate?
if not temporary and not lastUpdate.expiresAt?
# permanent pack appends to permanent pack
canAppend = true
age = Date.now() - lastUpdate.meta?.start_ts
if temporary and lastUpdate.expiresAt? and age < 1 * DAYS
# temporary pack appends to temporary pack if same day
canAppend = true
if canAppend
PackManager.appendUpdatesToExistingPack project_id, doc_id, lastUpdate, newUpdates, temporary, callback
else
PackManager.insertUpdatesIntoNewPack project_id, doc_id, newUpdates, temporary, callback
insertUpdatesIntoNewPack: (project_id, doc_id, newUpdates, temporary, callback = (error) ->) ->
first = newUpdates[0]
last = newUpdates[newUpdates.length - 1]
n = newUpdates.length
sz = BSON.calculateObjectSize(newUpdates)
newPack =
project_id: ObjectId(project_id.toString())
doc_id: ObjectId(doc_id.toString())
pack: newUpdates
n: n
sz: sz
meta:
start_ts: first.meta.start_ts
end_ts: last.meta.end_ts
v: first.v
v_end: last.v
temporary: temporary
if temporary
newPack.expiresAt = new Date(Date.now() + 7 * DAYS)
newPack.last_checked = new Date(Date.now() + 30 * DAYS) # never check temporary packs
logger.log {project_id, doc_id, newUpdates}, "inserting updates into new pack"
db.docHistory.save newPack, (err, result) ->
return callback(err) if err?
Metrics.inc("insert-pack-" + if temporary then "temporary" else "permanent")
if temporary
return callback()
else
PackManager.updateIndex project_id, doc_id, callback
appendUpdatesToExistingPack: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
first = newUpdates[0]
last = newUpdates[newUpdates.length - 1]
n = newUpdates.length
sz = BSON.calculateObjectSize(newUpdates)
query =
_id: lastUpdate._id
project_id: ObjectId(project_id.toString())
doc_id: ObjectId(doc_id.toString())
pack: {$exists: true}
update =
$push:
"pack": {$each: newUpdates}
$inc:
"n": n
"sz": sz
$set:
"meta.end_ts": last.meta.end_ts
"v_end": last.v
if lastUpdate.expiresAt and temporary
update.$set.expiresAt = new Date(Date.now() + 7 * DAYS)
logger.log {project_id, doc_id, lastUpdate, newUpdates}, "appending updates to existing pack"
Metrics.inc("append-pack-" + if temporary then "temporary" else "permanent")
db.docHistory.findAndModify {query, update, new:true, fields:{meta:1,v_end:1}}, callback
# Retrieve all changes for a document
getOpsByVersionRange: (project_id, doc_id, fromVersion, toVersion, callback = (error, updates) ->) ->
PackManager.loadPacksByVersionRange project_id, doc_id, fromVersion, toVersion, (error) ->
query = {doc_id:ObjectId(doc_id.toString())}
query.v = {$lte:toVersion} if toVersion?
query.v_end = {$gte:fromVersion} if fromVersion?
#console.log "query:", query
db.docHistory.find(query).sort {v:-1}, (err, result) ->
return callback(err) if err?
#console.log "getOpsByVersionRange:", err, result
updates = []
opInRange = (op, from, to) ->
return false if fromVersion? and op.v < fromVersion
return false if toVersion? and op.v > toVersion
return true
for docHistory in result
#console.log 'adding', docHistory.pack
for op in docHistory.pack.reverse() when opInRange(op, fromVersion, toVersion)
op.project_id = docHistory.project_id
op.doc_id = docHistory.doc_id
#console.log "added op", op.v, fromVersion, toVersion
updates.push op
callback(null, updates)
loadPacksByVersionRange: (project_id, doc_id, fromVersion, toVersion, callback) ->
PackManager.getIndex doc_id, (err, indexResult) ->
return callback(err) if err?
indexPacks = indexResult?.packs or []
packInRange = (pack, from, to) ->
return false if fromVersion? and pack.v_end < fromVersion
return false if toVersion? and pack.v > toVersion
return true
neededIds = (pack._id for pack in indexPacks when packInRange(pack, fromVersion, toVersion))
if neededIds.length
PackManager.fetchPacksIfNeeded project_id, doc_id, neededIds, callback
else
callback()
fetchPacksIfNeeded: (project_id, doc_id, pack_ids, callback) ->
db.docHistory.find {_id: {$in: (ObjectId(id) for id in pack_ids)}}, {_id:1}, (err, loadedPacks) ->
return callback(err) if err?
allPackIds = (id.toString() for id in pack_ids)
loadedPackIds = (pack._id.toString() for pack in loadedPacks)
packIdsToFetch = _.difference allPackIds, loadedPackIds
logger.log {project_id, doc_id, loadedPackIds, allPackIds, packIdsToFetch}, "analysed packs"
return callback() if packIdsToFetch.length is 0
async.eachLimit packIdsToFetch, 4, (pack_id, cb) ->
MongoAWS.unArchivePack project_id, doc_id, pack_id, cb
, (err) ->
return callback(err) if err?
logger.log {project_id, doc_id}, "done unarchiving"
callback()
# Retrieve all changes across a project
makeProjectIterator: (project_id, before, callback) ->
# get all the docHistory Entries
db.docHistory.find({project_id: ObjectId(project_id)},{pack:false}).sort {"meta.end_ts":-1}, (err, packs) ->
return callback(err) if err?
allPacks = []
seenIds = {}
for pack in packs
allPacks.push pack
seenIds[pack._id] = true
db.docHistoryIndex.find {project_id: ObjectId(project_id)}, (err, indexes) ->
return callback(err) if err?
for index in indexes
for pack in index.packs when not seenIds[pack._id]
pack.project_id = index.project_id
pack.doc_id = index._id
pack.fromIndex = true
allPacks.push pack
seenIds[pack._id] = true
callback(null, new ProjectIterator(allPacks, before, PackManager.getPackById))
getPackById: (project_id, doc_id, pack_id, callback) ->
db.docHistory.findOne {_id: pack_id}, (err, pack) ->
return callback(err) if err?
if not pack?
MongoAWS.unArchivePack project_id, doc_id, pack_id, callback
else if pack.expiresAt? and pack.temporary is false
# we only need to touch the TTL when listing the changes in the project
# because diffs on individual documents are always done after that
PackManager.increaseTTL pack, callback
# only do this for cached packs, not temporary ones to avoid older packs
# being kept longer than newer ones (which messes up the last update version)
else
callback(null, pack)
increaseTTL: (pack, callback) ->
if pack.expiresAt < new Date(Date.now() + 6 * DAYS)
# update cache expiry since we are using this pack
db.docHistory.findAndModify {
query: {_id: pack._id}
update: {$set: {expiresAt: new Date(Date.now() + 7 * DAYS)}}
}, (err) ->
return callback(err, pack)
else
callback(null, pack)
# Manage docHistoryIndex collection
getIndex: (doc_id, callback) ->
db.docHistoryIndex.findOne {_id:ObjectId(doc_id.toString())}, callback
getPackFromIndex: (doc_id, pack_id, callback) ->
db.docHistoryIndex.findOne {_id:ObjectId(doc_id.toString()), "packs._id": pack_id}, {"packs.$":1}, callback
getLastPackFromIndex: (doc_id, callback) ->
db.docHistoryIndex.findOne {_id: ObjectId(doc_id.toString())}, {packs:{$slice:-1}}, (err, indexPack) ->
return callback(err) if err?
return callback() if not indexPack?
callback(null,indexPack[0])
getIndexWithKeys: (doc_id, callback) ->
PackManager.getIndex doc_id, (err, index) ->
return callback(err) if err?
return callback() if not index?
for pack in index?.packs or []
index[pack._id] = pack
callback(null, index)
initialiseIndex: (project_id, doc_id, callback) ->
PackManager.findCompletedPacks project_id, doc_id, (err, packs) ->
#console.log 'err', err, 'packs', packs, packs?.length
return callback(err) if err?
return callback() if not packs?
PackManager.insertPacksIntoIndexWithLock project_id, doc_id, packs, callback
updateIndex: (project_id, doc_id, callback) ->
# find all packs prior to current pack
PackManager.findUnindexedPacks project_id, doc_id, (err, newPacks) ->
return callback(err) if err?
return callback() if not newPacks? or newPacks.length is 0
PackManager.insertPacksIntoIndexWithLock project_id, doc_id, newPacks, (err) ->
return callback(err) if err?
logger.log {project_id, doc_id, newPacks}, "added new packs to index"
callback()
findCompletedPacks: (project_id, doc_id, callback) ->
query = { doc_id: ObjectId(doc_id.toString()), expiresAt: {$exists:false} }
db.docHistory.find(query, {pack:false}).sort {v:1}, (err, packs) ->
return callback(err) if err?
return callback() if not packs?
return callback() if not packs?.length
last = packs.pop() # discard the last pack, if it's still in progress
packs.push(last) if last.finalised # it's finalised so we push it back to archive it
callback(null, packs)
findPacks: (project_id, doc_id, callback) ->
query = { doc_id: ObjectId(doc_id.toString()), expiresAt: {$exists:false} }
db.docHistory.find(query, {pack:false}).sort {v:1}, (err, packs) ->
return callback(err) if err?
return callback() if not packs?
return callback() if not packs?.length
callback(null, packs)
findUnindexedPacks: (project_id, doc_id, callback) ->
PackManager.getIndexWithKeys doc_id, (err, indexResult) ->
return callback(err) if err?
PackManager.findCompletedPacks project_id, doc_id, (err, historyPacks) ->
return callback(err) if err?
return callback() if not historyPacks?
# select only the new packs not already in the index
newPacks = (pack for pack in historyPacks when not indexResult?[pack._id]?)
newPacks = (_.omit(pack, 'doc_id', 'project_id', 'n', 'sz', 'last_checked', 'finalised') for pack in newPacks)
if newPacks.length
logger.log {project_id, doc_id, n: newPacks.length}, "found new packs"
callback(null, newPacks)
insertPacksIntoIndexWithLock: (project_id, doc_id, newPacks, callback) ->
LockManager.runWithLock(
keys.historyIndexLock({doc_id}),
(releaseLock) ->
PackManager._insertPacksIntoIndex project_id, doc_id, newPacks, releaseLock
callback
)
_insertPacksIntoIndex: (project_id, doc_id, newPacks, callback) ->
db.docHistoryIndex.findAndModify {
query: {_id:ObjectId(doc_id.toString())}
update:
$setOnInsert: project_id: ObjectId(project_id.toString())
$push:
packs: {$each: newPacks, $sort: {v: 1}}
upsert: true
}, callback
# Archiving packs to S3
archivePack: (project_id, doc_id, pack_id, callback) ->
clearFlagOnError = (err, cb) ->
if err? # clear the inS3 flag on error
PackManager.clearPackAsArchiveInProgress project_id, doc_id, pack_id, (err2) ->
return cb(err2) if err2?
return cb(err)
else
cb()
async.series [
(cb) ->
PackManager.checkArchiveNotInProgress project_id, doc_id, pack_id, cb
(cb) ->
PackManager.markPackAsArchiveInProgress project_id, doc_id, pack_id, cb
(cb) ->
MongoAWS.archivePack project_id, doc_id, pack_id, (err) ->
clearFlagOnError(err, cb)
(cb) ->
PackManager.checkArchivedPack project_id, doc_id, pack_id, (err) ->
clearFlagOnError(err, cb)
(cb) ->
PackManager.markPackAsArchived project_id, doc_id, pack_id, cb
(cb) ->
PackManager.setTTLOnArchivedPack project_id, doc_id, pack_id, callback
], callback
checkArchivedPack: (project_id, doc_id, pack_id, callback) ->
db.docHistory.findOne {_id: pack_id}, (err, pack) ->
return callback(err) if err?
return callback new Error("pack not found") if not pack?
MongoAWS.readArchivedPack project_id, doc_id, pack_id, (err, result) ->
delete result.last_checked
delete pack.last_checked
# need to compare ids as ObjectIds with .equals()
for key in ['_id', 'project_id', 'doc_id']
result[key] = pack[key] if result[key].equals(pack[key])
for op, i in result.pack
op._id = pack.pack[i]._id if op._id? and op._id.equals(pack.pack[i]._id)
if _.isEqual pack, result
callback()
else
logger.err {pack, result, jsondiff: JSON.stringify(pack) is JSON.stringify(result)}, "difference when comparing packs"
callback new Error("pack retrieved from s3 does not match pack in mongo")
# Extra methods to test archive/unarchive for a doc_id
pushOldPacks: (project_id, doc_id, callback) ->
PackManager.findPacks project_id, doc_id, (err, packs) ->
return callback(err) if err?
return callback() if not packs?.length
PackManager.processOldPack project_id, doc_id, packs[0]._id, callback
pullOldPacks: (project_id, doc_id, callback) ->
PackManager.loadPacksByVersionRange project_id, doc_id, null, null, callback
# Processing old packs via worker
processOldPack: (project_id, doc_id, pack_id, callback) ->
markAsChecked = (err) ->
PackManager.markPackAsChecked project_id, doc_id, pack_id, (err2) ->
return callback(err2) if err2?
callback(err)
logger.log {project_id, doc_id}, "processing old packs"
db.docHistory.findOne {_id:pack_id}, (err, pack) ->
return markAsChecked(err) if err?
return markAsChecked() if not pack?
return callback() if pack.expiresAt? # return directly
PackManager.finaliseIfNeeded project_id, doc_id, pack._id, pack, (err) ->
return markAsChecked(err) if err?
PackManager.updateIndexIfNeeded project_id, doc_id, (err) ->
return markAsChecked(err) if err?
PackManager.findUnarchivedPacks project_id, doc_id, (err, unarchivedPacks) ->
return markAsChecked(err) if err?
if not unarchivedPacks?.length
logger.log {project_id, doc_id}, "no packs need archiving"
return markAsChecked()
async.eachSeries unarchivedPacks, (pack, cb) ->
PackManager.archivePack project_id, doc_id, pack._id, cb
, (err) ->
return markAsChecked(err) if err?
logger.log {project_id, doc_id}, "done processing"
markAsChecked()
finaliseIfNeeded: (project_id, doc_id, pack_id, pack, callback) ->
sz = pack.sz / (1024 * 1024) # in fractions of a megabyte
n = pack.n / 1024 # in fraction of 1024 ops
age = (Date.now() - pack.meta.end_ts) / DAYS
if age < 30 # always keep if less than 1 month old
logger.log {project_id, doc_id, pack_id, age}, "less than 30 days old"
return callback()
# compute an archiving threshold which decreases for each month of age
archive_threshold = 30 / age
if sz > archive_threshold or n > archive_threshold or age > 90
logger.log {project_id, doc_id, pack_id, age, archive_threshold, sz, n}, "meets archive threshold"
PackManager.markPackAsFinalisedWithLock project_id, doc_id, pack_id, callback
else
logger.log {project_id, doc_id, pack_id, age, archive_threshold, sz, n}, "does not meet archive threshold"
callback()
markPackAsFinalisedWithLock: (project_id, doc_id, pack_id, callback) ->
LockManager.runWithLock(
keys.historyLock({doc_id}),
(releaseLock) ->
PackManager._markPackAsFinalised project_id, doc_id, pack_id, releaseLock
callback
)
_markPackAsFinalised: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "marking pack as finalised"
db.docHistory.findAndModify {
query: {_id: pack_id}
update: {$set: {finalised: true}}
}, callback
updateIndexIfNeeded: (project_id, doc_id, callback) ->
logger.log {project_id, doc_id}, "archiving old packs"
PackManager.getIndexWithKeys doc_id, (err, index) ->
return callback(err) if err?
if not index?
PackManager.initialiseIndex project_id, doc_id, callback
else
PackManager.updateIndex project_id, doc_id, callback
markPackAsChecked: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "marking pack as checked"
db.docHistory.findAndModify {
query: {_id: pack_id}
update: {$currentDate: {"last_checked":true}}
}, callback
findUnarchivedPacks: (project_id, doc_id, callback) ->
PackManager.getIndex doc_id, (err, indexResult) ->
return callback(err) if err?
indexPacks = indexResult?.packs or []
unArchivedPacks = (pack for pack in indexPacks when not pack.inS3?)
if unArchivedPacks.length
logger.log {project_id, doc_id, n: unArchivedPacks.length}, "find unarchived packs"
callback(null, unArchivedPacks)
# Archive locking flags
checkArchiveNotInProgress: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "checking if archive in progress"
PackManager.getPackFromIndex doc_id, pack_id, (err, result) ->
return callback(err) if err?
return callback new Error("pack not found in index") if not result?
if result.inS3
return callback new Error("pack archiving already done")
else if result.inS3?
return callback new Error("pack archiving already in progress")
else
return callback()
markPackAsArchiveInProgress: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id}, "marking pack as archive in progress status"
db.docHistoryIndex.findAndModify {
query: {_id:ObjectId(doc_id.toString()), packs: {$elemMatch: {"_id": pack_id, inS3: {$exists:false}}}}
fields: { "packs.$": 1 }
update: {$set: {"packs.$.inS3":false}}
}, (err, result) ->
return callback(err) if err?
return callback new Error("archive is already in progress") if not result?
logger.log {project_id, doc_id, pack_id}, "marked as archive in progress"
callback()
clearPackAsArchiveInProgress: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "clearing as archive in progress"
db.docHistoryIndex.findAndModify {
query: {_id:ObjectId(doc_id.toString()), "packs" : {$elemMatch: {"_id": pack_id, inS3: false}}}
fields: { "packs.$": 1 }
update: {$unset: {"packs.$.inS3":true}}
}, callback
markPackAsArchived: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "marking pack as archived"
db.docHistoryIndex.findAndModify {
query: {_id:ObjectId(doc_id.toString()), "packs" : {$elemMatch: {"_id": pack_id, inS3: false}}}
fields: { "packs.$": 1 }
update: {$set: {"packs.$.inS3":true}}
}, (err, result) ->
return callback(err) if err?
return callback new Error("archive is not marked as progress") if not result?
logger.log {project_id, doc_id, pack_id}, "marked as archived"
callback()
setTTLOnArchivedPack: (project_id, doc_id, pack_id, callback) ->
db.docHistory.findAndModify {
query: {_id: pack_id}
update: {$set: {expiresAt: new Date(Date.now() + 1*DAYS)}}
}, (err) ->
logger.log {project_id, doc_id, pack_id}, "set expiry on pack"
callback()
# _getOneDayInFutureWithRandomDelay: ->
# thirtyMins = 1000 * 60 * 30
# randomThirtyMinMax = Math.ceil(Math.random() * thirtyMins)
# return new Date(Date.now() + randomThirtyMinMax + 1*DAYS)