mirror of
https://github.com/overleaf/overleaf.git
synced 2024-10-31 21:21:03 -04:00
336 lines
11 KiB
CoffeeScript
336 lines
11 KiB
CoffeeScript
# This implements the socketio-based network API for ShareJS.
|
|
#
|
|
# This is the frontend used by the javascript socket implementation.
|
|
#
|
|
# See documentation for this protocol is in doc/protocol.md
|
|
# Tests are in test/socketio.coffee
|
|
#
|
|
# This code will be removed in a future version of sharejs because socket.io is
|
|
# too buggy.
|
|
|
|
socketio = require 'socket.io'
|
|
util = require 'util'
|
|
hat = require 'hat'
|
|
|
|
p = ->#util.debug
|
|
i = ->#util.inspect
|
|
|
|
# Attach the streaming protocol to the supplied http.Server.
|
|
#
|
|
# Options = {}
|
|
exports.attach = (server, createClient, options) ->
|
|
io = socketio.listen server
|
|
|
|
io.configure ->
|
|
io.set 'log level', 1
|
|
for option of options
|
|
io.set option, options[option]
|
|
|
|
authClient = (handshakeData, callback) ->
|
|
data =
|
|
headers: handshakeData.headers
|
|
remoteAddress: handshakeData.address.address
|
|
secure: handshakeData.secure
|
|
|
|
createClient data, (error, client) ->
|
|
if error
|
|
# Its important that we don't pass the error message to the client here - leaving it as null
|
|
# will ensure the client recieves the normal 'not_authorized' message and thus
|
|
# emits 'connect_failed' instead of 'error'
|
|
callback null, false
|
|
else
|
|
handshakeData.client = client
|
|
callback null, true
|
|
|
|
io.of('/sjs').authorization(authClient).on 'connection', (socket) ->
|
|
client = socket.handshake.client
|
|
|
|
# There seems to be a bug in socket.io where socket.request isn't set sometimes.
|
|
p "New socket connected from #{socket.request.socket.remoteAddress} with id #{socket.id}" if socket.request?
|
|
|
|
lastSentDoc = null
|
|
lastReceivedDoc = null
|
|
|
|
# Map from docName -> {listener:fn, queue:[msg], busy:bool}
|
|
docState = {}
|
|
closed = false
|
|
|
|
# Send a message to the socket.
|
|
# msg _must_ have the doc:DOCNAME property set. We'll remove it if its the same as lastReceivedDoc.
|
|
send = (msg) ->
|
|
if msg.doc == lastSentDoc
|
|
delete msg.doc
|
|
else
|
|
lastSentDoc = msg.doc
|
|
|
|
p "Sending #{i msg}"
|
|
socket.json.send msg
|
|
|
|
# Open the given document name, at the requested version.
|
|
# callback(error, version)
|
|
open = (docName, version, callback) ->
|
|
callback 'Doc already opened' if docState[docName].listener?
|
|
p "Registering listener on #{docName} by #{socket.id} at #{version}"
|
|
|
|
# This passes op events to the client
|
|
docState[docName].listener = listener = (opData) ->
|
|
throw new Error 'Consistency violation - doc listener invalid' unless docState[docName].listener == listener
|
|
|
|
p "listener doc:#{docName} opdata:#{i opData} v:#{version}"
|
|
|
|
# Skip the op if this socket sent it.
|
|
return if opData.meta?.source == client.id
|
|
|
|
opMsg =
|
|
doc: docName
|
|
op: opData.op
|
|
v: opData.v
|
|
meta: opData.meta
|
|
|
|
send opMsg
|
|
|
|
# Tell the socket the doc is open at the requested version
|
|
client.listen docName, version, listener, (error, v) ->
|
|
delete docState[docName].listener if error
|
|
callback error, v
|
|
|
|
# Close the named document.
|
|
# callback([error])
|
|
close = (docName, callback) ->
|
|
p "Closing #{docName}"
|
|
listener = docState[docName].listener
|
|
return callback 'Doc already closed' unless listener?
|
|
|
|
client.removeListener docName
|
|
docState[docName].listener = null
|
|
callback()
|
|
|
|
# Handles messages with any combination of the open:true, create:true and snapshot:null parameters
|
|
handleOpenCreateSnapshot = (query, finished) ->
|
|
docName = query.doc
|
|
msg = doc:docName
|
|
|
|
callback = (error) ->
|
|
if error
|
|
close(docName) if msg.open == true
|
|
msg.open = false if query.open == true
|
|
msg.snapshot = null if query.snapshot != undefined
|
|
delete msg.create
|
|
|
|
msg.error = error
|
|
|
|
send msg
|
|
finished()
|
|
|
|
return callback 'No docName specified' unless query.doc?
|
|
|
|
if query.create == true
|
|
if typeof query.type != 'string'
|
|
return callback 'create:true requires type specified'
|
|
|
|
if query.meta != undefined
|
|
unless typeof query.meta == 'object' and Array.isArray(query.meta) == false
|
|
return callback 'meta must be an object'
|
|
|
|
docData = undefined
|
|
# Technically, we don't need a snapshot if the user called create but not open or createSnapshot,
|
|
# but no clients do that yet anyway.
|
|
#
|
|
# It might be nice to add a 'createOrGet()' method to model / db manager. But most
|
|
# of the time clients are opening an existing document rather than creating a new one anyway.
|
|
###
|
|
model.clientGetSnapshot client, query.doc, (error, data) ->
|
|
maybeCreate = (callback) ->
|
|
if query.create and error is 'Document does not exist'
|
|
model.clientCreate client, docName, query.type, query.meta or {}, callback
|
|
else
|
|
callback error, data
|
|
|
|
maybeCreate (error, data) ->
|
|
if query.create
|
|
msg.create = !!error
|
|
if error is 'Document already exists'
|
|
msg.create = false
|
|
else if error and (!msg.create or error isnt 'Document already exists')
|
|
# This is the real final callback, to say an error has occurred.
|
|
return callback error
|
|
else if query.create or query.snapshot is null
|
|
|
|
|
|
if query.snapshot isnt null
|
|
###
|
|
|
|
# This is implemented with a series of cascading methods for each different type of
|
|
# thing this method can handle. This would be so much nicer with an async library. Welcome to
|
|
# callback hell.
|
|
|
|
step1Create = ->
|
|
return step2Snapshot() if query.create != true
|
|
|
|
# The document obviously already exists if we have a snapshot.
|
|
if docData
|
|
msg.create = false
|
|
step2Snapshot()
|
|
else
|
|
client.create docName, query.type, query.meta || {}, (error) ->
|
|
if error is 'Document already exists'
|
|
# We've called getSnapshot (-> null), then createClient (-> already exists). Its possible
|
|
# another client has called createClient first.
|
|
client.getSnapshot docName, (error, data) ->
|
|
return callback error if error
|
|
|
|
docData = data
|
|
msg.create = false
|
|
step2Snapshot()
|
|
else if error
|
|
callback error
|
|
else
|
|
msg.create = !error
|
|
step2Snapshot()
|
|
|
|
# The socket requested a document snapshot
|
|
step2Snapshot = ->
|
|
# Skip inserting a snapshot if the document was just created.
|
|
if query.snapshot != null or msg.create == true
|
|
step3Open()
|
|
return
|
|
|
|
if docData
|
|
msg.v = docData.v
|
|
msg.type = docData.type.name unless query.type == docData.type.name
|
|
msg.snapshot = docData.snapshot
|
|
else
|
|
return callback 'Document does not exist'
|
|
|
|
step3Open()
|
|
|
|
# Attempt to open a document with a given name. Version is optional.
|
|
# callback(opened at version) or callback(null, errormessage)
|
|
step3Open = ->
|
|
return callback() if query.open != true
|
|
|
|
# Verify the type matches
|
|
return callback 'Type mismatch' if query.type and docData and query.type != docData.type.name
|
|
|
|
open docName, query.v, (error, version) ->
|
|
return callback error if error
|
|
|
|
# + Should fail if the type is wrong.
|
|
|
|
p "Opened #{docName} at #{version} by #{socket.id}"
|
|
msg.open = true
|
|
msg.v = version
|
|
callback()
|
|
|
|
if query.snapshot == null or (query.open == true and query.type)
|
|
client.getSnapshot query.doc, (error, data) ->
|
|
return callback error if error and error != 'Document does not exist'
|
|
|
|
docData = data
|
|
step1Create()
|
|
else
|
|
step1Create()
|
|
|
|
# The socket closes a document
|
|
handleClose = (query, callback) ->
|
|
close query.doc, (error) ->
|
|
if error
|
|
# An error closing still results in the doc being closed.
|
|
send {doc:query.doc, open:false, error:error}
|
|
else
|
|
send {doc:query.doc, open:false}
|
|
|
|
callback()
|
|
|
|
# We received an op from the socket
|
|
handleOp = (query, callback) ->
|
|
throw new Error 'No docName specified' unless query.doc?
|
|
throw new Error 'No version specified' unless query.v? or (query.meta?.path? and query.meta?.value?)
|
|
|
|
op_data = {v:query.v, op:query.op}
|
|
op_data.meta = query.meta || {}
|
|
op_data.meta.source = socket.id
|
|
|
|
client.submitOp query.doc, op_data, (error, appliedVersion) ->
|
|
msg = if error
|
|
p "Sending error to socket: #{error}"
|
|
{doc:query.doc, v:null, error:error}
|
|
else
|
|
{doc:query.doc, v:appliedVersion}
|
|
|
|
p "sending #{i msg}"
|
|
send msg
|
|
callback()
|
|
|
|
flush = (state) ->
|
|
return if state.busy || state.queue.length == 0
|
|
state.busy = true
|
|
|
|
query = state.queue.shift()
|
|
|
|
callback = ->
|
|
state.busy = false
|
|
flush state
|
|
|
|
p "processing query #{i query}"
|
|
try
|
|
if query.open == false
|
|
handleClose query, callback
|
|
|
|
else if query.open != undefined or query.snapshot != undefined or query.create
|
|
# You can open, request a snapshot and create all in the same
|
|
# request. They're all handled together.
|
|
handleOpenCreateSnapshot query, callback
|
|
|
|
else if query.op? or query.meta? # The socket is applying an op.
|
|
handleOp query, callback
|
|
|
|
else
|
|
util.debug "Unknown message received: #{util.inspect query}"
|
|
|
|
catch error
|
|
util.debug error.stack
|
|
# ... And disconnect the socket?
|
|
callback()
|
|
|
|
# And now the actual message handler.
|
|
messageListener = (query) ->
|
|
p "Server recieved message #{i query}"
|
|
# There seems to be a bug in socket.io where messages are detected
|
|
# after the client disconnects.
|
|
if closed
|
|
console.warn "WARNING: received query from socket after the socket disconnected."
|
|
console.warn socket
|
|
return
|
|
|
|
try
|
|
query = JSON.parse query if typeof(query) == 'string'
|
|
|
|
if query.doc == null
|
|
lastReceivedDoc = null
|
|
query.doc = hat()
|
|
else if query.doc != undefined
|
|
lastReceivedDoc = query.doc
|
|
else
|
|
throw new Error 'msg.doc missing. Probably the client reconnected without telling us - this is a socket.io bug.' unless lastReceivedDoc
|
|
query.doc = lastReceivedDoc
|
|
catch error
|
|
util.debug error.stack
|
|
return
|
|
|
|
docState[query.doc] ||= {listener:null, queue:[], busy:no}
|
|
docState[query.doc].queue.push query
|
|
flush docState[query.doc]
|
|
|
|
socket.on 'message', messageListener
|
|
socket.on 'disconnect', ->
|
|
p "socket #{socket.id} disconnected"
|
|
closed = true
|
|
for docName, state of docState
|
|
state.busy = true
|
|
state.queue = []
|
|
client.removeListener docName if state.listener?
|
|
socket.removeListener 'message', messageListener
|
|
docState = {}
|
|
|
|
server
|