Merge pull request #205 from overleaf/ho-pending-update-shard

Pending update shard
This commit is contained in:
Henry Oswald 2021-02-22 09:08:35 +00:00 committed by GitHub
commit 9247eb4249
4 changed files with 83 additions and 29 deletions

View file

@ -88,6 +88,15 @@ const DocumentUpdaterManager = {
}) })
}, },
_getPendingUpdateListKey() {
const shard = _.random(0, settings.pendingUpdateListShardCount - 1)
if (shard === 0) {
return 'pending-updates-list'
} else {
return `pending-updates-list-${shard}`
}
},
queueChange(project_id, doc_id, change, callback) { queueChange(project_id, doc_id, change, callback) {
const allowedKeys = [ const allowedKeys = [
'doc', 'doc',
@ -123,9 +132,12 @@ const DocumentUpdaterManager = {
error = new OError('error pushing update into redis').withCause(error) error = new OError('error pushing update into redis').withCause(error)
return callback(error) return callback(error)
} }
rclient.rpush('pending-updates-list', doc_key, function (error) { const queueKey = DocumentUpdaterManager._getPendingUpdateListKey()
rclient.rpush(queueKey, doc_key, function (error) {
if (error) { if (error) {
error = new OError('error pushing doc_id into redis').withCause(error) error = new OError('error pushing doc_id into redis')
.withInfo({ queueKey })
.withCause(error)
} }
callback(error) callback(error)
}) })

View file

@ -114,6 +114,12 @@ const settings = {
max_doc_length: 2 * 1024 * 1024, // 2mb max_doc_length: 2 * 1024 * 1024, // 2mb
// should be set to the same same as dispatcherCount in document updater
pendingUpdateListShardCount: parseInt(
process.env.PENDING_UPDATE_LIST_SHARD_COUNT || 10,
10
),
// combine // combine
// max_doc_length (2mb see above) * 2 (delete + insert) // max_doc_length (2mb see above) * 2 (delete + insert)
// max_ranges_size (3mb see MAX_RANGES_SIZE in document-updater) // max_ranges_size (3mb see MAX_RANGES_SIZE in document-updater)

View file

@ -27,7 +27,7 @@ describe('DocumentUpdaterController', function () {
this.io = { mock: 'socket.io' } this.io = { mock: 'socket.io' }
this.rclient = [] this.rclient = []
this.RoomEvents = { on: sinon.stub() } this.RoomEvents = { on: sinon.stub() }
return (this.EditorUpdatesController = SandboxedModule.require(modulePath, { this.EditorUpdatesController = SandboxedModule.require(modulePath, {
requires: { requires: {
'logger-sharelatex': (this.logger = { 'logger-sharelatex': (this.logger = {
error: sinon.stub(), error: sinon.stub(),
@ -46,13 +46,17 @@ describe('DocumentUpdaterController', function () {
pubsub: null pubsub: null
} }
}), }),
'@overleaf/redis-wrapper': (this.redis = { './RedisClientManager': {
createClientList: () => {
this.redis = {
createClient: (name) => { createClient: (name) => {
let rclientStub let rclientStub
this.rclient.push((rclientStub = { name })) this.rclient.push((rclientStub = { name }))
return rclientStub return rclientStub
} }
}), }
}
},
'./SafeJsonParse': (this.SafeJsonParse = { './SafeJsonParse': (this.SafeJsonParse = {
parse: (data, cb) => cb(null, JSON.parse(data)) parse: (data, cb) => cb(null, JSON.parse(data))
}), }),
@ -64,7 +68,7 @@ describe('DocumentUpdaterController', function () {
}), }),
'./ChannelManager': (this.ChannelManager = {}) './ChannelManager': (this.ChannelManager = {})
} }
})) })
}) })
describe('listenForUpdatesFromDocumentUpdater', function () { describe('listenForUpdatesFromDocumentUpdater', function () {
@ -78,22 +82,20 @@ describe('DocumentUpdaterController', function () {
this.rclient[0].on = sinon.stub() this.rclient[0].on = sinon.stub()
this.rclient[1].subscribe = sinon.stub() this.rclient[1].subscribe = sinon.stub()
this.rclient[1].on = sinon.stub() this.rclient[1].on = sinon.stub()
return this.EditorUpdatesController.listenForUpdatesFromDocumentUpdater() this.EditorUpdatesController.listenForUpdatesFromDocumentUpdater()
}) })
it('should subscribe to the doc-updater stream', function () { it('should subscribe to the doc-updater stream', function () {
return this.rclient[0].subscribe this.rclient[0].subscribe.calledWith('applied-ops').should.equal(true)
.calledWith('applied-ops')
.should.equal(true)
}) })
it('should register a callback to handle updates', function () { it('should register a callback to handle updates', function () {
return this.rclient[0].on.calledWith('message').should.equal(true) this.rclient[0].on.calledWith('message').should.equal(true)
}) })
return it('should subscribe to any additional doc-updater stream', function () { it('should subscribe to any additional doc-updater stream', function () {
this.rclient[1].subscribe.calledWith('applied-ops').should.equal(true) this.rclient[1].subscribe.calledWith('applied-ops').should.equal(true)
return this.rclient[1].on.calledWith('message').should.equal(true) this.rclient[1].on.calledWith('message').should.equal(true)
}) })
}) })
@ -110,7 +112,7 @@ describe('DocumentUpdaterController', function () {
) )
}) })
return it('should log an error', function () { it('should log an error', function () {
return this.logger.error.called.should.equal(true) return this.logger.error.called.should.equal(true)
}) })
}) })
@ -129,14 +131,14 @@ describe('DocumentUpdaterController', function () {
) )
}) })
return it('should apply the update', function () { it('should apply the update', function () {
return this.EditorUpdatesController._applyUpdateFromDocumentUpdater return this.EditorUpdatesController._applyUpdateFromDocumentUpdater
.calledWith(this.io, this.doc_id, this.message.op) .calledWith(this.io, this.doc_id, this.message.op)
.should.equal(true) .should.equal(true)
}) })
}) })
return describe('with error', function () { describe('with error', function () {
beforeEach(function () { beforeEach(function () {
this.message = { this.message = {
doc_id: this.doc_id, doc_id: this.doc_id,

View file

@ -15,6 +15,7 @@ const sinon = require('sinon')
const SandboxedModule = require('sandboxed-module') const SandboxedModule = require('sandboxed-module')
const path = require('path') const path = require('path')
const modulePath = '../../../app/js/DocumentUpdaterManager' const modulePath = '../../../app/js/DocumentUpdaterManager'
const _ = require('underscore')
describe('DocumentUpdaterManager', function () { describe('DocumentUpdaterManager', function () {
beforeEach(function () { beforeEach(function () {
@ -34,7 +35,8 @@ describe('DocumentUpdaterManager', function () {
} }
} }
}, },
maxUpdateSize: 7 * 1024 * 1024 maxUpdateSize: 7 * 1024 * 1024,
pendingUpdateListShardCount: 10
} }
this.rclient = { auth() {} } this.rclient = { auth() {} }
@ -256,7 +258,7 @@ describe('DocumentUpdaterManager', function () {
}) })
}) })
return describe('queueChange', function () { describe('queueChange', function () {
beforeEach(function () { beforeEach(function () {
this.change = { this.change = {
doc: '1234567890', doc: '1234567890',
@ -269,7 +271,12 @@ describe('DocumentUpdaterManager', function () {
describe('successfully', function () { describe('successfully', function () {
beforeEach(function () { beforeEach(function () {
return this.DocumentUpdaterManager.queueChange( this.pendingUpdateListKey = `pending-updates-list-key-${Math.random()}`
this.DocumentUpdaterManager._getPendingUpdateListKey = sinon
.stub()
.returns(this.pendingUpdateListKey)
this.DocumentUpdaterManager.queueChange(
this.project_id, this.project_id,
this.doc_id, this.doc_id,
this.change, this.change,
@ -278,7 +285,7 @@ describe('DocumentUpdaterManager', function () {
}) })
it('should push the change', function () { it('should push the change', function () {
return this.rclient.rpush this.rclient.rpush
.calledWith( .calledWith(
`PendingUpdates:${this.doc_id}`, `PendingUpdates:${this.doc_id}`,
JSON.stringify(this.change) JSON.stringify(this.change)
@ -286,10 +293,10 @@ describe('DocumentUpdaterManager', function () {
.should.equal(true) .should.equal(true)
}) })
return it('should notify the doc updater of the change via the pending-updates-list queue', function () { it('should notify the doc updater of the change via the pending-updates-list queue', function () {
return this.rclient.rpush this.rclient.rpush
.calledWith( .calledWith(
'pending-updates-list', this.pendingUpdateListKey,
`${this.project_id}:${this.doc_id}` `${this.project_id}:${this.doc_id}`
) )
.should.equal(true) .should.equal(true)
@ -366,7 +373,7 @@ describe('DocumentUpdaterManager', function () {
}) })
}) })
return describe('with invalid keys', function () { describe('with invalid keys', function () {
beforeEach(function () { beforeEach(function () {
this.change = { this.change = {
op: [{ d: 'test', p: 345 }], op: [{ d: 'test', p: 345 }],
@ -380,7 +387,7 @@ describe('DocumentUpdaterManager', function () {
) )
}) })
return it('should remove the invalid keys from the change', function () { it('should remove the invalid keys from the change', function () {
return this.rclient.rpush return this.rclient.rpush
.calledWith( .calledWith(
`PendingUpdates:${this.doc_id}`, `PendingUpdates:${this.doc_id}`,
@ -390,4 +397,31 @@ describe('DocumentUpdaterManager', function () {
}) })
}) })
}) })
describe('_getPendingUpdateListKey', function () {
beforeEach(function () {
const keys = _.times(
10000,
this.DocumentUpdaterManager._getPendingUpdateListKey
)
this.keys = _.unique(keys)
})
it('should return normal pending updates key', function () {
_.contains(this.keys, 'pending-updates-list').should.equal(true)
})
it('should return pending-updates-list-n keys', function () {
_.contains(this.keys, 'pending-updates-list-1').should.equal(true)
_.contains(this.keys, 'pending-updates-list-3').should.equal(true)
_.contains(this.keys, 'pending-updates-list-9').should.equal(true)
})
it('should not include pending-updates-list-0 key', function () {
_.contains(this.keys, 'pending-updates-list-0').should.equal(false)
})
it('should not include maximum as pendingUpdateListShardCount value', function () {
_.contains(this.keys, 'pending-updates-list-10').should.equal(false)
})
})
}) })