From 2000f478a7a657b81e31d64064b41e18a7e0b5a2 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 13 Aug 2019 10:39:52 +0100 Subject: [PATCH 1/3] refresh the client list on demand --- .../app/coffee/ConnectedUsersManager.coffee | 14 +++++++++++++- .../app/coffee/WebsocketController.coffee | 13 ++++++++----- .../app/coffee/WebsocketLoadBalancer.coffee | 6 ++++++ 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/services/real-time/app/coffee/ConnectedUsersManager.coffee b/services/real-time/app/coffee/ConnectedUsersManager.coffee index fe7d7efb85..085a47dc28 100644 --- a/services/real-time/app/coffee/ConnectedUsersManager.coffee +++ b/services/real-time/app/coffee/ConnectedUsersManager.coffee @@ -10,6 +10,7 @@ ONE_DAY_IN_S = ONE_HOUR_IN_S * 24 FOUR_DAYS_IN_S = ONE_DAY_IN_S * 4 USER_TIMEOUT_IN_S = ONE_HOUR_IN_S / 4 +REFRESH_TIMEOUT_IN_S = 10 # only show clients which have responded to a refresh request in the last 10 seconds module.exports = # Use the same method for when a user connects, and when a user sends a cursor @@ -38,6 +39,16 @@ module.exports = logger.err err:err, project_id:project_id, client_id:client_id, "problem marking user as connected" callback(err) + refreshClient: (project_id, client_id, callback = (err) ->) -> + logger.log project_id:project_id, client_id:client_id, "refreshing connected client" + multi = rclient.multi() + multi.hset Keys.connectedUser({project_id, client_id}), "last_updated_at", Date.now() + multi.expire Keys.connectedUser({project_id, client_id}), USER_TIMEOUT_IN_S + multi.exec (err)-> + if err? + logger.err err:err, project_id:project_id, client_id:client_id, "problem refreshing connected client" + callback(err) + markUserAsDisconnected: (project_id, client_id, callback)-> logger.log project_id:project_id, client_id:client_id, "marking user as disconnected" multi = rclient.multi() @@ -56,6 +67,7 @@ module.exports = else result.connected = true result.client_id = client_id + result.client_age = (Date.now() - parseInt(result.last_updated_at,10)) / 1000 if result.cursorData? try result.cursorData = JSON.parse(result.cursorData) @@ -74,6 +86,6 @@ module.exports = async.series jobs, (err, users = [])-> return callback(err) if err? users = users.filter (user) -> - user?.connected + user?.connected && user?.client_age < REFRESH_TIMEOUT_IN_S callback null, users diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index fce505c1bc..8a09f534bb 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -175,6 +175,7 @@ module.exports = WebsocketController = }, callback) WebsocketLoadBalancer.emitToRoom(project_id, "clientTracking.clientUpdated", cursorData) + CLIENT_REFRESH_DELAY: 1000 getConnectedUsers: (client, callback = (error, users) ->) -> metrics.inc "editor.get-connected-users" Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) -> @@ -183,11 +184,13 @@ module.exports = WebsocketController = logger.log {user_id, project_id, client_id: client.id}, "getting connected users" AuthorizationManager.assertClientCanViewProject client, (error) -> return callback(error) if error? - ConnectedUsersManager.getConnectedUsers project_id, (error, users) -> - return callback(error) if error? - callback null, users - logger.log {user_id, project_id, client_id: client.id}, "got connected users" - + WebsocketLoadBalancer.emitToRoom project_id, 'clientTracking.refresh', project_id + setTimeout () -> + ConnectedUsersManager.getConnectedUsers project_id, (error, users) -> + return callback(error) if error? + callback null, users + logger.log {user_id, project_id, client_id: client.id}, "got connected users" + , WebsocketController.CLIENT_REFRESH_DELAY applyOtUpdate: (client, doc_id, update, callback = (error) ->) -> Utils.getClientAttributes client, ["user_id", "project_id"], (error, {user_id, project_id}) -> diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index 865249c63e..45a7645005 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -6,6 +6,7 @@ EventLogger = require "./EventLogger" HealthCheckManager = require "./HealthCheckManager" RoomManager = require "./RoomManager" ChannelManager = require "./ChannelManager" +ConnectedUsersManager = require "./ConnectedUsersManager" module.exports = WebsocketLoadBalancer = rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub) @@ -54,6 +55,11 @@ module.exports = WebsocketLoadBalancer = return if message.room_id == "all" io.sockets.emit(message.message, message.payload...) + else if message.message is 'clientTracking.refresh' && message.room_id? + clientList = io.sockets.clients(message.room_id) + logger.log {channel:channel, message: message.message, room_id: message.room_id, message_id: message._id, socketIoClients: (client.id for client in clientList)}, "refreshing client list" + for client in clientList + ConnectedUsersManager.refreshClient(message.room_id, client.id) else if message.room_id? if message._id? && Settings.checkEventOrder status = EventLogger.checkEventOrder("editor-events", message._id, message) From d3171e4e2ea715d2c84badc0f444f1597ac4b951 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 14 Aug 2019 13:03:06 +0100 Subject: [PATCH 2/3] remove unwanted argument --- services/real-time/app/coffee/WebsocketController.coffee | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index 8a09f534bb..5b03f334d5 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -184,7 +184,7 @@ module.exports = WebsocketController = logger.log {user_id, project_id, client_id: client.id}, "getting connected users" AuthorizationManager.assertClientCanViewProject client, (error) -> return callback(error) if error? - WebsocketLoadBalancer.emitToRoom project_id, 'clientTracking.refresh', project_id + WebsocketLoadBalancer.emitToRoom project_id, 'clientTracking.refresh' setTimeout () -> ConnectedUsersManager.getConnectedUsers project_id, (error, users) -> return callback(error) if error? From d57b229e170615b2f345ecde1d6a7693b20a7799 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 14 Aug 2019 13:03:14 +0100 Subject: [PATCH 3/3] update tests --- .../coffee/ConnectedUsersManagerTests.coffee | 16 ++++++++-------- .../unit/coffee/WebsocketControllerTests.coffee | 14 +++++++++++--- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/services/real-time/test/unit/coffee/ConnectedUsersManagerTests.coffee b/services/real-time/test/unit/coffee/ConnectedUsersManagerTests.coffee index 7d83cf1a0d..e88b1f27ad 100644 --- a/services/real-time/test/unit/coffee/ConnectedUsersManagerTests.coffee +++ b/services/real-time/test/unit/coffee/ConnectedUsersManagerTests.coffee @@ -147,18 +147,18 @@ describe "ConnectedUsersManager", -> describe "getConnectedUsers", -> beforeEach -> - @users = ["1234", "5678", "9123"] + @users = ["1234", "5678", "9123", "8234"] @rClient.smembers.callsArgWith(1, null, @users) @ConnectedUsersManager._getConnectedUser = sinon.stub() - @ConnectedUsersManager._getConnectedUser.withArgs(@project_id, @users[0]).callsArgWith(2, null, {connected:true, client_id:@users[0]}) - @ConnectedUsersManager._getConnectedUser.withArgs(@project_id, @users[1]).callsArgWith(2, null, {connected:false, client_id:@users[1]}) - @ConnectedUsersManager._getConnectedUser.withArgs(@project_id, @users[2]).callsArgWith(2, null, {connected:true, client_id:@users[2]}) + @ConnectedUsersManager._getConnectedUser.withArgs(@project_id, @users[0]).callsArgWith(2, null, {connected:true, client_age: 2, client_id:@users[0]}) + @ConnectedUsersManager._getConnectedUser.withArgs(@project_id, @users[1]).callsArgWith(2, null, {connected:false, client_age: 1, client_id:@users[1]}) + @ConnectedUsersManager._getConnectedUser.withArgs(@project_id, @users[2]).callsArgWith(2, null, {connected:true, client_age: 3, client_id:@users[2]}) + @ConnectedUsersManager._getConnectedUser.withArgs(@project_id, @users[3]).callsArgWith(2, null, {connected:true, client_age: 11, client_id:@users[3]}) # connected but old - - it "should only return the users in the list which are still in redis", (done)-> + it "should only return the users in the list which are still in redis and recently updated", (done)-> @ConnectedUsersManager.getConnectedUsers @project_id, (err, users)=> users.length.should.equal 2 - users[0].should.deep.equal {client_id:@users[0], connected:true} - users[1].should.deep.equal {client_id:@users[2], connected:true} + users[0].should.deep.equal {client_id:@users[0], client_age: 2, connected:true} + users[1].should.deep.equal {client_id:@users[2], client_age: 3, connected:true} done() diff --git a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee index d0dad108e7..edb0055bf5 100644 --- a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee @@ -355,18 +355,26 @@ describe 'WebsocketController', -> beforeEach -> @client.params.project_id = @project_id @users = ["mock", "users"] + @WebsocketLoadBalancer.emitToRoom = sinon.stub() @ConnectedUsersManager.getConnectedUsers = sinon.stub().callsArgWith(1, null, @users) describe "when authorized", -> - beforeEach -> + beforeEach (done) -> @AuthorizationManager.assertClientCanViewProject = sinon.stub().callsArgWith(1, null) - @WebsocketController.getConnectedUsers @client, @callback + @WebsocketController.getConnectedUsers @client, (args...) => + @callback(args...) + done() it "should check that the client is authorized to view the project", -> @AuthorizationManager.assertClientCanViewProject .calledWith(@client) .should.equal true - + + it "should broadcast a request to update the client list", -> + @WebsocketLoadBalancer.emitToRoom + .calledWith(@project_id, "clientTracking.refresh") + .should.equal true + it "should get the connected users for the project", -> @ConnectedUsersManager.getConnectedUsers .calledWith(@project_id)