diff --git a/services/track-changes/app.coffee b/services/track-changes/app.coffee index 08a8b66113..b0ae08dca5 100644 --- a/services/track-changes/app.coffee +++ b/services/track-changes/app.coffee @@ -7,6 +7,8 @@ Metrics = require "metrics-sharelatex" Metrics.initialize("track-changes") Metrics.mongodb.monitor(Path.resolve(__dirname + "/node_modules/mongojs/node_modules/mongodb"), logger) +child_process = require "child_process" + HttpController = require "./app/js/HttpController" express = require "express" app = express() @@ -23,6 +25,21 @@ app.post "/project/:project_id/flush", HttpController.flushProject app.post "/project/:project_id/doc/:doc_id/version/:version/restore", HttpController.restore +app.post "/doc/:doc_id/pack", HttpController.packDoc + +packWorker = null # use a single packing worker + +app.post "/pack", (req, res, next) -> + if packWorker? + res.send "pack already running" + else + logger.log "running pack" + packWorker = child_process.fork(__dirname + '/app/js/PackWorker.js') + packWorker.on 'exit', (code, signal) -> + logger.log {code, signal}, "history auto pack exited" + packWorker = null + res.send "pack started" + app.get "/status", (req, res, next) -> res.send "track-changes is alive" diff --git a/services/track-changes/app/coffee/HttpController.coffee b/services/track-changes/app/coffee/HttpController.coffee index 71df05a8f8..d802f193bc 100644 --- a/services/track-changes/app/coffee/HttpController.coffee +++ b/services/track-changes/app/coffee/HttpController.coffee @@ -1,5 +1,6 @@ UpdatesManager = require "./UpdatesManager" DiffManager = require "./DiffManager" +PackManager = require "./PackManager" RestoreManager = require "./RestoreManager" logger = require "logger-sharelatex" @@ -19,6 +20,13 @@ module.exports = HttpController = return next(error) if error? res.send 204 + packDoc: (req, res, next = (error) ->) -> + doc_id = req.params.doc_id + logger.log doc_id: doc_id, "packing doc history" + PackManager.packDocHistory doc_id, (error) -> + return next(error) if error? + res.send 204 + getDiff: (req, res, next = (error) ->) -> doc_id = req.params.doc_id project_id = req.params.project_id diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 43de0dfc87..3311dbbb5a 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -2,6 +2,8 @@ async = require "async" _ = require "underscore" {db, ObjectId} = require "./mongojs" BSON=db.bson.BSON +logger = require "logger-sharelatex" +LockManager = require "./LockManager" module.exports = PackManager = # The following functions implement methods like a mongo find, but @@ -325,26 +327,101 @@ module.exports = PackManager = insertPack: (packObj, callback) -> bulk = db.docHistory.initializeOrderedBulkOp() + doc_id = packObj.doc_id expect_nInserted = 1 expect_nRemoved = packObj.pack.length + logger.log {doc_id: doc_id}, "adding pack, removing #{expect_nRemoved} ops" bulk.insert packObj packObj.pack.forEach (op) -> bulk.find({_id:op._id}).removeOne() bulk.execute (err, result) -> if err? + logger.error {doc_id: doc_id}, "error adding pack" callback(err, result) else if result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved + logger.error {doc_id: doc_id, result}, "unexpected result adding pack" callback(new Error( msg: 'unexpected result' expected: {expect_nInserted, expect_nRemoved} ), result) else - db.docHistoryStats.update {doc_id:packObj.doc_id}, { + db.docHistoryStats.update {doc_id:doc_id}, { $inc:{update_count:-expect_nRemoved}, $currentDate:{last_packed:true} }, {upsert:true}, () -> callback(err, result) + # retrieve document ops/packs and check them + getDocHistory: (doc_id, callback) -> + db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) -> + return callback(err) if err? + # for safety, do a consistency check of the history + logger.log {doc_id}, "checking history for document" + PackManager.checkHistory docs, (err) -> + return callback(err) if err? + callback(err, docs) + #PackManager.deleteExpiredPackOps docs, (err) -> + # return callback(err) if err? + # callback err, docs + + packDocHistory: (doc_id, options, callback) -> + if typeof callback == "undefined" and typeof options == 'function' + callback = options + options = {} + LockManager.runWithLock( + "HistoryLock:#{doc_id}", + (releaseLock) -> + PackManager._packDocHistory(doc_id, options, releaseLock) + , callback + ) + + _packDocHistory: (doc_id, options, callback) -> + logger.log {doc_id},"starting pack operation for document history" + + PackManager.getDocHistory doc_id, (err, docs) -> + return callback(err) if err? + origDocs = 0 + origPacks = 0 + for d in docs + if d.pack? then origPacks++ else origDocs++ + PackManager.convertDocsToPacks docs, (err, packs) -> + return callback(err) if err? + total = 0 + for p in packs + total = total + p.pack.length + logger.log {doc_id, origDocs, origPacks, newPacks: packs.length, totalOps: total}, "document stats" + if packs.length + if options['dry-run'] + logger.log {doc_id}, 'dry-run, skipping write packs' + return callback() + PackManager.savePacks packs, (err) -> + return callback(err) if err? + # check the history again + PackManager.getDocHistory doc_id, callback + else + logger.log {doc_id}, "no packs to write" + # keep a record that we checked this one to avoid rechecking it + db.docHistoryStats.update {doc_id:doc_id}, { + $currentDate:{last_checked:true} + }, {upsert:true}, () -> + callback null, null + + DB_WRITE_DELAY: 2000 + + savePacks: (packs, callback) -> + async.eachSeries packs, PackManager.safeInsert, (err, result) -> + if err? + logger.log {err, result}, "error writing packs" + callback err, result + else + callback() + + safeInsert: (packObj, callback) -> + PackManager.insertPack packObj, (err, result) -> + setTimeout () -> + callback(err,result) + , PackManager.DB_WRITE_DELAY + deleteExpiredPackOps: (docs, callback) -> now = Date.now() toRemove = [] diff --git a/services/track-changes/app/coffee/PackWorker.coffee b/services/track-changes/app/coffee/PackWorker.coffee new file mode 100644 index 0000000000..8366343de5 --- /dev/null +++ b/services/track-changes/app/coffee/PackWorker.coffee @@ -0,0 +1,60 @@ +async = require "async" +_ = require "underscore" +{db, ObjectId} = require "./mongojs" +BSON=db.bson.BSON +logger = require "logger-sharelatex" +logger.initialize("track-changes-packworker") +LockManager = require "./LockManager" +PackManager = require "./PackManager" + +# this worker script is forked by the main process to look for +# document histories which can be packed + +DOCUMENT_PACK_DELAY = 1000 + +logger.log 'checking for updates' + +finish = () -> + logger.log 'closing db' + db.close () -> + logger.log 'exiting from pack worker' + process.exit() + +processUpdates = (pending) -> + async.eachSeries pending, (doc_id, callback) -> + PackManager.packDocHistory doc_id, (err, result) -> + if err? + logger.error {err, result}, "error in pack worker" + return callback(err) + setTimeout () -> + callback(err, result) + , DOCUMENT_PACK_DELAY + , (err, results) -> + if err? + logger.error {err}, 'error in pack worker processUpdates' + finish() + +# find the documents which can be packed, by checking the number of +# unpacked updates in the docHistoryStats collection + +db.docHistoryStats.find({ + update_count: {$gt : PackManager.MIN_COUNT} +}).sort({ + update_count:-1 +}).limit 1000, (err, results) -> + if err? + logger.log {err}, 'error checking for updates' + finish() + return + results = _.filter results, (doc) -> + if doc.last_checked? and doc.last_checked > doc.last_update + # skip documents which don't have any updates since last check + return false + else if doc.last_packed? and doc.last_packed > doc.last_update + # skip documents which don't have any updates since last pack + return false + else + return true + pending = _.pluck results, 'doc_id' + logger.log "found #{pending.length} documents to pack" + processUpdates pending diff --git a/services/track-changes/pack.coffee b/services/track-changes/pack.coffee deleted file mode 100644 index 59e64aadf4..0000000000 --- a/services/track-changes/pack.coffee +++ /dev/null @@ -1,134 +0,0 @@ -Settings = require "settings-sharelatex" -fs = require("fs") -{db, ObjectId} = require "./app/coffee/mongojs" -async = require("async") -BSON=db.bson.BSON -util = require 'util' -_ = require 'underscore' -PackManager = require "./app/coffee/PackManager.coffee" - -lineReader = require "line-reader" -cli = require "cli" -options = cli.parse { - 'update': ['u', 'find documents to pack from database'] - 'dry-run': ['n', 'do not write to database'], - 'fast': [false, 'no delays on writes'] -} - -DB_WRITE_DELAY = if options.fast then 0 else 2000 -DOCUMENT_PACK_DELAY = if options.fast then 0 else 1000 - -packDocHistory = (doc_id, callback) -> - util.log "starting pack operation for #{doc_id}" - getDocHistory doc_id, (err, docs) -> - return callback(err) if err? - origDocs = 0 - origPacks = 0 - for d in docs - if d.pack? then origPacks++ else origDocs++ - PackManager.convertDocsToPacks docs, (err, packs) -> - return callback(err) if err? - total = 0 - for p in packs - total = total + p.pack.length - util.log "docs #{origDocs} packs #{origPacks} => packs #{packs.length} of #{total} ops" - if packs.length - if options['dry-run'] - util.log 'dry-run, skipping write packs' - return callback() - savePacks packs, (err) -> - return callback(err) if err? - # check the history again - getDocHistory doc_id, callback - else - util.log "no packs to write" - callback null, null - -# retrieve document ops/packs and check them -getDocHistory = (doc_id, callback) -> - db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) -> - return callback(err) if err? - # for safety, do a consistency check of the history - util.log "checking history for #{doc_id}" - PackManager.checkHistory docs, (err) -> - return callback(err) if err? - callback(err, docs) - #PackManager.deleteExpiredPackOps docs, (err) -> - # return callback(err) if err? - # callback err, docs - -safeInsert = (packObj, callback) -> - if shutdownRequested - return callback('shutdown') - PackManager.insertPack packObj, (err, result) -> - setTimeout () -> - callback(err,result) - , DB_WRITE_DELAY - -savePacks = (packs, callback) -> - async.eachSeries packs, safeInsert, (err, result) -> - if err? - util.log "error writing packs" - callback err, result - else - util.log "done writing packs" - callback() - -readFile = (file, callback) -> - ids = [] - lineReader.eachLine file, (line) -> - result = line.match(/[0-9a-f]{24}/) - if result? - ids.push result[0] - .then () -> - callback(null, ids) - -shutdownRequested = false -process.on 'SIGINT', () -> - util.log "Gracefully shutting down from SIGINT" - shutdownRequested = true - -processUpdates = (pending) -> - async.eachSeries pending, (doc_id, callback) -> - packDocHistory doc_id, (err, result) -> - if err? - console.log "ERROR:", err, result - return callback(err) - else if not options['dry-run'] && doneFile? - fs.appendFileSync doneFile, doc_id + '\n' - if shutdownRequested - return callback('shutdown') - setTimeout () -> - callback(err, result) - , DOCUMENT_PACK_DELAY - , (err, results) -> - if err? - console.log 'error:', err - util.log 'closing db' - db.close() - -if options['update'] - util.log 'checking for updates' - db.docHistoryStats.find({ - update_count: {$gt : PackManager.MIN_COUNT} - }).sort({ - update_count:-1 - }).limit 1000, (error, results) -> - if err? - utils.log 'error', error - db.close() - return - util.log "found #{results.length} documents to pack" - pending = _.pluck results, 'doc_id' - processUpdates pending -else - todoFile = cli.args[1] - doneFile = cli.args[2] - util.log "reading from #{todoFile}" - util.log "logging progress to #{doneFile}" - fs.appendFileSync doneFile, '# starting pack run at ' + new Date() + '\n' - - readFile todoFile, (err, todo) -> - readFile doneFile, (err, done) -> - pending = _.difference todo, done - processUpdates pending