Explicitly separate calls to web and docupdater redis instances

This commit is contained in:
James Allen 2016-06-17 12:17:22 +01:00
parent 9ff3026807
commit bc00aab7b1
12 changed files with 175 additions and 119 deletions

View file

@ -70,25 +70,6 @@ module.exports = RedisManager =
multi.set keys.docVersion(doc_id:doc_id), version
multi.exec (error, replys) -> callback(error)
getPendingUpdatesForDoc : (doc_id, callback)->
multi = rclient.multi()
multi.lrange keys.pendingUpdates(doc_id:doc_id), 0 , -1
multi.del keys.pendingUpdates(doc_id:doc_id)
multi.exec (error, replys) ->
return callback(error) if error?
jsonUpdates = replys[0]
updates = []
for jsonUpdate in jsonUpdates
try
update = JSON.parse jsonUpdate
catch e
return callback e
updates.push update
callback error, updates
getUpdatesLength: (doc_id, callback)->
rclient.llen keys.pendingUpdates(doc_id:doc_id), callback
getPreviousDocOps: (doc_id, start, end, callback = (error, jsonOps) ->) ->
rclient.llen keys.docOps(doc_id: doc_id), (error, length) ->
return callback(error) if error?

View file

@ -1,9 +1,7 @@
settings = require "settings-sharelatex"
request = require "request"
logger = require "logger-sharelatex"
redis = require("redis-sharelatex")
rclient = redis.createClient(settings.redis.web)
async = require "async"
WebRedisManager = require "./WebRedisManager"
module.exports = TrackChangesManager =
flushDocChanges: (project_id, doc_id, callback = (error) ->) ->
@ -24,13 +22,8 @@ module.exports = TrackChangesManager =
FLUSH_EVERY_N_OPS: 50
pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error) ->) ->
jsonOp = JSON.stringify op
multi = rclient.multi()
multi.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp
multi.sadd "DocsWithHistoryOps:#{project_id}", doc_id
multi.exec (error, results) ->
WebRedisManager.pushUncompressedHistoryOp project_id, doc_id, op, (error, length) ->
return callback(error) if error?
[length, _] = results
if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.

View file

@ -1,5 +1,6 @@
LockManager = require "./LockManager"
RedisManager = require "./RedisManager"
WebRedisManager = require "./WebRedisManager"
ShareJsUpdateManager = require "./ShareJsUpdateManager"
Settings = require('settings-sharelatex')
async = require("async")
@ -25,7 +26,7 @@ module.exports = UpdateManager =
UpdateManager.continueProcessingUpdatesWithLock project_id, doc_id, callback
continueProcessingUpdatesWithLock: (project_id, doc_id, callback = (error) ->) ->
RedisManager.getUpdatesLength doc_id, (error, length) =>
WebRedisManager.getUpdatesLength doc_id, (error, length) =>
return callback(error) if error?
if length > 0
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, callback
@ -33,7 +34,7 @@ module.exports = UpdateManager =
callback()
fetchAndApplyUpdates: (project_id, doc_id, callback = (error) ->) ->
RedisManager.getPendingUpdatesForDoc doc_id, (error, updates) =>
WebRedisManager.getPendingUpdatesForDoc doc_id, (error, updates) =>
return callback(error) if error?
if updates.length == 0
return callback()

View file

@ -0,0 +1,33 @@
Settings = require('settings-sharelatex')
rclient = require("redis-sharelatex").createClient(Settings.redis.web)
async = require "async"
module.exports = WebRedisManager =
getPendingUpdatesForDoc : (doc_id, callback)->
multi = rclient.multi()
multi.lrange "PendingUpdates:#{doc_id}", 0 , -1
multi.del "PendingUpdates:#{doc_id}"
multi.exec (error, replys) ->
return callback(error) if error?
jsonUpdates = replys[0]
updates = []
for jsonUpdate in jsonUpdates
try
update = JSON.parse jsonUpdate
catch e
return callback e
updates.push update
callback error, updates
getUpdatesLength: (doc_id, callback)->
rclient.llen "PendingUpdates:#{doc_id}", callback
pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error, length) ->) ->
jsonOp = JSON.stringify op
async.parallel [
(cb) -> rclient.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp, cb
(cb) -> rclient.sadd "DocsWithHistoryOps:#{project_id}", doc_id, cb
], (error, results) ->
return callback(error) if error?
[length, _] = results
callback(error, length)

