diff --git a/services/real-time/app.coffee b/services/real-time/app.coffee index f468ad2d49..6161e63a46 100644 --- a/services/real-time/app.coffee +++ b/services/real-time/app.coffee @@ -20,6 +20,7 @@ SessionSockets = require('session.socket.io') CookieParser = require("cookie-parser") DrainManager = require("./app/js/DrainManager") +HealthCheckManager = require("./app/js/HealthCheckManager") # Set up socket.io server app = express() @@ -64,6 +65,10 @@ app.get "/health_check/redis", (req, res, next) -> if error? logger.err {err: error}, "failed redis health check" res.sendStatus 500 + else if HealthCheckManager.isFailing() + status = HealthCheckManager.status() + logger.err {pubSubErrors: status}, "failed pubsub health check" + res.sendStatus 500 else res.sendStatus 200 @@ -130,8 +135,9 @@ if Settings.continualPubsubTraffic pubSubClient = redis.createClient(Settings.redis.documentupdater) publishJob = (channel, cb)-> - json = JSON.stringify({health_check:true, date: new Date().toString()}) + checker = new HealthCheckManager(channel) logger.debug {channel:channel}, "sending pub to keep connection alive" + json = JSON.stringify({health_check:true, key: checker.id, date: new Date().toString()}) pubSubClient.publish channel, json, (err)-> if err? logger.err {err, channel}, "error publishing pubsub traffic to redis" @@ -139,7 +145,7 @@ if Settings.continualPubsubTraffic runPubSubTraffic = -> async.map ["applied-ops", "editor-events"], publishJob, (err)-> - setTimeout(runPubSubTraffic, 1000 * 60) + setTimeout(runPubSubTraffic, 1000 * 20) runPubSubTraffic() diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index c4367cd652..1eb5d21274 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -4,6 +4,7 @@ redis = require("redis-sharelatex") rclient = redis.createClient(settings.redis.documentupdater) SafeJsonParse = require "./SafeJsonParse" EventLogger = require "./EventLogger" +HealthCheckManager = require "./HealthCheckManager" metrics = require "metrics-sharelatex" MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb @@ -34,6 +35,7 @@ module.exports = DocumentUpdaterController = DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message) else if message.health_check? logger.debug {message}, "got health check message in applied ops channel" + HealthCheckManager.check channel, message.key _applyUpdateFromDocumentUpdater: (io, doc_id, update) -> clientList = io.sockets.clients(doc_id) diff --git a/services/real-time/app/coffee/HealthCheckManager.coffee b/services/real-time/app/coffee/HealthCheckManager.coffee new file mode 100644 index 0000000000..cd6b617b9b --- /dev/null +++ b/services/real-time/app/coffee/HealthCheckManager.coffee @@ -0,0 +1,49 @@ +metrics = require "metrics-sharelatex" + +os = require "os" +HOST = os.hostname() +PID = process.pid +COUNT = 0 + +CHANNEL_MANAGER = {} # hash of event checkers by channel name +CHANNEL_ERROR = {} # error status by channel name + +module.exports = class HealthCheckManager + # create an instance of this class which checks that an event with a unique + # id is received only once within a timeout + constructor: (@channel, timeout = 1000) -> + # unique event string + @id = "host=#{HOST}:pid=#{PID}:count=#{COUNT++}" + # count of number of times the event is received + @count = 0 + # after a timeout check the status of the count + @handler = setTimeout () => + @setStatus() + , timeout + # use a timer to record the latency of the channel + @timer = new metrics.Timer("event.#{@channel}.latency") + # keep a record of these objects to dispatch on + CHANNEL_MANAGER[@channel] = @ + processEvent: (id) -> + # if this is our event record it + if id == @id + @count++ + @timer?.done() + @timer = null # only time the latency of the first event + setStatus: () -> + # if we saw the event anything other than a single time that is an error + error = (@count != 1) + CHANNEL_ERROR[@channel] = error + + # class methods + @check: (channel, id) -> + # dispatch event to manager for channel + CHANNEL_MANAGER[channel]?.processEvent id + @status: () -> + # return status of all channels for logging + return CHANNEL_ERROR + @isFailing: () -> + # check if any channel status is bad + for channel, error of CHANNEL_ERROR + return true if error is true + return false diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index 8fed080b68..ffe6e820ca 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -5,6 +5,7 @@ SafeJsonParse = require "./SafeJsonParse" rclientPub = redis.createClient(Settings.redis.realtime) rclientSub = redis.createClient(Settings.redis.realtime) EventLogger = require "./EventLogger" +HealthCheckManager = require "./HealthCheckManager" module.exports = WebsocketLoadBalancer = rclientPub: rclientPub @@ -53,4 +54,5 @@ module.exports = WebsocketLoadBalancer = client.emit(message.message, message.payload...) else if message.health_check? logger.debug {message}, "got health check message in editor events channel" + HealthCheckManager.check channel, message.key diff --git a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee index c104646696..5ed274b36c 100644 --- a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee +++ b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee @@ -23,6 +23,7 @@ describe "DocumentUpdaterController", -> "./SafeJsonParse": @SafeJsonParse = parse: (data, cb) => cb null, JSON.parse(data) "./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()} + "./HealthCheckManager": {check: sinon.stub()} "metrics-sharelatex": @metrics = {inc: sinon.stub()} describe "listenForUpdatesFromDocumentUpdater", -> diff --git a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee index 38c317e9fe..e786038ab7 100644 --- a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee @@ -13,6 +13,7 @@ describe "WebsocketLoadBalancer", -> "./SafeJsonParse": @SafeJsonParse = parse: (data, cb) => cb null, JSON.parse(data) "./EventLogger": {checkEventOrder: sinon.stub()} + "./HealthCheckManager": {check: sinon.stub()} @io = {} @WebsocketLoadBalancer.rclientPub = publish: sinon.stub() @WebsocketLoadBalancer.rclientSub =