overleaf/services/web/TpdsWorker.coffee

123 lines
3.7 KiB
CoffeeScript
Raw Normal View History

2014-02-12 10:23:40 +00:00
async = require('async')
request = require('request')
keys = require('./app/js/infrastructure/Keys')
settings = require('settings-sharelatex')
logger = require('logger-sharelatex')
_ = require('underscore')
childProcess = require("child_process")
metrics = require("./app/js/infrastructure/Metrics")
fiveMinutes = 5 * 60 * 1000
processingFuncs =
sendDoc : (options, callback)->
if !options.docLines? || options.docLines.length == 0
logger.err options:options, "doc lines not added to options for processing"
return callback()
docLines = options.docLines.reduce (singleLine, line)-> "#{singleLine}\n#{line}"
post = request(options)
post.on 'error', (err)->
if err?
callback(err)
else
callback()
post.on 'end', callback
post.write(docLines, 'utf-8')
standardHttpRequest: (options, callback)->
request options, (err, reponse, body)->
if err?
callback(err)
else
callback()
2014-03-25 22:28:39 +00:00
pipeStreamFrom: (options, _callback)->
callback = (args...) ->
_callback(args...)
_callback = () ->
2014-02-12 10:23:40 +00:00
if options.filePath == "/droppy/main.tex"
request options.streamOrigin, (err,res, body)->
logger.log options:options, body:body
2014-03-25 22:28:39 +00:00
2014-02-12 10:23:40 +00:00
origin = request(options.streamOrigin)
2014-03-25 22:28:39 +00:00
cancelled = false
gotResponse = false
origin.on 'response', (res) ->
2014-03-25 22:28:39 +00:00
return if cancelled
gotResponse = true
if 200 <= res.statusCode < 300
dest = request(options)
origin.pipe(dest)
dest.on "error", (err)->
logger.error err:err, options:options, "something went wrong in pipeStreamFrom dest"
callback(err)
dest.on 'end', callback
else
error = new Error("received non-success status code: #{res.statusCode}")
logger.error err: error, options: options, "something went wrong connecting to origin"
callback(error)
2014-02-12 10:23:40 +00:00
origin.on 'error', (err)->
2014-03-25 22:28:39 +00:00
return if cancelled
gotResponse = true
2014-02-12 10:23:40 +00:00
logger.error err:err, options:options, "something went wrong in pipeStreamFrom origin"
callback(err)
2014-03-25 22:28:39 +00:00
setTimeout () ->
return if gotResponse
2014-03-25 22:28:39 +00:00
cancelled = true
error = new Error("timeout")
logger.error err: error, options: options, "timeout"
callback(error)
2014-03-25 23:05:55 +00:00
, 2000
2014-03-25 22:28:39 +00:00
2014-02-12 10:23:40 +00:00
workerRegistration = (groupKey, method, options, callback)->
callback = _.once callback
setTimeout callback, fiveMinutes
metrics.inc "tpds-worker-processing"
logger.log groupKey:groupKey, method:method, options:options, "processing http request from queue"
processingFuncs[method] options, (err)->
if err?
logger.err err:err, user_id:groupKey, method:method, options:options, "something went wrong processing tpdsUpdateSender update"
return callback("skip-after-retry")
callback()
setupWebToTpdsWorkers = (queueName)->
logger.log worker_count:worker_count, queueName:queueName, "fairy workers"
worker_count = 4
while worker_count-- > 0
workerQueueRef = require('fairy').connect(settings.redis.fairy).queue(queueName)
workerQueueRef.polling_interval = 100
workerQueueRef.regist workerRegistration
cleanupPreviousQueues = (queueName, callback)->
#cleanup queues then setup workers
fairy = require('fairy').connect(settings.redis.fairy)
queuePrefix = "FAIRY:QUEUED:#{queueName}:"
fairy.redis.keys "#{queuePrefix}*", (err, keys)->
logger.log "#{keys.length} fairy queues need cleanup"
queueNames = keys.map (key)->
key.replace queuePrefix, ""
cleanupJobs = queueNames.map (projectQueueName)->
return (cb)->
cleanup = childProcess.fork(__dirname + '/cleanup.js', [queueName, projectQueueName])
cleanup.on 'exit', cb
async.series cleanupJobs, callback
cleanupPreviousQueues keys.queue.web_to_tpds_http_requests, ->
setupWebToTpdsWorkers keys.queue.web_to_tpds_http_requests
cleanupPreviousQueues keys.queue.tpds_to_web_http_requests, ->
setupWebToTpdsWorkers keys.queue.tpds_to_web_http_requests