This commit is contained in:
Henry Oswald 2014-09-04 12:40:34 +01:00
commit 8b7d92b149
6 changed files with 133 additions and 16 deletions

View file

@ -10,13 +10,5 @@ install:
- npm install
- grunt install
before_script:
- grunt forever:app:start
script:
- grunt test:unit
- grunt test:acceptance
services:
- redis-server
- mongodb

View file

@ -3,8 +3,9 @@ http = require("http")
Settings = require('settings-sharelatex')
logger = require('logger-sharelatex')
logger.initialize("documentupdater")
RedisManager = require('./app/js/RedisManager.js')
UpdateManager = require('./app/js/UpdateManager.js')
RedisManager = require('./app/js/RedisManager')
UpdateManager = require('./app/js/UpdateManager')
DispatchManager = require('./app/js/DispatchManager')
Keys = require('./app/js/RedisKeyBuilder')
redis = require('redis')
Errors = require "./app/js/Errors"
@ -32,7 +33,9 @@ rclient.on "message", (channel, doc_key) ->
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error?
else
logger.log project_id: project_id, doc_id: doc_id, "ignoring incoming update"
logger.log project_id: project_id, doc_id: doc_id, "ignoring incoming update"
DispatchManager.createAndStartDispatchers(Settings.dispatcherCount || 10)
UpdateManager.resumeProcessing()

View file

@ -0,0 +1,44 @@
Settings = require('settings-sharelatex')
logger = require('logger-sharelatex')
Keys = require('./RedisKeyBuilder')
redis = require('redis')
UpdateManager = require('./UpdateManager')
Metrics = require('./Metrics')
module.exports = DispatchManager =
createDispatcher: () ->
redisConf = Settings.redis.web
client = redis.createClient(redisConf.port, redisConf.host)
client.auth(redisConf.password)
worker = {
client: client
_waitForUpdateThenDispatchWorker: (callback = (error) ->) ->
timer = new Metrics.Timer "worker.waiting"
worker.client.blpop "pending-updates-list", 0, (error, result) ->
timer.done()
return callback(error) if error?
return callback() if !result?
[list_name, doc_key] = result
[project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key)
# Dispatch this in the background
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error?
callback()
run: () ->
return if Settings.shuttingDown
worker._waitForUpdateThenDispatchWorker (error) =>
if error?
logger.error err: error, "Error in worker process"
throw error
else
worker.run()
}
return worker
createAndStartDispatchers: (number) ->
for i in [1..number]
worker = DispatchManager.createDispatcher()
worker.run()

View file

@ -1,6 +1,11 @@
{
"name": "document-updater-sharelatex",
"version": "0.0.1",
"version": "0.1.0",
"description": "An API for applying incoming updates to documents in real-time",
"repository": {
"type": "git",
"url": "https://github.com/sharelatex/document-updater-sharelatex.git"
},
"dependencies": {
"express": "3.3.4",
"underscore": "1.2.2",
@ -12,9 +17,9 @@
"async": "",
"lynx": "0.0.11",
"coffee-script": "1.4.0",
"settings-sharelatex": "git+https://github.com/sharelatex/settings-sharelatex.git#master",
"logger-sharelatex": "git+https://github.com/sharelatex/logger-sharelatex.git#master",
"metrics-sharelatex": "git+https://github.com/sharelatex/metrics-sharelatex.git#master",
"settings-sharelatex": "git+https://github.com/sharelatex/settings-sharelatex.git#v1.0.0",
"logger-sharelatex": "git+https://github.com/sharelatex/logger-sharelatex.git#v1.0.0",
"metrics-sharelatex": "git+https://github.com/sharelatex/metrics-sharelatex.git#v1.0.0",
"sinon": "~1.5.2",
"mongojs": "0.9.11"
},

View file

@ -14,7 +14,7 @@ module.exports = DocUpdaterClient =
doc_key = "#{project_id}:#{doc_id}"
rclient.sadd "DocsWithPendingUpdates", doc_key, (error) ->
return callback(error) if error?
rclient.publish "pending-updates", doc_key, callback
rclient.rpush "pending-updates-list", doc_key, callback
sendUpdates: (project_id, doc_id, updates, callback = (error) ->) ->
DocUpdaterClient.preloadDoc project_id, doc_id, (error) ->

View file

@ -0,0 +1,73 @@
sinon = require('sinon')
chai = require('chai')
should = chai.should()
modulePath = "../../../../app/js/DispatchManager.js"
SandboxedModule = require('sandboxed-module')
describe "DispatchManager", ->
beforeEach ->
@DispatchManager = SandboxedModule.require modulePath, requires:
"./UpdateManager" : @UpdateManager = {}
"logger-sharelatex": @logger = { log: sinon.stub() }
"settings-sharelatex": @settings =
redis:
web: {}
"redis": @redis = {}
@callback = sinon.stub()
describe "each worker", ->
beforeEach ->
@client =
auth: sinon.stub()
@redis.createClient = sinon.stub().returns @client
@worker = @DispatchManager.createDispatcher()
it "should create a new redis client", ->
@redis.createClient.called.should.equal true
describe "_waitForUpdateThenDispatchWorker", ->
beforeEach ->
@project_id = "project-id-123"
@doc_id = "doc-id-123"
@doc_key = "#{@project_id}:#{@doc_id}"
@client.blpop = sinon.stub().callsArgWith(2, null, ["pending-updates-list", @doc_key])
@UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2)
@worker._waitForUpdateThenDispatchWorker @callback
it "should call redis with BLPOP", ->
@client.blpop
.calledWith("pending-updates-list", 0)
.should.equal true
it "should call processOutstandingUpdatesWithLock", ->
@UpdateManager.processOutstandingUpdatesWithLock
.calledWith(@project_id, @doc_id)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "run", ->
it "should call _waitForUpdateThenDispatchWorker until shutting down", (done) ->
callCount = 0
@worker._waitForUpdateThenDispatchWorker = (callback = (error) ->) =>
callCount++
if callCount == 3
@settings.shuttingDown = true
setTimeout () ->
callback()
, 10
sinon.spy @worker, "_waitForUpdateThenDispatchWorker"
@worker.run()
setTimeout () =>
@worker._waitForUpdateThenDispatchWorker.callCount.should.equal 3
done()
, 100