mirror of
https://github.com/overleaf/overleaf.git
synced 2025-01-01 17:31:01 +00:00
132 lines
4.1 KiB
CoffeeScript
132 lines
4.1 KiB
CoffeeScript
Settings = require "settings-sharelatex"
|
|
async = require "async"
|
|
_ = require "underscore"
|
|
{db, ObjectId, BSON} = require "./mongojs"
|
|
fs = require "fs"
|
|
Metrics = require "metrics-sharelatex"
|
|
Metrics.initialize("track-changes")
|
|
logger = require "logger-sharelatex"
|
|
logger.initialize("track-changes-packworker")
|
|
if Settings.sentry?.dsn?
|
|
logger.initializeErrorReporting(Settings.sentry.dsn)
|
|
|
|
DAYS = 24 * 3600 * 1000
|
|
|
|
LockManager = require "./LockManager"
|
|
PackManager = require "./PackManager"
|
|
|
|
# this worker script is forked by the main process to look for
|
|
# document histories which can be archived
|
|
|
|
source = process.argv[2]
|
|
DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000
|
|
TIMEOUT = Number(process.argv[4]) || 30*60*1000
|
|
|
|
if source.match(/[^0-9]/)
|
|
file = fs.readFileSync source
|
|
result = for line in file.toString().split('\n')
|
|
[project_id, doc_id] = line.split(' ')
|
|
{doc_id, project_id}
|
|
pending = _.filter result, (row) -> row?.doc_id?.match(/^[a-f0-9]{24}$/)
|
|
else
|
|
LIMIT = Number(process.argv[2]) || 1000
|
|
|
|
shutDownRequested = false
|
|
shutDownTimer = setTimeout () ->
|
|
logger.log "pack timed out, requesting shutdown"
|
|
# start the shutdown on the next pack
|
|
shutDownRequested = true
|
|
# do a hard shutdown after a further 5 minutes
|
|
setTimeout () ->
|
|
logger.error "HARD TIMEOUT in pack archive worker"
|
|
process.exit()
|
|
, 5*60*1000
|
|
, TIMEOUT
|
|
|
|
logger.log "checking for updates, limit=#{LIMIT}, delay=#{DOCUMENT_PACK_DELAY}, timeout=#{TIMEOUT}"
|
|
|
|
# work around for https://github.com/mafintosh/mongojs/issues/224
|
|
db.close = (callback) ->
|
|
this._getServer (err, server) ->
|
|
return callback(err) if err?
|
|
server = if server.destroy? then server else server.topology
|
|
server.destroy(true, true)
|
|
callback()
|
|
|
|
finish = () ->
|
|
if shutDownTimer?
|
|
logger.log 'cancelling timeout'
|
|
clearTimeout shutDownTimer
|
|
logger.log 'closing db'
|
|
db.close () ->
|
|
logger.log 'closing LockManager Redis Connection'
|
|
LockManager.close () ->
|
|
logger.log 'ready to exit from pack archive worker'
|
|
hardTimeout = setTimeout () ->
|
|
logger.error 'hard exit from pack archive worker'
|
|
process.exit(1)
|
|
, 5*1000
|
|
hardTimeout.unref()
|
|
|
|
process.on 'exit', (code) ->
|
|
logger.log {code}, 'pack archive worker exited'
|
|
|
|
processUpdates = (pending) ->
|
|
async.eachSeries pending, (result, callback) ->
|
|
{_id, project_id, doc_id} = result
|
|
logger.log {project_id, doc_id}, "processing"
|
|
if not project_id? or not doc_id?
|
|
logger.log {project_id, doc_id}, "skipping pack, missing project/doc id"
|
|
return callback()
|
|
handler = (err, result) ->
|
|
if err? and err.code is "InternalError" and err.retryable
|
|
logger.warn {err, result}, "ignoring S3 error in pack archive worker"
|
|
# Ignore any s3 errors due to random problems
|
|
err = null
|
|
if err?
|
|
logger.error {err, result}, "error in pack archive worker"
|
|
return callback(err)
|
|
if shutDownRequested
|
|
logger.error "shutting down pack archive worker"
|
|
return callback(new Error("shutdown"))
|
|
setTimeout () ->
|
|
callback(err, result)
|
|
, DOCUMENT_PACK_DELAY
|
|
if not _id?
|
|
PackManager.pushOldPacks project_id, doc_id, handler
|
|
else
|
|
PackManager.processOldPack project_id, doc_id, _id, handler
|
|
, (err, results) ->
|
|
if err? and err.message != "shutdown"
|
|
logger.error {err}, 'error in pack archive worker processUpdates'
|
|
finish()
|
|
|
|
# find the packs which can be archived
|
|
|
|
ObjectIdFromDate = (date) ->
|
|
id = Math.floor(date.getTime() / 1000).toString(16) + "0000000000000000";
|
|
return ObjectId(id)
|
|
|
|
# new approach, two passes
|
|
# find packs to be marked as finalised:true, those which have a newer pack present
|
|
# then only consider finalised:true packs for archiving
|
|
|
|
if pending?
|
|
logger.log "got #{pending.length} entries from #{source}"
|
|
processUpdates pending
|
|
else
|
|
db.docHistory.find({
|
|
expiresAt: {$exists: false}
|
|
project_id: {$exists: true}
|
|
v_end: {$exists: true}
|
|
_id: {$lt: ObjectIdFromDate(new Date(Date.now() - 7 * DAYS))}
|
|
}, {_id:1, doc_id:1, project_id:1}).sort({
|
|
last_checked:1
|
|
}).limit LIMIT, (err, results) ->
|
|
if err?
|
|
logger.log {err}, 'error checking for updates'
|
|
finish()
|
|
return
|
|
pending = _.uniq results, false, (result) -> result.doc_id.toString()
|
|
logger.log "found #{pending.length} documents to archive"
|
|
processUpdates pending
|