overleaf/services/track-changes/app/coffee/PackWorker.coffee

101 lines
3 KiB
CoffeeScript
Raw Normal View History

Settings = require "settings-sharelatex"
async = require "async"
_ = require "underscore"
2015-12-17 09:11:44 -05:00
{db, ObjectId, BSON} = require "./mongojs"
logger = require "logger-sharelatex"
logger.initialize("track-changes-packworker")
if Settings.sentry?.dsn?
logger.initializeErrorReporting(Settings.sentry.dsn)
LockManager = require "./LockManager"
PackManager = require "./PackManager"
# this worker script is forked by the main process to look for
# document histories which can be packed
LIMIT = Number(process.argv[2]) || 1000
DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000
TIMEOUT = Number(process.argv[4]) || 30*60*1000
shutDownRequested = false
2016-01-06 04:43:10 -05:00
shutDownTimer = setTimeout () ->
logger.log "pack timed out, requesting shutdown"
# start the shutdown on the next pack
shutDownRequested = true
# do a hard shutdown after a further 5 minutes
setTimeout () ->
logger.error "HARD TIMEOUT in pack worker"
process.exit()
, 5*60*1000
, TIMEOUT
logger.log "checking for updates, limit=#{LIMIT}, delay=#{DOCUMENT_PACK_DELAY}, timeout=#{TIMEOUT}"
# work around for https://github.com/mafintosh/mongojs/issues/224
db.close = (callback) ->
this._getServer (err, server) ->
return callback(err) if err?
server = if server.destroy? then server else server.topology
server.destroy(true, true)
callback()
finish = () ->
2016-01-06 04:43:10 -05:00
if shutDownTimer?
logger.log 'cancelling timeout'
clearTimeout shutDownTimer
logger.log 'closing db'
db.close () ->
2016-01-06 04:43:10 -05:00
logger.log 'closing LockManager Redis Connection'
LockManager.close () ->
logger.log 'ready to exit from pack worker'
hardTimeout = setTimeout () ->
logger.error 'hard exit from pack worker'
process.exit(1)
, 5*1000
hardTimeout.unref()
process.on 'exit', (code) ->
logger.log {code}, 'pack worker exited'
processUpdates = (pending) ->
async.eachSeries pending, (doc_id, callback) ->
PackManager.packDocHistory doc_id, (err, result) ->
if err?
logger.error {err, result}, "error in pack worker"
return callback(err)
if shutDownRequested
logger.error "shutting down pack worker"
return callback(new Error("shutdown"))
setTimeout () ->
callback(err, result)
, DOCUMENT_PACK_DELAY
, (err, results) ->
2015-09-12 06:07:54 -04:00
if err? and err.message != "shutdown"
logger.error {err}, 'error in pack worker processUpdates'
finish()
# find the documents which can be packed, by checking the number of
# unpacked updates in the docHistoryStats collection
db.docHistoryStats.find({
update_count: {$gt : PackManager.MIN_COUNT}
}).sort({
update_count:-1
}).limit LIMIT, (err, results) ->
if err?
logger.log {err}, 'error checking for updates'
finish()
return
results = _.filter results, (doc) ->
if doc.last_checked? and doc.last_checked > doc.last_update
# skip documents which don't have any updates since last check
return false
else if doc.last_packed? and doc.last_packed > doc.last_update
# skip documents which don't have any updates since last pack
return false
else
return true
pending = _.pluck results, 'doc_id'
logger.log "found #{pending.length} documents to pack"
processUpdates pending