Merge pull request #1671 from sharelatex/bg-reorder-messages

queue incoming out of order updates to avoid out of sync errors

GitOrigin-RevId: ed666bedea442facc92e88227abe68fdce0aebd8
This commit is contained in:
Brian Gough 2019-03-27 14:06:10 +00:00 committed by sharelatex
parent 7b1832a96e
commit 9a48142919
2 changed files with 95 additions and 1 deletions

View file

@ -424,7 +424,8 @@ define([
this.ide.pushEvent('received-update:processing', { this.ide.pushEvent('received-update:processing', {
update update
}) })
this.doc.processUpdateFromServer(update) // FIXME: change this back to processUpdateFromServer when redis fixed
this.doc.processUpdateFromServerInOrder(update)
if (!this.wantToBeJoined) { if (!this.wantToBeJoined) {
return this.leave() return this.leave()

View file

@ -137,6 +137,98 @@ define(['utils/EventEmitter', 'libs/sharejs'], function(EventEmitter, ShareJs) {
return this._doc.submitOp(...Array.from(args || [])) return this._doc.submitOp(...Array.from(args || []))
} }
// The following code puts out of order messages into a queue
// so that they can be processed in order. This is a workaround
// for messages being delayed by redis cluster.
// FIXME: REMOVE THIS WHEN REDIS PUBSUB IS SENDING MESSAGES IN ORDER
_isAheadOfExpectedVersion(message) {
return this._doc.version > 0 && message.v > this._doc.version
}
_pushOntoQueue(message) {
sl_console.log(`[processUpdate] push onto queue ${message.v}`)
// set a timer so that we never leave messages in the queue indefinitely
if (!this.queuedMessageTimer) {
this.queuedMessageTimer = setTimeout(() => {
sl_console.log(
`[processUpdate] queue timeout fired for ${message.v}`
)
// force the message to be processed after the timeout,
// it will cause an error if the missing update has not arrived
this.processUpdateFromServer(message)
}, this.INFLIGHT_OP_TIMEOUT)
}
this.queuedMessages.push(message)
// keep the queue in order, lowest version first
this.queuedMessages.sort(function(a, b) {
return a.v - b.v
})
}
_clearQueue() {
this.queuedMessages = []
}
_processQueue() {
while (this.queuedMessages.length > 0) {
nextAvailableVersion = this.queuedMessages[0].v
if (nextAvailableVersion === this._doc.version) {
// if the right version is on the queue, apply it
sl_console.log(
`[processUpdate] taken from queue ${nextAvailableVersion}`
)
this.processUpdateFromServerInOrder(this.queuedMessages.shift())
break
} else if (nextAvailableVersion <= this._doc.version) {
// discard old updates if they are in the queue (since we are only
// putting updates ahead of the current version in the queue this
// shouldn't happen, but we handle it anyway for safety.)
sl_console.log(
`[processUpdate] discarded from queue ${nextAvailableVersion}`
)
this.queuedMessages.shift()
} else {
// there are updates we still can't apply yet
break
}
}
// clear the pending timer if the queue has now been cleared
if (this.queuedMessages.length === 0 && this.queuedMessageTimer) {
sl_console.log('[processUpdate] queue is empty, cleared timeout')
clearTimeout(this.queuedMessageTimer)
this.queuedMessageTimer = null
}
}
// FIXME: This is the new method which reorders incoming updates if needed
// called from Document.js
processUpdateFromServerInOrder(message) {
// Create an array to hold queued messages
if (!this.queuedMessages) {
this.queuedMessages = []
}
// Is this update ahead of the next expected update?
// If so, put it on a queue to be handled later.
if (this._isAheadOfExpectedVersion(message)) {
this._pushOntoQueue(message)
return // defer processing this update for now
}
var error = this.processUpdateFromServer(message)
if (
error instanceof Error &&
error.message === 'Invalid version from server'
) {
// if there was an error, abandon the queued updates ahead of this one
this._clearQueue()
return
}
// Do we have any messages queued up?
// find the next message if available
this._processQueue()
}
// FIXME: This is the original method. Switch back to this when redis
// issues are resolved.
processUpdateFromServer(message) { processUpdateFromServer(message) {
try { try {
this._doc._onMessage(message) this._doc._onMessage(message)
@ -144,6 +236,7 @@ define(['utils/EventEmitter', 'libs/sharejs'], function(EventEmitter, ShareJs) {
// Version mismatches are thrown as errors // Version mismatches are thrown as errors
console.log(error) console.log(error)
this._handleError(error) this._handleError(error)
return error // return the error for queue handling
} }
if ( if (