added checkpointing and logging to pack script

This commit is contained in:
Brian Gough 2015-02-23 12:13:38 +00:00
parent 4c0eea9916
commit b19f3835d5

View file

@ -1,19 +1,27 @@
mongojs = require "mongojs" Settings = require "settings-sharelatex"
async = require "async" fs = require("fs")
db = mongojs.connect("localhost/sharelatex", ["docHistory"]) mongojs = require("mongojs")
ObjectId = mongojs.ObjectId
db = mongojs(Settings.mongo.url, ['docHistory'])
async = require("async")
BSON=db.bson.BSON BSON=db.bson.BSON
util = require 'util' util = require 'util'
_ = require 'underscore' _ = require 'underscore'
lineReader = require "line-reader"
MAX_SIZE = 1024*1024 MAX_SIZE = 1024*1024
MAX_COUNT = 1024 MAX_COUNT = 1024
MIN_COUNT = 100 MIN_COUNT = 100
KEEP_OPS = 100 KEEP_OPS = 100
insertPack = (packObj, callback) -> insertPack = (packObj, callback) ->
if shutdownRequested
return callback('shutdown')
bulk = db.docHistory.initializeOrderedBulkOp(); bulk = db.docHistory.initializeOrderedBulkOp();
expect_nInserted = 1 expect_nInserted = 1
expect_nRemoved = packObj.pack.length expect_nRemoved = packObj.pack.length
console.log 'inserting', expect_nInserted, 'pack, will remove', expect_nRemoved, 'ops' util.log "insert #{expect_nInserted} pack, remove #{expect_nRemoved} ops"
bulk.insert packObj bulk.insert packObj
packObj.pack.forEach (op) -> packObj.pack.forEach (op) ->
bulk.find({_id:op._id}).removeOne() bulk.find({_id:op._id}).removeOne()
@ -24,7 +32,7 @@ insertPack = (packObj, callback) ->
callback(err, result) callback(err, result)
packDocHistory = (doc_id, callback) -> packDocHistory = (doc_id, callback) ->
console.log 'packing doc_id', doc_id util.log "starting pack operation for #{doc_id}"
db.docHistory.find({doc_id:mongojs.ObjectId(doc_id),pack:{$exists:false}}).sort {v:1}, (err, docs) -> db.docHistory.find({doc_id:mongojs.ObjectId(doc_id),pack:{$exists:false}}).sort {v:1}, (err, docs) ->
packs = [] packs = []
top = null top = null
@ -50,21 +58,56 @@ packDocHistory = (doc_id, callback) ->
packs.push top packs.push top
else else
# keep the op # keep the op
console.log 'keeping large op unchanged' util.log 'keeping large op unchanged (#{sz} bytes)'
# only store packs with a sufficient number of ops, discard others # only store packs with a sufficient number of ops, discard others
packs = packs.filter (packObj) -> packs = packs.filter (packObj) ->
packObj.pack.length > MIN_COUNT packObj.pack.length > MIN_COUNT
console.log 'docs', origDocs, 'packs', packs.length util.log "docs #{origDocs} packs #{packs.length}"
if packs.length
async.each packs, insertPack, (err, result) -> async.each packs, insertPack, (err, result) ->
if err? if err?
console.log 'err', err console.log doc_id, err
console.log 'done writing packs' else
util.log "done writing packs"
callback err, result callback err, result
else
util.log "no packs to write"
callback null, null
async.each process.argv.slice(2), (doc_id, callback) -> readFile = (file, callback) ->
packDocHistory(doc_id, callback) ids = []
lineReader.eachLine file, (line) ->
result = line.match(/[0-9a-f]{24}/)
if result?
ids.push result[0]
.then () ->
callback(null, ids)
todoFile = process.argv[2]
doneFile = process.argv[3]
fs.appendFileSync doneFile, '# starting pack run at ' + new Date() + '\n'
shutdownRequested = false
process.on 'SIGINT', () ->
util.log "Gracefully shutting down from SIGINT"
shutdownRequested = true
readFile todoFile, (err, todo) ->
readFile doneFile, (err, done) ->
pending = _.difference todo, done
async.eachSeries pending, (doc_id, callback) ->
packDocHistory doc_id, (err, result) ->
if err?
return callback(err)
else
fs.appendFileSync doneFile, doc_id + '\n'
if shutdownRequested
return callback('shutdown')
callback(err, result)
, (err, results) -> , (err, results) ->
console.log 'closing db' if err?
console.log 'error:', err
util.log 'closing db'
db.close() db.close()