From 18b34d1bcbe11c654fe419b0fe138c5952d2dc59 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 13 Feb 2015 17:01:37 +0000 Subject: [PATCH] clean up packing script --- services/track-changes/pack.coffee | 78 +++++++++++++++--------------- 1 file changed, 38 insertions(+), 40 deletions(-) diff --git a/services/track-changes/pack.coffee b/services/track-changes/pack.coffee index 4bc10c829d..aa0c079261 100644 --- a/services/track-changes/pack.coffee +++ b/services/track-changes/pack.coffee @@ -6,15 +6,31 @@ util = require 'util' _ = require 'underscore' MAX_SIZE = 1024*1024 MAX_COUNT = 1024 +MIN_COUNT = 100 +KEEP_OPS = 100 -packOps = (doc_id, callback) -> - console.log 'packing', doc_id +insertPack = (packObj, callback) -> + bulk = db.docHistory.initializeOrderedBulkOp(); + expect_nInserted = 1 + expect_nRemoved = packObj.pack.length + console.log 'inserting', expect_nInserted, 'pack, will remove', expect_nRemoved, 'ops' + bulk.insert packObj + packObj.pack.forEach (op) -> + bulk.find({_id:op._id}).removeOne() + bulk.execute (err, result) -> + if err? or result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved + console.log err, result + console.log 'nInserted', result.nInserted, 'nRemoved', result.nRemoved + callback(err, result) + +packDocHistory = (doc_id, callback) -> + console.log 'packing doc_id', doc_id db.docHistory.find({doc_id:mongojs.ObjectId(doc_id),pack:{$exists:false}}).sort {v:1}, (err, docs) -> packs = [] top = null - if docs.length < 100 - console.log 'only', docs.length, 'ops, skipping' - return + origDocs = docs.length + # keep the last KEEP_OPS as individual ops + docs = docs.slice(0,-KEEP_OPS) docs.forEach (d,i) -> sz = BSON.calculateObjectSize(d) if top? && top.pack.length < MAX_COUNT && top.sz + sz < MAX_SIZE @@ -23,7 +39,7 @@ packOps = (doc_id, callback) -> top.v_end = d.v top.meta.end_ts = d.meta.end_ts return - else + else if sz < MAX_SIZE # create a new pack top = _.clone(d) top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ] @@ -32,41 +48,23 @@ packOps = (doc_id, callback) -> delete top.op delete top._id packs.push top - # never store the last pack, keep some unpacked ops - packs.pop() - # - if packs.length > docs.length - console.log 'not enough compression', packs.length, 'vs', docs.length, 'skipping' - return + else + # keep the op + console.log 'keeping large op unchanged' - console.log 'docs', docs.length, 'packs', packs.length - tasks = [] - for pack, i in packs - console.log 'storing pack', i - do (pack) -> - task = (cb) -> - bulk = db.docHistory.initializeOrderedBulkOp(); - console.log 'insert', pack - bulk.insert pack - pack.pack.forEach (op) -> - console.log 'will remove', op._id - bulk.find({_id:op._id}).removeOne() - bulk.execute (err, result) -> - console.log 'ok?', result.ok, 'nInserted', result.nInserted, 'nRemoved', result.nRemoved - cb(err, result) - tasks.push task + # only store packs with a sufficient number of ops, discard others + packs = packs.filter (packObj) -> + packObj.pack.length > MIN_COUNT - async.series tasks, (err, result) -> - console.log 'writing packs', err, result.length + console.log 'docs', origDocs, 'packs', packs.length + async.each packs, insertPack, (err, result) -> + if err? + console.log 'err', err + console.log 'done writing packs' callback err, result -packOpsTask = (doc_id) -> - return (callback) -> - packOps doc_id, callback - -tasks = (packOpsTask id for id in process.argv.slice(2)) - -async.series tasks, (err, results) -> - console.log 'closing db' - #console.log util.inspect(results,{depth:null}) - db.close() +async.each process.argv.slice(2), (doc_id, callback) -> + packDocHistory(doc_id, callback) + , (err, results) -> + console.log 'closing db' + db.close()