diff --git a/services/track-changes/app/coffee/MongoManager.coffee b/services/track-changes/app/coffee/MongoManager.coffee index e906257add..383facbf2e 100644 --- a/services/track-changes/app/coffee/MongoManager.coffee +++ b/services/track-changes/app/coffee/MongoManager.coffee @@ -23,11 +23,13 @@ module.exports = MongoManager = MongoManager.getLastCompressedUpdate doc_id, (error, update) -> return callback(error) if error? if update? - if update.broken - # the update is marked as broken so we will force a new op + if update.broken # marked as broken so we will force a new op return callback null, null else if update.pack? - return callback null, update, update.pack[0]?.v + if update.finalised # no more ops can be appended + return callback null, null, update.pack[0]?.v + else + return callback null, update, update.pack[0]?.v else return callback null, update, update.v else diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 9d17d8bd17..27a825ff05 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -272,8 +272,9 @@ module.exports = PackManager = db.docHistory.find(query, {pack:false}).sort {v:1}, (err, packs) -> return callback(err) if err? return callback() if not packs? - return callback() if packs?.length <= 1 - packs.pop() # discard the last pack, it's still in progress + return callback() if not packs?.length + last = packs.pop() # discard the last pack, if it's still in progress + packs.push(last) if last.finalised # it's finalised so we push it back to archive it callback(null, packs) # findPacks: (project_id, doc_id, queryFilter, callback) -> @@ -380,19 +381,51 @@ module.exports = PackManager = return markAsChecked(err) if err? return markAsChecked() if not pack? return callback() if pack.expiresAt? # return directly - PackManager.updateIndexIfNeeded project_id, doc_id, (err) -> + PackManager.finaliseIfNeeded project_id, doc_id, pack._id, pack, (err) -> return markAsChecked(err) if err? - PackManager.findUnarchivedPacks project_id, doc_id, (err, unarchivedPacks) -> + PackManager.updateIndexIfNeeded project_id, doc_id, (err) -> return markAsChecked(err) if err? - if not unarchivedPacks?.length - logger.log "no packs need archiving" - return markAsChecked() - async.eachSeries unarchivedPacks, (pack, cb) -> - PackManager.archivePack project_id, doc_id, pack._id, cb - , (err) -> + PackManager.findUnarchivedPacks project_id, doc_id, (err, unarchivedPacks) -> return markAsChecked(err) if err? - logger.log "done processing" - markAsChecked() + if not unarchivedPacks?.length + logger.log "no packs need archiving" + return markAsChecked() + async.eachSeries unarchivedPacks, (pack, cb) -> + PackManager.archivePack project_id, doc_id, pack._id, cb + , (err) -> + return markAsChecked(err) if err? + logger.log "done processing" + markAsChecked() + + finaliseIfNeeded: (project_id, doc_id, pack_id, pack, callback) -> + logger.log {project_id, doc_id}, "archiving old packs" + sz = pack.sz / (1024 * 1024) # in fractions of a megabyte + n = pack.n / 1024 # in fraction of 1024 ops + age = (Date.now() - pack.meta.end_ts) / DAYS + if age < 30 # always keep if less than 1 month old + return callback() + # compute an archiving threshold which decreases for each month of age + archive_threshold = 30 / age + if sz > archive_threshold or n > archive_threshold or age > 90 + PackManager.markPackAsFinalisedWithLock project_id, doc_id, pack_id, callback + else + callback() + + markPackAsFinalisedWithLock: (project_id, doc_id, pack_id, callback) -> + LockManager.runWithLock( + "HistoryLock:#{doc_id}", + (releaseLock) -> + PackManager._markPackAsFinalised project_id, doc_id, pack_id, releaseLock + callback + ) + + _markPackAsFinalised: (project_id, doc_id, pack_id, callback) -> + logger.log {project_id, doc_id, pack_id}, "marking pack as finalised" + db.docHistory.findAndModify { + query: {_id: pack_id} + update: {$set: {finalised: true}} + }, callback + updateIndexIfNeeded: (project_id, doc_id, callback) -> logger.log {project_id, doc_id}, "archiving old packs"