archive head packs after sufficient time

This commit is contained in:
Brian Gough 2016-04-06 13:30:09 +01:00
parent 7a0c2900ab
commit 6ab75795a2
2 changed files with 50 additions and 15 deletions

View file

@ -23,11 +23,13 @@ module.exports = MongoManager =
MongoManager.getLastCompressedUpdate doc_id, (error, update) ->
return callback(error) if error?
if update?
if update.broken
# the update is marked as broken so we will force a new op
if update.broken # marked as broken so we will force a new op
return callback null, null
else if update.pack?
return callback null, update, update.pack[0]?.v
if update.finalised # no more ops can be appended
return callback null, null, update.pack[0]?.v
else
return callback null, update, update.pack[0]?.v
else
return callback null, update, update.v
else

View file

@ -272,8 +272,9 @@ module.exports = PackManager =
db.docHistory.find(query, {pack:false}).sort {v:1}, (err, packs) ->
return callback(err) if err?
return callback() if not packs?
return callback() if packs?.length <= 1
packs.pop() # discard the last pack, it's still in progress
return callback() if not packs?.length
last = packs.pop() # discard the last pack, if it's still in progress
packs.push(last) if last.finalised # it's finalised so we push it back to archive it
callback(null, packs)
# findPacks: (project_id, doc_id, queryFilter, callback) ->
@ -380,19 +381,51 @@ module.exports = PackManager =
return markAsChecked(err) if err?
return markAsChecked() if not pack?
return callback() if pack.expiresAt? # return directly
PackManager.updateIndexIfNeeded project_id, doc_id, (err) ->
PackManager.finaliseIfNeeded project_id, doc_id, pack._id, pack, (err) ->
return markAsChecked(err) if err?
PackManager.findUnarchivedPacks project_id, doc_id, (err, unarchivedPacks) ->
PackManager.updateIndexIfNeeded project_id, doc_id, (err) ->
return markAsChecked(err) if err?
if not unarchivedPacks?.length
logger.log "no packs need archiving"
return markAsChecked()
async.eachSeries unarchivedPacks, (pack, cb) ->
PackManager.archivePack project_id, doc_id, pack._id, cb
, (err) ->
PackManager.findUnarchivedPacks project_id, doc_id, (err, unarchivedPacks) ->
return markAsChecked(err) if err?
logger.log "done processing"
markAsChecked()
if not unarchivedPacks?.length
logger.log "no packs need archiving"
return markAsChecked()
async.eachSeries unarchivedPacks, (pack, cb) ->
PackManager.archivePack project_id, doc_id, pack._id, cb
, (err) ->
return markAsChecked(err) if err?
logger.log "done processing"
markAsChecked()
finaliseIfNeeded: (project_id, doc_id, pack_id, pack, callback) ->
logger.log {project_id, doc_id}, "archiving old packs"
sz = pack.sz / (1024 * 1024) # in fractions of a megabyte
n = pack.n / 1024 # in fraction of 1024 ops
age = (Date.now() - pack.meta.end_ts) / DAYS
if age < 30 # always keep if less than 1 month old
return callback()
# compute an archiving threshold which decreases for each month of age
archive_threshold = 30 / age
if sz > archive_threshold or n > archive_threshold or age > 90
PackManager.markPackAsFinalisedWithLock project_id, doc_id, pack_id, callback
else
callback()
markPackAsFinalisedWithLock: (project_id, doc_id, pack_id, callback) ->
LockManager.runWithLock(
"HistoryLock:#{doc_id}",
(releaseLock) ->
PackManager._markPackAsFinalised project_id, doc_id, pack_id, releaseLock
callback
)
_markPackAsFinalised: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "marking pack as finalised"
db.docHistory.findAndModify {
query: {_id: pack_id}
update: {$set: {finalised: true}}
}, callback
updateIndexIfNeeded: (project_id, doc_id, callback) ->
logger.log {project_id, doc_id}, "archiving old packs"