Listen on queue via BLPOP rather than pub/sub

This commit is contained in:
James Allen 2014-08-07 11:45:19 +01:00
parent 946f451155
commit cab5509076
4 changed files with 123 additions and 4 deletions

View file

@ -3,8 +3,9 @@ http = require("http")
Settings = require('settings-sharelatex') Settings = require('settings-sharelatex')
logger = require('logger-sharelatex') logger = require('logger-sharelatex')
logger.initialize("documentupdater") logger.initialize("documentupdater")
RedisManager = require('./app/js/RedisManager.js') RedisManager = require('./app/js/RedisManager')
UpdateManager = require('./app/js/UpdateManager.js') UpdateManager = require('./app/js/UpdateManager')
WorkersManager = require('./app/js/WorkersManager')
Keys = require('./app/js/RedisKeyBuilder') Keys = require('./app/js/RedisKeyBuilder')
redis = require('redis') redis = require('redis')
Errors = require "./app/js/Errors" Errors = require "./app/js/Errors"
@ -32,7 +33,9 @@ rclient.on "message", (channel, doc_key) ->
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) -> UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error? logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error?
else else
logger.log project_id: project_id, doc_id: doc_id, "ignoring incoming update" logger.log project_id: project_id, doc_id: doc_id, "ignoring incoming update"
WorkersManager.createAndStartWorkers(Settings.workerCount || 10)
UpdateManager.resumeProcessing() UpdateManager.resumeProcessing()

View file

@ -0,0 +1,43 @@
Settings = require('settings-sharelatex')
logger = require('logger-sharelatex')
Keys = require('./RedisKeyBuilder')
redis = require('redis')
UpdateManager = require('./UpdateManager')
module.exports = WorkersManager =
createWorker: () ->
redisConf = Settings.redis.web
client = redis.createClient(redisConf.port, redisConf.host)
client.auth(redisConf.password)
worker = {
client: client
waitForAndProcessUpdate: (callback = (error) ->) ->
worker.client.blpop "pending-updates-list", 0, (error, result) ->
return callback(error) if error?
return callback() if !result?
[list_name, doc_key] = result
[project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key)
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error?
return callback(error) if error?
return callback()
run: () ->
return if Settings.shuttingDown
worker.waitForAndProcessUpdate (error) =>
if error?
logger.error err: error, "Error in worker process, waiting 1 second before continuing"
setTimeout () ->
worker.run()
, 1000
else
worker.run()
}
return worker
createAndStartWorkers: (number) ->
for i in [1..number]
worker = WorkersManager.createWorker()
worker.run()

View file

@ -14,7 +14,7 @@ module.exports = DocUpdaterClient =
doc_key = "#{project_id}:#{doc_id}" doc_key = "#{project_id}:#{doc_id}"
rclient.sadd "DocsWithPendingUpdates", doc_key, (error) -> rclient.sadd "DocsWithPendingUpdates", doc_key, (error) ->
return callback(error) if error? return callback(error) if error?
rclient.publish "pending-updates", doc_key, callback rclient.rpush "pending-updates-list", doc_key, callback
sendUpdates: (project_id, doc_id, updates, callback = (error) ->) -> sendUpdates: (project_id, doc_id, updates, callback = (error) ->) ->
DocUpdaterClient.preloadDoc project_id, doc_id, (error) -> DocUpdaterClient.preloadDoc project_id, doc_id, (error) ->

View file

@ -0,0 +1,73 @@
sinon = require('sinon')
chai = require('chai')
should = chai.should()
modulePath = "../../../../app/js/WorkersManager.js"
SandboxedModule = require('sandboxed-module')
describe "WorkersManager", ->
beforeEach ->
@WorkersManager = SandboxedModule.require modulePath, requires:
"./UpdateManager" : @UpdateManager = {}
"logger-sharelatex": @logger = { log: sinon.stub() }
"settings-sharelatex": @settings =
redis:
web: {}
"redis": @redis = {}
@callback = sinon.stub()
describe "each worker", ->
beforeEach ->
@client =
auth: sinon.stub()
@redis.createClient = sinon.stub().returns @client
@worker = @WorkersManager.createWorker()
it "should create a new redis client", ->
@redis.createClient.called.should.equal true
describe "waitForAndProcessUpdate", ->
beforeEach ->
@project_id = "project-id-123"
@doc_id = "doc-id-123"
@doc_key = "#{@project_id}:#{@doc_id}"
@client.blpop = sinon.stub().callsArgWith(2, null, ["pending-updates-list", @doc_key])
@UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2)
@worker.waitForAndProcessUpdate @callback
it "should call redis with BLPOP", ->
@client.blpop
.calledWith("pending-updates-list", 0)
.should.equal true
it "should call processOutstandingUpdatesWithLock", ->
@UpdateManager.processOutstandingUpdatesWithLock
.calledWith(@project_id, @doc_id)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "run", ->
it "should call waitForAndProcessUpdate until shutting down", (done) ->
callCount = 0
@worker.waitForAndProcessUpdate = (callback = (error) ->) =>
callCount++
if callCount == 3
@settings.shuttingDown = true
setTimeout () ->
callback()
, 10
sinon.spy @worker, "waitForAndProcessUpdate"
@worker.run()
setTimeout () =>
@worker.waitForAndProcessUpdate.callCount.should.equal 3
done()
, 100