View file

@ -20,10 +20,9 @@ module.exports =
port:"6379"
host:"localhost"
password:""
documentupdater: [{
primary: true
port: "6379"
host: "localhost"
documentupdater:
port:"6379"
host:"localhost"
password:""
key_schema:
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
@ -33,20 +32,22 @@ module.exports =
projectKey: ({doc_id}) -> "ProjectId:#{doc_id}"
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
}, {
cluster: [{
port: "7000"
host: "localhost"
}]
key_schema:
blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}"
docLines: ({doc_id}) -> "doclines:{#{doc_id}}"
docOps: ({doc_id}) -> "DocOps:{#{doc_id}}"
docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}"
projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}"
pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}"
docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}"
}]
# To use Redis cluster, configure the backend as follows:
# [{
# primary: true
# cluster: [{
# port: "7000"
# host: "localhost"
# }]
# key_schema:
# blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}"
# docLines: ({doc_id}) -> "doclines:{#{doc_id}}"
# docOps: ({doc_id}) -> "DocOps:{#{doc_id}}"
# docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}"
# projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}"
# pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}"
# docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}"
# }]
max_doc_length: 2 * 1024 * 1024 # 2mb

View file

@ -26,6 +26,7 @@
"bunyan": "~0.22.1",
"chai": "^3.5.0",
"chai-spies": "^0.7.1",
"cluster-key-slot": "^1.0.5",
"grunt": "~0.4.2",
"grunt-available-tasks": "~0.4.1",
"grunt-bunyan": "~0.5.0",

View file

@ -1,4 +1,5 @@
rclient = require("redis").createClient()
Settings = require('settings-sharelatex')
rclient = require("redis").createClient(Settings.redis.web)
request = require("request").defaults(jar: false)
async = require "async"
@ -7,6 +8,11 @@ module.exports = DocUpdaterClient =
chars = for i in [1..24]
Math.random().toString(16)[2]
return chars.join("")
subscribeToAppliedOps: (callback = (message) ->) ->
rclient_sub = require("redis").createClient()
rclient_sub.subscribe "applied-ops"
rclient_sub.on "message", callback
sendUpdate: (project_id, doc_id, update, callback = (error) ->) ->
rclient.rpush "PendingUpdates:#{doc_id}", JSON.stringify(update), (error)->

View file

@ -54,46 +54,6 @@ describe "RedisManager", ->
@callback
.calledWith(null, @lines, @version)
.should.equal true
describe "getPendingUpdatesForDoc", ->
beforeEach ->
@rclient.lrange = sinon.stub()
@rclient.del = sinon.stub()
describe "successfully", ->
beforeEach ->
@updates = [
{ op: [{ i: "foo", p: 4 }] }
{ op: [{ i: "foo", p: 4 }] }
]
@jsonUpdates = @updates.map (update) -> JSON.stringify update
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates])
@RedisManager.getPendingUpdatesForDoc @doc_id, @callback
it "should get the pending updates", ->
@rclient.lrange
.calledWith("PendingUpdates:#{@doc_id}", 0, -1)
.should.equal true
it "should delete the pending updates", ->
@rclient.del
.calledWith("PendingUpdates:#{@doc_id}")
.should.equal true
it "should call the callback with the updates", ->
@callback.calledWith(null, @updates).should.equal true
describe "when the JSON doesn't parse", ->
beforeEach ->
@jsonUpdates = [
JSON.stringify { op: [{ i: "foo", p: 4 }] }
"broken json"
]
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates])
@RedisManager.getPendingUpdatesForDoc @doc_id, @callback
it "should return an error to the callback", ->
@callback.calledWith(new Error("JSON parse error")).should.equal true
describe "getPreviousDocOpsTests", ->
describe "with a start and an end value", ->
@ -179,17 +139,6 @@ describe "RedisManager", ->
it "should log out the problem", ->
@logger.warn.called.should.equal true
describe "getUpdatesLength", ->
beforeEach ->
@rclient.llen = sinon.stub().yields(null, @length = 3)
@RedisManager.getUpdatesLength @doc_id, @callback
it "should look up the length", ->
@rclient.llen.calledWith("PendingUpdates:#{@doc_id}").should.equal true
it "should return the length", ->
@callback.calledWith(null, @length).should.equal true
describe "pushDocOp", ->
beforeEach ->
@rclient.rpush = sinon.stub()

