diff --git a/services/web/public/src/ide/editor/Document.js b/services/web/public/src/ide/editor/Document.js index e7802a49a1..a89895ad3e 100644 --- a/services/web/public/src/ide/editor/Document.js +++ b/services/web/public/src/ide/editor/Document.js @@ -424,7 +424,8 @@ define([ this.ide.pushEvent('received-update:processing', { update }) - this.doc.processUpdateFromServer(update) + // FIXME: change this back to processUpdateFromServer when redis fixed + this.doc.processUpdateFromServerInOrder(update) if (!this.wantToBeJoined) { return this.leave() diff --git a/services/web/public/src/ide/editor/ShareJsDoc.js b/services/web/public/src/ide/editor/ShareJsDoc.js index 96bdf244c4..78ab0b31d0 100644 --- a/services/web/public/src/ide/editor/ShareJsDoc.js +++ b/services/web/public/src/ide/editor/ShareJsDoc.js @@ -137,6 +137,98 @@ define(['utils/EventEmitter', 'libs/sharejs'], function(EventEmitter, ShareJs) { return this._doc.submitOp(...Array.from(args || [])) } + // The following code puts out of order messages into a queue + // so that they can be processed in order. This is a workaround + // for messages being delayed by redis cluster. + // FIXME: REMOVE THIS WHEN REDIS PUBSUB IS SENDING MESSAGES IN ORDER + _isAheadOfExpectedVersion(message) { + return this._doc.version > 0 && message.v > this._doc.version + } + + _pushOntoQueue(message) { + sl_console.log(`[processUpdate] push onto queue ${message.v}`) + // set a timer so that we never leave messages in the queue indefinitely + if (!this.queuedMessageTimer) { + this.queuedMessageTimer = setTimeout(() => { + sl_console.log( + `[processUpdate] queue timeout fired for ${message.v}` + ) + // force the message to be processed after the timeout, + // it will cause an error if the missing update has not arrived + this.processUpdateFromServer(message) + }, this.INFLIGHT_OP_TIMEOUT) + } + this.queuedMessages.push(message) + // keep the queue in order, lowest version first + this.queuedMessages.sort(function(a, b) { + return a.v - b.v + }) + } + + _clearQueue() { + this.queuedMessages = [] + } + + _processQueue() { + while (this.queuedMessages.length > 0) { + nextAvailableVersion = this.queuedMessages[0].v + if (nextAvailableVersion === this._doc.version) { + // if the right version is on the queue, apply it + sl_console.log( + `[processUpdate] taken from queue ${nextAvailableVersion}` + ) + this.processUpdateFromServerInOrder(this.queuedMessages.shift()) + break + } else if (nextAvailableVersion <= this._doc.version) { + // discard old updates if they are in the queue (since we are only + // putting updates ahead of the current version in the queue this + // shouldn't happen, but we handle it anyway for safety.) + sl_console.log( + `[processUpdate] discarded from queue ${nextAvailableVersion}` + ) + this.queuedMessages.shift() + } else { + // there are updates we still can't apply yet + break + } + } + // clear the pending timer if the queue has now been cleared + if (this.queuedMessages.length === 0 && this.queuedMessageTimer) { + sl_console.log('[processUpdate] queue is empty, cleared timeout') + clearTimeout(this.queuedMessageTimer) + this.queuedMessageTimer = null + } + } + + // FIXME: This is the new method which reorders incoming updates if needed + // called from Document.js + processUpdateFromServerInOrder(message) { + // Create an array to hold queued messages + if (!this.queuedMessages) { + this.queuedMessages = [] + } + // Is this update ahead of the next expected update? + // If so, put it on a queue to be handled later. + if (this._isAheadOfExpectedVersion(message)) { + this._pushOntoQueue(message) + return // defer processing this update for now + } + var error = this.processUpdateFromServer(message) + if ( + error instanceof Error && + error.message === 'Invalid version from server' + ) { + // if there was an error, abandon the queued updates ahead of this one + this._clearQueue() + return + } + // Do we have any messages queued up? + // find the next message if available + this._processQueue() + } + + // FIXME: This is the original method. Switch back to this when redis + // issues are resolved. processUpdateFromServer(message) { try { this._doc._onMessage(message) @@ -144,6 +236,7 @@ define(['utils/EventEmitter', 'libs/sharejs'], function(EventEmitter, ShareJs) { // Version mismatches are thrown as errors console.log(error) this._handleError(error) + return error // return the error for queue handling } if (