From ffeb1cccb6d699cd4b20c1bc32a49aec4718997c Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 21 May 2015 16:56:05 +0100 Subject: [PATCH 1/5] move pack migration code into PackManager --- .../app/coffee/PackManager.coffee | 85 ++++++++++++++ services/track-changes/pack.coffee | 109 +++--------------- 2 files changed, 101 insertions(+), 93 deletions(-) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 0b72ea158c..3ff39040c1 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -1,5 +1,7 @@ async = require "async" _ = require "underscore" +{db, ObjectId} = require "./mongojs" +BSON=db.bson.BSON module.exports = PackManager = # The following functions implement methods like a mongo find, but @@ -249,3 +251,86 @@ module.exports = PackManager = return true newResults.slice(0, limit) if limit? return newResults + + convertDocsToPacks: (docs, callback) -> + MAX_SIZE = 1024*1024 # make these configurable parameters + MAX_COUNT = 1024 + MIN_COUNT = 100 + KEEP_OPS = 100 + packs = [] + top = null + # keep the last KEEP_OPS as individual ops + docs = docs.slice(0,-KEEP_OPS) + + docs.forEach (d,i) -> + if d.pack? + # util.log "skipping existing pack of #{d.pack.length}" + top = null if top? # flush the current pack + return # and try next + sz = BSON.calculateObjectSize(d) + if top? && top.pack.length < MAX_COUNT && top.sz + sz < MAX_SIZE + top.pack = top.pack.concat {v: d.v, meta: d.meta, op: d.op, _id: d._id} + top.sz += sz + top.v_end = d.v + top.meta.end_ts = d.meta.end_ts + return + else if sz < MAX_SIZE + # create a new pack + top = _.clone(d) + top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ] + top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts } + top.sz = sz + delete top.op + delete top._id + packs.push top + else + # keep the op + # util.log "keeping large op unchanged (#{sz} bytes)" + + # only store packs with a sufficient number of ops, discard others + packs = packs.filter (packObj) -> + packObj.pack.length > MIN_COUNT + callback(null, packs) + + checkHistory: (docs, callback) -> + errors = [] + prev = null + error = (args...) -> + errors.push args + docs.forEach (d,i) -> + if d.pack? + n = d.pack.length + last = d.pack[n-1] + error('bad pack v_end', d) if d.v_end != last.v + error('bad pack start_ts', d) if d.meta.start_ts != d.pack[0].meta.start_ts + error('bad pack end_ts', d) if d.meta.end_ts != last.meta.end_ts + d.pack.forEach (p, i) -> + prev = v + v = p.v + error('bad version', v, 'in', p) if v <= prev + else + prev = v + v = d.v + error('bad version', v, 'in', d) if v <= prev + if errors.length + callback(errors) + else + callback() + + insertPack: (packObj, callback) -> + bulk = db.docHistory.initializeOrderedBulkOp() + expect_nInserted = 1 + expect_nRemoved = packObj.pack.length + bulk.insert packObj + packObj.pack.forEach (op) -> + bulk.find({_id:op._id}).removeOne() + bulk.execute (err, result) -> + if err? + callback(err, result) + else if result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved + callback(new Error( + msg: 'unexpected result' + expected: {expect_nInserted, expect_nRemoved} + ), result) + else + callback(err, result) diff --git a/services/track-changes/pack.coffee b/services/track-changes/pack.coffee index 27e9a347b8..d32b36ad9d 100644 --- a/services/track-changes/pack.coffee +++ b/services/track-changes/pack.coffee @@ -1,12 +1,11 @@ Settings = require "settings-sharelatex" fs = require("fs") -mongojs = require("mongojs") -ObjectId = mongojs.ObjectId -db = mongojs(Settings.mongo.url, ['docHistory']) +{db, ObjectId} = require "./app/coffee/mongojs" async = require("async") BSON=db.bson.BSON util = require 'util' _ = require 'underscore' +PackManager = require "./app/coffee/PackManager.coffee" lineReader = require "line-reader" cli = require "cli" @@ -15,11 +14,6 @@ options = cli.parse { 'fast': [false, 'no delays on writes'] } -MAX_SIZE = 1024*1024 -MAX_COUNT = 1024 -MIN_COUNT = 100 -KEEP_OPS = 100 - DB_WRITE_DELAY = if options.fast then 0 else 2000 DOCUMENT_PACK_DELAY = if options.fast then 0 else 1000 @@ -28,7 +22,7 @@ packDocHistory = (doc_id, callback) -> getDocHistory doc_id, (err, docs) -> return callback(err) if err? origDocs = docs.length - convertDocsToPacks docs, (err, packs) -> + PackManager.convertDocsToPacks docs, (err, packs) -> return callback(err) if err? util.log "docs #{origDocs} packs #{packs.length}" if packs.length @@ -45,103 +39,31 @@ packDocHistory = (doc_id, callback) -> # retrieve document ops/packs and check them getDocHistory = (doc_id, callback) -> - db.docHistory.find({doc_id:mongojs.ObjectId(doc_id)}).sort {v:1}, (err, docs) -> + db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) -> return callback(err) if err? # for safety, do a consistency check of the history - checkHistory doc_id, docs, (err) -> + util.log "checking history for #{doc_id}" + PackManager.checkHistory docs, (err) -> return callback(err) if err? callback err, docs -convertDocsToPacks = (docs, callback) -> - packs = [] - top = null - # keep the last KEEP_OPS as individual ops - docs = docs.slice(0,-KEEP_OPS) - - docs.forEach (d,i) -> - if d.pack? - util.log "skipping existing pack of #{d.pack.length}" - top = null if top? # flush the current pack - return # and try next - sz = BSON.calculateObjectSize(d) - if top? && top.pack.length < MAX_COUNT && top.sz + sz < MAX_SIZE - top.pack = top.pack.concat {v: d.v, meta: d.meta, op: d.op, _id: d._id} - top.sz += sz - top.v_end = d.v - top.meta.end_ts = d.meta.end_ts - return - else if sz < MAX_SIZE - # create a new pack - top = _.clone(d) - top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ] - top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts } - top.sz = sz - delete top.op - delete top._id - packs.push top - else - # keep the op - util.log "keeping large op unchanged (#{sz} bytes)" - - # only store packs with a sufficient number of ops, discard others - packs = packs.filter (packObj) -> - packObj.pack.length > MIN_COUNT - callback(null, packs) +safeInsert = (packObj, callback) -> + if shutdownRequested + return callback('shutdown') + PackManager.insertPack packObj, (err, result) -> + setTimeout () -> + callback(err,result) + , DB_WRITE_DELAY savePacks = (packs, callback) -> - async.eachSeries packs, insertPack, (err, result) -> + async.eachSeries packs, safeInsert, (err, result) -> if err? - console.log err + util.log "error writing packs" callback err, result else util.log "done writing packs" callback() -insertPack = (packObj, callback) -> - if shutdownRequested - return callback('shutdown') - bulk = db.docHistory.initializeOrderedBulkOp(); - expect_nInserted = 1 - expect_nRemoved = packObj.pack.length - util.log "insert #{expect_nInserted} pack, remove #{expect_nRemoved} ops" - bulk.insert packObj - packObj.pack.forEach (op) -> - bulk.find({_id:op._id}).removeOne() - bulk.execute (err, result) -> - if err? or result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved - console.log err, result - console.log 'nInserted', result.nInserted, 'nRemoved', result.nRemoved - setTimeout () -> - callback(err, result) - , DB_WRITE_DELAY - -checkHistory = (doc_id, docs, callback) -> - util.log "checking history for #{doc_id}" - errors = 0 - prev = null - error = (args...) -> - errors++ - console.log.apply(null, args) - docs.forEach (d,i) -> - if d.pack? - n = d.pack.length - last = d.pack[n-1] - error('bad pack v_end', d) if d.v_end != last.v - error('bad pack start_ts', d) if d.meta.start_ts != d.pack[0].meta.start_ts - error('bad pack end_ts', d) if d.meta.end_ts != last.meta.end_ts - d.pack.forEach (p, i) -> - prev = v - v = p.v - error('bad version', v, 'in', p) if v <= prev - else - prev = v - v = d.v - error('bad version', v, 'in', d) if v <= prev - if errors - callback({errcount: errors}) - else - callback() - readFile = (file, callback) -> ids = [] lineReader.eachLine file, (line) -> @@ -168,6 +90,7 @@ readFile todoFile, (err, todo) -> async.eachSeries pending, (doc_id, callback) -> packDocHistory doc_id, (err, result) -> if err? + console.log "ERROR:", err, result return callback(err) else if not options['dry-run'] fs.appendFileSync doneFile, doc_id + '\n' From adc2866a7d6255d6332a5b0b3488bbb841654478 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 22 May 2015 11:15:47 +0100 Subject: [PATCH 2/5] add check to exclude temporary ops from packs --- services/track-changes/app/coffee/PackManager.coffee | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 3ff39040c1..51ccecbc8e 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -263,10 +263,14 @@ module.exports = PackManager = docs = docs.slice(0,-KEEP_OPS) docs.forEach (d,i) -> + # skip existing packs if d.pack? - # util.log "skipping existing pack of #{d.pack.length}" - top = null if top? # flush the current pack - return # and try next + top = null + return + # skip temporary ops (we could pack these into temporary packs in future) + if d.expiresAt? + top = null + return sz = BSON.calculateObjectSize(d) if top? && top.pack.length < MAX_COUNT && top.sz + sz < MAX_SIZE top.pack = top.pack.concat {v: d.v, meta: d.meta, op: d.op, _id: d._id} @@ -308,6 +312,7 @@ module.exports = PackManager = prev = v v = p.v error('bad version', v, 'in', p) if v <= prev + #error('expired op', p, 'in pack') if p.expiresAt? else prev = v v = d.v From 78f0bdbae3933b50174acaf9ac45328d155c894f Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 22 May 2015 14:12:29 +0100 Subject: [PATCH 3/5] fix name of temporary parameter to match other methods --- services/track-changes/app/coffee/MongoManager.coffee | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/track-changes/app/coffee/MongoManager.coffee b/services/track-changes/app/coffee/MongoManager.coffee index b1ea5cb180..1b99c53ab8 100644 --- a/services/track-changes/app/coffee/MongoManager.coffee +++ b/services/track-changes/app/coffee/MongoManager.coffee @@ -29,11 +29,11 @@ module.exports = MongoManager = else callback null, null - insertCompressedUpdates: (project_id, doc_id, updates, permanent, callback = (error) ->) -> + insertCompressedUpdates: (project_id, doc_id, updates, temporary, callback = (error) ->) -> jobs = [] for update in updates do (update) -> - jobs.push (callback) -> MongoManager.insertCompressedUpdate project_id, doc_id, update, permanent, callback + jobs.push (callback) -> MongoManager.insertCompressedUpdate project_id, doc_id, update, temporary, callback async.series jobs, callback insertCompressedUpdate: (project_id, doc_id, update, temporary, callback = (error) ->) -> From 5c4afd5303df3e8b63e37e1a5119413097b16a4e Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 22 May 2015 14:13:34 +0100 Subject: [PATCH 4/5] add docHistoryStats collection to keep track of updates to docs --- .../track-changes/app/coffee/MongoManager.coffee | 12 +++++++++++- services/track-changes/app/coffee/mongojs.coffee | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/services/track-changes/app/coffee/MongoManager.coffee b/services/track-changes/app/coffee/MongoManager.coffee index 1b99c53ab8..1beb2b8983 100644 --- a/services/track-changes/app/coffee/MongoManager.coffee +++ b/services/track-changes/app/coffee/MongoManager.coffee @@ -34,7 +34,14 @@ module.exports = MongoManager = for update in updates do (update) -> jobs.push (callback) -> MongoManager.insertCompressedUpdate project_id, doc_id, update, temporary, callback - async.series jobs, callback + async.series jobs, (err, results) -> + if not temporary + # keep track of updates to be packed + db.docHistoryStats.update {doc_id:ObjectId(doc_id)}, {$inc:{updates:updates.length}}, {upsert:true}, () -> + callback(err,results) + else + callback(err,results) + insertCompressedUpdate: (project_id, doc_id, update, temporary, callback = (error) ->) -> update = { @@ -113,3 +120,6 @@ module.exports = MongoManager = db.projectHistoryMetaData.ensureIndex { project_id: 1 }, { background: true } # TTL index for auto deleting week old temporary ops db.docHistory.ensureIndex { expiresAt: 1 }, { expireAfterSeconds: 0, background: true } + # For finding documents which need packing + db.docHistoryStats.ensureIndex { doc_id: 1 }, { background: true } + db.docHistoryStats.ensureIndex { updates: -1, doc_id: 1 }, { background: true } diff --git a/services/track-changes/app/coffee/mongojs.coffee b/services/track-changes/app/coffee/mongojs.coffee index 8c2971945c..32efbc9a1d 100644 --- a/services/track-changes/app/coffee/mongojs.coffee +++ b/services/track-changes/app/coffee/mongojs.coffee @@ -1,6 +1,6 @@ Settings = require "settings-sharelatex" mongojs = require "mongojs" -db = mongojs.connect(Settings.mongo.url, ["docHistory", "projectHistoryMetaData"]) +db = mongojs.connect(Settings.mongo.url, ["docHistory", "projectHistoryMetaData", "docHistoryStats"]) module.exports = db: db ObjectId: mongojs.ObjectId From 1811ac2145f5ca611b9c08b407da8752ea2a54de Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 22 May 2015 15:27:15 +0100 Subject: [PATCH 5/5] added support for cleaning old expired ops in packs --- .../app/coffee/PackManager.coffee | 31 +++++++++++++++++++ services/track-changes/pack.coffee | 5 ++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 51ccecbc8e..c41a1e6bd8 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -339,3 +339,34 @@ module.exports = PackManager = ), result) else callback(err, result) + + deleteExpiredPackOps: (docs, callback) -> + now = Date.now() + toRemove = [] + toUpdate = [] + docs.forEach (d,i) -> + if d.pack? + newPack = d.pack.filter (op) -> + if op.expiresAt? then op.expiresAt > now else true + if newPack.length == 0 + toRemove.push d + else if newPack.length < d.pack.length + # adjust the pack properties + d.pack = newPack + first = d.pack[0] + last = d.pack[d.pack.length - 1] + d.v_end = last.v + d.meta.start_ts = first.meta.start_ts + d.meta.end_ts = last.meta.end_ts + toUpdate.push d + if toRemove.length or toUpdate.length + bulk = db.docHistory.initializeOrderedBulkOp() + toRemove.forEach (pack) -> + console.log "would remove", pack + #bulk.find({_id:pack._id}).removeOne() + toUpdate.forEach (pack) -> + console.log "would update", pack + #bulk.find({_id:pack._id}).updateOne(pack); + bulk.execute callback + else + callback() diff --git a/services/track-changes/pack.coffee b/services/track-changes/pack.coffee index d32b36ad9d..a9fb76f23a 100644 --- a/services/track-changes/pack.coffee +++ b/services/track-changes/pack.coffee @@ -45,7 +45,10 @@ getDocHistory = (doc_id, callback) -> util.log "checking history for #{doc_id}" PackManager.checkHistory docs, (err) -> return callback(err) if err? - callback err, docs + callback(err, docs) + #PackManager.deleteExpiredPackOps docs, (err) -> + # return callback(err) if err? + # callback err, docs safeInsert = (packObj, callback) -> if shutdownRequested