From 230203eadf09a444f2120bc6c1ff51b693010381 Mon Sep 17 00:00:00 2001 From: James Allen Date: Wed, 19 Nov 2014 11:01:02 +0000 Subject: [PATCH] Add in robust heartbeat driven subscription model --- libraries/redis-wrapper/index.coffee | 38 +++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/libraries/redis-wrapper/index.coffee b/libraries/redis-wrapper/index.coffee index 1135d90c36..a72613a0b6 100644 --- a/libraries/redis-wrapper/index.coffee +++ b/libraries/redis-wrapper/index.coffee @@ -1,8 +1,7 @@ _ = require("underscore") -module.exports = - - createClient: (opts)-> +module.exports = RedisSharelatex = + createClient: (opts = {port: 6379, host: "localhost"})-> if opts.password? opts.auth_pass = opts.password delete opts.password @@ -17,5 +16,38 @@ module.exports = delete standardOpts.host client = require("redis").createClient opts.port, opts.host, standardOpts return client + + createRobustSubscriptionClient: (opts, heartbeatOpts = {}) -> + sub = RedisSharelatex.createClient(opts) + pub = RedisSharelatex.createClient(opts) + + heartbeatInterval = heartbeatOpts.heartbeat_interval or 1000 #ms + reconnectAfter = heartbeatOpts.reconnect_after or 5000 #ms + + id = require("crypto").createHash("md5").update(Math.random().toString()).digest("hex") + heartbeatChannel = "heartbeat-#{id}" + lastHeartbeat = Date.now() + + sub.subscribe heartbeatChannel, (error) -> + if error? + console.error "ERROR: failed to subscribe to #{heartbeatChannel} channel", error + sub.on "message", (channel, message) -> + if channel == heartbeatChannel + lastHeartbeat = Date.now() + + reconnectIfInactive = () -> + timeSinceLastHeartBeat = Date.now() - lastHeartbeat + if timeSinceLastHeartBeat > reconnectAfter + console.warn "No heartbeat for #{timeSinceLastHeartBeat}ms, reconnecting" + sub.connection_gone("no heartbeat for #{timeSinceLastHeartBeat}ms") + + setInterval () -> + pub.publish heartbeatChannel, "PING" + reconnectIfInactive() + , heartbeatInterval + + return sub + +