mirror of
https://github.com/overleaf/overleaf.git
synced 2024-12-26 03:30:49 +00:00
Merge pull request #44 from sharelatex/bg-add-pubsub-health-check
add health check to pubsub channels
This commit is contained in:
commit
6a7679d55c
6 changed files with 63 additions and 2 deletions
|
@ -20,6 +20,7 @@ SessionSockets = require('session.socket.io')
|
|||
CookieParser = require("cookie-parser")
|
||||
|
||||
DrainManager = require("./app/js/DrainManager")
|
||||
HealthCheckManager = require("./app/js/HealthCheckManager")
|
||||
|
||||
# Set up socket.io server
|
||||
app = express()
|
||||
|
@ -64,6 +65,10 @@ app.get "/health_check/redis", (req, res, next) ->
|
|||
if error?
|
||||
logger.err {err: error}, "failed redis health check"
|
||||
res.sendStatus 500
|
||||
else if HealthCheckManager.isFailing()
|
||||
status = HealthCheckManager.status()
|
||||
logger.err {pubSubErrors: status}, "failed pubsub health check"
|
||||
res.sendStatus 500
|
||||
else
|
||||
res.sendStatus 200
|
||||
|
||||
|
@ -130,8 +135,9 @@ if Settings.continualPubsubTraffic
|
|||
pubSubClient = redis.createClient(Settings.redis.documentupdater)
|
||||
|
||||
publishJob = (channel, cb)->
|
||||
json = JSON.stringify({health_check:true, date: new Date().toString()})
|
||||
checker = new HealthCheckManager(channel)
|
||||
logger.debug {channel:channel}, "sending pub to keep connection alive"
|
||||
json = JSON.stringify({health_check:true, key: checker.id, date: new Date().toString()})
|
||||
pubSubClient.publish channel, json, (err)->
|
||||
if err?
|
||||
logger.err {err, channel}, "error publishing pubsub traffic to redis"
|
||||
|
@ -139,7 +145,7 @@ if Settings.continualPubsubTraffic
|
|||
|
||||
runPubSubTraffic = ->
|
||||
async.map ["applied-ops", "editor-events"], publishJob, (err)->
|
||||
setTimeout(runPubSubTraffic, 1000 * 60)
|
||||
setTimeout(runPubSubTraffic, 1000 * 20)
|
||||
|
||||
runPubSubTraffic()
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ redis = require("redis-sharelatex")
|
|||
rclient = redis.createClient(settings.redis.documentupdater)
|
||||
SafeJsonParse = require "./SafeJsonParse"
|
||||
EventLogger = require "./EventLogger"
|
||||
HealthCheckManager = require "./HealthCheckManager"
|
||||
metrics = require "metrics-sharelatex"
|
||||
|
||||
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
|
||||
|
@ -34,6 +35,7 @@ module.exports = DocumentUpdaterController =
|
|||
DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message)
|
||||
else if message.health_check?
|
||||
logger.debug {message}, "got health check message in applied ops channel"
|
||||
HealthCheckManager.check channel, message.key
|
||||
|
||||
_applyUpdateFromDocumentUpdater: (io, doc_id, update) ->
|
||||
clientList = io.sockets.clients(doc_id)
|
||||
|
|
49
services/real-time/app/coffee/HealthCheckManager.coffee
Normal file
49
services/real-time/app/coffee/HealthCheckManager.coffee
Normal file
|
@ -0,0 +1,49 @@
|
|||
metrics = require "metrics-sharelatex"
|
||||
|
||||
os = require "os"
|
||||
HOST = os.hostname()
|
||||
PID = process.pid
|
||||
COUNT = 0
|
||||
|
||||
CHANNEL_MANAGER = {} # hash of event checkers by channel name
|
||||
CHANNEL_ERROR = {} # error status by channel name
|
||||
|
||||
module.exports = class HealthCheckManager
|
||||
# create an instance of this class which checks that an event with a unique
|
||||
# id is received only once within a timeout
|
||||
constructor: (@channel, timeout = 1000) ->
|
||||
# unique event string
|
||||
@id = "host=#{HOST}:pid=#{PID}:count=#{COUNT++}"
|
||||
# count of number of times the event is received
|
||||
@count = 0
|
||||
# after a timeout check the status of the count
|
||||
@handler = setTimeout () =>
|
||||
@setStatus()
|
||||
, timeout
|
||||
# use a timer to record the latency of the channel
|
||||
@timer = new metrics.Timer("event.#{@channel}.latency")
|
||||
# keep a record of these objects to dispatch on
|
||||
CHANNEL_MANAGER[@channel] = @
|
||||
processEvent: (id) ->
|
||||
# if this is our event record it
|
||||
if id == @id
|
||||
@count++
|
||||
@timer?.done()
|
||||
@timer = null # only time the latency of the first event
|
||||
setStatus: () ->
|
||||
# if we saw the event anything other than a single time that is an error
|
||||
error = (@count != 1)
|
||||
CHANNEL_ERROR[@channel] = error
|
||||
|
||||
# class methods
|
||||
@check: (channel, id) ->
|
||||
# dispatch event to manager for channel
|
||||
CHANNEL_MANAGER[channel]?.processEvent id
|
||||
@status: () ->
|
||||
# return status of all channels for logging
|
||||
return CHANNEL_ERROR
|
||||
@isFailing: () ->
|
||||
# check if any channel status is bad
|
||||
for channel, error of CHANNEL_ERROR
|
||||
return true if error is true
|
||||
return false
|
|
@ -5,6 +5,7 @@ SafeJsonParse = require "./SafeJsonParse"
|
|||
rclientPub = redis.createClient(Settings.redis.realtime)
|
||||
rclientSub = redis.createClient(Settings.redis.realtime)
|
||||
EventLogger = require "./EventLogger"
|
||||
HealthCheckManager = require "./HealthCheckManager"
|
||||
|
||||
module.exports = WebsocketLoadBalancer =
|
||||
rclientPub: rclientPub
|
||||
|
@ -53,4 +54,5 @@ module.exports = WebsocketLoadBalancer =
|
|||
client.emit(message.message, message.payload...)
|
||||
else if message.health_check?
|
||||
logger.debug {message}, "got health check message in editor events channel"
|
||||
HealthCheckManager.check channel, message.key
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ describe "DocumentUpdaterController", ->
|
|||
"./SafeJsonParse": @SafeJsonParse =
|
||||
parse: (data, cb) => cb null, JSON.parse(data)
|
||||
"./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()}
|
||||
"./HealthCheckManager": {check: sinon.stub()}
|
||||
"metrics-sharelatex": @metrics = {inc: sinon.stub()}
|
||||
|
||||
describe "listenForUpdatesFromDocumentUpdater", ->
|
||||
|
|
|
@ -13,6 +13,7 @@ describe "WebsocketLoadBalancer", ->
|
|||
"./SafeJsonParse": @SafeJsonParse =
|
||||
parse: (data, cb) => cb null, JSON.parse(data)
|
||||
"./EventLogger": {checkEventOrder: sinon.stub()}
|
||||
"./HealthCheckManager": {check: sinon.stub()}
|
||||
@io = {}
|
||||
@WebsocketLoadBalancer.rclientPub = publish: sinon.stub()
|
||||
@WebsocketLoadBalancer.rclientSub =
|
||||
|
|
Loading…
Reference in a new issue