add health check to pubsub channels

This commit is contained in:
Brian Gough 2019-04-15 14:05:26 +01:00
parent e72acacf17
commit 2dbdcf5bc8
6 changed files with 63 additions and 2 deletions

View file

@ -20,6 +20,7 @@ SessionSockets = require('session.socket.io')
CookieParser = require("cookie-parser")
DrainManager = require("./app/js/DrainManager")
HealthCheckManager = require("./app/js/HealthCheckManager")
# Set up socket.io server
app = express()
@ -64,6 +65,10 @@ app.get "/health_check/redis", (req, res, next) ->
if error?
logger.err {err: error}, "failed redis health check"
res.sendStatus 500
else if HealthCheckManager.isFailing()
status = HealthCheckManager.status()
logger.err {pubSubErrors: status}, "failed pubsub health check"
res.sendStatus 500
else
res.sendStatus 200
@ -130,8 +135,9 @@ if Settings.continualPubsubTraffic
pubSubClient = redis.createClient(Settings.redis.documentupdater)
publishJob = (channel, cb)->
json = JSON.stringify({health_check:true, date: new Date().toString()})
checker = new HealthCheckManager(channel)
logger.debug {channel:channel}, "sending pub to keep connection alive"
json = JSON.stringify({health_check:true, key: checker.id, date: new Date().toString()})
pubSubClient.publish channel, json, (err)->
if err?
logger.err {err, channel}, "error publishing pubsub traffic to redis"
@ -139,7 +145,7 @@ if Settings.continualPubsubTraffic
runPubSubTraffic = ->
async.map ["applied-ops", "editor-events"], publishJob, (err)->
setTimeout(runPubSubTraffic, 1000 * 60)
setTimeout(runPubSubTraffic, 1000 * 20)
runPubSubTraffic()

View file

@ -4,6 +4,7 @@ redis = require("redis-sharelatex")
rclient = redis.createClient(settings.redis.documentupdater)
SafeJsonParse = require "./SafeJsonParse"
EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager"
metrics = require "metrics-sharelatex"
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
@ -34,6 +35,7 @@ module.exports = DocumentUpdaterController =
DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message)
else if message.health_check?
logger.debug {message}, "got health check message in applied ops channel"
HealthCheckManager.check channel, message.key
_applyUpdateFromDocumentUpdater: (io, doc_id, update) ->
clientList = io.sockets.clients(doc_id)

View file

@ -0,0 +1,49 @@
metrics = require "metrics-sharelatex"
os = require "os"
HOST = os.hostname()
PID = process.pid
COUNT = 0
CHANNEL_MANAGER = {} # hash of event checkers by channel name
CHANNEL_ERROR = {} # error status by channel name
module.exports = class HealthCheckManager
# create an instance of this class which checks that an event with a unique
# id is received only once within a timeout
constructor: (@channel, timeout = 1000) ->
# unique event string
@id = "host=#{HOST}:pid=#{PID}:count=#{COUNT++}"
# count of number of times the event is received
@count = 0
# after a timeout check the status of the count
@handler = setTimeout () =>
@setStatus()
, timeout
# use a timer to record the latency of the channel
@timer = new metrics.Timer("event.#{@channel}.latency")
# keep a record of these objects to dispatch on
CHANNEL_MANAGER[@channel] = @
processEvent: (id) ->
# if this is our event record it
if id == @id
@count++
@timer?.done()
@timer = null # only time the latency of the first event
setStatus: () ->
# if we saw the event anything other than a single time that is an error
error = (@count != 1)
CHANNEL_ERROR[@channel] = error
# class methods
@check: (channel, id) ->
# dispatch event to manager for channel
CHANNEL_MANAGER[channel]?.processEvent id
@status: () ->
# return status of all channels for logging
return CHANNEL_ERROR
@isFailing: () ->
# check if any channel status is bad
for channel, error of CHANNEL_ERROR
return true if error is true
return false

View file

@ -5,6 +5,7 @@ SafeJsonParse = require "./SafeJsonParse"
rclientPub = redis.createClient(Settings.redis.realtime)
rclientSub = redis.createClient(Settings.redis.realtime)
EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager"
module.exports = WebsocketLoadBalancer =
rclientPub: rclientPub
@ -53,4 +54,5 @@ module.exports = WebsocketLoadBalancer =
client.emit(message.message, message.payload...)
else if message.health_check?
logger.debug {message}, "got health check message in editor events channel"
HealthCheckManager.check channel, message.key

View file

@ -23,6 +23,7 @@ describe "DocumentUpdaterController", ->
"./SafeJsonParse": @SafeJsonParse =
parse: (data, cb) => cb null, JSON.parse(data)
"./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()}
"./HealthCheckManager": {check: sinon.stub()}
"metrics-sharelatex": @metrics = {inc: sinon.stub()}
describe "listenForUpdatesFromDocumentUpdater", ->

View file

@ -13,6 +13,7 @@ describe "WebsocketLoadBalancer", ->
"./SafeJsonParse": @SafeJsonParse =
parse: (data, cb) => cb null, JSON.parse(data)
"./EventLogger": {checkEventOrder: sinon.stub()}
"./HealthCheckManager": {check: sinon.stub()}
@io = {}
@WebsocketLoadBalancer.rclientPub = publish: sinon.stub()
@WebsocketLoadBalancer.rclientSub =