overleaf/services/web/public/coffee/editor/sharejs/server/browserchannel.coffee
2014-02-12 10:23:40 +00:00

333 lines
12 KiB
CoffeeScript

# This implements the network API for ShareJS.
#
# The wire protocol is speccced out here:
# https://github.com/josephg/ShareJS/wiki/Wire-Protocol
#
# When a client connects the server first authenticates it and sends:
#
# S: {auth:<agent session id>}
# or
# S: {auth:null, error:'forbidden'}
#
# After that, the client can open documents:
#
# C: {doc:'foo', open:true, snapshot:null, create:true, type:'text'}
# S: {doc:'foo', open:true, snapshot:{snapshot:'hi there', v:5, meta:{}}, create:false}
#
# ...
#
# The client can send open requests as soon as the socket has opened - it doesn't need to
# wait for auth.
#
# The wire protocol is documented here:
# https://github.com/josephg/ShareJS/wiki/Wire-Protocol
browserChannel = require('browserchannel').server
util = require 'util'
hat = require 'hat'
syncQueue = require './syncqueue'
# Attach the streaming protocol to the supplied http.Server.
#
# Options = {}
module.exports = (createAgent, options) ->
options or= {}
browserChannel options, (session) ->
#console.log "New BC session from #{session.address} with id #{session.id}"
data =
headers: session.headers
remoteAddress: session.address
# This is the user agent through which a connecting client acts. It is set when the
# session is authenticated. The agent is responsible for making sure client requests are
# properly authorized, and metadata is kept up to date.
agent = null
# To save on network traffic, the agent & server can leave out the docName with each message to mean
# 'same as the last message'
lastSentDoc = null
lastReceivedDoc = null
# Map from docName -> {queue, listener if open}
docState = {}
# We'll only handle one message from each client at a time.
handleMessage = (query) ->
#console.log "Message from #{session.id}", query
error = null
error = 'Invalid docName' unless query.doc is null or typeof query.doc is 'string' or (query.doc is undefined and lastReceivedDoc)
error = "'create' must be true or missing" unless query.create in [true, undefined]
error = "'open' must be true, false or missing" unless query.open in [true, false, undefined]
error = "'snapshot' must be null or missing" unless query.snapshot in [null, undefined]
error = "'type' invalid" unless query.type is undefined or typeof query.type is 'string'
error = "'v' invalid" unless query.v is undefined or (typeof query.v is 'number' and query.v >= 0)
if error
console.warn "Invalid query #{JSON.stringify query} from #{agent.sessionId}: #{error}"
session.abort()
return callback()
# The agent can specify null as the docName to get a random doc name.
if query.doc is null
query.doc = lastReceivedDoc = hat()
else if query.doc != undefined
lastReceivedDoc = query.doc
else
unless lastReceivedDoc
console.warn "msg.doc missing in query #{JSON.stringify query} from #{agent.sessionId}"
# The disconnect handler will be called when we do this, which will clean up the open docs.
return session.abort()
query.doc = lastReceivedDoc
docState[query.doc] or= queue: syncQueue (query, callback) ->
# When the session is closed, we'll nuke docState. When that happens, no more messages
# should be handled.
return callback() unless docState
# Close messages are {open:false}
if query.open == false
handleClose query, callback
# Open messages are {open:true}. There's a lot of shared logic with getting snapshots
# and creating documents. These operations can be done together; and I'll handle them
# together.
else if query.open or query.snapshot is null or query.create
# You can open, request a snapshot and create all in the same
# request. They're all handled together.
handleOpenCreateSnapshot query, callback
# The socket is submitting an op.
else if query.op? or query.meta?.path?
handleOp query, callback
else
console.warn "Invalid query #{JSON.stringify query} from #{agent.sessionId}"
session.abort()
callback()
# ... And add the message to the queue.
docState[query.doc].queue query
# # Some utility methods for message handlers
# Send a message to the socket.
# msg _must_ have the doc:DOCNAME property set. We'll remove it if its the same as lastReceivedDoc.
send = (response) ->
if response.doc is lastSentDoc
delete response.doc
else
lastSentDoc = response.doc
# Its invalid to send a message to a closed session. We'll silently drop messages if the
# session has closed.
if session.state isnt 'closed'
#console.log "Sending", response
session.send response
# Open the given document name, at the requested version.
# callback(error, version)
open = (docName, version, callback) ->
return callback 'Session closed' unless docState
return callback 'Document already open' if docState[docName].listener
#p "Registering listener on #{docName} by #{socket.id} at #{version}"
docState[docName].listener = listener = (opData) ->
throw new Error 'Consistency violation - doc listener invalid' unless docState[docName].listener == listener
#p "listener doc:#{docName} opdata:#{i opData} v:#{version}"
# Skip the op if this socket sent it.
return if opData.meta.source is agent.sessionId
opMsg =
doc: docName
op: opData.op
v: opData.v
meta: opData.meta
send opMsg
# Tell the socket the doc is open at the requested version
agent.listen docName, version, listener, (error, v) ->
delete docState[docName].listener if error
callback error, v
# Close the named document.
# callback([error])
close = (docName, callback) ->
#p "Closing #{docName}"
return callback 'Session closed' unless docState
listener = docState[docName].listener
return callback 'Doc already closed' unless listener?
agent.removeListener docName
delete docState[docName].listener
callback()
# Handles messages with any combination of the open:true, create:true and snapshot:null parameters
handleOpenCreateSnapshot = (query, finished) ->
docName = query.doc
msg = doc:docName
callback = (error) ->
if error
close(docName) if msg.open == true
msg.open = false if query.open == true
msg.snapshot = null if query.snapshot != undefined
delete msg.create
msg.error = error
send msg
finished()
return callback 'No docName specified' unless query.doc?
if query.create == true
if typeof query.type != 'string'
return callback 'create:true requires type specified'
if query.meta != undefined
unless typeof query.meta == 'object' and Array.isArray(query.meta) == false
return callback 'meta must be an object'
docData = undefined
# This is implemented with a series of cascading methods for each different type of
# thing this method can handle. This would be so much nicer with an async library. Welcome to
# callback hell.
step1Create = ->
return step2Snapshot() if query.create != true
# The document obviously already exists if we have a snapshot.
if docData
msg.create = false
step2Snapshot()
else
agent.create docName, query.type, query.meta || {}, (error) ->
if error is 'Document already exists'
# We've called getSnapshot (-> null), then create (-> already exists). Its possible
# another agent has called create() between our getSnapshot and create() calls.
agent.getSnapshot docName, (error, data) ->
return callback error if error
docData = data
msg.create = false
step2Snapshot()
else if error
callback error
else
msg.create = true
step2Snapshot()
# The socket requested a document snapshot
step2Snapshot = ->
# if query.create or query.open or query.snapshot == null
# msg.meta = docData.meta
# Skip inserting a snapshot if the document was just created.
if query.snapshot != null or msg.create == true
step3Open()
return
if docData
msg.v = docData.v
msg.type = docData.type.name unless query.type == docData.type.name
msg.snapshot = docData.snapshot
else
return callback 'Document does not exist'
step3Open()
# Attempt to open a document with a given name. Version is optional.
# callback(opened at version) or callback(null, errormessage)
step3Open = ->
return callback() if query.open != true
# Verify the type matches
return callback 'Type mismatch' if query.type and docData and query.type != docData.type.name
open docName, query.v, (error, version) ->
return callback error if error
# + Should fail if the type is wrong.
#p "Opened #{docName} at #{version} by #{socket.id}"
msg.open = true
msg.v = version
callback()
# Technically, we don't need a snapshot if the user called create but not open or createSnapshot,
# but no clients do that yet anyway.
if query.snapshot == null or query.open == true #and query.type
agent.getSnapshot query.doc, (error, data) ->
return callback error if error and error != 'Document does not exist'
docData = data
step1Create()
else
step1Create()
# The socket closes a document
handleClose = (query, callback) ->
close query.doc, (error) ->
if error
# An error closing still results in the doc being closed.
send {doc:query.doc, open:false, error:error}
else
send {doc:query.doc, open:false}
callback()
# We received an op from the socket
handleOp = (query, callback) ->
# ...
#throw new Error 'No version specified' unless query.v?
opData = {v:query.v, op:query.op, meta:query.meta, dupIfSource:query.dupIfSource}
# If it's a metaOp don't send a response
agent.submitOp query.doc, opData, if (not opData.op? and opData.meta?.path?) then callback else (error, appliedVersion) ->
msg = if error
#p "Sending error to socket: #{error}"
{doc:query.doc, v:null, error:error}
else
{doc:query.doc, v:appliedVersion}
send msg
callback()
# We don't process any messages from the agent until they've authorized. Instead,
# they are stored in this buffer.
buffer = []
session.on 'message', bufferMsg = (msg) -> buffer.push msg
createAgent data, (error, agent_) ->
if error
# The client is not authorized, so they shouldn't try and reconnect.
session.send {auth:null, error}
session.stop()
else
agent = agent_
session.send auth:agent.sessionId
# Ok. Now we can handle all the messages in the buffer. They'll go straight to
# handleMessage from now on.
session.removeListener 'message', bufferMsg
handleMessage msg for msg in buffer
buffer = null
session.on 'message', handleMessage
session.on 'close', ->
return unless agent
#console.log "Client #{agent.sessionId} disconnected"
for docName, {listener} of docState
agent.removeListener docName if listener
docState = null