diff --git a/services/track-changes/app/coffee/PackWorker.coffee b/services/track-changes/app/coffee/PackWorker.coffee index a6d31b8436..d1afd9f1fa 100644 --- a/services/track-changes/app/coffee/PackWorker.coffee +++ b/services/track-changes/app/coffee/PackWorker.coffee @@ -2,6 +2,7 @@ Settings = require "settings-sharelatex" async = require "async" _ = require "underscore" {db, ObjectId, BSON} = require "./mongojs" +fs = require "fs" logger = require "logger-sharelatex" logger.initialize("track-changes-packworker") if Settings.sentry?.dsn? @@ -15,10 +16,19 @@ PackManager = require "./PackManager" # this worker script is forked by the main process to look for # document histories which can be archived -LIMIT = Number(process.argv[2]) || 1000 +source = process.argv[2] DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000 TIMEOUT = Number(process.argv[4]) || 30*60*1000 +if source.match(/[^0-9]/) + file = fs.readFileSync source + result = for line in file.toString().split('\n') + [project_id, doc_id] = line.split(' ') + {doc_id, project_id} + pending = _.filter result, (row) -> row?.doc_id?.match(/^[a-f0-9]{24}$/) +else + LIMIT = Number(process.argv[2]) || 1000 + shutDownRequested = false shutDownTimer = setTimeout () -> logger.log "pack timed out, requesting shutdown" @@ -62,10 +72,11 @@ process.on 'exit', (code) -> processUpdates = (pending) -> async.eachSeries pending, (result, callback) -> {_id, project_id, doc_id} = result + logger.log {project_id, doc_id}, "processing" if not project_id? or not doc_id? logger.log {project_id, doc_id}, "skipping pack, missing project/doc id" return callback() - PackManager.processOldPack project_id, doc_id, _id, (err, result) -> + handler = (err, result) -> if err? logger.error {err, result}, "error in pack archive worker" return callback(err) @@ -75,6 +86,10 @@ processUpdates = (pending) -> setTimeout () -> callback(err, result) , DOCUMENT_PACK_DELAY + if not _id? + PackManager.pushOldPacks project_id, doc_id, handler + else + PackManager.processOldPack project_id, doc_id, _id, handler , (err, results) -> if err? and err.message != "shutdown" logger.error {err}, 'error in pack archive worker processUpdates' @@ -90,18 +105,23 @@ ObjectIdFromDate = (date) -> # find packs to be marked as finalised:true, those which have a newer pack present # then only consider finalised:true packs for archiving -db.docHistory.find({ - expiresAt: {$exists: false} - project_id: {$exists: true} - v_end: {$exists: true} - _id: {$lt: ObjectIdFromDate(new Date(Date.now() - 7 * DAYS))} -}, {_id:1, doc_id:1, project_id:1}).sort({ - last_checked:1 -}).limit LIMIT, (err, results) -> - if err? - logger.log {err}, 'error checking for updates' - finish() - return - pending = _.uniq results, false, (result) -> result.doc_id.toString() - logger.log "found #{pending.length} documents to archive" +if pending? + logger.log "got #{pending.length} entries from #{source}" processUpdates pending +else + db.docHistory.find({ + expiresAt: {$exists: false} + project_id: {$exists: true} + v_end: {$exists: true} + $or: [ {n:512}, {n:1024} ] + _id: {$lt: ObjectIdFromDate(new Date(Date.now() - 7 * DAYS))} + }, {_id:1, doc_id:1, project_id:1}).sort({ + last_checked:1 + }).limit LIMIT, (err, results) -> + if err? + logger.log {err}, 'error checking for updates' + finish() + return + pending = _.uniq results, false, (result) -> result.doc_id.toString() + logger.log "found #{pending.length} documents to archive" + processUpdates pending