mirror of
synced 2024-11-21 20:47:08 -05:00
Pull out rclient into RedisBackend that supports sending requests to multiple rclients
This commit is contained in:
17 changed files with 201 additions and 243 deletions
@ -1,6 +1,6 @@
Settings = require('settings-sharelatex')
logger = require('logger-sharelatex')
Keys = require('./RedisKeyBuilder')
Keys = require('./UpdateKeys')
redis = require("redis-sharelatex")
UpdateManager = require('./UpdateManager')
@ -1,7 +1,6 @@
metrics = require('./Metrics')
Settings = require('settings-sharelatex')
redis = require("redis-sharelatex")
rclient = redis.createClient(Settings.redis.web)
rclient = require("./RedisBackend").createClient()
keys = require('./RedisKeyBuilder')
logger = require "logger-sharelatex"
os = require "os"
Normal file
Normal file
@ -0,0 +1,84 @@
Settings = require "settings-sharelatex"
redis = require("redis-sharelatex")
async = require "async"
class Client
constructor: (@clients) ->
multi: () ->
return new MultiClient(
@clients.map (client) -> {
rclient: client.rclient.multi()
key_schema: client.key_schema
primary: client.primary
class MultiClient
constructor: (@clients) ->
exec: (callback) ->
jobs = @clients.map (client) ->
(cb) ->
console.error "EXEC", client.rclient.queue
client.rclient.exec (result...) ->
console.error "EXEC RESULT", result
if client.primary
# Return this result as the actual result
# Send the rest through for comparison
async.parallel jobs, (error, results) ->
console.error "EXEC RESULTS", results
"get", "smembers", "set", "srem", "sadd", "del", "lrange",
"llen", "rpush", "expire", "ltrim", "incr"
for command in COMMANDS
do (command) ->
Client.prototype[command] = (key_builder, args..., callback) ->
async.parallel @clients.map (client) ->
(cb) ->
key = key_builder(client.key_schema)
console.error "COMMAND", command, key, args
client.rclient[command] key, args..., (result...) ->
console.log "RESULT", command, result
if client.primary
# Return this result as the actual result
# Send the rest through for comparison
, (error, results) ->
console.log "#{command} RESULTS", results
MultiClient.prototype[command] = (key_builder, args...) ->
for client in @clients
key = key_builder(client.key_schema)
console.error "MULTI COMMAND", command, key, args
client.rclient[command] key, args...
Client::eval = (script, pos, key_builder, args..., callback) ->
async.parallel @clients.map (client) ->
(cb) ->
key = key_builder(client.key_schema)
client.rclient.eval script, pos, key, args..., (result...) ->
if client.primary
# Return this result as the actual result
# Send the rest through for comparison
, (error, results) ->
console.log "#{command} RESULTS", results
module.exports =
createClient: () ->
client_configs = Settings.redis.documentupdater
unless client_configs instanceof Array
client_configs.primary = true
client_configs = [client_configs]
clients = client_configs.map (config) ->
rclient: redis.createClient(config)
key_schema: config.key_schema
primary: config.primary
return new Client(clients)
@ -1,24 +1,40 @@
PROJECTKEY = "ProjectId"
BLOCKINGKEY = "Blocking"
CHANGEQUE = "ChangeQue"
DOCLINES = "doclines"
DOCOPS = "DocOps"
DOCVERSION = "DocVersion"
UNCOMPRESSED_HISTORY_OPS = "UncompressedHistoryOps"
module.exports =
docLines : (op)-> DOCLINES+":"+op.doc_id
docOps : (op)-> DOCOPS+":"+op.doc_id
uncompressedHistoryOp: (op) -> UNCOMPRESSED_HISTORY_OPS + ":" + op.doc_id
docVersion : (op)-> DOCVERSION+":"+op.doc_id
projectKey : (op)-> PROJECTKEY+":"+op.doc_id
blockingKey : (op)-> BLOCKINGKEY+":"+op.doc_id
changeQue : (op)-> CHANGEQUE+":"+op.project_id
docsInProject : (op)-> DOCSINPROJECT+":"+op.project_id
pendingUpdates : (op)-> PENDINGUPDATESKEY+":"+op.doc_id
combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}"
splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":")
docsWithHistoryOps: (op) -> DOCSWITHHISTORYOPS + ":" + op.project_id
# The default key schema looks like:
# doclines:foo
# DocVersion:foo
# but if we use redis cluster, we want all 'foo' keys to map to the same
# node, so we must use:
# doclines:{foo}
# DocVersion:{foo}
# since redis hashes on the contents of {...}.
# To transparently support different key schemas for different clients
# (potential writing/reading to both a cluster and single instance
# while we migrate), instead of keys, we now pass around functions which
# will build the key when passed a schema.
# E.g.
# key_schema = Settings.redis.keys
# key_schema == { docLines: ({doc_id}) -> "doclines:#{doc_id}", ... }
# key_builder = RedisKeyBuilder.docLines({doc_id: "foo"})
# key_builder == (key_schema) -> key_schema.docLines({doc_id: "foo"})
# key = key_builder(key_schema)
# key == "doclines:foo"
module.exports = RedisKeyBuilder =
blockingKey: ({doc_id}) ->
return (key_schema) -> key_schema.blockingKey({doc_id})
docLines: ({doc_id}) ->
return (key_schema) -> key_schema.docLines({doc_id})
docOps: ({doc_id}) ->
return (key_schema) -> key_schema.docOps({doc_id})
docVersion: ({doc_id}) ->
return (key_schema) -> key_schema.docVersion({doc_id})
projectKey: ({doc_id}) ->
return (key_schema) -> key_schema.projectKey({doc_id})
uncompressedHistoryOp: ({doc_id}) ->
return (key_schema) -> key_schema.uncompressedHistoryOp({doc_id})
pendingUpdates: ({doc_id}) ->
return (key_schema) -> key_schema.pendingUpdates({doc_id})
docsInProject: ({project_id}) ->
return (key_schema) -> key_schema.docsInProject({project_id})
docsWithHistoryOps: ({project_id}) ->
return (key_schema) -> key_schema.docsWithHistoryOps({project_id})
@ -1,16 +1,12 @@
Settings = require('settings-sharelatex')
redis = require("redis-sharelatex")
rclient = redis.createClient(Settings.redis.web)
async = require('async')
rclient = require("./RedisBackend").createClient()
_ = require('underscore')
keys = require('./RedisKeyBuilder')
logger = require('logger-sharelatex')
metrics = require('./Metrics')
Errors = require "./Errors"
redisOptions = _.clone(Settings.redis.web)
redisOptions.return_buffers = true
# Make times easy to read
minutes = 60 # seconds for Redis expire
@ -141,15 +137,5 @@ module.exports = RedisManager =
version = parseInt(version, 10)
callback null, version
pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error, length) ->) ->
jsonOp = JSON.stringify op
async.parallel [
(cb) -> rclient.rpush keys.uncompressedHistoryOp(doc_id: doc_id), jsonOp, cb
(cb) -> rclient.sadd keys.docsWithHistoryOps(project_id: project_id), doc_id, cb
], (error, results) ->
return callback(error) if error?
[length, _] = results
callback(error, length)
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
rclient.smembers keys.docsInProject(project_id: project_id), callback
@ -1,4 +1,4 @@
Keys = require('./RedisKeyBuilder')
Keys = require('./UpdateKeys')
Settings = require('settings-sharelatex')
DocumentManager = require "./DocumentManager"
RedisManager = require "./RedisManager"
@ -3,7 +3,7 @@ ShareJsDB = require "./ShareJsDB"
async = require "async"
logger = require "logger-sharelatex"
Settings = require('settings-sharelatex')
Keys = require "./RedisKeyBuilder"
Keys = require "./UpdateKeys"
{EventEmitter} = require "events"
util = require "util"
@ -1,8 +1,9 @@
settings = require "settings-sharelatex"
request = require "request"
logger = require "logger-sharelatex"
RedisManager = require "./RedisManager"
crypto = require("crypto")
redis = require("redis-sharelatex")
rclient = redis.createClient(settings.redis.web)
async = require "async"
module.exports = TrackChangesManager =
flushDocChanges: (project_id, doc_id, callback = (error) ->) ->
@ -23,8 +24,13 @@ module.exports = TrackChangesManager =
pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error) ->) ->
RedisManager.pushUncompressedHistoryOp project_id, doc_id, op, (error, length) ->
jsonOp = JSON.stringify op
async.parallel [
(cb) -> rclient.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp, cb
(cb) -> rclient.sadd "DocsWithHistoryOps:#{project_id}", doc_id, cb
], (error, results) ->
return callback(error) if error?
[length, _] = results
if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.
Normal file
Normal file
@ -0,0 +1,3 @@
module.exports =
combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}"
splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":")
@ -20,9 +20,32 @@ module.exports =
minSize: 10*1024
writesEnabled: false
documentupdater: [{
primary: true
port: "6379"
host: "localhost"
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
docLines: ({doc_id}) -> "doclines:#{doc_id}"
docOps: ({doc_id}) -> "DocOps:#{doc_id}"
docVersion: ({doc_id}) -> "DocVersion:#{doc_id}"
projectKey: ({doc_id}) -> "ProjectId:#{doc_id}"
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
}, {
port: "6380"
host: "localhost"
blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}"
docLines: ({doc_id}) -> "doclines:{#{doc_id}}"
docOps: ({doc_id}) -> "DocOps:{#{doc_id}}"
docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}"
projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}"
pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}"
docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}"
max_doc_length: 2 * 1024 * 1024 # 2mb
@ -210,7 +210,7 @@ describe "Applying updates to a doc", ->
throw error if error?
DocUpdaterClient.sendUpdates @project_id, @doc_id, updates, (error) =>
throw error if error?
setTimeout done, 200
setTimeout done, 1000
after ->
@ -237,154 +237,3 @@ describe "Applying updates to a doc", ->
DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) =>
doc.lines.should.deep.equal @result
describe "Applying updates to a large doc (uses compression)", ->
MIN_SIZE = 500000
before ->
@lines = ["one", "two", "three"]
while @lines.join('').length < MIN_SIZE
@lines.push "this is a repeated long line which will create a large document which must be compressed #{@lines.length}"
@version = 42
@update =
doc: @doc_id
op: [{
i: "one and a half\n"
p: 4
v: @version
@result = @lines.slice()
@result.splice 1, 0, "one and a half"
describe "when the document is not loaded", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
sinon.spy MockWebApi, "getDocument"
MockWebApi.insertDoc @project_id, @doc_id, lines: @lines
db.docOps.insert {
doc_id: ObjectId(@doc_id)
version: @version
}, (error) =>
throw error if error?
DocUpdaterClient.sendUpdate @project_id, @doc_id, @update, (error) ->
throw error if error?
setTimeout done, 200
after ->
it "should load the document from the web API", ->
.calledWith(@project_id, @doc_id)
.should.equal true
it "should update the doc", (done) ->
DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) =>
doc.lines.should.deep.equal @result
it "should push the applied updates to the track changes api", (done) ->
rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) =>
throw error if error?
JSON.parse(updates[0]).op.should.deep.equal @update.op
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
throw error if error?
result.should.equal 1
describe "when the document is loaded", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
MockWebApi.insertDoc @project_id, @doc_id, lines: @lines
db.docOps.insert doc_id: ObjectId(@doc_id), version: @version, (error) =>
throw error if error?
DocUpdaterClient.preloadDoc @project_id, @doc_id, (error) =>
throw error if error?
sinon.spy MockWebApi, "getDocument"
DocUpdaterClient.sendUpdate @project_id, @doc_id, @update, (error) ->
throw error if error?
setTimeout done, 200
after ->
it "should not need to call the web api", ->
MockWebApi.getDocument.called.should.equal false
it "should update the doc", (done) ->
DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) =>
doc.lines.should.deep.equal @result
it "should push the applied updates to the track changes api", (done) ->
rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) =>
JSON.parse(updates[0]).op.should.deep.equal @update.op
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
result.should.equal 1
describe "with a broken update", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
MockWebApi.insertDoc @project_id, @doc_id, lines: @lines
db.docOps.insert doc_id: ObjectId(@doc_id), version: @version, (error) =>
throw error if error?
DocUpdaterClient.sendUpdate @project_id, @doc_id, @undefined, (error) ->
throw error if error?
setTimeout done, 200
it "should not update the doc", (done) ->
DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) =>
doc.lines.should.deep.equal @lines
describe "with enough updates to flush to the track changes api", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
updates = []
for v in [0..99] # Should flush after 50 ops
doc_id: @doc_id,
op: [i: v.toString(), p: 0]
v: v
sinon.spy MockTrackChangesApi, "flushDoc"
MockWebApi.insertDoc @project_id, @doc_id, lines: @lines
db.docOps.insert doc_id: ObjectId(@doc_id), version: 0, (error) =>
throw error if error?
DocUpdaterClient.sendUpdates @project_id, @doc_id, updates, (error) =>
throw error if error?
setTimeout done, 500
after ->
it "should flush the doc", ->
MockTrackChangesApi.flushDoc.called.should.equal true
describe "when there is no version in Mongo", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
MockWebApi.insertDoc @project_id, @doc_id, {
lines: @lines
update =
doc: @doc_id
op: @update.op
v: 0
DocUpdaterClient.sendUpdate @project_id, @doc_id, update, (error) ->
throw error if error?
setTimeout done, 200
it "should update the doc (using version = 0)", (done) ->
DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) =>
doc.lines.should.deep.equal @result
@ -3,7 +3,6 @@ sinon = require('sinon')
assert = require('assert')
path = require('path')
modulePath = path.join __dirname, '../../../../app/js/LockManager.js'
keys = require(path.join __dirname, '../../../../app/js/RedisKeyBuilder.js')
project_id = 1234
doc_id = 5678
blockingKey = "Blocking:#{doc_id}"
@ -15,8 +14,9 @@ describe 'LockManager - checking the lock', ()->
mocks =
"logger-sharelatex": log:->
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
createClient : ()->
exists: existsStub
@ -3,7 +3,6 @@ sinon = require('sinon')
assert = require('assert')
path = require('path')
modulePath = path.join __dirname, '../../../../app/js/LockManager.js'
keys = require(path.join __dirname, '../../../../app/js/RedisKeyBuilder.js')
project_id = 1234
doc_id = 5678
SandboxedModule = require('sandboxed-module')
@ -13,8 +12,9 @@ describe 'LockManager - releasing the lock', ()->
evalStub = sinon.stub().yields(1)
mocks =
"logger-sharelatex": log:->
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
createClient : ()->
eval: evalStub
@ -8,7 +8,9 @@ describe 'LockManager - getting the lock', ->
beforeEach ->
@LockManager = SandboxedModule.require modulePath, requires:
"logger-sharelatex": log:->
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
createClient : () =>
"./Metrics": {inc: () ->}
@ -8,7 +8,9 @@ describe 'LockManager - trying the lock', ->
beforeEach ->
@LockManager = SandboxedModule.require modulePath, requires:
"logger-sharelatex": log:->
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
createClient : () =>
set: @set = sinon.stub()
@ -12,7 +12,15 @@ describe "RedisManager", ->
exec: sinon.stub()
@rclient.multi = () => @rclient
@RedisManager = SandboxedModule.require modulePath, requires:
"redis-sharelatex": createClient: () => @rclient
"./RedisBackend": createClient: () => @rclient
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
docLines: ({doc_id}) -> "doclines:#{doc_id}"
docOps: ({doc_id}) -> "DocOps:#{doc_id}"
docVersion: ({doc_id}) -> "DocVersion:#{doc_id}"
projectKey: ({doc_id}) -> "ProjectId:#{doc_id}"
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
"logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() }
"./Metrics": @metrics =
inc: sinon.stub()
@ -171,28 +179,6 @@ describe "RedisManager", ->
it "should log out the problem", ->
@logger.warn.called.should.equal true
describe "pushUncompressedHistoryOp", ->
beforeEach (done) ->
@op = { op: [{ i: "foo", p: 4 }] }
@rclient.rpush = sinon.stub().yields(null, @length = 42)
@rclient.sadd = sinon.stub().yields()
@RedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, (args...) =>
it "should push the doc op into the doc ops list", ->
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@op))
.should.equal true
it "should add the doc_id to the set of which records the project docs", ->
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
.should.equal true
it "should call the callback with the length", ->
@callback.calledWith(null, @length).should.equal true
describe "getUpdatesLength", ->
beforeEach ->
@rclient.llen = sinon.stub().yields(null, @length = 3)
@ -7,9 +7,9 @@ describe "TrackChangesManager", ->
beforeEach ->
@TrackChangesManager = SandboxedModule.require modulePath, requires:
"request": @request = {}
"settings-sharelatex": @Settings = {}
"settings-sharelatex": @Settings = { redis: web: {} }
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
"./RedisManager": @RedisManager = {}
"redis-sharelatex": createClient: () => @rclient = {}
@project_id = "mock-project-id"
@doc_id = "mock-doc-id"
@callback = sinon.stub()
@ -42,17 +42,21 @@ describe "TrackChangesManager", ->
describe "pushUncompressedHistoryOp", ->
beforeEach ->
@op = "mock-op"
@op = { op: [{ i: "foo", p: 4 }] }
@rclient.rpush = sinon.stub().yields(null, @length = 42)
@rclient.sadd = sinon.stub().yields()
@TrackChangesManager.flushDocChanges = sinon.stub().callsArg(2)
describe "pushing the op", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(3, null, 1)
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should push the op into redis", ->
.calledWith(@project_id, @doc_id, @op)
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify @op)
.should.equal true
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
.should.equal true
it "should call the callback", ->
@ -63,8 +67,7 @@ describe "TrackChangesManager", ->
describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@rclient.rpush = sinon.stub().yields(null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should tell the track changes api to flush", ->
@ -74,8 +77,7 @@ describe "TrackChangesManager", ->
describe "when TrackChangesManager errors", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@rclient.rpush = sinon.stub().yields(null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops"))
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
Reference in a new issue