Brian Gough 62b8c30d0b make pollSavedStatus more robust against failure
check last ack timestamp and size of pending op

provide method to compute sharejs op size so we can check if pending
ops get too big
2016-11-03 11:28:23 +00:00

191 lines
6.2 KiB

define [
], (EventEmitter, ShareJs) ->
class ShareJsDoc extends EventEmitter
constructor: (@doc_id, docLines, version, @socket) ->
# Dencode any binary bits of data
# See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html
@type = "text"
docLines = for line in docLines
if line.text?
@type = "json"
line.text = decodeURIComponent(escape(line.text))
@type = "text"
line = decodeURIComponent(escape(line))
if @type == "text"
snapshot = docLines.join("\n")
else if @type == "json"
snapshot = { lines: docLines }
throw new Error("Unknown type: #{@type}")
@connection = {
send: (update) =>
if window.disconnectOnUpdate? and Math.random() < window.disconnectOnUpdate
sl_console.log "Disconnecting on update", update
if window.dropUpdates? and Math.random() < window.dropUpdates
sl_console.log "Simulating a lost update", update
@socket.emit "applyOtUpdate", @doc_id, update, (error) =>
return @_handleError(error) if error?
state: "ok"
id: @socket.socket.sessionid
@_doc = new ShareJs.Doc @connection, @doc_id,
type: @type
@_doc.on "change", () =>
@trigger "change"
@_doc.on "acknowledge", () =>
@lastAcked = new Date() # note time of last ack from server
@trigger "acknowledge"
@_doc.on "remoteop", () =>
# As soon as we're working with a collaborator, start sending
# ops as quickly as possible for low latency.
@trigger "remoteop"
@_doc.on "error", (e) =>
open: true
v: version
snapshot: snapshot
submitOp: (args...) -> @_doc.submitOp(args...)
processUpdateFromServer: (message) ->
@_doc._onMessage message
catch error
# Version mismatches are thrown as errors
if message?.meta?.type == "external"
@trigger "externalUpdate", message
catchUp: (updates) ->
for update, i in updates
update.v = @_doc.version
update.doc = @doc_id
getSnapshot: () -> @_doc.snapshot
getVersion: () -> @_doc.version
getType: () -> @type
clearInflightAndPendingOps: () ->
@_doc.inflightOp = null
@_doc.inflightCallbacks = []
@_doc.pendingOp = null
@_doc.pendingCallbacks = []
flushPendingOps: () ->
# This will flush any ops that are pending.
# If there is an inflight op it will do nothing.
updateConnectionState: (state) ->
sl_console.log "[updateConnectionState] Setting state to #{state}"
@connection.state = state
@connection.id = @socket.socket.sessionid
@_doc.autoOpen = false
@lastAcked = null # reset the last ack time when connection changes
hasBufferedOps: () ->
@_doc.inflightOp? or @_doc.pendingOp?
getInflightOp: () -> @_doc.inflightOp
getPendingOp: () -> @_doc.pendingOp
getRecentAck: () ->
# check if we have received an ack recently (within the flush delay)
@lastAcked? and new Date() - @lastAcked < @_doc._flushDelay
getOpSize: (op) ->
# compute size of an op from its components
# (total number of characters inserted and deleted)
size = 0
for component in op or []
if component?.i?
size += component.i.length
if component?.d?
size += component.d.length
return size
attachToAce: (ace) -> @_doc.attach_ace(ace, false, window.maxDocLength)
detachFromAce: () -> @_doc.detach_ace?()
INFLIGHT_OP_TIMEOUT: 5000 # Retry sending ops after 5 seconds without an ack
WAIT_FOR_CONNECTION_TIMEOUT: 500 # If we're waiting for the project to join, try again in 0.5 seconds
_startInflightOpTimeout: (update) ->
retryOp = () =>
# Only send the update again if inflightOp is still populated
# This can be cleared when hard reloading the document in which
# case we don't want to keep trying to send it.
sl_console.log "[inflightOpTimeout] Trying op again"
if @_doc.inflightOp?
# When there is a socket.io disconnect, @_doc.inflightSubmittedIds
# is updated with the socket.io client id of the current op in flight
# (meta.source of the op).
# @connection.id is the client id of the current socket.io session.
# So we need both depending on whether the op was submitted before
# one or more disconnects, or if it was submitted during the current session.
update.dupIfSource = [@connection.id, @_doc.inflightSubmittedIds...]
# We must be joined to a project for applyOtUpdate to work on the real-time
# service, so don't send an op if we're not. Connection state is set to 'ok'
# when we've joined the project
if @connection.state != "ok"
sl_console.log "[inflightOpTimeout] Not connected, retrying in 0.5s"
timer = setTimeout retryOp, @WAIT_FOR_CONNECTION_TIMEOUT
sl_console.log "[inflightOpTimeout] Sending"
timer = setTimeout retryOp, @INFLIGHT_OP_TIMEOUT
@_doc.inflightCallbacks.push () =>
clearTimeout timer
FATAL_OP_TIMEOUT: 30000 # 30 seconds
_startFatalTimeoutTimer: (update) ->
# If an op doesn't get acked within FATAL_OP_TIMEOUT, something has
# gone unrecoverably wrong (the op will have been retried multiple times)
return if @_timeoutTimer?
@_timeoutTimer = setTimeout () =>
@trigger "op:timeout", update
_clearFatalTimeoutTimer: () ->
return if !@_timeoutTimer?
clearTimeout @_timeoutTimer
@_timeoutTimer = null
_handleError: (error, meta = {}) ->
@trigger "error", error, meta
_bindToDocChanges: (doc) ->
submitOp = doc.submitOp
doc.submitOp = (args...) =>
@trigger "op:sent", args...
doc.pendingCallbacks.push () =>
@trigger "op:acknowledged", args...
submitOp.apply(doc, args)
flush = doc.flush
doc.flush = (args...) =>
@trigger "flush", doc.inflightOp, doc.pendingOp, doc.version
flush.apply(doc, args)