2014-08-07 10:45:19 +00:00
|
|
|
Settings = require('settings-sharelatex')
|
|
|
|
logger = require('logger-sharelatex')
|
2016-06-07 16:58:18 +00:00
|
|
|
Keys = require('./UpdateKeys')
|
2014-10-07 11:08:36 +00:00
|
|
|
redis = require("redis-sharelatex")
|
2018-04-27 14:45:28 +00:00
|
|
|
Errors = require("./Errors")
|
2014-10-07 11:08:36 +00:00
|
|
|
|
2014-08-07 10:45:19 +00:00
|
|
|
UpdateManager = require('./UpdateManager')
|
2014-08-07 12:16:11 +00:00
|
|
|
Metrics = require('./Metrics')
|
2017-05-26 11:10:57 +00:00
|
|
|
RateLimitManager = require('./RateLimitManager')
|
2014-08-07 10:45:19 +00:00
|
|
|
|
2014-08-11 13:16:05 +00:00
|
|
|
module.exports = DispatchManager =
|
2017-05-26 11:10:57 +00:00
|
|
|
createDispatcher: (RateLimiter) ->
|
2019-07-03 09:21:25 +00:00
|
|
|
client = redis.createClient(Settings.redis.documentupdater)
|
2014-08-07 10:45:19 +00:00
|
|
|
worker = {
|
|
|
|
client: client
|
2014-08-11 13:16:05 +00:00
|
|
|
_waitForUpdateThenDispatchWorker: (callback = (error) ->) ->
|
2014-08-07 12:16:11 +00:00
|
|
|
timer = new Metrics.Timer "worker.waiting"
|
2014-08-07 10:45:19 +00:00
|
|
|
worker.client.blpop "pending-updates-list", 0, (error, result) ->
|
2019-02-07 16:27:52 +00:00
|
|
|
logger.log("getting pending-updates-list", error, result)
|
2014-08-07 12:16:11 +00:00
|
|
|
timer.done()
|
2014-08-07 10:45:19 +00:00
|
|
|
return callback(error) if error?
|
|
|
|
return callback() if !result?
|
|
|
|
[list_name, doc_key] = result
|
|
|
|
[project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key)
|
2014-08-11 13:16:05 +00:00
|
|
|
# Dispatch this in the background
|
2017-05-26 11:10:57 +00:00
|
|
|
backgroundTask = (cb) ->
|
|
|
|
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
|
2018-04-27 14:45:28 +00:00
|
|
|
# log everything except OpRangeNotAvailable errors, these are normal
|
2018-04-27 15:03:46 +00:00
|
|
|
if error?
|
|
|
|
# downgrade OpRangeNotAvailable and "Delete component" errors so they are not sent to sentry
|
2019-05-07 13:15:26 +00:00
|
|
|
logAsWarning = (error instanceof Errors.OpRangeNotAvailableError) || (error instanceof Errors.DeleteMismatchError)
|
2018-04-27 15:03:46 +00:00
|
|
|
if logAsWarning
|
|
|
|
logger.warn err: error, project_id: project_id, doc_id: doc_id, "error processing update"
|
|
|
|
else
|
|
|
|
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update"
|
2017-05-26 11:10:57 +00:00
|
|
|
cb()
|
|
|
|
RateLimiter.run backgroundTask, callback
|
2014-08-07 10:45:19 +00:00
|
|
|
|
|
|
|
run: () ->
|
|
|
|
return if Settings.shuttingDown
|
2014-08-11 13:16:05 +00:00
|
|
|
worker._waitForUpdateThenDispatchWorker (error) =>
|
2014-08-07 10:45:19 +00:00
|
|
|
if error?
|
2014-08-11 13:16:05 +00:00
|
|
|
logger.error err: error, "Error in worker process"
|
|
|
|
throw error
|
2014-08-07 10:45:19 +00:00
|
|
|
else
|
|
|
|
worker.run()
|
|
|
|
}
|
|
|
|
|
|
|
|
return worker
|
|
|
|
|
2014-08-11 13:16:05 +00:00
|
|
|
createAndStartDispatchers: (number) ->
|
2017-05-26 11:10:57 +00:00
|
|
|
RateLimiter = new RateLimitManager(number)
|
2014-08-07 10:45:19 +00:00
|
|
|
for i in [1..number]
|
2017-05-26 11:10:57 +00:00
|
|
|
worker = DispatchManager.createDispatcher(RateLimiter)
|
2017-05-24 10:47:06 +00:00
|
|
|
worker.run()
|