mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
Merge pull request #156 from overleaf/ho-shard-pending-queue
shard the pending-updates-list queue
This commit is contained in:
commit
34bfe9d3b1
5 changed files with 62 additions and 23 deletions
|
@ -34,7 +34,7 @@ app.use(Metrics.http.monitor(logger))
|
|||
app.use(bodyParser.json({ limit: Settings.maxJsonRequestSize }))
|
||||
Metrics.injectMetricsRoute(app)
|
||||
|
||||
DispatchManager.createAndStartDispatchers(Settings.dispatcherCount || 10)
|
||||
DispatchManager.createAndStartDispatchers(Settings.dispatcherCount)
|
||||
|
||||
app.param('project_id', (req, res, next, projectId) => {
|
||||
if (projectId != null && projectId.match(/^[0-9a-f]{24}$/)) {
|
||||
|
|
|
@ -20,13 +20,21 @@ const logger = require('logger-sharelatex')
|
|||
const Keys = require('./UpdateKeys')
|
||||
const redis = require('@overleaf/redis-wrapper')
|
||||
const Errors = require('./Errors')
|
||||
const _ = require('lodash')
|
||||
|
||||
const UpdateManager = require('./UpdateManager')
|
||||
const Metrics = require('./Metrics')
|
||||
const RateLimitManager = require('./RateLimitManager')
|
||||
|
||||
module.exports = DispatchManager = {
|
||||
createDispatcher(RateLimiter) {
|
||||
createDispatcher(RateLimiter, queueShardNumber) {
|
||||
let pendingListKey
|
||||
if (queueShardNumber === 0) {
|
||||
pendingListKey = 'pending-updates-list'
|
||||
} else {
|
||||
pendingListKey = `pending-updates-list-${queueShardNumber}`
|
||||
}
|
||||
|
||||
const client = redis.createClient(Settings.redis.documentupdater)
|
||||
var worker = {
|
||||
client,
|
||||
|
@ -35,11 +43,8 @@ module.exports = DispatchManager = {
|
|||
callback = function (error) {}
|
||||
}
|
||||
const timer = new Metrics.Timer('worker.waiting')
|
||||
return worker.client.blpop('pending-updates-list', 0, function (
|
||||
error,
|
||||
result
|
||||
) {
|
||||
logger.log('getting pending-updates-list', error, result)
|
||||
return worker.client.blpop(pendingListKey, 0, function (error, result) {
|
||||
logger.log(`getting ${queueShardNumber}`, error, result)
|
||||
timer.done()
|
||||
if (error != null) {
|
||||
return callback(error)
|
||||
|
@ -102,17 +107,13 @@ module.exports = DispatchManager = {
|
|||
|
||||
createAndStartDispatchers(number) {
|
||||
const RateLimiter = new RateLimitManager(number)
|
||||
return (() => {
|
||||
const result = []
|
||||
for (
|
||||
let i = 1, end = number, asc = end >= 1;
|
||||
asc ? i <= end : i >= end;
|
||||
asc ? i++ : i--
|
||||
) {
|
||||
const worker = DispatchManager.createDispatcher(RateLimiter)
|
||||
result.push(worker.run())
|
||||
}
|
||||
return result
|
||||
})()
|
||||
_.times(number, function (shardNumber) {
|
||||
return DispatchManager.createDispatcher(RateLimiter, shardNumber).run()
|
||||
})
|
||||
|
||||
// run extra dispatchers on old queue while we migrate
|
||||
_.times(number, function () {
|
||||
return DispatchManager.createDispatcher(RateLimiter, 0).run()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,7 +171,7 @@ module.exports = {
|
|||
maxJsonRequestSize:
|
||||
parseInt(process.env.MAX_JSON_REQUEST_SIZE, 10) || 8 * 1024 * 1024,
|
||||
|
||||
dispatcherCount: process.env.DISPATCHER_COUNT,
|
||||
dispatcherCount: parseInt(process.env.DISPATCHER_COUNT || 10, 10),
|
||||
|
||||
mongo: {
|
||||
options: {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
let DocUpdaterClient
|
||||
const Settings = require('settings-sharelatex')
|
||||
const _ = require('lodash')
|
||||
const rclient = require('@overleaf/redis-wrapper').createClient(
|
||||
Settings.redis.documentupdater
|
||||
)
|
||||
|
@ -26,6 +27,15 @@ module.exports = DocUpdaterClient = {
|
|||
rclientSub.on('message', callback)
|
||||
},
|
||||
|
||||
_getPendingUpdateListKey() {
|
||||
const shard = _.random(0, Settings.dispatcherCount - 1)
|
||||
if (shard === 0) {
|
||||
return 'pending-updates-list'
|
||||
} else {
|
||||
return `pending-updates-list-${shard}`
|
||||
}
|
||||
},
|
||||
|
||||
sendUpdate(projectId, docId, update, callback) {
|
||||
rclient.rpush(
|
||||
keys.pendingUpdates({ doc_id: docId }),
|
||||
|
@ -39,7 +49,12 @@ module.exports = DocUpdaterClient = {
|
|||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
rclient.rpush('pending-updates-list', docKey, callback)
|
||||
|
||||
rclient.rpush(
|
||||
DocUpdaterClient._getPendingUpdateListKey(),
|
||||
docKey,
|
||||
callback
|
||||
)
|
||||
})
|
||||
}
|
||||
)
|
||||
|
|
|
@ -64,7 +64,8 @@ describe('DispatchManager', function () {
|
|||
this.client = { auth: sinon.stub() }
|
||||
this.redis.createClient = sinon.stub().returns(this.client)
|
||||
return (this.worker = this.DispatchManager.createDispatcher(
|
||||
this.RateLimiter
|
||||
this.RateLimiter,
|
||||
0
|
||||
))
|
||||
})
|
||||
|
||||
|
@ -129,7 +130,7 @@ describe('DispatchManager', function () {
|
|||
})
|
||||
})
|
||||
|
||||
return describe("with a 'Delete component' error", function () {
|
||||
describe("with a 'Delete component' error", function () {
|
||||
beforeEach(function () {
|
||||
this.UpdateManager.processOutstandingUpdatesWithLock = sinon
|
||||
.stub()
|
||||
|
@ -145,6 +146,28 @@ describe('DispatchManager', function () {
|
|||
return this.callback.called.should.equal(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('pending updates list with shard key', function () {
|
||||
beforeEach(function (done) {
|
||||
this.client = {
|
||||
auth: sinon.stub(),
|
||||
blpop: sinon.stub().callsArgWith(2)
|
||||
}
|
||||
this.redis.createClient = sinon.stub().returns(this.client)
|
||||
this.queueShardNumber = 7
|
||||
this.worker = this.DispatchManager.createDispatcher(
|
||||
this.RateLimiter,
|
||||
this.queueShardNumber
|
||||
)
|
||||
this.worker._waitForUpdateThenDispatchWorker(done)
|
||||
})
|
||||
|
||||
it('should call redis with BLPOP with the correct key', function () {
|
||||
this.client.blpop
|
||||
.calledWith(`pending-updates-list-${this.queueShardNumber}`, 0)
|
||||
.should.equal(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
return describe('run', function () {
|
||||
|
|
Loading…
Reference in a new issue