diff --git a/services/real-time/app/js/DocumentUpdaterManager.js b/services/real-time/app/js/DocumentUpdaterManager.js index 925d9f38d8..39bef96ebd 100644 --- a/services/real-time/app/js/DocumentUpdaterManager.js +++ b/services/real-time/app/js/DocumentUpdaterManager.js @@ -88,6 +88,15 @@ const DocumentUpdaterManager = { }) }, + _getPendingUpdateListKey() { + const shard = _.random(0, settings.pendingUpdateListShardCount - 1) + if (shard === 0) { + return 'pending-updates-list' + } else { + return `pending-updates-list-${shard}` + } + }, + queueChange(project_id, doc_id, change, callback) { const allowedKeys = [ 'doc', @@ -123,9 +132,12 @@ const DocumentUpdaterManager = { error = new OError('error pushing update into redis').withCause(error) return callback(error) } - rclient.rpush('pending-updates-list', doc_key, function (error) { + const queueKey = DocumentUpdaterManager._getPendingUpdateListKey() + rclient.rpush(queueKey, doc_key, function (error) { if (error) { - error = new OError('error pushing doc_id into redis').withCause(error) + error = new OError('error pushing doc_id into redis') + .withInfo({ queueKey }) + .withCause(error) } callback(error) }) diff --git a/services/real-time/config/settings.defaults.js b/services/real-time/config/settings.defaults.js index 5c0d501b50..486b686083 100644 --- a/services/real-time/config/settings.defaults.js +++ b/services/real-time/config/settings.defaults.js @@ -114,6 +114,12 @@ const settings = { max_doc_length: 2 * 1024 * 1024, // 2mb + // should be set to the same same as dispatcherCount in document updater + pendingUpdateListShardCount: parseInt( + process.env.PENDING_UPDATE_LIST_SHARD_COUNT || 10, + 10 + ), + // combine // max_doc_length (2mb see above) * 2 (delete + insert) // max_ranges_size (3mb see MAX_RANGES_SIZE in document-updater) diff --git a/services/real-time/test/unit/js/DocumentUpdaterControllerTests.js b/services/real-time/test/unit/js/DocumentUpdaterControllerTests.js index 832ad9490a..0aabc51de7 100644 --- a/services/real-time/test/unit/js/DocumentUpdaterControllerTests.js +++ b/services/real-time/test/unit/js/DocumentUpdaterControllerTests.js @@ -27,7 +27,7 @@ describe('DocumentUpdaterController', function () { this.io = { mock: 'socket.io' } this.rclient = [] this.RoomEvents = { on: sinon.stub() } - return (this.EditorUpdatesController = SandboxedModule.require(modulePath, { + this.EditorUpdatesController = SandboxedModule.require(modulePath, { requires: { 'logger-sharelatex': (this.logger = { error: sinon.stub(), @@ -46,13 +46,17 @@ describe('DocumentUpdaterController', function () { pubsub: null } }), - '@overleaf/redis-wrapper': (this.redis = { - createClient: (name) => { - let rclientStub - this.rclient.push((rclientStub = { name })) - return rclientStub + './RedisClientManager': { + createClientList: () => { + this.redis = { + createClient: (name) => { + let rclientStub + this.rclient.push((rclientStub = { name })) + return rclientStub + } + } } - }), + }, './SafeJsonParse': (this.SafeJsonParse = { parse: (data, cb) => cb(null, JSON.parse(data)) }), @@ -64,7 +68,7 @@ describe('DocumentUpdaterController', function () { }), './ChannelManager': (this.ChannelManager = {}) } - })) + }) }) describe('listenForUpdatesFromDocumentUpdater', function () { @@ -78,22 +82,20 @@ describe('DocumentUpdaterController', function () { this.rclient[0].on = sinon.stub() this.rclient[1].subscribe = sinon.stub() this.rclient[1].on = sinon.stub() - return this.EditorUpdatesController.listenForUpdatesFromDocumentUpdater() + this.EditorUpdatesController.listenForUpdatesFromDocumentUpdater() }) it('should subscribe to the doc-updater stream', function () { - return this.rclient[0].subscribe - .calledWith('applied-ops') - .should.equal(true) + this.rclient[0].subscribe.calledWith('applied-ops').should.equal(true) }) it('should register a callback to handle updates', function () { - return this.rclient[0].on.calledWith('message').should.equal(true) + this.rclient[0].on.calledWith('message').should.equal(true) }) - return it('should subscribe to any additional doc-updater stream', function () { + it('should subscribe to any additional doc-updater stream', function () { this.rclient[1].subscribe.calledWith('applied-ops').should.equal(true) - return this.rclient[1].on.calledWith('message').should.equal(true) + this.rclient[1].on.calledWith('message').should.equal(true) }) }) @@ -110,7 +112,7 @@ describe('DocumentUpdaterController', function () { ) }) - return it('should log an error', function () { + it('should log an error', function () { return this.logger.error.called.should.equal(true) }) }) @@ -129,14 +131,14 @@ describe('DocumentUpdaterController', function () { ) }) - return it('should apply the update', function () { + it('should apply the update', function () { return this.EditorUpdatesController._applyUpdateFromDocumentUpdater .calledWith(this.io, this.doc_id, this.message.op) .should.equal(true) }) }) - return describe('with error', function () { + describe('with error', function () { beforeEach(function () { this.message = { doc_id: this.doc_id, diff --git a/services/real-time/test/unit/js/DocumentUpdaterManagerTests.js b/services/real-time/test/unit/js/DocumentUpdaterManagerTests.js index d6da97ddab..2a5fdf530f 100644 --- a/services/real-time/test/unit/js/DocumentUpdaterManagerTests.js +++ b/services/real-time/test/unit/js/DocumentUpdaterManagerTests.js @@ -15,6 +15,7 @@ const sinon = require('sinon') const SandboxedModule = require('sandboxed-module') const path = require('path') const modulePath = '../../../app/js/DocumentUpdaterManager' +const _ = require('underscore') describe('DocumentUpdaterManager', function () { beforeEach(function () { @@ -34,7 +35,8 @@ describe('DocumentUpdaterManager', function () { } } }, - maxUpdateSize: 7 * 1024 * 1024 + maxUpdateSize: 7 * 1024 * 1024, + pendingUpdateListShardCount: 10 } this.rclient = { auth() {} } @@ -256,7 +258,7 @@ describe('DocumentUpdaterManager', function () { }) }) - return describe('queueChange', function () { + describe('queueChange', function () { beforeEach(function () { this.change = { doc: '1234567890', @@ -269,7 +271,12 @@ describe('DocumentUpdaterManager', function () { describe('successfully', function () { beforeEach(function () { - return this.DocumentUpdaterManager.queueChange( + this.pendingUpdateListKey = `pending-updates-list-key-${Math.random()}` + + this.DocumentUpdaterManager._getPendingUpdateListKey = sinon + .stub() + .returns(this.pendingUpdateListKey) + this.DocumentUpdaterManager.queueChange( this.project_id, this.doc_id, this.change, @@ -278,7 +285,7 @@ describe('DocumentUpdaterManager', function () { }) it('should push the change', function () { - return this.rclient.rpush + this.rclient.rpush .calledWith( `PendingUpdates:${this.doc_id}`, JSON.stringify(this.change) @@ -286,10 +293,10 @@ describe('DocumentUpdaterManager', function () { .should.equal(true) }) - return it('should notify the doc updater of the change via the pending-updates-list queue', function () { - return this.rclient.rpush + it('should notify the doc updater of the change via the pending-updates-list queue', function () { + this.rclient.rpush .calledWith( - 'pending-updates-list', + this.pendingUpdateListKey, `${this.project_id}:${this.doc_id}` ) .should.equal(true) @@ -366,7 +373,7 @@ describe('DocumentUpdaterManager', function () { }) }) - return describe('with invalid keys', function () { + describe('with invalid keys', function () { beforeEach(function () { this.change = { op: [{ d: 'test', p: 345 }], @@ -380,7 +387,7 @@ describe('DocumentUpdaterManager', function () { ) }) - return it('should remove the invalid keys from the change', function () { + it('should remove the invalid keys from the change', function () { return this.rclient.rpush .calledWith( `PendingUpdates:${this.doc_id}`, @@ -390,4 +397,31 @@ describe('DocumentUpdaterManager', function () { }) }) }) + + describe('_getPendingUpdateListKey', function () { + beforeEach(function () { + const keys = _.times( + 10000, + this.DocumentUpdaterManager._getPendingUpdateListKey + ) + this.keys = _.unique(keys) + }) + it('should return normal pending updates key', function () { + _.contains(this.keys, 'pending-updates-list').should.equal(true) + }) + + it('should return pending-updates-list-n keys', function () { + _.contains(this.keys, 'pending-updates-list-1').should.equal(true) + _.contains(this.keys, 'pending-updates-list-3').should.equal(true) + _.contains(this.keys, 'pending-updates-list-9').should.equal(true) + }) + + it('should not include pending-updates-list-0 key', function () { + _.contains(this.keys, 'pending-updates-list-0').should.equal(false) + }) + + it('should not include maximum as pendingUpdateListShardCount value', function () { + _.contains(this.keys, 'pending-updates-list-10').should.equal(false) + }) + }) })