mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
0452b53ce3
increase single and multi-user flush delays GitOrigin-RevId: c9415a093d08c8f6cc403d0ddb6cd313ea99aebe
481 lines
15 KiB
JavaScript
481 lines
15 KiB
JavaScript
/* eslint-disable
|
|
camelcase,
|
|
max-len,
|
|
no-unused-vars,
|
|
*/
|
|
// TODO: This file was created by bulk-decaffeinate.
|
|
// Fix any style issues and re-enable lint.
|
|
/*
|
|
* decaffeinate suggestions:
|
|
* DS001: Remove Babel/TypeScript constructor workaround
|
|
* DS101: Remove unnecessary use of Array.from
|
|
* DS102: Remove unnecessary code created because of implicit returns
|
|
* DS103: Rewrite code to no longer use __guard__
|
|
* DS205: Consider reworking code to avoid use of IIFEs
|
|
* DS206: Consider reworking classes to avoid initClass
|
|
* DS207: Consider shorter variations of null checks
|
|
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
|
*/
|
|
import EventEmitter from '../../utils/EventEmitter'
|
|
import ShareJs from 'libs/sharejs'
|
|
import EditorWatchdogManager from '../connection/EditorWatchdogManager'
|
|
|
|
let ShareJsDoc
|
|
const SINGLE_USER_FLUSH_DELAY = 2000 // ms
|
|
const MULTI_USER_FLUSH_DELAY = 500 // ms
|
|
|
|
export default ShareJsDoc = (function() {
|
|
ShareJsDoc = class ShareJsDoc extends EventEmitter {
|
|
static initClass() {
|
|
this.prototype.INFLIGHT_OP_TIMEOUT = 5000 // Retry sending ops after 5 seconds without an ack
|
|
this.prototype.WAIT_FOR_CONNECTION_TIMEOUT = 500
|
|
|
|
this.prototype.FATAL_OP_TIMEOUT = 30000
|
|
}
|
|
|
|
constructor(
|
|
doc_id,
|
|
docLines,
|
|
version,
|
|
socket,
|
|
globalEditorWatchdogManager
|
|
) {
|
|
super()
|
|
// Dencode any binary bits of data
|
|
// See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html
|
|
this.doc_id = doc_id
|
|
this.socket = socket
|
|
this.type = 'text'
|
|
docLines = Array.from(docLines).map(line =>
|
|
decodeURIComponent(escape(line))
|
|
)
|
|
const snapshot = docLines.join('\n')
|
|
this.track_changes = false
|
|
|
|
this.connection = {
|
|
send: update => {
|
|
this._startInflightOpTimeout(update)
|
|
if (
|
|
window.disconnectOnUpdate != null &&
|
|
Math.random() < window.disconnectOnUpdate
|
|
) {
|
|
sl_console.log('Disconnecting on update', update)
|
|
window._ide.socket.socket.disconnect()
|
|
}
|
|
if (
|
|
window.dropUpdates != null &&
|
|
Math.random() < window.dropUpdates
|
|
) {
|
|
sl_console.log('Simulating a lost update', update)
|
|
return
|
|
}
|
|
if (this.track_changes) {
|
|
if (update.meta == null) {
|
|
update.meta = {}
|
|
}
|
|
update.meta.tc = this.track_changes_id_seeds.inflight
|
|
}
|
|
return this.socket.emit(
|
|
'applyOtUpdate',
|
|
this.doc_id,
|
|
update,
|
|
error => {
|
|
if (error != null) {
|
|
return this._handleError(error)
|
|
}
|
|
}
|
|
)
|
|
},
|
|
state: 'ok',
|
|
id: this.socket.publicId
|
|
}
|
|
|
|
this._doc = new ShareJs.Doc(this.connection, this.doc_id, {
|
|
type: this.type
|
|
})
|
|
this._doc.setFlushDelay(SINGLE_USER_FLUSH_DELAY)
|
|
this._doc.on('change', (...args) => {
|
|
return this.trigger('change', ...Array.from(args))
|
|
})
|
|
this.EditorWatchdogManager = new EditorWatchdogManager({
|
|
parent: globalEditorWatchdogManager
|
|
})
|
|
this._doc.on('acknowledge', () => {
|
|
this.lastAcked = new Date() // note time of last ack from server for an op we sent
|
|
this.EditorWatchdogManager.onAck() // keep track of last ack globally
|
|
return this.trigger('acknowledge')
|
|
})
|
|
this._doc.on('remoteop', (...args) => {
|
|
// As soon as we're working with a collaborator, start sending
|
|
// ops more frequently for low latency.
|
|
this._doc.setFlushDelay(MULTI_USER_FLUSH_DELAY)
|
|
return this.trigger('remoteop', ...Array.from(args))
|
|
})
|
|
this._doc.on('flipped_pending_to_inflight', () => {
|
|
return this.trigger('flipped_pending_to_inflight')
|
|
})
|
|
this._doc.on('saved', () => {
|
|
return this.trigger('saved')
|
|
})
|
|
this._doc.on('error', e => {
|
|
return this._handleError(e)
|
|
})
|
|
|
|
this._bindToDocChanges(this._doc)
|
|
|
|
this.processUpdateFromServer({
|
|
open: true,
|
|
v: version,
|
|
snapshot
|
|
})
|
|
this._removeCarriageReturnCharFromShareJsDoc()
|
|
}
|
|
|
|
_removeCarriageReturnCharFromShareJsDoc() {
|
|
const doc = this._doc
|
|
if (doc.snapshot.indexOf('\r') === -1) {
|
|
return
|
|
}
|
|
window._ide.pushEvent('remove-carriage-return-char', {
|
|
doc_id: this.doc_id
|
|
})
|
|
let nextPos
|
|
while ((nextPos = doc.snapshot.indexOf('\r')) !== -1) {
|
|
sl_console.log('[ShareJsDoc] remove-carriage-return-char', nextPos)
|
|
doc.del(nextPos, 1)
|
|
}
|
|
}
|
|
|
|
submitOp(...args) {
|
|
return this._doc.submitOp(...Array.from(args || []))
|
|
}
|
|
|
|
// The following code puts out of order messages into a queue
|
|
// so that they can be processed in order. This is a workaround
|
|
// for messages being delayed by redis cluster.
|
|
// FIXME: REMOVE THIS WHEN REDIS PUBSUB IS SENDING MESSAGES IN ORDER
|
|
_isAheadOfExpectedVersion(message) {
|
|
return this._doc.version > 0 && message.v > this._doc.version
|
|
}
|
|
|
|
_pushOntoQueue(message) {
|
|
sl_console.log(`[processUpdate] push onto queue ${message.v}`)
|
|
// set a timer so that we never leave messages in the queue indefinitely
|
|
if (!this.queuedMessageTimer) {
|
|
this.queuedMessageTimer = setTimeout(() => {
|
|
sl_console.log(`[processUpdate] queue timeout fired for ${message.v}`)
|
|
// force the message to be processed after the timeout,
|
|
// it will cause an error if the missing update has not arrived
|
|
this.processUpdateFromServer(message)
|
|
}, this.INFLIGHT_OP_TIMEOUT)
|
|
}
|
|
this.queuedMessages.push(message)
|
|
// keep the queue in order, lowest version first
|
|
this.queuedMessages.sort(function(a, b) {
|
|
return a.v - b.v
|
|
})
|
|
}
|
|
|
|
_clearQueue() {
|
|
this.queuedMessages = []
|
|
}
|
|
|
|
_processQueue() {
|
|
if (this.queuedMessages.length > 0) {
|
|
const nextAvailableVersion = this.queuedMessages[0].v
|
|
if (nextAvailableVersion > this._doc.version) {
|
|
// there are updates we still can't apply yet
|
|
} else {
|
|
// there's a version we can accept on the queue, apply it
|
|
sl_console.log(
|
|
`[processUpdate] taken from queue ${nextAvailableVersion}`
|
|
)
|
|
this.processUpdateFromServerInOrder(this.queuedMessages.shift())
|
|
// clear the pending timer if the queue has now been cleared
|
|
if (this.queuedMessages.length === 0 && this.queuedMessageTimer) {
|
|
sl_console.log('[processUpdate] queue is empty, cleared timeout')
|
|
clearTimeout(this.queuedMessageTimer)
|
|
this.queuedMessageTimer = null
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// FIXME: This is the new method which reorders incoming updates if needed
|
|
// called from Document.js
|
|
processUpdateFromServerInOrder(message) {
|
|
// Create an array to hold queued messages
|
|
if (!this.queuedMessages) {
|
|
this.queuedMessages = []
|
|
}
|
|
// Is this update ahead of the next expected update?
|
|
// If so, put it on a queue to be handled later.
|
|
if (this._isAheadOfExpectedVersion(message)) {
|
|
this._pushOntoQueue(message)
|
|
return // defer processing this update for now
|
|
}
|
|
var error = this.processUpdateFromServer(message)
|
|
if (
|
|
error instanceof Error &&
|
|
error.message === 'Invalid version from server'
|
|
) {
|
|
// if there was an error, abandon the queued updates ahead of this one
|
|
this._clearQueue()
|
|
return
|
|
}
|
|
// Do we have any messages queued up?
|
|
// find the next message if available
|
|
this._processQueue()
|
|
}
|
|
|
|
// FIXME: This is the original method. Switch back to this when redis
|
|
// issues are resolved.
|
|
processUpdateFromServer(message) {
|
|
try {
|
|
this._doc._onMessage(message)
|
|
} catch (error) {
|
|
// Version mismatches are thrown as errors
|
|
console.log(error)
|
|
this._handleError(error)
|
|
return error // return the error for queue handling
|
|
}
|
|
|
|
if (
|
|
__guard__(message != null ? message.meta : undefined, x => x.type) ===
|
|
'external'
|
|
) {
|
|
return this.trigger('externalUpdate', message)
|
|
}
|
|
}
|
|
|
|
catchUp(updates) {
|
|
return (() => {
|
|
const result = []
|
|
for (let i = 0; i < updates.length; i++) {
|
|
const update = updates[i]
|
|
update.v = this._doc.version
|
|
update.doc = this.doc_id
|
|
result.push(this.processUpdateFromServer(update))
|
|
}
|
|
return result
|
|
})()
|
|
}
|
|
|
|
getSnapshot() {
|
|
return this._doc.snapshot
|
|
}
|
|
|
|
getVersion() {
|
|
return this._doc.version
|
|
}
|
|
|
|
getType() {
|
|
return this.type
|
|
}
|
|
|
|
clearInflightAndPendingOps() {
|
|
this._clearFatalTimeoutTimer()
|
|
this._doc.inflightOp = null
|
|
this._doc.inflightCallbacks = []
|
|
this._doc.pendingOp = null
|
|
return (this._doc.pendingCallbacks = [])
|
|
}
|
|
|
|
flushPendingOps() {
|
|
// This will flush any ops that are pending.
|
|
// If there is an inflight op it will do nothing.
|
|
return this._doc.flush()
|
|
}
|
|
|
|
updateConnectionState(state) {
|
|
sl_console.log(`[updateConnectionState] Setting state to ${state}`)
|
|
this.connection.state = state
|
|
this.connection.id = this.socket.publicId
|
|
this._doc.autoOpen = false
|
|
this._doc._connectionStateChanged(state)
|
|
return (this.lastAcked = null) // reset the last ack time when connection changes
|
|
}
|
|
|
|
hasBufferedOps() {
|
|
return this._doc.inflightOp != null || this._doc.pendingOp != null
|
|
}
|
|
|
|
getInflightOp() {
|
|
return this._doc.inflightOp
|
|
}
|
|
|
|
getPendingOp() {
|
|
return this._doc.pendingOp
|
|
}
|
|
|
|
getRecentAck() {
|
|
// check if we have received an ack recently (within a factor of two of the single user flush delay)
|
|
return (
|
|
this.lastAcked != null &&
|
|
new Date() - this.lastAcked < 2 * SINGLE_USER_FLUSH_DELAY
|
|
)
|
|
}
|
|
|
|
getOpSize(op) {
|
|
// compute size of an op from its components
|
|
// (total number of characters inserted and deleted)
|
|
let size = 0
|
|
for (let component of Array.from(op || [])) {
|
|
if ((component != null ? component.i : undefined) != null) {
|
|
size += component.i.length
|
|
}
|
|
if ((component != null ? component.d : undefined) != null) {
|
|
size += component.d.length
|
|
}
|
|
}
|
|
return size
|
|
}
|
|
|
|
_attachEditorWatchdogManager(editorName, editor) {
|
|
// end-to-end check for edits -> acks, for this very ShareJsdoc
|
|
// This will catch a broken connection and missing UX-blocker for the
|
|
// user, allowing them to keep editing.
|
|
this._detachEditorWatchdogManager = this.EditorWatchdogManager.attachToEditor(
|
|
editorName,
|
|
editor
|
|
)
|
|
}
|
|
|
|
_attachToEditor(editorName, editor, attachToShareJs) {
|
|
this._attachEditorWatchdogManager(editorName, editor)
|
|
|
|
attachToShareJs()
|
|
}
|
|
|
|
_maybeDetachEditorWatchdogManager() {
|
|
// a failed attach attempt may lead to a missing cleanup handler
|
|
if (this._detachEditorWatchdogManager) {
|
|
this._detachEditorWatchdogManager()
|
|
delete this._detachEditorWatchdogManager
|
|
}
|
|
}
|
|
|
|
attachToAce(ace) {
|
|
this._attachToEditor('Ace', ace, () => {
|
|
this._doc.attach_ace(ace, false, window.maxDocLength)
|
|
})
|
|
}
|
|
|
|
detachFromAce() {
|
|
this._maybeDetachEditorWatchdogManager()
|
|
return typeof this._doc.detach_ace === 'function'
|
|
? this._doc.detach_ace()
|
|
: undefined
|
|
}
|
|
|
|
attachToCM(cm) {
|
|
this._attachToEditor('CM', cm, () => {
|
|
this._doc.attach_cm(cm, false)
|
|
})
|
|
}
|
|
|
|
detachFromCM() {
|
|
this._maybeDetachEditorWatchdogManager()
|
|
return typeof this._doc.detach_cm === 'function'
|
|
? this._doc.detach_cm()
|
|
: undefined
|
|
} // If we're waiting for the project to join, try again in 0.5 seconds
|
|
|
|
_startInflightOpTimeout(update) {
|
|
this._startFatalTimeoutTimer(update)
|
|
var retryOp = () => {
|
|
// Only send the update again if inflightOp is still populated
|
|
// This can be cleared when hard reloading the document in which
|
|
// case we don't want to keep trying to send it.
|
|
sl_console.log('[inflightOpTimeout] Trying op again')
|
|
if (this._doc.inflightOp != null) {
|
|
// When there is a socket.io disconnect, @_doc.inflightSubmittedIds
|
|
// is updated with the socket.io client id of the current op in flight
|
|
// (meta.source of the op).
|
|
// @connection.id is the client id of the current socket.io session.
|
|
// So we need both depending on whether the op was submitted before
|
|
// one or more disconnects, or if it was submitted during the current session.
|
|
update.dupIfSource = [
|
|
this.connection.id,
|
|
...Array.from(this._doc.inflightSubmittedIds)
|
|
]
|
|
|
|
// We must be joined to a project for applyOtUpdate to work on the real-time
|
|
// service, so don't send an op if we're not. Connection state is set to 'ok'
|
|
// when we've joined the project
|
|
if (this.connection.state !== 'ok') {
|
|
let timer
|
|
sl_console.log(
|
|
'[inflightOpTimeout] Not connected, retrying in 0.5s'
|
|
)
|
|
return (timer = setTimeout(
|
|
retryOp,
|
|
this.WAIT_FOR_CONNECTION_TIMEOUT
|
|
))
|
|
} else {
|
|
sl_console.log('[inflightOpTimeout] Sending')
|
|
return this.connection.send(update)
|
|
}
|
|
}
|
|
}
|
|
|
|
const timer = setTimeout(retryOp, this.INFLIGHT_OP_TIMEOUT)
|
|
return this._doc.inflightCallbacks.push(() => {
|
|
this._clearFatalTimeoutTimer()
|
|
return clearTimeout(timer)
|
|
}) // 30 seconds
|
|
}
|
|
|
|
_startFatalTimeoutTimer(update) {
|
|
// If an op doesn't get acked within FATAL_OP_TIMEOUT, something has
|
|
// gone unrecoverably wrong (the op will have been retried multiple times)
|
|
if (this._timeoutTimer != null) {
|
|
return
|
|
}
|
|
return (this._timeoutTimer = setTimeout(() => {
|
|
this._clearFatalTimeoutTimer()
|
|
return this.trigger('op:timeout', update)
|
|
}, this.FATAL_OP_TIMEOUT))
|
|
}
|
|
|
|
_clearFatalTimeoutTimer() {
|
|
if (this._timeoutTimer == null) {
|
|
return
|
|
}
|
|
clearTimeout(this._timeoutTimer)
|
|
return (this._timeoutTimer = null)
|
|
}
|
|
|
|
_handleError(error, meta) {
|
|
if (meta == null) {
|
|
meta = {}
|
|
}
|
|
return this.trigger('error', error, meta)
|
|
}
|
|
|
|
_bindToDocChanges(doc) {
|
|
const { submitOp } = doc
|
|
doc.submitOp = (...args) => {
|
|
this.trigger('op:sent', ...Array.from(args))
|
|
doc.pendingCallbacks.push(() => {
|
|
return this.trigger('op:acknowledged', ...Array.from(args))
|
|
})
|
|
return submitOp.apply(doc, args)
|
|
}
|
|
|
|
const { flush } = doc
|
|
return (doc.flush = (...args) => {
|
|
this.trigger('flush', doc.inflightOp, doc.pendingOp, doc.version)
|
|
return flush.apply(doc, args)
|
|
})
|
|
}
|
|
}
|
|
ShareJsDoc.initClass()
|
|
return ShareJsDoc
|
|
})()
|
|
|
|
function __guard__(value, transform) {
|
|
return typeof value !== 'undefined' && value !== null
|
|
? transform(value)
|
|
: undefined
|
|
}
|