From ffeb1cccb6d699cd4b20c1bc32a49aec4718997c Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 21 May 2015 16:56:05 +0100 Subject: [PATCH] move pack migration code into PackManager --- .../app/coffee/PackManager.coffee | 85 ++++++++++++++ services/track-changes/pack.coffee | 109 +++--------------- 2 files changed, 101 insertions(+), 93 deletions(-) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 0b72ea158c..3ff39040c1 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -1,5 +1,7 @@ async = require "async" _ = require "underscore" +{db, ObjectId} = require "./mongojs" +BSON=db.bson.BSON module.exports = PackManager = # The following functions implement methods like a mongo find, but @@ -249,3 +251,86 @@ module.exports = PackManager = return true newResults.slice(0, limit) if limit? return newResults + + convertDocsToPacks: (docs, callback) -> + MAX_SIZE = 1024*1024 # make these configurable parameters + MAX_COUNT = 1024 + MIN_COUNT = 100 + KEEP_OPS = 100 + packs = [] + top = null + # keep the last KEEP_OPS as individual ops + docs = docs.slice(0,-KEEP_OPS) + + docs.forEach (d,i) -> + if d.pack? + # util.log "skipping existing pack of #{d.pack.length}" + top = null if top? # flush the current pack + return # and try next + sz = BSON.calculateObjectSize(d) + if top? && top.pack.length < MAX_COUNT && top.sz + sz < MAX_SIZE + top.pack = top.pack.concat {v: d.v, meta: d.meta, op: d.op, _id: d._id} + top.sz += sz + top.v_end = d.v + top.meta.end_ts = d.meta.end_ts + return + else if sz < MAX_SIZE + # create a new pack + top = _.clone(d) + top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ] + top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts } + top.sz = sz + delete top.op + delete top._id + packs.push top + else + # keep the op + # util.log "keeping large op unchanged (#{sz} bytes)" + + # only store packs with a sufficient number of ops, discard others + packs = packs.filter (packObj) -> + packObj.pack.length > MIN_COUNT + callback(null, packs) + + checkHistory: (docs, callback) -> + errors = [] + prev = null + error = (args...) -> + errors.push args + docs.forEach (d,i) -> + if d.pack? + n = d.pack.length + last = d.pack[n-1] + error('bad pack v_end', d) if d.v_end != last.v + error('bad pack start_ts', d) if d.meta.start_ts != d.pack[0].meta.start_ts + error('bad pack end_ts', d) if d.meta.end_ts != last.meta.end_ts + d.pack.forEach (p, i) -> + prev = v + v = p.v + error('bad version', v, 'in', p) if v <= prev + else + prev = v + v = d.v + error('bad version', v, 'in', d) if v <= prev + if errors.length + callback(errors) + else + callback() + + insertPack: (packObj, callback) -> + bulk = db.docHistory.initializeOrderedBulkOp() + expect_nInserted = 1 + expect_nRemoved = packObj.pack.length + bulk.insert packObj + packObj.pack.forEach (op) -> + bulk.find({_id:op._id}).removeOne() + bulk.execute (err, result) -> + if err? + callback(err, result) + else if result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved + callback(new Error( + msg: 'unexpected result' + expected: {expect_nInserted, expect_nRemoved} + ), result) + else + callback(err, result) diff --git a/services/track-changes/pack.coffee b/services/track-changes/pack.coffee index 27e9a347b8..d32b36ad9d 100644 --- a/services/track-changes/pack.coffee +++ b/services/track-changes/pack.coffee @@ -1,12 +1,11 @@ Settings = require "settings-sharelatex" fs = require("fs") -mongojs = require("mongojs") -ObjectId = mongojs.ObjectId -db = mongojs(Settings.mongo.url, ['docHistory']) +{db, ObjectId} = require "./app/coffee/mongojs" async = require("async") BSON=db.bson.BSON util = require 'util' _ = require 'underscore' +PackManager = require "./app/coffee/PackManager.coffee" lineReader = require "line-reader" cli = require "cli" @@ -15,11 +14,6 @@ options = cli.parse { 'fast': [false, 'no delays on writes'] } -MAX_SIZE = 1024*1024 -MAX_COUNT = 1024 -MIN_COUNT = 100 -KEEP_OPS = 100 - DB_WRITE_DELAY = if options.fast then 0 else 2000 DOCUMENT_PACK_DELAY = if options.fast then 0 else 1000 @@ -28,7 +22,7 @@ packDocHistory = (doc_id, callback) -> getDocHistory doc_id, (err, docs) -> return callback(err) if err? origDocs = docs.length - convertDocsToPacks docs, (err, packs) -> + PackManager.convertDocsToPacks docs, (err, packs) -> return callback(err) if err? util.log "docs #{origDocs} packs #{packs.length}" if packs.length @@ -45,103 +39,31 @@ packDocHistory = (doc_id, callback) -> # retrieve document ops/packs and check them getDocHistory = (doc_id, callback) -> - db.docHistory.find({doc_id:mongojs.ObjectId(doc_id)}).sort {v:1}, (err, docs) -> + db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) -> return callback(err) if err? # for safety, do a consistency check of the history - checkHistory doc_id, docs, (err) -> + util.log "checking history for #{doc_id}" + PackManager.checkHistory docs, (err) -> return callback(err) if err? callback err, docs -convertDocsToPacks = (docs, callback) -> - packs = [] - top = null - # keep the last KEEP_OPS as individual ops - docs = docs.slice(0,-KEEP_OPS) - - docs.forEach (d,i) -> - if d.pack? - util.log "skipping existing pack of #{d.pack.length}" - top = null if top? # flush the current pack - return # and try next - sz = BSON.calculateObjectSize(d) - if top? && top.pack.length < MAX_COUNT && top.sz + sz < MAX_SIZE - top.pack = top.pack.concat {v: d.v, meta: d.meta, op: d.op, _id: d._id} - top.sz += sz - top.v_end = d.v - top.meta.end_ts = d.meta.end_ts - return - else if sz < MAX_SIZE - # create a new pack - top = _.clone(d) - top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ] - top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts } - top.sz = sz - delete top.op - delete top._id - packs.push top - else - # keep the op - util.log "keeping large op unchanged (#{sz} bytes)" - - # only store packs with a sufficient number of ops, discard others - packs = packs.filter (packObj) -> - packObj.pack.length > MIN_COUNT - callback(null, packs) +safeInsert = (packObj, callback) -> + if shutdownRequested + return callback('shutdown') + PackManager.insertPack packObj, (err, result) -> + setTimeout () -> + callback(err,result) + , DB_WRITE_DELAY savePacks = (packs, callback) -> - async.eachSeries packs, insertPack, (err, result) -> + async.eachSeries packs, safeInsert, (err, result) -> if err? - console.log err + util.log "error writing packs" callback err, result else util.log "done writing packs" callback() -insertPack = (packObj, callback) -> - if shutdownRequested - return callback('shutdown') - bulk = db.docHistory.initializeOrderedBulkOp(); - expect_nInserted = 1 - expect_nRemoved = packObj.pack.length - util.log "insert #{expect_nInserted} pack, remove #{expect_nRemoved} ops" - bulk.insert packObj - packObj.pack.forEach (op) -> - bulk.find({_id:op._id}).removeOne() - bulk.execute (err, result) -> - if err? or result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved - console.log err, result - console.log 'nInserted', result.nInserted, 'nRemoved', result.nRemoved - setTimeout () -> - callback(err, result) - , DB_WRITE_DELAY - -checkHistory = (doc_id, docs, callback) -> - util.log "checking history for #{doc_id}" - errors = 0 - prev = null - error = (args...) -> - errors++ - console.log.apply(null, args) - docs.forEach (d,i) -> - if d.pack? - n = d.pack.length - last = d.pack[n-1] - error('bad pack v_end', d) if d.v_end != last.v - error('bad pack start_ts', d) if d.meta.start_ts != d.pack[0].meta.start_ts - error('bad pack end_ts', d) if d.meta.end_ts != last.meta.end_ts - d.pack.forEach (p, i) -> - prev = v - v = p.v - error('bad version', v, 'in', p) if v <= prev - else - prev = v - v = d.v - error('bad version', v, 'in', d) if v <= prev - if errors - callback({errcount: errors}) - else - callback() - readFile = (file, callback) -> ids = [] lineReader.eachLine file, (line) -> @@ -168,6 +90,7 @@ readFile todoFile, (err, todo) -> async.eachSeries pending, (doc_id, callback) -> packDocHistory doc_id, (err, result) -> if err? + console.log "ERROR:", err, result return callback(err) else if not options['dry-run'] fs.appendFileSync doneFile, doc_id + '\n'