View file

@ -7,9 +7,9 @@ describe "TrackChangesManager", ->
beforeEach ->
@TrackChangesManager = SandboxedModule.require modulePath, requires:
"request": @request = {}
"settings-sharelatex": @Settings = { redis: web: {} }
"settings-sharelatex": @Settings = {}
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"redis-sharelatex": createClient: () => @rclient = {}
"./WebRedisManager": @WebRedisManager = {}
@project_id = "mock-project-id"
@doc_id = "mock-doc-id"
@callback = sinon.stub()
@ -42,23 +42,17 @@ describe "TrackChangesManager", ->
describe "pushUncompressedHistoryOp", ->
beforeEach ->
@op = { op: [{ i: "foo", p: 4 }] }
@rclient.multi = sinon.stub().returns(@multi = {})
@multi.rpush = sinon.stub()
@multi.sadd = sinon.stub()
@multi.exec = sinon.stub().yields(null, [@length = 42, "foo"])
@op = "mock-op"
@TrackChangesManager.flushDocChanges = sinon.stub().callsArg(2)
describe "pushing the op", ->
beforeEach ->
@WebRedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(3, null, 1)
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should push the op into redis", ->
@multi.rpush
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify @op)
.should.equal true
@multi.sadd
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
@WebRedisManager.pushUncompressedHistoryOp
.calledWith(@project_id, @doc_id, @op)
.should.equal true
it "should call the callback", ->
@ -69,7 +63,8 @@ describe "TrackChangesManager", ->
describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", ->
beforeEach ->
@multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"])
@WebRedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should tell the track changes api to flush", ->
@ -79,7 +74,8 @@ describe "TrackChangesManager", ->
describe "when TrackChangesManager errors", ->
beforeEach ->
@multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"])
@WebRedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops"))
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback

View file

@ -12,6 +12,7 @@ describe "UpdateManager", ->
@UpdateManager = SandboxedModule.require modulePath, requires:
"./LockManager" : @LockManager = {}
"./RedisManager" : @RedisManager = {}
"./WebRedisManager" : @WebRedisManager = {}
"./ShareJsUpdateManager" : @ShareJsUpdateManager = {}
"logger-sharelatex": @logger = { log: sinon.stub() }
"./Metrics": @Metrics =
@ -89,7 +90,7 @@ describe "UpdateManager", ->
describe "continueProcessingUpdatesWithLock", ->
describe "when there are outstanding updates", ->
beforeEach ->
@RedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 3)
@WebRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 3)
@UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2)
@UpdateManager.continueProcessingUpdatesWithLock @project_id, @doc_id, @callback
@ -101,7 +102,7 @@ describe "UpdateManager", ->
describe "when there are no outstanding updates", ->
beforeEach ->
@RedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 0)
@WebRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 0)
@UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2)
@UpdateManager.continueProcessingUpdatesWithLock @project_id, @doc_id, @callback
@ -117,12 +118,12 @@ describe "UpdateManager", ->
@updates = [{p: 1, t: "foo"}]
@updatedDocLines = ["updated", "lines"]
@version = 34
@RedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
@WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
@UpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version)
@UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback
it "should get the pending updates", ->
@RedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true
@WebRedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true
it "should apply the updates", ->
@UpdateManager.applyUpdates
@ -135,7 +136,7 @@ describe "UpdateManager", ->
describe "when there are no updates", ->
beforeEach ->
@updates = []
@RedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
@WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
@UpdateManager.applyUpdates = sinon.stub()
@RedisManager.setDocument = sinon.stub()
@UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback

