mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
removed tpdsworker
This commit is contained in:
parent
fd53cf0124
commit
10c012d856
2 changed files with 0 additions and 126 deletions
|
@ -28,10 +28,6 @@ module.exports = (grunt) ->
|
|||
src: 'app.coffee'
|
||||
dest: 'app.js'
|
||||
|
||||
TpdsWorker:
|
||||
src: 'TpdsWorker.coffee'
|
||||
dest: 'TpdsWorker.js'
|
||||
|
||||
BackgroundJobsWorker:
|
||||
src: 'BackgroundJobsWorker.coffee'
|
||||
dest: 'BackgroundJobsWorker.js'
|
||||
|
|
|
@ -1,122 +0,0 @@
|
|||
async = require('async')
|
||||
request = require('request')
|
||||
keys = require('./app/js/infrastructure/Keys')
|
||||
settings = require('settings-sharelatex')
|
||||
logger = require('logger-sharelatex')
|
||||
_ = require('underscore')
|
||||
childProcess = require("child_process")
|
||||
metrics = require("./app/js/infrastructure/Metrics")
|
||||
|
||||
fiveMinutes = 5 * 60 * 1000
|
||||
|
||||
|
||||
processingFuncs =
|
||||
|
||||
sendDoc : (options, callback)->
|
||||
if !options.docLines? || options.docLines.length == 0
|
||||
logger.err options:options, "doc lines not added to options for processing"
|
||||
return callback()
|
||||
docLines = options.docLines.reduce (singleLine, line)-> "#{singleLine}\n#{line}"
|
||||
post = request(options)
|
||||
post.on 'error', (err)->
|
||||
if err?
|
||||
callback(err)
|
||||
else
|
||||
callback()
|
||||
post.on 'end', callback
|
||||
post.write(docLines, 'utf-8')
|
||||
|
||||
standardHttpRequest: (options, callback)->
|
||||
request options, (err, reponse, body)->
|
||||
if err?
|
||||
callback(err)
|
||||
else
|
||||
callback()
|
||||
|
||||
pipeStreamFrom: (options, _callback)->
|
||||
callback = (args...) ->
|
||||
_callback(args...)
|
||||
_callback = () ->
|
||||
|
||||
if options.filePath == "/droppy/main.tex"
|
||||
request options.streamOrigin, (err,res, body)->
|
||||
logger.log options:options, body:body
|
||||
|
||||
origin = request(options.streamOrigin)
|
||||
|
||||
cancelled = false
|
||||
gotResponse = false
|
||||
origin.on 'response', (res) ->
|
||||
return if cancelled
|
||||
gotResponse = true
|
||||
if 200 <= res.statusCode < 300
|
||||
dest = request(options)
|
||||
origin.pipe(dest)
|
||||
|
||||
dest.on "error", (err)->
|
||||
logger.error err:err, options:options, "something went wrong in pipeStreamFrom dest"
|
||||
callback(err)
|
||||
|
||||
dest.on 'end', callback
|
||||
else
|
||||
error = new Error("received non-success status code: #{res.statusCode}")
|
||||
logger.error err: error, options: options, "something went wrong connecting to origin"
|
||||
callback(error)
|
||||
|
||||
origin.on 'error', (err)->
|
||||
return if cancelled
|
||||
gotResponse = true
|
||||
logger.error err:err, options:options, "something went wrong in pipeStreamFrom origin"
|
||||
callback(err)
|
||||
|
||||
setTimeout () ->
|
||||
return if gotResponse
|
||||
cancelled = true
|
||||
error = new Error("timeout")
|
||||
logger.error err: error, options: options, "timeout"
|
||||
callback(error)
|
||||
, 2000
|
||||
|
||||
|
||||
|
||||
workerRegistration = (groupKey, method, options, callback)->
|
||||
callback = _.once callback
|
||||
setTimeout callback, fiveMinutes
|
||||
metrics.inc "tpds-worker-processing"
|
||||
logger.log groupKey:groupKey, method:method, options:options, "processing http request from queue"
|
||||
processingFuncs[method] options, (err)->
|
||||
if err?
|
||||
logger.err err:err, user_id:groupKey, method:method, options:options, "something went wrong processing tpdsUpdateSender update"
|
||||
return callback("skip-after-retry")
|
||||
callback()
|
||||
|
||||
|
||||
setupWebToTpdsWorkers = (queueName)->
|
||||
logger.log worker_count:worker_count, queueName:queueName, "fairy workers"
|
||||
worker_count = 4
|
||||
while worker_count-- > 0
|
||||
workerQueueRef = require('fairy').connect(settings.redis.fairy).queue(queueName)
|
||||
workerQueueRef.polling_interval = 100
|
||||
workerQueueRef.regist workerRegistration
|
||||
|
||||
|
||||
cleanupPreviousQueues = (queueName, callback)->
|
||||
#cleanup queues then setup workers
|
||||
fairy = require('fairy').connect(settings.redis.fairy)
|
||||
queuePrefix = "FAIRY:QUEUED:#{queueName}:"
|
||||
fairy.redis.keys "#{queuePrefix}*", (err, keys)->
|
||||
logger.log "#{keys.length} fairy queues need cleanup"
|
||||
queueNames = keys.map (key)->
|
||||
key.replace queuePrefix, ""
|
||||
cleanupJobs = queueNames.map (projectQueueName)->
|
||||
return (cb)->
|
||||
cleanup = childProcess.fork(__dirname + '/cleanup.js', [queueName, projectQueueName])
|
||||
cleanup.on 'exit', cb
|
||||
async.series cleanupJobs, callback
|
||||
|
||||
|
||||
cleanupPreviousQueues keys.queue.web_to_tpds_http_requests, ->
|
||||
setupWebToTpdsWorkers keys.queue.web_to_tpds_http_requests
|
||||
|
||||
cleanupPreviousQueues keys.queue.tpds_to_web_http_requests, ->
|
||||
setupWebToTpdsWorkers keys.queue.tpds_to_web_http_requests
|
Loading…
Reference in a new issue