From 11c8cfc9396ad205496b725b93547ff9f9976f1c Mon Sep 17 00:00:00 2001 From: Henry Oswald Date: Tue, 2 Feb 2021 15:10:04 +0000 Subject: [PATCH 1/6] shard the pending-updates-list queue --- services/document-updater/app.js | 2 +- .../app/js/DispatchManager.js | 29 +++++++++---------- .../config/settings.defaults.js | 2 +- .../acceptance/js/helpers/DocUpdaterClient.js | 17 ++++++++++- .../DispatchManager/DispatchManagerTests.js | 27 +++++++++++++++-- 5 files changed, 57 insertions(+), 20 deletions(-) diff --git a/services/document-updater/app.js b/services/document-updater/app.js index 6d1cc43b82..d8b67dd31e 100644 --- a/services/document-updater/app.js +++ b/services/document-updater/app.js @@ -34,7 +34,7 @@ app.use(Metrics.http.monitor(logger)) app.use(bodyParser.json({ limit: Settings.maxJsonRequestSize })) Metrics.injectMetricsRoute(app) -DispatchManager.createAndStartDispatchers(Settings.dispatcherCount || 10) +DispatchManager.createAndStartDispatchers(Settings.dispatcherCount) app.param('project_id', (req, res, next, projectId) => { if (projectId != null && projectId.match(/^[0-9a-f]{24}$/)) { diff --git a/services/document-updater/app/js/DispatchManager.js b/services/document-updater/app/js/DispatchManager.js index aa7c4f1f0e..97bd7eafbd 100644 --- a/services/document-updater/app/js/DispatchManager.js +++ b/services/document-updater/app/js/DispatchManager.js @@ -20,13 +20,21 @@ const logger = require('logger-sharelatex') const Keys = require('./UpdateKeys') const redis = require('@overleaf/redis-wrapper') const Errors = require('./Errors') +const _ = require('lodash') const UpdateManager = require('./UpdateManager') const Metrics = require('./Metrics') const RateLimitManager = require('./RateLimitManager') module.exports = DispatchManager = { - createDispatcher(RateLimiter) { + createDispatcher(RateLimiter, queueShardNumber) { + let pendingListKey + if (queueShardNumber === 0) { + pendingListKey = 'pending-updates-list' + } else { + pendingListKey = `pending-updates-list-${queueShardNumber}` + } + const client = redis.createClient(Settings.redis.documentupdater) var worker = { client, @@ -35,11 +43,8 @@ module.exports = DispatchManager = { callback = function (error) {} } const timer = new Metrics.Timer('worker.waiting') - return worker.client.blpop('pending-updates-list', 0, function ( - error, - result - ) { - logger.log('getting pending-updates-list', error, result) + return worker.client.blpop(pendingListKey, 0, function (error, result) { + logger.log(`getting ${queueShardNumber}`, error, result) timer.done() if (error != null) { return callback(error) @@ -103,15 +108,9 @@ module.exports = DispatchManager = { createAndStartDispatchers(number) { const RateLimiter = new RateLimitManager(number) return (() => { - const result = [] - for ( - let i = 1, end = number, asc = end >= 1; - asc ? i <= end : i >= end; - asc ? i++ : i-- - ) { - const worker = DispatchManager.createDispatcher(RateLimiter) - result.push(worker.run()) - } + const result = _.times(number, function (shardNumber) { + return DispatchManager.createDispatcher(RateLimiter, shardNumber).run() + }) return result })() } diff --git a/services/document-updater/config/settings.defaults.js b/services/document-updater/config/settings.defaults.js index 0228941382..67be229eef 100755 --- a/services/document-updater/config/settings.defaults.js +++ b/services/document-updater/config/settings.defaults.js @@ -171,7 +171,7 @@ module.exports = { maxJsonRequestSize: parseInt(process.env.MAX_JSON_REQUEST_SIZE, 10) || 8 * 1024 * 1024, - dispatcherCount: process.env.DISPATCHER_COUNT, + dispatcherCount: process.env.DISPATCHER_COUNT || 10, mongo: { options: { diff --git a/services/document-updater/test/acceptance/js/helpers/DocUpdaterClient.js b/services/document-updater/test/acceptance/js/helpers/DocUpdaterClient.js index 7156da0c26..d4efa453ec 100644 --- a/services/document-updater/test/acceptance/js/helpers/DocUpdaterClient.js +++ b/services/document-updater/test/acceptance/js/helpers/DocUpdaterClient.js @@ -1,5 +1,6 @@ let DocUpdaterClient const Settings = require('settings-sharelatex') +const _ = require('lodash') const rclient = require('@overleaf/redis-wrapper').createClient( Settings.redis.documentupdater ) @@ -26,6 +27,15 @@ module.exports = DocUpdaterClient = { rclientSub.on('message', callback) }, + _getPendingUpdateListKey() { + const shard = _.random(0, Settings.dispatcherCount) + if (shard === 0) { + return 'pending-updates-list' + } else { + return `pending-updates-list-${shard}` + } + }, + sendUpdate(projectId, docId, update, callback) { rclient.rpush( keys.pendingUpdates({ doc_id: docId }), @@ -39,7 +49,12 @@ module.exports = DocUpdaterClient = { if (error) { return callback(error) } - rclient.rpush('pending-updates-list', docKey, callback) + + rclient.rpush( + DocUpdaterClient._getPendingUpdateListKey(), + docKey, + callback + ) }) } ) diff --git a/services/document-updater/test/unit/js/DispatchManager/DispatchManagerTests.js b/services/document-updater/test/unit/js/DispatchManager/DispatchManagerTests.js index 0907b14e57..5610c4abc1 100644 --- a/services/document-updater/test/unit/js/DispatchManager/DispatchManagerTests.js +++ b/services/document-updater/test/unit/js/DispatchManager/DispatchManagerTests.js @@ -64,7 +64,8 @@ describe('DispatchManager', function () { this.client = { auth: sinon.stub() } this.redis.createClient = sinon.stub().returns(this.client) return (this.worker = this.DispatchManager.createDispatcher( - this.RateLimiter + this.RateLimiter, + 0 )) }) @@ -129,7 +130,7 @@ describe('DispatchManager', function () { }) }) - return describe("with a 'Delete component' error", function () { + describe("with a 'Delete component' error", function () { beforeEach(function () { this.UpdateManager.processOutstandingUpdatesWithLock = sinon .stub() @@ -145,6 +146,28 @@ describe('DispatchManager', function () { return this.callback.called.should.equal(true) }) }) + + describe('pending updates list with shard key', function () { + beforeEach(function (done) { + this.client = { + auth: sinon.stub(), + blpop: sinon.stub().callsArgWith(2) + } + this.redis.createClient = sinon.stub().returns(this.client) + this.queueShardNumber = 7 + this.worker = this.DispatchManager.createDispatcher( + this.RateLimiter, + this.queueShardNumber + ) + this.worker._waitForUpdateThenDispatchWorker(done) + }) + + it('should call redis with BLPOP with the correct key', function () { + this.client.blpop + .calledWith(`pending-updates-list-${this.queueShardNumber}`, 0) + .should.equal(true) + }) + }) }) return describe('run', function () { From 40de9997664c275bfae57d11322064700f67fb9a Mon Sep 17 00:00:00 2001 From: Henry Oswald Date: Thu, 4 Feb 2021 09:30:35 +0000 Subject: [PATCH 2/6] Update config/settings.defaults.js parseint on dispatcher count Co-authored-by: John Lees-Miller --- services/document-updater/config/settings.defaults.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/document-updater/config/settings.defaults.js b/services/document-updater/config/settings.defaults.js index 67be229eef..37c23792f6 100755 --- a/services/document-updater/config/settings.defaults.js +++ b/services/document-updater/config/settings.defaults.js @@ -171,7 +171,7 @@ module.exports = { maxJsonRequestSize: parseInt(process.env.MAX_JSON_REQUEST_SIZE, 10) || 8 * 1024 * 1024, - dispatcherCount: process.env.DISPATCHER_COUNT || 10, + dispatcherCount: parseInt(process.env.DISPATCHER_COUNT || 10, 10) mongo: { options: { From bcfc7e66fc51deefcba21269fc718538be30ef3f Mon Sep 17 00:00:00 2001 From: Henry Oswald Date: Tue, 9 Feb 2021 10:32:16 +0000 Subject: [PATCH 3/6] add missing comma in settings file --- services/document-updater/config/settings.defaults.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/document-updater/config/settings.defaults.js b/services/document-updater/config/settings.defaults.js index 37c23792f6..bac86ff55f 100755 --- a/services/document-updater/config/settings.defaults.js +++ b/services/document-updater/config/settings.defaults.js @@ -171,7 +171,7 @@ module.exports = { maxJsonRequestSize: parseInt(process.env.MAX_JSON_REQUEST_SIZE, 10) || 8 * 1024 * 1024, - dispatcherCount: parseInt(process.env.DISPATCHER_COUNT || 10, 10) + dispatcherCount: parseInt(process.env.DISPATCHER_COUNT || 10, 10), mongo: { options: { From 0cdeffae6cb91c32325466bd1764d36ae38696c3 Mon Sep 17 00:00:00 2001 From: Henry Oswald Date: Tue, 9 Feb 2021 10:50:37 +0000 Subject: [PATCH 4/6] fix off by 1 error in Doc updater client helper file --- .../test/acceptance/js/helpers/DocUpdaterClient.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/document-updater/test/acceptance/js/helpers/DocUpdaterClient.js b/services/document-updater/test/acceptance/js/helpers/DocUpdaterClient.js index d4efa453ec..719df741c3 100644 --- a/services/document-updater/test/acceptance/js/helpers/DocUpdaterClient.js +++ b/services/document-updater/test/acceptance/js/helpers/DocUpdaterClient.js @@ -28,7 +28,7 @@ module.exports = DocUpdaterClient = { }, _getPendingUpdateListKey() { - const shard = _.random(0, Settings.dispatcherCount) + const shard = _.random(0, Settings.dispatcherCount - 1) if (shard === 0) { return 'pending-updates-list' } else { From 854e24bb5784d0a8e8c0d0c1f11f811c132bb5c5 Mon Sep 17 00:00:00 2001 From: Henry Oswald Date: Mon, 15 Feb 2021 14:12:28 +0000 Subject: [PATCH 5/6] remove unneeded anonymous func --- services/document-updater/app/js/DispatchManager.js | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/services/document-updater/app/js/DispatchManager.js b/services/document-updater/app/js/DispatchManager.js index 97bd7eafbd..c600adec7b 100644 --- a/services/document-updater/app/js/DispatchManager.js +++ b/services/document-updater/app/js/DispatchManager.js @@ -107,11 +107,8 @@ module.exports = DispatchManager = { createAndStartDispatchers(number) { const RateLimiter = new RateLimitManager(number) - return (() => { - const result = _.times(number, function (shardNumber) { - return DispatchManager.createDispatcher(RateLimiter, shardNumber).run() - }) - return result - })() + _.times(number, function (shardNumber) { + return DispatchManager.createDispatcher(RateLimiter, shardNumber).run() + }) } } From c7e57cd28fb4ae307e64c3b4ff40513a7efdc226 Mon Sep 17 00:00:00 2001 From: Henry Oswald Date: Mon, 15 Feb 2021 14:16:45 +0000 Subject: [PATCH 6/6] add Dispatchers running on old queue while we migrate revert once migrated --- services/document-updater/app/js/DispatchManager.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/document-updater/app/js/DispatchManager.js b/services/document-updater/app/js/DispatchManager.js index c600adec7b..19942b67cf 100644 --- a/services/document-updater/app/js/DispatchManager.js +++ b/services/document-updater/app/js/DispatchManager.js @@ -110,5 +110,10 @@ module.exports = DispatchManager = { _.times(number, function (shardNumber) { return DispatchManager.createDispatcher(RateLimiter, shardNumber).run() }) + + // run extra dispatchers on old queue while we migrate + _.times(number, function () { + return DispatchManager.createDispatcher(RateLimiter, 0).run() + }) } }