Merge pull request #4 from sharelatex/move-packing-into-packmanager

Move packing into packmanager
This commit is contained in:
Brian Gough 2015-06-04 16:44:30 +01:00
commit c2e5c2c751
5 changed files with 163 additions and 135 deletions

View file

@ -7,6 +7,8 @@ Metrics = require "metrics-sharelatex"
Metrics.initialize("track-changes") Metrics.initialize("track-changes")
Metrics.mongodb.monitor(Path.resolve(__dirname + "/node_modules/mongojs/node_modules/mongodb"), logger) Metrics.mongodb.monitor(Path.resolve(__dirname + "/node_modules/mongojs/node_modules/mongodb"), logger)
child_process = require "child_process"
HttpController = require "./app/js/HttpController" HttpController = require "./app/js/HttpController"
express = require "express" express = require "express"
app = express() app = express()
@ -23,6 +25,21 @@ app.post "/project/:project_id/flush", HttpController.flushProject
app.post "/project/:project_id/doc/:doc_id/version/:version/restore", HttpController.restore app.post "/project/:project_id/doc/:doc_id/version/:version/restore", HttpController.restore
app.post "/doc/:doc_id/pack", HttpController.packDoc
packWorker = null # use a single packing worker
app.post "/pack", (req, res, next) ->
if packWorker?
res.send "pack already running"
else
logger.log "running pack"
packWorker = child_process.fork(__dirname + '/app/js/PackWorker.js')
packWorker.on 'exit', (code, signal) ->
logger.log {code, signal}, "history auto pack exited"
packWorker = null
res.send "pack started"
app.get "/status", (req, res, next) -> app.get "/status", (req, res, next) ->
res.send "track-changes is alive" res.send "track-changes is alive"

View file

@ -1,5 +1,6 @@
UpdatesManager = require "./UpdatesManager" UpdatesManager = require "./UpdatesManager"
DiffManager = require "./DiffManager" DiffManager = require "./DiffManager"
PackManager = require "./PackManager"
RestoreManager = require "./RestoreManager" RestoreManager = require "./RestoreManager"
logger = require "logger-sharelatex" logger = require "logger-sharelatex"
@ -19,6 +20,13 @@ module.exports = HttpController =
return next(error) if error? return next(error) if error?
res.send 204 res.send 204
packDoc: (req, res, next = (error) ->) ->
doc_id = req.params.doc_id
logger.log doc_id: doc_id, "packing doc history"
PackManager.packDocHistory doc_id, (error) ->
return next(error) if error?
res.send 204
getDiff: (req, res, next = (error) ->) -> getDiff: (req, res, next = (error) ->) ->
doc_id = req.params.doc_id doc_id = req.params.doc_id
project_id = req.params.project_id project_id = req.params.project_id

View file

