Distribute server side socket.io updates over Redis Pub/Sub

This commit is contained in:
James Allen 2014-11-13 16:03:37 +00:00
parent e769819521
commit cc1c85ebf8
7 changed files with 150 additions and 18 deletions

View file

@ -41,6 +41,9 @@ io.configure ->
Router = require "./app/js/Router"
Router.configure(app, io, sessionSockets)
WebsocketLoadBalancer = require "./app/js/WebsocketLoadBalancer"
WebsocketLoadBalancer.listenForEditorEvents(io)
port = Settings.internal.realTime.port
host = Settings.internal.realTime.host

View file

@ -61,7 +61,6 @@ module.exports =
else
result.connected = true
result.client_id = client_id
console.log "RESULT", result
if result.cursorData?
try
result.cursorData = JSON.parse(result.cursorData)

View file

@ -3,6 +3,7 @@ WebApiManager = require "./WebApiManager"
AuthorizationManager = require "./AuthorizationManager"
DocumentUpdaterManager = require "./DocumentUpdaterManager"
ConnectedUsersManager = require "./ConnectedUsersManager"
WebsocketLoadBalancer = require "./WebsocketLoadBalancer"
Utils = require "./Utils"
module.exports = WebsocketController =
@ -94,8 +95,7 @@ module.exports = WebsocketController =
else
cursorData.name = "Anonymous"
callback()
#EditorRealTimeController.emitToRoom(project_id, "clientTracking.clientUpdated", cursorData)
#callback()
WebsocketLoadBalancer.emitToRoom(project_id, "clientTracking.clientUpdated", cursorData)
getConnectedUsers: (client, callback = (error, users) ->) ->
Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) ->

View file

@ -0,0 +1,30 @@
Settings = require 'settings-sharelatex'
redis = require("redis-sharelatex")
rclientPub = redis.createClient(Settings.redis.web)
rclientSub = redis.createClient(Settings.redis.web)
module.exports = WebsocketLoadBalancer =
rclientPub: rclientPub
rclientSub: rclientSub
emitToRoom: (room_id, message, payload...) ->
@rclientPub.publish "editor-events", JSON.stringify
room_id: room_id
message: message
payload: payload
emitToAll: (message, payload...) ->
@emitToRoom "all", message, payload...
listenForEditorEvents: (io) ->
@rclientSub.subscribe "editor-events"
@rclientSub.on "message", (channel, message) ->
WebsocketLoadBalancer._processEditorEvent io, channel, message
_processEditorEvent: (io, channel, message) ->
message = JSON.parse(message)
if message.room_id == "all"
io.sockets.emit(message.message, message.payload...)
else
io.sockets.in(message.room_id).emit(message.message, message.payload...)

View file