View file

@ -9,6 +9,7 @@ describe 'UpdateManager - lockUpdatesAndDo', ->
@UpdateManager = SandboxedModule.require modulePath, requires:
"./LockManager" : @LockManager = {}
"./RedisManager" : @RedisManager = {}
"./WebRedisManager" : @WebRedisManager = {}
"./ShareJsUpdateManager" : @ShareJsUpdateManager = {}
"logger-sharelatex": @logger = { log: sinon.stub() }
@project_id = "project-id-123"

View file

@ -0,0 +1,93 @@
sinon = require('sinon')
chai = require('chai')
should = chai.should()
modulePath = "../../../../app/js/WebRedisManager.js"
SandboxedModule = require('sandboxed-module')
Errors = require "../../../../app/js/Errors"
describe "WebRedisManager", ->
beforeEach ->
@rclient =
auth: () ->
exec: sinon.stub()
@rclient.multi = () => @rclient
@WebRedisManager = SandboxedModule.require modulePath, requires:
"redis-sharelatex": createClient: () => @rclient
"settings-sharelatex": redis: web: @settings = {"mock": "settings"}
@doc_id = "doc-id-123"
@project_id = "project-id-123"
@callback = sinon.stub()
describe "getPendingUpdatesForDoc", ->
beforeEach ->
@rclient.lrange = sinon.stub()
@rclient.del = sinon.stub()
describe "successfully", ->
beforeEach ->
@updates = [
{ op: [{ i: "foo", p: 4 }] }
{ op: [{ i: "foo", p: 4 }] }
]
@jsonUpdates = @updates.map (update) -> JSON.stringify update
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates])
@WebRedisManager.getPendingUpdatesForDoc @doc_id, @callback
it "should get the pending updates", ->
@rclient.lrange
.calledWith("PendingUpdates:#{@doc_id}", 0, -1)
.should.equal true
it "should delete the pending updates", ->
@rclient.del
.calledWith("PendingUpdates:#{@doc_id}")
.should.equal true
it "should call the callback with the updates", ->
@callback.calledWith(null, @updates).should.equal true
describe "when the JSON doesn't parse", ->
beforeEach ->
@jsonUpdates = [
JSON.stringify { op: [{ i: "foo", p: 4 }] }
"broken json"
]
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates])
@WebRedisManager.getPendingUpdatesForDoc @doc_id, @callback
it "should return an error to the callback", ->
@callback.calledWith(new Error("JSON parse error")).should.equal true
describe "getUpdatesLength", ->
beforeEach ->
@rclient.llen = sinon.stub().yields(null, @length = 3)
@WebRedisManager.getUpdatesLength @doc_id, @callback
it "should look up the length", ->
@rclient.llen.calledWith("PendingUpdates:#{@doc_id}").should.equal true
it "should return the length", ->
@callback.calledWith(null, @length).should.equal true
describe "pushUncompressedHistoryOp", ->
beforeEach (done) ->
@op = { op: [{ i: "foo", p: 4 }] }
@rclient.rpush = sinon.stub().yields(null, @length = 42)
@rclient.sadd = sinon.stub().yields()
@WebRedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, (args...) =>
@callback(args...)
done()
it "should push the doc op into the doc ops list", ->
@rclient.rpush
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@op))
.should.equal true
it "should add the doc_id to the set of which records the project docs", ->
@rclient.sadd
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
.should.equal true
it "should call the callback with the length", ->
@callback.calledWith(null, @length).should.equal true