@ -2,6 +2,8 @@ async = require "async"
_ = require "underscore" _ = require "underscore"
{db, ObjectId} = require "./mongojs" {db, ObjectId} = require "./mongojs"
BSON=db.bson.BSON BSON=db.bson.BSON
logger = require "logger-sharelatex"
LockManager = require "./LockManager"
module.exports = PackManager = module.exports = PackManager =
# The following functions implement methods like a mongo find, but # The following functions implement methods like a mongo find, but
@ -325,26 +327,101 @@ module.exports = PackManager =
insertPack: (packObj, callback) -> insertPack: (packObj, callback) ->
bulk = db.docHistory.initializeOrderedBulkOp() bulk = db.docHistory.initializeOrderedBulkOp()
doc_id = packObj.doc_id
expect_nInserted = 1 expect_nInserted = 1
expect_nRemoved = packObj.pack.length expect_nRemoved = packObj.pack.length
logger.log {doc_id: doc_id}, "adding pack, removing #{expect_nRemoved} ops"
bulk.insert packObj bulk.insert packObj
packObj.pack.forEach (op) -> packObj.pack.forEach (op) ->
bulk.find({_id:op._id}).removeOne() bulk.find({_id:op._id}).removeOne()
bulk.execute (err, result) -> bulk.execute (err, result) ->
if err? if err?
logger.error {doc_id: doc_id}, "error adding pack"
callback(err, result) callback(err, result)
else if result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved else if result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved
logger.error {doc_id: doc_id, result}, "unexpected result adding pack"
callback(new Error( callback(new Error(
msg: 'unexpected result' msg: 'unexpected result'
expected: {expect_nInserted, expect_nRemoved} expected: {expect_nInserted, expect_nRemoved}
), result) ), result)
else else
db.docHistoryStats.update {doc_id:packObj.doc_id}, { db.docHistoryStats.update {doc_id:doc_id}, {
$inc:{update_count:-expect_nRemoved}, $inc:{update_count:-expect_nRemoved},
$currentDate:{last_packed:true} $currentDate:{last_packed:true}
}, {upsert:true}, () -> }, {upsert:true}, () ->
callback(err, result) callback(err, result)
# retrieve document ops/packs and check them
getDocHistory: (doc_id, callback) ->
db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) ->
return callback(err) if err?
# for safety, do a consistency check of the history
logger.log {doc_id}, "checking history for document"
PackManager.checkHistory docs, (err) ->
return callback(err) if err?
callback(err, docs)
#PackManager.deleteExpiredPackOps docs, (err) ->
# return callback(err) if err?
# callback err, docs
packDocHistory: (doc_id, options, callback) ->
if typeof callback == "undefined" and typeof options == 'function'
callback = options
options = {}
LockManager.runWithLock(
"HistoryLock:#{doc_id}",
(releaseLock) ->
PackManager._packDocHistory(doc_id, options, releaseLock)
, callback
)
_packDocHistory: (doc_id, options, callback) ->
logger.log {doc_id},"starting pack operation for document history"
PackManager.getDocHistory doc_id, (err, docs) ->
return callback(err) if err?
origDocs = 0
origPacks = 0
for d in docs
if d.pack? then origPacks++ else origDocs++
PackManager.convertDocsToPacks docs, (err, packs) ->
return callback(err) if err?
total = 0
for p in packs
total = total + p.pack.length
logger.log {doc_id, origDocs, origPacks, newPacks: packs.length, totalOps: total}, "document stats"
if packs.length
if options['dry-run']
logger.log {doc_id}, 'dry-run, skipping write packs'
return callback()
PackManager.savePacks packs, (err) ->
return callback(err) if err?
# check the history again
PackManager.getDocHistory doc_id, callback
else
logger.log {doc_id}, "no packs to write"
# keep a record that we checked this one to avoid rechecking it
db.docHistoryStats.update {doc_id:doc_id}, {
$currentDate:{last_checked:true}
}, {upsert:true}, () ->
callback null, null
DB_WRITE_DELAY: 2000
savePacks: (packs, callback) ->
async.eachSeries packs, PackManager.safeInsert, (err, result) ->
if err?
logger.log {err, result}, "error writing packs"
callback err, result
else
callback()
safeInsert: (packObj, callback) ->
PackManager.insertPack packObj, (err, result) ->
setTimeout () ->
callback(err,result)
, PackManager.DB_WRITE_DELAY
deleteExpiredPackOps: (docs, callback) -> deleteExpiredPackOps: (docs, callback) ->
now = Date.now() now = Date.now()
toRemove = [] toRemove = []

View file

@ -0,0 +1,60 @@
async = require "async"
_ = require "underscore"
{db, ObjectId} = require "./mongojs"
BSON=db.bson.BSON
logger = require "logger-sharelatex"
logger.initialize("track-changes-packworker")
LockManager = require "./LockManager"
PackManager = require "./PackManager"
# this worker script is forked by the main process to look for
# document histories which can be packed
DOCUMENT_PACK_DELAY = 1000
logger.log 'checking for updates'
finish = () ->
logger.log 'closing db'
db.close () ->
logger.log 'exiting from pack worker'
process.exit()
processUpdates = (pending) ->
async.eachSeries pending, (doc_id, callback) ->
PackManager.packDocHistory doc_id, (err, result) ->
if err?
logger.error {err, result}, "error in pack worker"
return callback(err)
setTimeout () ->
callback(err, result)
, DOCUMENT_PACK_DELAY
, (err, results) ->
if err?
logger.error {err}, 'error in pack worker processUpdates'
finish()
# find the documents which can be packed, by checking the number of
# unpacked updates in the docHistoryStats collection
db.docHistoryStats.find({
update_count: {$gt : PackManager.MIN_COUNT}
}).sort({
update_count:-1
}).limit 1000, (err, results) ->
if err?
logger.log {err}, 'error checking for updates'
finish()
return
results = _.filter results, (doc) ->
if doc.last_checked? and doc.last_checked > doc.last_update
# skip documents which don't have any updates since last check
return false
else if doc.last_packed? and doc.last_packed > doc.last_update
# skip documents which don't have any updates since last pack
return false
else
return true
pending = _.pluck results, 'doc_id'
logger.log "found #{pending.length} documents to pack"
processUpdates pending

