refresh the client list on demand

This commit is contained in:
Brian Gough 2019-08-13 10:39:52 +01:00
parent 49c7bde799
commit 2000f478a7
3 changed files with 27 additions and 6 deletions

View file

@ -10,6 +10,7 @@ ONE_DAY_IN_S = ONE_HOUR_IN_S * 24
FOUR_DAYS_IN_S = ONE_DAY_IN_S * 4 FOUR_DAYS_IN_S = ONE_DAY_IN_S * 4
USER_TIMEOUT_IN_S = ONE_HOUR_IN_S / 4 USER_TIMEOUT_IN_S = ONE_HOUR_IN_S / 4
REFRESH_TIMEOUT_IN_S = 10 # only show clients which have responded to a refresh request in the last 10 seconds
module.exports = module.exports =
# Use the same method for when a user connects, and when a user sends a cursor # Use the same method for when a user connects, and when a user sends a cursor
@ -38,6 +39,16 @@ module.exports =
logger.err err:err, project_id:project_id, client_id:client_id, "problem marking user as connected" logger.err err:err, project_id:project_id, client_id:client_id, "problem marking user as connected"
callback(err) callback(err)
refreshClient: (project_id, client_id, callback = (err) ->) ->
logger.log project_id:project_id, client_id:client_id, "refreshing connected client"
multi = rclient.multi()
multi.hset Keys.connectedUser({project_id, client_id}), "last_updated_at", Date.now()
multi.expire Keys.connectedUser({project_id, client_id}), USER_TIMEOUT_IN_S
multi.exec (err)->
if err?
logger.err err:err, project_id:project_id, client_id:client_id, "problem refreshing connected client"
callback(err)
markUserAsDisconnected: (project_id, client_id, callback)-> markUserAsDisconnected: (project_id, client_id, callback)->
logger.log project_id:project_id, client_id:client_id, "marking user as disconnected" logger.log project_id:project_id, client_id:client_id, "marking user as disconnected"
multi = rclient.multi() multi = rclient.multi()
@ -56,6 +67,7 @@ module.exports =
else else
result.connected = true result.connected = true
result.client_id = client_id result.client_id = client_id
result.client_age = (Date.now() - parseInt(result.last_updated_at,10)) / 1000
if result.cursorData? if result.cursorData?
try try
result.cursorData = JSON.parse(result.cursorData) result.cursorData = JSON.parse(result.cursorData)
@ -74,6 +86,6 @@ module.exports =
async.series jobs, (err, users = [])-> async.series jobs, (err, users = [])->
return callback(err) if err? return callback(err) if err?
users = users.filter (user) -> users = users.filter (user) ->
user?.connected user?.connected && user?.client_age < REFRESH_TIMEOUT_IN_S
callback null, users callback null, users

View file

@ -175,6 +175,7 @@ module.exports = WebsocketController =
}, callback) }, callback)
WebsocketLoadBalancer.emitToRoom(project_id, "clientTracking.clientUpdated", cursorData) WebsocketLoadBalancer.emitToRoom(project_id, "clientTracking.clientUpdated", cursorData)
CLIENT_REFRESH_DELAY: 1000
getConnectedUsers: (client, callback = (error, users) ->) -> getConnectedUsers: (client, callback = (error, users) ->) ->
metrics.inc "editor.get-connected-users" metrics.inc "editor.get-connected-users"
Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) -> Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) ->
@ -183,11 +184,13 @@ module.exports = WebsocketController =
logger.log {user_id, project_id, client_id: client.id}, "getting connected users" logger.log {user_id, project_id, client_id: client.id}, "getting connected users"
AuthorizationManager.assertClientCanViewProject client, (error) -> AuthorizationManager.assertClientCanViewProject client, (error) ->
return callback(error) if error? return callback(error) if error?
WebsocketLoadBalancer.emitToRoom project_id, 'clientTracking.refresh', project_id
setTimeout () ->
ConnectedUsersManager.getConnectedUsers project_id, (error, users) -> ConnectedUsersManager.getConnectedUsers project_id, (error, users) ->
return callback(error) if error? return callback(error) if error?
callback null, users callback null, users
logger.log {user_id, project_id, client_id: client.id}, "got connected users" logger.log {user_id, project_id, client_id: client.id}, "got connected users"
, WebsocketController.CLIENT_REFRESH_DELAY
applyOtUpdate: (client, doc_id, update, callback = (error) ->) -> applyOtUpdate: (client, doc_id, update, callback = (error) ->) ->
Utils.getClientAttributes client, ["user_id", "project_id"], (error, {user_id, project_id}) -> Utils.getClientAttributes client, ["user_id", "project_id"], (error, {user_id, project_id}) ->

View file

@ -6,6 +6,7 @@ EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager" HealthCheckManager = require "./HealthCheckManager"
RoomManager = require "./RoomManager" RoomManager = require "./RoomManager"
ChannelManager = require "./ChannelManager" ChannelManager = require "./ChannelManager"
ConnectedUsersManager = require "./ConnectedUsersManager"
module.exports = WebsocketLoadBalancer = module.exports = WebsocketLoadBalancer =
rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub) rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub)
@ -54,6 +55,11 @@ module.exports = WebsocketLoadBalancer =
return return
if message.room_id == "all" if message.room_id == "all"
io.sockets.emit(message.message, message.payload...) io.sockets.emit(message.message, message.payload...)
else if message.message is 'clientTracking.refresh' && message.room_id?
clientList = io.sockets.clients(message.room_id)
logger.log {channel:channel, message: message.message, room_id: message.room_id, message_id: message._id, socketIoClients: (client.id for client in clientList)}, "refreshing client list"
for client in clientList
ConnectedUsersManager.refreshClient(message.room_id, client.id)
else if message.room_id? else if message.room_id?
if message._id? && Settings.checkEventOrder if message._id? && Settings.checkEventOrder
status = EventLogger.checkEventOrder("editor-events", message._id, message) status = EventLogger.checkEventOrder("editor-events", message._id, message)