diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 43de0dfc87..3311dbbb5a 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -2,6 +2,8 @@ async = require "async" _ = require "underscore" {db, ObjectId} = require "./mongojs" BSON=db.bson.BSON +logger = require "logger-sharelatex" +LockManager = require "./LockManager" module.exports = PackManager = # The following functions implement methods like a mongo find, but @@ -325,26 +327,101 @@ module.exports = PackManager = insertPack: (packObj, callback) -> bulk = db.docHistory.initializeOrderedBulkOp() + doc_id = packObj.doc_id expect_nInserted = 1 expect_nRemoved = packObj.pack.length + logger.log {doc_id: doc_id}, "adding pack, removing #{expect_nRemoved} ops" bulk.insert packObj packObj.pack.forEach (op) -> bulk.find({_id:op._id}).removeOne() bulk.execute (err, result) -> if err? + logger.error {doc_id: doc_id}, "error adding pack" callback(err, result) else if result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved + logger.error {doc_id: doc_id, result}, "unexpected result adding pack" callback(new Error( msg: 'unexpected result' expected: {expect_nInserted, expect_nRemoved} ), result) else - db.docHistoryStats.update {doc_id:packObj.doc_id}, { + db.docHistoryStats.update {doc_id:doc_id}, { $inc:{update_count:-expect_nRemoved}, $currentDate:{last_packed:true} }, {upsert:true}, () -> callback(err, result) + # retrieve document ops/packs and check them + getDocHistory: (doc_id, callback) -> + db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) -> + return callback(err) if err? + # for safety, do a consistency check of the history + logger.log {doc_id}, "checking history for document" + PackManager.checkHistory docs, (err) -> + return callback(err) if err? + callback(err, docs) + #PackManager.deleteExpiredPackOps docs, (err) -> + # return callback(err) if err? + # callback err, docs + + packDocHistory: (doc_id, options, callback) -> + if typeof callback == "undefined" and typeof options == 'function' + callback = options + options = {} + LockManager.runWithLock( + "HistoryLock:#{doc_id}", + (releaseLock) -> + PackManager._packDocHistory(doc_id, options, releaseLock) + , callback + ) + + _packDocHistory: (doc_id, options, callback) -> + logger.log {doc_id},"starting pack operation for document history" + + PackManager.getDocHistory doc_id, (err, docs) -> + return callback(err) if err? + origDocs = 0 + origPacks = 0 + for d in docs + if d.pack? then origPacks++ else origDocs++ + PackManager.convertDocsToPacks docs, (err, packs) -> + return callback(err) if err? + total = 0 + for p in packs + total = total + p.pack.length + logger.log {doc_id, origDocs, origPacks, newPacks: packs.length, totalOps: total}, "document stats" + if packs.length + if options['dry-run'] + logger.log {doc_id}, 'dry-run, skipping write packs' + return callback() + PackManager.savePacks packs, (err) -> + return callback(err) if err? + # check the history again + PackManager.getDocHistory doc_id, callback + else + logger.log {doc_id}, "no packs to write" + # keep a record that we checked this one to avoid rechecking it + db.docHistoryStats.update {doc_id:doc_id}, { + $currentDate:{last_checked:true} + }, {upsert:true}, () -> + callback null, null + + DB_WRITE_DELAY: 2000 + + savePacks: (packs, callback) -> + async.eachSeries packs, PackManager.safeInsert, (err, result) -> + if err? + logger.log {err, result}, "error writing packs" + callback err, result + else + callback() + + safeInsert: (packObj, callback) -> + PackManager.insertPack packObj, (err, result) -> + setTimeout () -> + callback(err,result) + , PackManager.DB_WRITE_DELAY + deleteExpiredPackOps: (docs, callback) -> now = Date.now() toRemove = [] diff --git a/services/track-changes/pack.coffee b/services/track-changes/pack.coffee deleted file mode 100644 index 59e64aadf4..0000000000 --- a/services/track-changes/pack.coffee +++ /dev/null @@ -1,134 +0,0 @@ -Settings = require "settings-sharelatex" -fs = require("fs") -{db, ObjectId} = require "./app/coffee/mongojs" -async = require("async") -BSON=db.bson.BSON -util = require 'util' -_ = require 'underscore' -PackManager = require "./app/coffee/PackManager.coffee" - -lineReader = require "line-reader" -cli = require "cli" -options = cli.parse { - 'update': ['u', 'find documents to pack from database'] - 'dry-run': ['n', 'do not write to database'], - 'fast': [false, 'no delays on writes'] -} - -DB_WRITE_DELAY = if options.fast then 0 else 2000 -DOCUMENT_PACK_DELAY = if options.fast then 0 else 1000 - -packDocHistory = (doc_id, callback) -> - util.log "starting pack operation for #{doc_id}" - getDocHistory doc_id, (err, docs) -> - return callback(err) if err? - origDocs = 0 - origPacks = 0 - for d in docs - if d.pack? then origPacks++ else origDocs++ - PackManager.convertDocsToPacks docs, (err, packs) -> - return callback(err) if err? - total = 0 - for p in packs - total = total + p.pack.length - util.log "docs #{origDocs} packs #{origPacks} => packs #{packs.length} of #{total} ops" - if packs.length - if options['dry-run'] - util.log 'dry-run, skipping write packs' - return callback() - savePacks packs, (err) -> - return callback(err) if err? - # check the history again - getDocHistory doc_id, callback - else - util.log "no packs to write" - callback null, null - -# retrieve document ops/packs and check them -getDocHistory = (doc_id, callback) -> - db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) -> - return callback(err) if err? - # for safety, do a consistency check of the history - util.log "checking history for #{doc_id}" - PackManager.checkHistory docs, (err) -> - return callback(err) if err? - callback(err, docs) - #PackManager.deleteExpiredPackOps docs, (err) -> - # return callback(err) if err? - # callback err, docs - -safeInsert = (packObj, callback) -> - if shutdownRequested - return callback('shutdown') - PackManager.insertPack packObj, (err, result) -> - setTimeout () -> - callback(err,result) - , DB_WRITE_DELAY - -savePacks = (packs, callback) -> - async.eachSeries packs, safeInsert, (err, result) -> - if err? - util.log "error writing packs" - callback err, result - else - util.log "done writing packs" - callback() - -readFile = (file, callback) -> - ids = [] - lineReader.eachLine file, (line) -> - result = line.match(/[0-9a-f]{24}/) - if result? - ids.push result[0] - .then () -> - callback(null, ids) - -shutdownRequested = false -process.on 'SIGINT', () -> - util.log "Gracefully shutting down from SIGINT" - shutdownRequested = true - -processUpdates = (pending) -> - async.eachSeries pending, (doc_id, callback) -> - packDocHistory doc_id, (err, result) -> - if err? - console.log "ERROR:", err, result - return callback(err) - else if not options['dry-run'] && doneFile? - fs.appendFileSync doneFile, doc_id + '\n' - if shutdownRequested - return callback('shutdown') - setTimeout () -> - callback(err, result) - , DOCUMENT_PACK_DELAY - , (err, results) -> - if err? - console.log 'error:', err - util.log 'closing db' - db.close() - -if options['update'] - util.log 'checking for updates' - db.docHistoryStats.find({ - update_count: {$gt : PackManager.MIN_COUNT} - }).sort({ - update_count:-1 - }).limit 1000, (error, results) -> - if err? - utils.log 'error', error - db.close() - return - util.log "found #{results.length} documents to pack" - pending = _.pluck results, 'doc_id' - processUpdates pending -else - todoFile = cli.args[1] - doneFile = cli.args[2] - util.log "reading from #{todoFile}" - util.log "logging progress to #{doneFile}" - fs.appendFileSync doneFile, '# starting pack run at ' + new Date() + '\n' - - readFile todoFile, (err, todo) -> - readFile doneFile, (err, done) -> - pending = _.difference todo, done - processUpdates pending