View file

@ -1,134 +0,0 @@
Settings = require "settings-sharelatex"
fs = require("fs")
{db, ObjectId} = require "./app/coffee/mongojs"
async = require("async")
BSON=db.bson.BSON
util = require 'util'
_ = require 'underscore'
PackManager = require "./app/coffee/PackManager.coffee"
lineReader = require "line-reader"
cli = require "cli"
options = cli.parse {
'update': ['u', 'find documents to pack from database']
'dry-run': ['n', 'do not write to database'],
'fast': [false, 'no delays on writes']
}
DB_WRITE_DELAY = if options.fast then 0 else 2000
DOCUMENT_PACK_DELAY = if options.fast then 0 else 1000
packDocHistory = (doc_id, callback) ->
util.log "starting pack operation for #{doc_id}"
getDocHistory doc_id, (err, docs) ->
return callback(err) if err?
origDocs = 0
origPacks = 0
for d in docs
if d.pack? then origPacks++ else origDocs++
PackManager.convertDocsToPacks docs, (err, packs) ->
return callback(err) if err?
total = 0
for p in packs
total = total + p.pack.length
util.log "docs #{origDocs} packs #{origPacks} => packs #{packs.length} of #{total} ops"
if packs.length
if options['dry-run']
util.log 'dry-run, skipping write packs'
return callback()
savePacks packs, (err) ->
return callback(err) if err?
# check the history again
getDocHistory doc_id, callback
else
util.log "no packs to write"
callback null, null
# retrieve document ops/packs and check them
getDocHistory = (doc_id, callback) ->
db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) ->
return callback(err) if err?
# for safety, do a consistency check of the history
util.log "checking history for #{doc_id}"
PackManager.checkHistory docs, (err) ->
return callback(err) if err?
callback(err, docs)
#PackManager.deleteExpiredPackOps docs, (err) ->
# return callback(err) if err?
# callback err, docs
safeInsert = (packObj, callback) ->
if shutdownRequested
return callback('shutdown')
PackManager.insertPack packObj, (err, result) ->
setTimeout () ->
callback(err,result)
, DB_WRITE_DELAY
savePacks = (packs, callback) ->
async.eachSeries packs, safeInsert, (err, result) ->
if err?
util.log "error writing packs"
callback err, result
else
util.log "done writing packs"
callback()
readFile = (file, callback) ->
ids = []
lineReader.eachLine file, (line) ->
result = line.match(/[0-9a-f]{24}/)
if result?
ids.push result[0]
.then () ->
callback(null, ids)
shutdownRequested = false
process.on 'SIGINT', () ->
util.log "Gracefully shutting down from SIGINT"
shutdownRequested = true
processUpdates = (pending) ->
async.eachSeries pending, (doc_id, callback) ->
packDocHistory doc_id, (err, result) ->
if err?
console.log "ERROR:", err, result
return callback(err)
else if not options['dry-run'] && doneFile?
fs.appendFileSync doneFile, doc_id + '\n'
if shutdownRequested
return callback('shutdown')
setTimeout () ->
callback(err, result)
, DOCUMENT_PACK_DELAY
, (err, results) ->
if err?
console.log 'error:', err
util.log 'closing db'
db.close()
if options['update']
util.log 'checking for updates'
db.docHistoryStats.find({
update_count: {$gt : PackManager.MIN_COUNT}
}).sort({
update_count:-1
}).limit 1000, (error, results) ->
if err?
utils.log 'error', error
db.close()
return
util.log "found #{results.length} documents to pack"
pending = _.pluck results, 'doc_id'
processUpdates pending
else
todoFile = cli.args[1]
doneFile = cli.args[2]
util.log "reading from #{todoFile}"
util.log "logging progress to #{doneFile}"
fs.appendFileSync doneFile, '# starting pack run at ' + new Date() + '\n'
readFile todoFile, (err, todo) ->
readFile doneFile, (err, done) ->
pending = _.difference todo, done
processUpdates pending