@ -31,7 +31,7 @@ describe "clientTracking", ->
describe "when a client updates its cursor location", ->
before (done) ->
@updates = []
@clientB.on "clientTracking.clientUpdated", (data) ->
@clientB.on "clientTracking.clientUpdated", (data) =>
@updates.push data
@clientA.emit "clientTracking.updatePosition", {
@ -42,7 +42,17 @@ describe "clientTracking", ->
throw error if error?
done()
it "should tell other clients about the update"
it "should tell other clients about the update", ->
@updates.should.deep.equal [
{
row: @row
column: @column
doc_id: @doc_id
id: @clientA.socket.sessionid
user_id: @user_id
name: "Joe Bloggs"
}
]
it "should record the update in getConnectedUsers", (done) ->
@clientB.emit "clientTracking.getConnectedUsers", (error, users) =>

View file

@ -29,6 +29,7 @@ describe 'WebsocketController', ->
"./AuthorizationManager": @AuthorizationManager = {}
"./DocumentUpdaterManager": @DocumentUpdaterManager = {}
"./ConnectedUsersManager": @ConnectedUsersManager = {}
"./WebsocketLoadBalancer": @WebsocketLoadBalancer = {}
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
afterEach ->
@ -219,7 +220,7 @@ describe 'WebsocketController', ->
describe "updateClientPosition", ->
beforeEach ->
#@EditorRealTimeController.emitToRoom = sinon.stub()
@WebsocketLoadBalancer.emitToRoom = sinon.stub()
@ConnectedUsersManager.updateUserPosition = sinon.stub().callsArgWith(4)
@update = {
doc_id: @doc_id = "doc-id-123"
@ -248,8 +249,8 @@ describe 'WebsocketController', ->
email: @email
user_id: @user_id
# it "should send the update to the project room with the user's name", ->
# @EditorRealTimeController.emitToRoom.calledWith(@project_id, "clientTracking.clientUpdated", @populatedCursorData).should.equal true
it "should send the update to the project room with the user's name", ->
@WebsocketLoadBalancer.emitToRoom.calledWith(@project_id, "clientTracking.clientUpdated", @populatedCursorData).should.equal true
it "should send the cursor data to the connected user manager", (done)->
@ConnectedUsersManager.updateUserPosition.calledWith(@project_id, @client.id, {
@ -272,16 +273,16 @@ describe 'WebsocketController', ->
@client.get = (param, callback) => callback null, @clientParams[param]
@WebsocketController.updateClientPosition @client, @update
# it "should send the update to the project room with an anonymous name", ->
# @EditorRealTimeController.emitToRoom
# .calledWith(@project_id, "clientTracking.clientUpdated", {
# doc_id: @doc_id,
# id: @client.id
# name: "Anonymous"
# row: @row
# column: @column
# })
# .should.equal true
it "should send the update to the project room with an anonymous name", ->
@WebsocketLoadBalancer.emitToRoom
.calledWith(@project_id, "clientTracking.clientUpdated", {
doc_id: @doc_id,
id: @client.id
name: "Anonymous"
row: @row
column: @column
})
.should.equal true
it "should not send cursor data to the connected user manager", (done)->
@ConnectedUsersManager.updateUserPosition.called.should.equal false

View file

@ -0,0 +1,89 @@
SandboxedModule = require('sandboxed-module')
sinon = require('sinon')
require('chai').should()
modulePath = require('path').join __dirname, '../../../app/js/WebsocketLoadBalancer'
describe "WebsocketLoadBalancer", ->
beforeEach ->
@WebsocketLoadBalancer = SandboxedModule.require modulePath, requires:
"redis-sharelatex":
createClient: () ->
auth:->
@io = {}
@WebsocketLoadBalancer.rclientPub = publish: sinon.stub()
@WebsocketLoadBalancer.rclientSub =
subscribe: sinon.stub()
on: sinon.stub()
@room_id = "room-id"
@message = "message-to-editor"
@payload = ["argument one", 42]
describe "emitToRoom", ->
beforeEach ->
@WebsocketLoadBalancer.emitToRoom(@room_id, @message, @payload...)
it "should publish the message to redis", ->
@WebsocketLoadBalancer.rclientPub.publish
.calledWith("editor-events", JSON.stringify(
room_id: @room_id,
message: @message
payload: @payload
))
.should.equal true
describe "emitToAll", ->
beforeEach ->
@WebsocketLoadBalancer.emitToRoom = sinon.stub()
@WebsocketLoadBalancer.emitToAll @message, @payload...
it "should emit to the room 'all'", ->
@WebsocketLoadBalancer.emitToRoom
.calledWith("all", @message, @payload...)
.should.equal true
describe "listenForEditorEvents", ->
beforeEach ->
@WebsocketLoadBalancer._processEditorEvent = sinon.stub()
@WebsocketLoadBalancer.listenForEditorEvents()
it "should subscribe to the editor-events channel", ->
@WebsocketLoadBalancer.rclientSub.subscribe
.calledWith("editor-events")
.should.equal true
it "should process the events with _processEditorEvent", ->
@WebsocketLoadBalancer.rclientSub.on
.calledWith("message", sinon.match.func)
.should.equal true
describe "_processEditorEvent", ->
describe "with a designated room", ->
beforeEach ->
@io.sockets =
in: sinon.stub().returns(emit: @emit = sinon.stub())
data = JSON.stringify
room_id: @room_id
message: @message
payload: @payload
@WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data)
it "should send the message to all clients in the room", ->
@io.sockets.in
.calledWith(@room_id)
.should.equal true
@emit.calledWith(@message, @payload...).should.equal true
describe "when emitting to all", ->
beforeEach ->
@io.sockets =
emit: @emit = sinon.stub()
data = JSON.stringify
room_id: "all"
message: @message
payload: @payload
@WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data)
it "should send the message to all clients", ->
@emit.calledWith(@message, @payload...).should.equal true