async = require "async" _ = require "underscore" {db, ObjectId, BSON} = require "./mongojs" logger = require "logger-sharelatex" LockManager = require "./LockManager" MongoAWS = require "./MongoAWS" ProjectIterator = require "./ProjectIterator" # Sharejs operations are stored in a 'pack' object # # e.g. a single sharejs update looks like # # { # "doc_id" : 549dae9e0a2a615c0c7f0c98, # "project_id" : 549dae9c0a2a615c0c7f0c8c, # "op" : [ {"p" : 6981, "d" : "?" } ], # "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 }, # "v" : 17082 # } # # and a pack looks like this # # { # "doc_id" : 549dae9e0a2a615c0c7f0c98, # "project_id" : 549dae9c0a2a615c0c7f0c8c, # "pack" : [ U1, U2, U3, ...., UN], # "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 }, # "v" : 17082 # "v_end" : ... # } # # where U1, U2, U3, .... are single updates stripped of their # doc_id and project_id fields (which are the same for all the # updates in the pack). # # The pack itself has v and meta fields, this makes it possible to # treat packs and single updates in a similar way. # # The v field of the pack itself is from the first entry U1, the # v_end field from UN. The meta.end_ts field of the pack itself is # from the last entry UN, the meta.start_ts field from U1. DAYS = 24 * 3600 * 1000 # one day in milliseconds module.exports = PackManager = MAX_SIZE: 1024*1024 # make these configurable parameters MAX_COUNT: 1024 insertCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> return callback() if newUpdates.length == 0 updatesToFlush = [] updatesRemaining = newUpdates.slice() n = lastUpdate?.n || 0 sz = lastUpdate?.sz || 0 while updatesRemaining.length and n < PackManager.MAX_COUNT and sz < PackManager.MAX_SIZE nextUpdate = updatesRemaining[0] nextUpdateSize = BSON.calculateObjectSize(nextUpdate) if nextUpdateSize + sz > PackManager.MAX_SIZE and n > 0 break n++ sz += nextUpdateSize updatesToFlush.push updatesRemaining.shift() PackManager.flushCompressedUpdates project_id, doc_id, lastUpdate, updatesToFlush, temporary, (error) -> return callback(error) if error? PackManager.insertCompressedUpdates project_id, doc_id, null, updatesRemaining, temporary, callback flushCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> return callback() if newUpdates.length == 0 if lastUpdate? and not (temporary and ((Date.now() - lastUpdate.meta?.start_ts) > 1 * DAYS)) PackManager.appendUpdatesToExistingPack project_id, doc_id, lastUpdate, newUpdates, temporary, callback else PackManager.insertUpdatesIntoNewPack project_id, doc_id, newUpdates, temporary, callback insertUpdatesIntoNewPack: (project_id, doc_id, newUpdates, temporary, callback = (error) ->) -> first = newUpdates[0] last = newUpdates[newUpdates.length - 1] n = newUpdates.length sz = BSON.calculateObjectSize(newUpdates) newPack = project_id: ObjectId(project_id.toString()) doc_id: ObjectId(doc_id.toString()) pack: newUpdates n: n sz: sz meta: start_ts: first.meta.start_ts end_ts: last.meta.end_ts v: first.v v_end: last.v temporary: temporary if temporary newPack.expiresAt = new Date(Date.now() + 7 * DAYS) logger.log {project_id, doc_id, newUpdates}, "inserting updates into new pack" db.docHistory.save newPack, (err, result) -> return callback(err) if err? if temporary return callback() else PackManager.updateIndex project_id, doc_id, callback appendUpdatesToExistingPack: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> first = newUpdates[0] last = newUpdates[newUpdates.length - 1] n = newUpdates.length sz = BSON.calculateObjectSize(newUpdates) query = _id: lastUpdate._id project_id: ObjectId(project_id.toString()) doc_id: ObjectId(doc_id.toString()) pack: {$exists: true} update = $push: "pack": {$each: newUpdates} $inc: "n": n "sz": sz $set: "meta.end_ts": last.meta.end_ts "v_end": last.v if lastUpdate.expiresAt and temporary update.$set.expiresAt = new Date(Date.now() + 7 * DAYS) logger.log {project_id, doc_id, lastUpdate, newUpdates}, "appending updates to existing pack" db.docHistory.findAndModify {query, update, new:true, fields:{meta:1,v_end:1}}, callback # Retrieve all changes for a document getOpsByVersionRange: (project_id, doc_id, fromVersion, toVersion, callback = (error, updates) ->) -> PackManager.loadPacksByVersionRange project_id, doc_id, fromVersion, toVersion, (error) -> query = {doc_id:ObjectId(doc_id.toString())} query.v = {$lte:toVersion} if toVersion? query.v_end = {$gte:fromVersion} if fromVersion? #console.log "query:", query db.docHistory.find(query).sort {v:-1}, (err, result) -> return callback(err) if err? #console.log "getOpsByVersionRange:", err, result updates = [] opInRange = (op, from, to) -> return false if fromVersion? and op.v < fromVersion return false if toVersion? and op.v > toVersion return true for docHistory in result #console.log 'adding', docHistory.pack for op in docHistory.pack.reverse() when opInRange(op, fromVersion, toVersion) op.project_id = docHistory.project_id op.doc_id = docHistory.doc_id #console.log "added op", op.v, fromVersion, toVersion updates.push op callback(null, updates) loadPacksByVersionRange: (project_id, doc_id, fromVersion, toVersion, callback) -> PackManager.getIndex doc_id, (err, indexResult) -> return callback(err) if err? indexPacks = indexResult?.packs or [] packInRange = (pack, from, to) -> return false if fromVersion? and pack.v_end < fromVersion return false if toVersion? and pack.v > toVersion return true neededIds = (pack._id for pack in indexPacks when packInRange(pack, fromVersion, toVersion)) PackManager.fetchPacksIfNeeded project_id, doc_id, neededIds, callback fetchPacksIfNeeded: (project_id, doc_id, pack_ids, callback) -> db.docHistory.find {_id: {$in: (ObjectId(id) for id in pack_ids)}}, {_id:1}, (err, loadedPacks) -> return callback(err) if err? allPackIds = (id.toString() for id in pack_ids) loadedPackIds = (pack._id.toString() for pack in loadedPacks) packIdsToFetch = _.difference allPackIds, loadedPackIds logger.log {loadedPackIds, allPackIds, packIdsToFetch}, "analysed packs" async.eachLimit packIdsToFetch, 4, (pack_id, cb) -> MongoAWS.unArchivePack project_id, doc_id, pack_id, cb , (err) -> return callback(err) if err? logger.log "done unarchiving" callback() # Retrieve all changes across a project makeProjectIterator: (project_id, before, callback) -> # get all the docHistory Entries db.docHistory.find({project_id: ObjectId(project_id)},{pack:false}).sort {"meta.end_ts":-1}, (err, packs) -> return callback(err) if err? allPacks = [] seenIds = {} for pack in packs allPacks.push pack seenIds[pack._id] = true db.docHistoryIndex.find {project_id: ObjectId(project_id)}, (err, indexes) -> return callback(err) if err? for index in indexes for pack in index.packs when not seenIds[pack._id] pack.project_id = index.project_id pack.doc_id = index._id pack.fromIndex = true allPacks.push pack seenIds[pack._id] = true callback(null, new ProjectIterator(allPacks, before, PackManager.getPackById)) getPackById: (project_id, doc_id, pack_id, callback) -> db.docHistory.findOne {_id: pack_id}, (err, pack) -> return callback(err) if err? if not pack? MongoAWS.unArchivePack project_id, doc_id, pack_id, callback else if pack.expiresAt? # we only need to touch the TTL on the listing of changes in the project # because diffs on individual documents are always done after that PackManager.increaseTTL pack, callback else callback(null, pack) increaseTTL: (pack, callback) -> if pack.expiresAt < new Date(Date.now() + 6 * DAYS) # update cache expiry since we are using this pack db.docHistory.findAndModify { query: {_id: pack._id} update: {$set: {expiresAt: new Date(Date.now() + 7 * DAYS)}} }, (err) -> return callback(err, pack) else callback(null, pack) # Manage docHistoryIndex collection getIndex: (doc_id, callback) -> db.docHistoryIndex.findOne {_id:ObjectId(doc_id.toString())}, callback getPackFromIndex: (doc_id, pack_id, callback) -> db.docHistoryIndex.findOne {_id:ObjectId(doc_id.toString()), "packs._id": pack_id}, {"packs.$":1}, callback getLastPackFromIndex: (doc_id, callback) -> db.docHistoryIndex.findOne {_id: ObjectId(doc_id.toString())}, {packs:{$slice:-1}}, (err, indexPack) -> return callback(err) if err? return callback() if not indexPack? callback(null,indexPack[0]) getIndexWithKeys: (doc_id, callback) -> PackManager.getIndex doc_id, (err, index) -> return callback(err) if err? return callback() if not index? for pack in index?.packs or [] index[pack._id] = pack callback(null, index) initialiseIndex: (project_id, doc_id, callback) -> PackManager.findCompletedPacks project_id, doc_id, (err, packs) -> #console.log 'err', err, 'packs', packs, packs?.length return callback(err) if err? return callback() if not packs? PackManager.insertPacksIntoIndexWithLock project_id, doc_id, packs, callback updateIndex: (project_id, doc_id, callback) -> # find all packs prior to current pack PackManager.findUnindexedPacks project_id, doc_id, (err, newPacks) -> return callback(err) if err? return callback() if not newPacks? PackManager.insertPacksIntoIndexWithLock project_id, doc_id, newPacks, (err) -> return callback(err) if err? logger.log {project_id, doc_id, newPacks}, "added new packs to index" callback() findCompletedPacks: (project_id, doc_id, callback) -> query = { doc_id: ObjectId(doc_id.toString()), expiresAt: {$exists:false} } db.docHistory.find(query, {pack:false}).sort {v:1}, (err, packs) -> return callback(err) if err? return callback() if not packs? return callback() if packs?.length <= 1 packs.pop() # discard the last pack, it's still in progress callback(null, packs) # findPacks: (project_id, doc_id, queryFilter, callback) -> # query = { doc_id: ObjectId(doc_id.toString()) } # query = _.defaults query, queryFilter if queryFilter? # db.docHistory.find(query, {pack:false}).sort {v:1}, callback findUnindexedPacks: (project_id, doc_id, callback) -> PackManager.getIndexWithKeys doc_id, (err, indexResult) -> return callback(err) if err? PackManager.findCompletedPacks project_id, doc_id, (err, historyPacks) -> return callback(err) if err? return callback() if not historyPacks? # select only the new packs not already in the index newPacks = (pack for pack in historyPacks when not indexResult?[pack._id]?) newPacks = (_.omit(pack, 'doc_id', 'project_id', 'n', 'sz') for pack in newPacks) logger.log {project_id, doc_id, n: newPacks.length}, "found new packs" callback(null, newPacks) insertPacksIntoIndexWithLock: (project_id, doc_id, newPacks, callback) -> LockManager.runWithLock( "HistoryIndexLock:#{doc_id}", (releaseLock) -> PackManager._insertPacksIntoIndex project_id, doc_id, newPacks, releaseLock callback ) _insertPacksIntoIndex: (project_id, doc_id, newPacks, callback) -> db.docHistoryIndex.findAndModify { query: {_id:ObjectId(doc_id.toString())} update: $setOnInsert: project_id: ObjectId(project_id.toString()) $push: packs: {$each: newPacks, $sort: {v: 1}} upsert: true }, callback # Archiving packs to S3 archivePack: (project_id, doc_id, pack_id, callback) -> clearFlagOnError = (err, cb) -> if err? # clear the inS3 flag on error PackManager.clearPackAsArchiveInProgress project_id, doc_id, pack_id, (err2) -> return cb(err2) if err2? return cb(err) else cb() async.series [ (cb) -> PackManager.checkArchiveNotInProgress project_id, doc_id, pack_id, cb (cb) -> PackManager.markPackAsArchiveInProgress project_id, doc_id, pack_id, cb (cb) -> MongoAWS.archivePack project_id, doc_id, pack_id, (err) -> clearFlagOnError(err, cb) (cb) -> PackManager.checkArchivedPack project_id, doc_id, pack_id, (err) -> clearFlagOnError(err, cb) (cb) -> PackManager.markPackAsArchived project_id, doc_id, pack_id, cb (cb) -> PackManager.setTTLOnArchivedPack project_id, doc_id, pack_id, callback ], callback checkArchivedPack: (project_id, doc_id, pack_id, callback) -> db.docHistory.findOne {_id: pack_id}, (err, pack) -> return callback(err) if err? return callback new Error("pack not found") if not pack? MongoAWS.readArchivedPack project_id, doc_id, pack_id, (err, result) -> delete result.last_checked delete pack.last_checked # need to compare ids as ObjectIds with .equals() for key in ['_id', 'project_id', 'doc_id'] result[key] = pack[key] if result[key].equals(pack[key]) for op, i in result.pack op._id = pack.pack[i]._id if op._id? and op._id.equals(pack.pack[i]._id) if _.isEqual pack, result callback() else logger.err {pack, result, jsondiff: JSON.stringify(pack) is JSON.stringify(result)}, "difference when comparing packs" callback new Error("pack retrieved from s3 does not match pack in mongo") # Extra methods to test archive/unarchive for a doc_id pushOldPacks: (project_id, doc_id, callback) -> PackManager.findCompletedPacks project_id, doc_id, (err, packs) -> return callback(err) if err? return callback() if not packs?.length PackManager.processOldPack project_id, doc_id, packs[0]._id, callback pullOldPacks: (project_id, doc_id, callback) -> PackManager.loadPacksByVersionRange project_id, doc_id, null, null, callback # Processing old packs via worker processOldPack: (project_id, doc_id, pack_id, callback) -> markAsChecked = (err) -> PackManager.markPackAsChecked project_id, doc_id, pack_id, (err2) -> return callback(err2) if err2? callback(err) logger.log {project_id, doc_id}, "processing old packs" db.docHistory.findOne {_id:pack_id}, (err, pack) -> return markAsChecked(err) if err? return markAsChecked() if not pack? return callback() if pack.expiresAt? # return directly PackManager.updateIndexIfNeeded project_id, doc_id, (err) -> return markAsChecked(err) if err? PackManager.findUnarchivedPacks project_id, doc_id, (err, unarchivedPacks) -> return markAsChecked(err) if err? if not unarchivedPacks?.length logger.log "no packs need archiving" return markAsChecked() async.eachSeries unarchivedPacks, (pack, cb) -> PackManager.archivePack project_id, doc_id, pack._id, cb , (err) -> return markAsChecked(err) if err? logger.log "done processing" markAsChecked() updateIndexIfNeeded: (project_id, doc_id, callback) -> logger.log {project_id, doc_id}, "archiving old packs" PackManager.getIndexWithKeys doc_id, (err, index) -> return callback(err) if err? if not index? PackManager.initialiseIndex project_id, doc_id, callback else PackManager.updateIndex project_id, doc_id, callback markPackAsChecked: (project_id, doc_id, pack_id, callback) -> logger.log {project_id, doc_id, pack_id}, "marking pack as checked" db.docHistory.findAndModify { query: {_id: pack_id} update: {$currentDate: {"last_checked":true}} }, callback findUnarchivedPacks: (project_id, doc_id, callback) -> PackManager.getIndex doc_id, (err, indexResult) -> return callback(err) if err? indexPacks = indexResult?.packs or [] unArchivedPacks = (pack for pack in indexPacks when not pack.inS3?) logger.log {project_id, doc_id, n: unArchivedPacks.length}, "find unarchived packs" callback(null, unArchivedPacks) # Archive locking flags checkArchiveNotInProgress: (project_id, doc_id, pack_id, callback) -> logger.log {project_id, doc_id, pack_id}, "checking if archive in progress" PackManager.getPackFromIndex doc_id, pack_id, (err, result) -> return callback(err) if err? return callback new Error("pack not found in index") if not result? if result.inS3 return callback new Error("pack archiving already done") else if result.inS3? return callback new Error("pack archiving already in progress") else return callback() markPackAsArchiveInProgress: (project_id, doc_id, pack_id, callback) -> logger.log {project_id, doc_id}, "marking pack as archive in progress status" db.docHistoryIndex.findAndModify { query: {_id:ObjectId(doc_id.toString()), packs: {$elemMatch: {"_id": pack_id, inS3: {$exists:false}}}} fields: { "packs.$": 1 } update: {$set: {"packs.$.inS3":false}} }, (err, result) -> return callback(err) if err? return callback new Error("archive is already in progress") if not result? logger.log {project_id, doc_id, pack_id}, "marked as archive in progress" callback() clearPackAsArchiveInProgress: (project_id, doc_id, pack_id, callback) -> logger.log {project_id, doc_id, pack_id}, "clearing as archive in progress" db.docHistoryIndex.findAndModify { query: {_id:ObjectId(doc_id.toString()), "packs" : {$elemMatch: {"_id": pack_id, inS3: false}}} fields: { "packs.$": 1 } update: {$unset: {"packs.$.inS3":true}} }, callback markPackAsArchived: (project_id, doc_id, pack_id, callback) -> logger.log {project_id, doc_id, pack_id}, "marking pack as archived" db.docHistoryIndex.findAndModify { query: {_id:ObjectId(doc_id.toString()), "packs" : {$elemMatch: {"_id": pack_id, inS3: false}}} fields: { "packs.$": 1 } update: {$set: {"packs.$.inS3":true}} }, (err, result) -> return callback(err) if err? return callback new Error("archive is not marked as progress") if not result? logger.log {project_id, doc_id, pack_id}, "marked as archived" callback() setTTLOnArchivedPack: (project_id, doc_id, pack_id, callback) -> db.docHistory.findAndModify { query: {_id: pack_id} update: {$set: {expiresAt: new Date(Date.now() + 1*DAYS)}} }, (err) -> logger.log {project_id, doc_id, pack_id}, "set expiry on pack" callback()