overleaf/services/web/public/coffee/editor/sharejs/server/socketio.coffee

337 lines
11 KiB
CoffeeScript
Raw Normal View History

2014-02-12 05:23:40 -05:00
# This implements the socketio-based network API for ShareJS.
#
# This is the frontend used by the javascript socket implementation.
#
# See documentation for this protocol is in doc/protocol.md
# Tests are in test/socketio.coffee
#
# This code will be removed in a future version of sharejs because socket.io is
# too buggy.
socketio = require 'socket.io'
util = require 'util'
hat = require 'hat'
p = ->#util.debug
i = ->#util.inspect
# Attach the streaming protocol to the supplied http.Server.
#
# Options = {}
exports.attach = (server, createClient, options) ->
io = socketio.listen server
io.configure ->
io.set 'log level', 1
for option of options
io.set option, options[option]
authClient = (handshakeData, callback) ->
data =
headers: handshakeData.headers
remoteAddress: handshakeData.address.address
secure: handshakeData.secure
createClient data, (error, client) ->
if error
# Its important that we don't pass the error message to the client here - leaving it as null
# will ensure the client recieves the normal 'not_authorized' message and thus
# emits 'connect_failed' instead of 'error'
callback null, false
else
handshakeData.client = client
callback null, true
io.of('/sjs').authorization(authClient).on 'connection', (socket) ->
client = socket.handshake.client
# There seems to be a bug in socket.io where socket.request isn't set sometimes.
p "New socket connected from #{socket.request.socket.remoteAddress} with id #{socket.id}" if socket.request?
lastSentDoc = null
lastReceivedDoc = null
# Map from docName -> {listener:fn, queue:[msg], busy:bool}
docState = {}
closed = false
# Send a message to the socket.
# msg _must_ have the doc:DOCNAME property set. We'll remove it if its the same as lastReceivedDoc.
send = (msg) ->
if msg.doc == lastSentDoc
delete msg.doc
else
lastSentDoc = msg.doc
p "Sending #{i msg}"
socket.json.send msg
# Open the given document name, at the requested version.
# callback(error, version)
open = (docName, version, callback) ->
callback 'Doc already opened' if docState[docName].listener?
p "Registering listener on #{docName} by #{socket.id} at #{version}"
# This passes op events to the client
docState[docName].listener = listener = (opData) ->
throw new Error 'Consistency violation - doc listener invalid' unless docState[docName].listener == listener
p "listener doc:#{docName} opdata:#{i opData} v:#{version}"
# Skip the op if this socket sent it.
return if opData.meta?.source == client.id
opMsg =
doc: docName
op: opData.op
v: opData.v
meta: opData.meta
send opMsg
# Tell the socket the doc is open at the requested version
client.listen docName, version, listener, (error, v) ->
delete docState[docName].listener if error
callback error, v
# Close the named document.
# callback([error])
close = (docName, callback) ->
p "Closing #{docName}"
listener = docState[docName].listener
return callback 'Doc already closed' unless listener?
client.removeListener docName
docState[docName].listener = null
callback()
# Handles messages with any combination of the open:true, create:true and snapshot:null parameters
handleOpenCreateSnapshot = (query, finished) ->
docName = query.doc
msg = doc:docName
callback = (error) ->
if error
close(docName) if msg.open == true
msg.open = false if query.open == true
msg.snapshot = null if query.snapshot != undefined
delete msg.create
msg.error = error
send msg
finished()
return callback 'No docName specified' unless query.doc?
if query.create == true
if typeof query.type != 'string'
return callback 'create:true requires type specified'
if query.meta != undefined
unless typeof query.meta == 'object' and Array.isArray(query.meta) == false
return callback 'meta must be an object'
docData = undefined
# Technically, we don't need a snapshot if the user called create but not open or createSnapshot,
# but no clients do that yet anyway.
#
# It might be nice to add a 'createOrGet()' method to model / db manager. But most
# of the time clients are opening an existing document rather than creating a new one anyway.
###
model.clientGetSnapshot client, query.doc, (error, data) ->
maybeCreate = (callback) ->
if query.create and error is 'Document does not exist'
model.clientCreate client, docName, query.type, query.meta or {}, callback
else
callback error, data
maybeCreate (error, data) ->
if query.create
msg.create = !!error
if error is 'Document already exists'
msg.create = false
else if error and (!msg.create or error isnt 'Document already exists')
# This is the real final callback, to say an error has occurred.
return callback error
else if query.create or query.snapshot is null
if query.snapshot isnt null
###
# This is implemented with a series of cascading methods for each different type of
# thing this method can handle. This would be so much nicer with an async library. Welcome to
# callback hell.
step1Create = ->
return step2Snapshot() if query.create != true
# The document obviously already exists if we have a snapshot.
if docData
msg.create = false
step2Snapshot()
else
client.create docName, query.type, query.meta || {}, (error) ->
if error is 'Document already exists'
# We've called getSnapshot (-> null), then createClient (-> already exists). Its possible
# another client has called createClient first.
client.getSnapshot docName, (error, data) ->
return callback error if error
docData = data
msg.create = false
step2Snapshot()
else if error
callback error
else
msg.create = !error
step2Snapshot()
# The socket requested a document snapshot
step2Snapshot = ->
# Skip inserting a snapshot if the document was just created.
if query.snapshot != null or msg.create == true
step3Open()
return
if docData
msg.v = docData.v
msg.type = docData.type.name unless query.type == docData.type.name
msg.snapshot = docData.snapshot
else
return callback 'Document does not exist'
step3Open()
# Attempt to open a document with a given name. Version is optional.
# callback(opened at version) or callback(null, errormessage)
step3Open = ->
return callback() if query.open != true
# Verify the type matches
return callback 'Type mismatch' if query.type and docData and query.type != docData.type.name
open docName, query.v, (error, version) ->
return callback error if error
# + Should fail if the type is wrong.
p "Opened #{docName} at #{version} by #{socket.id}"
msg.open = true
msg.v = version
callback()
if query.snapshot == null or (query.open == true and query.type)
client.getSnapshot query.doc, (error, data) ->
return callback error if error and error != 'Document does not exist'
docData = data
step1Create()
else
step1Create()
# The socket closes a document
handleClose = (query, callback) ->
close query.doc, (error) ->
if error
# An error closing still results in the doc being closed.
send {doc:query.doc, open:false, error:error}
else
send {doc:query.doc, open:false}
callback()
# We received an op from the socket
handleOp = (query, callback) ->
throw new Error 'No docName specified' unless query.doc?
throw new Error 'No version specified' unless query.v? or (query.meta?.path? and query.meta?.value?)
op_data = {v:query.v, op:query.op}
op_data.meta = query.meta || {}
op_data.meta.source = socket.id
client.submitOp query.doc, op_data, (error, appliedVersion) ->
msg = if error
p "Sending error to socket: #{error}"
{doc:query.doc, v:null, error:error}
else
{doc:query.doc, v:appliedVersion}
p "sending #{i msg}"
send msg
callback()
flush = (state) ->
return if state.busy || state.queue.length == 0
state.busy = true
query = state.queue.shift()
callback = ->
state.busy = false
flush state
p "processing query #{i query}"
try
if query.open == false
handleClose query, callback
else if query.open != undefined or query.snapshot != undefined or query.create
# You can open, request a snapshot and create all in the same
# request. They're all handled together.
handleOpenCreateSnapshot query, callback
else if query.op? or query.meta? # The socket is applying an op.
handleOp query, callback
else
util.debug "Unknown message received: #{util.inspect query}"
catch error
util.debug error.stack
# ... And disconnect the socket?
callback()
# And now the actual message handler.
messageListener = (query) ->
p "Server recieved message #{i query}"
# There seems to be a bug in socket.io where messages are detected
# after the client disconnects.
if closed
console.warn "WARNING: received query from socket after the socket disconnected."
console.warn socket
return
try
query = JSON.parse query if typeof(query) == 'string'
if query.doc == null
lastReceivedDoc = null
query.doc = hat()
else if query.doc != undefined
lastReceivedDoc = query.doc
else
throw new Error 'msg.doc missing. Probably the client reconnected without telling us - this is a socket.io bug.' unless lastReceivedDoc
query.doc = lastReceivedDoc
catch error
util.debug error.stack
return
docState[query.doc] ||= {listener:null, queue:[], busy:no}
docState[query.doc].queue.push query
flush docState[query.doc]
socket.on 'message', messageListener
socket.on 'disconnect', ->
p "socket #{socket.id} disconnected"
closed = true
for docName, state of docState
state.busy = true
state.queue = []
client.removeListener docName if state.listener?
socket.removeListener 'message', messageListener
docState = {}
server