mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-20 07:23:40 +00:00
Merge branch 'master' into pr-bulk-actions
This commit is contained in:
commit
05d7d1b8c6
32 changed files with 441 additions and 1044 deletions
|
@ -8,14 +8,9 @@ if Settings.sentry?.dsn?
|
|||
|
||||
RedisManager = require('./app/js/RedisManager')
|
||||
DispatchManager = require('./app/js/DispatchManager')
|
||||
Keys = require('./app/js/RedisKeyBuilder')
|
||||
Errors = require "./app/js/Errors"
|
||||
HttpController = require "./app/js/HttpController"
|
||||
|
||||
redis = require("redis-sharelatex")
|
||||
rclient = redis.createClient(Settings.redis.web)
|
||||
|
||||
|
||||
Path = require "path"
|
||||
Metrics = require "metrics-sharelatex"
|
||||
Metrics.initialize("doc-updater")
|
||||
|
@ -64,15 +59,18 @@ app.get '/status', (req, res)->
|
|||
else
|
||||
res.send('document updater is alive')
|
||||
|
||||
redisCheck = require("redis-sharelatex").activeHealthCheckRedis(Settings.redis.web)
|
||||
app.get "/health_check/redis", (req, res, next)->
|
||||
if redisCheck.isAlive()
|
||||
res.send 200
|
||||
else
|
||||
res.send 500
|
||||
webRedisClient = require("redis-sharelatex").createClient(Settings.redis.realtime)
|
||||
app.get "/health_check/redis", (req, res, next) ->
|
||||
webRedisClient.healthCheck (error) ->
|
||||
if error?
|
||||
logger.err {err: error}, "failed redis health check"
|
||||
res.send 500
|
||||
else
|
||||
res.send 200
|
||||
|
||||
docUpdaterRedisClient = require("redis-sharelatex").createClient(Settings.redis.documentupdater)
|
||||
app.get "/health_check/redis_cluster", (req, res, next) ->
|
||||
RedisManager.rclient.healthCheck (error, alive) ->
|
||||
docUpdaterRedisClient.healthCheck (error) ->
|
||||
if error?
|
||||
logger.err {err: error}, "failed redis cluster health check"
|
||||
res.send 500
|
||||
|
|
|
@ -8,7 +8,7 @@ Metrics = require('./Metrics')
|
|||
|
||||
module.exports = DispatchManager =
|
||||
createDispatcher: () ->
|
||||
client = redis.createClient(Settings.redis.web)
|
||||
client = redis.createClient(Settings.redis.realtime)
|
||||
worker = {
|
||||
client: client
|
||||
_waitForUpdateThenDispatchWorker: (callback = (error) ->) ->
|
||||
|
|
|
@ -4,7 +4,7 @@ DiffCodec = require "./DiffCodec"
|
|||
logger = require "logger-sharelatex"
|
||||
Metrics = require "./Metrics"
|
||||
HistoryManager = require "./HistoryManager"
|
||||
WebRedisManager = require "./WebRedisManager"
|
||||
RealTimeRedisManager = require "./RealTimeRedisManager"
|
||||
Errors = require "./Errors"
|
||||
RangesManager = require "./RangesManager"
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ settings = require "settings-sharelatex"
|
|||
request = require "request"
|
||||
logger = require "logger-sharelatex"
|
||||
async = require "async"
|
||||
WebRedisManager = require "./WebRedisManager"
|
||||
HistoryRedisManager = require "./HistoryRedisManager"
|
||||
|
||||
module.exports = HistoryManager =
|
||||
flushDocChanges: (project_id, doc_id, callback = (error) ->) ->
|
||||
|
@ -21,16 +21,17 @@ module.exports = HistoryManager =
|
|||
error = new Error("track changes api returned a failure status code: #{res.statusCode}")
|
||||
return callback(error)
|
||||
|
||||
FLUSH_EVERY_N_OPS: 50
|
||||
pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) ->
|
||||
FLUSH_EVERY_N_OPS: 100
|
||||
recordAndFlushHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) ->
|
||||
if ops.length == 0
|
||||
return callback()
|
||||
WebRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) ->
|
||||
HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) ->
|
||||
return callback(error) if error?
|
||||
# We want to flush every 50 ops, i.e. 50, 100, 150, etc
|
||||
# Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these
|
||||
# ops. If we've changed, then we've gone over a multiple of 50 and should flush.
|
||||
# (Most of the time, we will only hit 50 and then flushing will put us back to 0)
|
||||
return callback() if not length? # don't flush unless we know the length
|
||||
# We want to flush every 100 ops, i.e. 100, 200, 300, etc
|
||||
# Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these
|
||||
# ops. If we've changed, then we've gone over a multiple of 100 and should flush.
|
||||
# (Most of the time, we will only hit 100 and then flushing will put us back to 0)
|
||||
previousLength = length - ops.length
|
||||
prevBlock = Math.floor(previousLength / HistoryManager.FLUSH_EVERY_N_OPS)
|
||||
newBlock = Math.floor(length / HistoryManager.FLUSH_EVERY_N_OPS)
|
||||
|
@ -41,4 +42,4 @@ module.exports = HistoryManager =
|
|||
HistoryManager.flushDocChanges project_id, doc_id, (error) ->
|
||||
if error?
|
||||
logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api"
|
||||
callback()
|
||||
callback()
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
Settings = require('settings-sharelatex')
|
||||
rclient = require("redis-sharelatex").createClient(Settings.redis.history)
|
||||
Keys = Settings.redis.history.key_schema
|
||||
async = require "async"
|
||||
logger = require('logger-sharelatex')
|
||||
|
||||
module.exports = HistoryRedisManager =
|
||||
recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) ->
|
||||
if ops.length == 0
|
||||
return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush
|
||||
logger.log project_id: project_id, doc_id: doc_id, "marking doc in project for history ops"
|
||||
rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, (error) ->
|
||||
return callback(error) if error?
|
||||
callback()
|
|
@ -1,7 +1,8 @@
|
|||
metrics = require('./Metrics')
|
||||
Settings = require('settings-sharelatex')
|
||||
redis = require("redis-sharelatex")
|
||||
rclient = redis.createClient(Settings.redis.web)
|
||||
rclient = redis.createClient(Settings.redis.lock)
|
||||
keys = Settings.redis.lock.key_schema
|
||||
logger = require "logger-sharelatex"
|
||||
os = require "os"
|
||||
crypto = require "crypto"
|
||||
|
@ -11,9 +12,6 @@ PID = process.pid
|
|||
RND = crypto.randomBytes(4).toString('hex')
|
||||
COUNT = 0
|
||||
|
||||
keys =
|
||||
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
|
||||
|
||||
module.exports = LockManager =
|
||||
LOCK_TEST_INTERVAL: 50 # 50ms between each test of the lock
|
||||
MAX_LOCK_WAIT_TIME: 10000 # 10s maximum time to spend trying to get the lock
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
Settings = require('settings-sharelatex')
|
||||
rclient = require("redis-sharelatex").createClient(Settings.redis.realtime)
|
||||
Keys = Settings.redis.realtime.key_schema
|
||||
logger = require('logger-sharelatex')
|
||||
|
||||
module.exports = RealTimeRedisManager =
|
||||
getPendingUpdatesForDoc : (doc_id, callback)->
|
||||
multi = rclient.multi()
|
||||
multi.lrange Keys.pendingUpdates({doc_id}), 0 , -1
|
||||
multi.del Keys.pendingUpdates({doc_id})
|
||||
multi.exec (error, replys) ->
|
||||
return callback(error) if error?
|
||||
jsonUpdates = replys[0]
|
||||
updates = []
|
||||
for jsonUpdate in jsonUpdates
|
||||
try
|
||||
update = JSON.parse jsonUpdate
|
||||
catch e
|
||||
return callback e
|
||||
updates.push update
|
||||
callback error, updates
|
||||
|
||||
getUpdatesLength: (doc_id, callback)->
|
||||
rclient.llen Keys.pendingUpdates({doc_id}), callback
|
||||
|
||||
sendData: (data) ->
|
||||
rclient.publish "applied-ops", JSON.stringify(data)
|
|
@ -1,206 +0,0 @@
|
|||
Settings = require "settings-sharelatex"
|
||||
async = require "async"
|
||||
_ = require "underscore"
|
||||
logger = require "logger-sharelatex"
|
||||
Metrics = require "metrics-sharelatex"
|
||||
|
||||
class Client
|
||||
constructor: (@clients) ->
|
||||
@SECONDARY_TIMEOUT = 600
|
||||
@HEARTBEAT_TIMEOUT = 2000
|
||||
|
||||
multi: () ->
|
||||
return new MultiClient(
|
||||
@clients.map (client) -> {
|
||||
rclient: client.rclient.multi()
|
||||
key_schema: client.key_schema
|
||||
primary: client.primary
|
||||
driver: client.driver
|
||||
}
|
||||
)
|
||||
|
||||
healthCheck: (callback) ->
|
||||
jobs = @clients.map (client) =>
|
||||
(cb) => @_healthCheckClient(client, cb)
|
||||
async.parallel jobs, callback
|
||||
|
||||
_healthCheckClient: (client, callback) ->
|
||||
if client.driver == "ioredis"
|
||||
@_healthCheckClusterClient(client, callback)
|
||||
else
|
||||
@_healthCheckNodeRedisClient(client, callback)
|
||||
|
||||
_healthCheckNodeRedisClient: (client, callback) ->
|
||||
client.healthCheck ?= require("redis-sharelatex").activeHealthCheckRedis(Settings.redis.web)
|
||||
if client.healthCheck.isAlive()
|
||||
return callback()
|
||||
else
|
||||
return callback(new Error("node-redis client failed health check"))
|
||||
|
||||
_healthCheckClusterClient: (client, callback) ->
|
||||
jobs = client.rclient.nodes("all").map (n) =>
|
||||
(cb) => @_checkNode(n, cb)
|
||||
async.parallel jobs, callback
|
||||
|
||||
_checkNode: (node, _callback) ->
|
||||
callback = (args...) ->
|
||||
_callback(args...)
|
||||
_callback = () ->
|
||||
timer = setTimeout () ->
|
||||
error = new Error("ioredis node ping check timed out")
|
||||
logger.error {err: error, key: node.options.key}, "node timed out"
|
||||
callback(error)
|
||||
, @HEARTBEAT_TIMEOUT
|
||||
node.ping (err) ->
|
||||
clearTimeout timer
|
||||
callback(err)
|
||||
|
||||
class MultiClient
|
||||
constructor: (@clients) ->
|
||||
@SECONDARY_TIMEOUT = 600
|
||||
|
||||
exec: (callback) ->
|
||||
primaryError = null
|
||||
primaryResult = null
|
||||
jobs = @clients.map (client) =>
|
||||
(cb) =>
|
||||
cb = _.once(cb)
|
||||
timer = new Metrics.Timer("redis.#{client.driver}.exec")
|
||||
|
||||
timeout = null
|
||||
if !client.primary
|
||||
timeout = setTimeout () ->
|
||||
logger.error {err: new Error("#{client.driver} backend timed out")}, "backend timed out"
|
||||
cb()
|
||||
, @SECONDARY_TIMEOUT
|
||||
|
||||
client.rclient.exec (error, result) =>
|
||||
timer.done()
|
||||
if client.driver == "ioredis"
|
||||
# ioredis returns an results like:
|
||||
# [ [null, 42], [null, "foo"] ]
|
||||
# where the first entries in each 2-tuple are
|
||||
# presumably errors for each individual command,
|
||||
# and the second entry is the result. We need to transform
|
||||
# this into the same result as the old redis driver:
|
||||
# [ 42, "foo" ]
|
||||
filtered_result = []
|
||||
for entry in result or []
|
||||
if entry[0]?
|
||||
return cb(entry[0])
|
||||
else
|
||||
filtered_result.push entry[1]
|
||||
result = filtered_result
|
||||
|
||||
if client.primary
|
||||
primaryError = error
|
||||
primaryResult = result
|
||||
if timeout?
|
||||
clearTimeout(timeout)
|
||||
cb(error, result)
|
||||
async.parallel jobs, (error, results) ->
|
||||
if error?
|
||||
# suppress logging of errors
|
||||
# logger.error {err: error}, "error in redis backend"
|
||||
else
|
||||
compareResults(results, "exec")
|
||||
callback(primaryError, primaryResult)
|
||||
|
||||
COMMANDS = {
|
||||
"get": 0,
|
||||
"smembers": 0,
|
||||
"set": 0,
|
||||
"srem": 0,
|
||||
"sadd": 0,
|
||||
"del": 0,
|
||||
"lrange": 0,
|
||||
"llen": 0,
|
||||
"rpush": 0,
|
||||
"expire": 0,
|
||||
"ltrim": 0,
|
||||
"incr": 0,
|
||||
"eval": 2
|
||||
}
|
||||
for command, key_pos of COMMANDS
|
||||
do (command, key_pos) ->
|
||||
Client.prototype[command] = (args..., callback) ->
|
||||
primaryError = null
|
||||
primaryResult = []
|
||||
jobs = @clients.map (client) =>
|
||||
(cb) =>
|
||||
cb = _.once(cb)
|
||||
key_builder = args[key_pos]
|
||||
key = key_builder(client.key_schema)
|
||||
args_with_key = args.slice(0)
|
||||
args_with_key[key_pos] = key
|
||||
timer = new Metrics.Timer("redis.#{client.driver}.#{command}")
|
||||
|
||||
timeout = null
|
||||
if !client.primary
|
||||
timeout = setTimeout () ->
|
||||
logger.error {err: new Error("#{client.driver} backend timed out")}, "backend timed out"
|
||||
cb()
|
||||
, @SECONDARY_TIMEOUT
|
||||
|
||||
client.rclient[command] args_with_key..., (error, result...) =>
|
||||
timer.done()
|
||||
if client.primary
|
||||
primaryError = error
|
||||
primaryResult = result
|
||||
if timeout?
|
||||
clearTimeout(timeout)
|
||||
cb(error, result...)
|
||||
async.parallel jobs, (error, results) ->
|
||||
if error?
|
||||
logger.error {err: error}, "error in redis backend"
|
||||
else
|
||||
compareResults(results, command)
|
||||
callback(primaryError, primaryResult...)
|
||||
|
||||
MultiClient.prototype[command] = (args...) ->
|
||||
for client in @clients
|
||||
key_builder = args[key_pos]
|
||||
key = key_builder(client.key_schema)
|
||||
args_with_key = args.slice(0)
|
||||
args_with_key[key_pos] = key
|
||||
client.rclient[command] args_with_key...
|
||||
|
||||
compareResults = (results, command) ->
|
||||
return if results.length < 2
|
||||
first = results[0]
|
||||
if command == "smembers" and first?
|
||||
first = first.slice().sort()
|
||||
for result in results.slice(1)
|
||||
if command == "smembers" and result?
|
||||
result = result.slice().sort()
|
||||
if not _.isEqual(first, result)
|
||||
logger.error results: results, "redis backend conflict"
|
||||
Metrics.inc "backend-conflict"
|
||||
else
|
||||
Metrics.inc "backend-match"
|
||||
|
||||
module.exports =
|
||||
createClient: () ->
|
||||
client_configs = Settings.redis.documentupdater
|
||||
unless client_configs instanceof Array
|
||||
client_configs.primary = true
|
||||
client_configs = [client_configs]
|
||||
clients = client_configs.map (config) ->
|
||||
if config.cluster?
|
||||
Redis = require("ioredis")
|
||||
rclient = new Redis.Cluster(config.cluster)
|
||||
driver = "ioredis"
|
||||
else
|
||||
redis_config = {}
|
||||
for key in ["host", "port", "password", "endpoints", "masterName"]
|
||||
if config[key]?
|
||||
redis_config[key] = config[key]
|
||||
rclient = require("redis-sharelatex").createClient(redis_config)
|
||||
driver = "noderedis"
|
||||
return {
|
||||
rclient: rclient
|
||||
key_schema: config.key_schema
|
||||
primary: config.primary
|
||||
driver: driver
|
||||
}
|
||||
return new Client(clients)
|
|
@ -1,44 +0,0 @@
|
|||
# The default key schema looks like:
|
||||
# doclines:foo
|
||||
# DocVersion:foo
|
||||
# but if we use redis cluster, we want all 'foo' keys to map to the same
|
||||
# node, so we must use:
|
||||
# doclines:{foo}
|
||||
# DocVersion:{foo}
|
||||
# since redis hashes on the contents of {...}.
|
||||
#
|
||||
# To transparently support different key schemas for different clients
|
||||
# (potential writing/reading to both a cluster and single instance
|
||||
# while we migrate), instead of keys, we now pass around functions which
|
||||
# will build the key when passed a schema.
|
||||
#
|
||||
# E.g.
|
||||
# key_schema = Settings.redis.keys
|
||||
# key_schema == { docLines: ({doc_id}) -> "doclines:#{doc_id}", ... }
|
||||
# key_builder = RedisKeyBuilder.docLines({doc_id: "foo"})
|
||||
# key_builder == (key_schema) -> key_schema.docLines({doc_id: "foo"})
|
||||
# key = key_builder(key_schema)
|
||||
# key == "doclines:foo"
|
||||
module.exports = RedisKeyBuilder =
|
||||
blockingKey: ({doc_id}) ->
|
||||
return (key_schema) -> key_schema.blockingKey({doc_id})
|
||||
docLines: ({doc_id}) ->
|
||||
return (key_schema) -> key_schema.docLines({doc_id})
|
||||
docOps: ({doc_id}) ->
|
||||
return (key_schema) -> key_schema.docOps({doc_id})
|
||||
docVersion: ({doc_id}) ->
|
||||
return (key_schema) -> key_schema.docVersion({doc_id})
|
||||
docHash: ({doc_id}) ->
|
||||
return (key_schema) -> key_schema.docHash({doc_id})
|
||||
projectKey: ({doc_id}) ->
|
||||
return (key_schema) -> key_schema.projectKey({doc_id})
|
||||
uncompressedHistoryOp: ({doc_id}) ->
|
||||
return (key_schema) -> key_schema.uncompressedHistoryOp({doc_id})
|
||||
pendingUpdates: ({doc_id}) ->
|
||||
return (key_schema) -> key_schema.pendingUpdates({doc_id})
|
||||
ranges: ({doc_id}) ->
|
||||
return (key_schema) -> key_schema.ranges({doc_id})
|
||||
docsInProject: ({project_id}) ->
|
||||
return (key_schema) -> key_schema.docsInProject({project_id})
|
||||
docsWithHistoryOps: ({project_id}) ->
|
||||
return (key_schema) -> key_schema.docsWithHistoryOps({project_id})
|
|
@ -1,8 +1,7 @@
|
|||
Settings = require('settings-sharelatex')
|
||||
async = require('async')
|
||||
rclient = require("./RedisBackend").createClient()
|
||||
rclient = require("redis-sharelatex").createClient(Settings.redis.documentupdater)
|
||||
_ = require('underscore')
|
||||
keys = require('./RedisKeyBuilder')
|
||||
logger = require('logger-sharelatex')
|
||||
metrics = require('./Metrics')
|
||||
Errors = require "./Errors"
|
||||
|
@ -25,6 +24,9 @@ logHashWriteErrors = logHashErrors?.write
|
|||
MEGABYTES = 1024 * 1024
|
||||
MAX_RANGES_SIZE = 3 * MEGABYTES
|
||||
|
||||
keys = Settings.redis.documentupdater.key_schema
|
||||
historyKeys = Settings.redis.history.key_schema
|
||||
|
||||
module.exports = RedisManager =
|
||||
rclient: rclient
|
||||
|
||||
|
@ -166,32 +168,37 @@ module.exports = RedisManager =
|
|||
logger.error err: error, doc_id: doc_id, newDocLines: newDocLines, error.message
|
||||
return callback(error)
|
||||
newHash = RedisManager._computeHash(newDocLines)
|
||||
|
||||
logger.log doc_id: doc_id, version: newVersion, hash: newHash, "updating doc in redis"
|
||||
|
||||
|
||||
opVersions = appliedOps.map (op) -> op?.v
|
||||
logger.log doc_id: doc_id, version: newVersion, hash: newHash, op_versions: opVersions, "updating doc in redis"
|
||||
|
||||
RedisManager._serializeRanges ranges, (error, ranges) ->
|
||||
if error?
|
||||
logger.error {err: error, doc_id}, error.message
|
||||
return callback(error)
|
||||
multi = rclient.multi()
|
||||
multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines
|
||||
multi.set keys.docVersion(doc_id:doc_id), newVersion
|
||||
multi.set keys.docHash(doc_id:doc_id), newHash
|
||||
if jsonOps.length > 0
|
||||
multi.rpush keys.docOps(doc_id: doc_id), jsonOps...
|
||||
multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL
|
||||
multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1
|
||||
multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines # index 0
|
||||
multi.set keys.docVersion(doc_id:doc_id), newVersion # index 1
|
||||
multi.set keys.docHash(doc_id:doc_id), newHash # index 2
|
||||
multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL # index 3
|
||||
multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 # index 4
|
||||
if ranges?
|
||||
multi.set keys.ranges(doc_id:doc_id), ranges
|
||||
multi.set keys.ranges(doc_id:doc_id), ranges # index 5
|
||||
else
|
||||
multi.del keys.ranges(doc_id:doc_id)
|
||||
multi.del keys.ranges(doc_id:doc_id) # also index 5
|
||||
# push the ops last so we can get the lengths at fixed index positions 6 and 7
|
||||
if jsonOps.length > 0
|
||||
multi.rpush keys.docOps(doc_id: doc_id), jsonOps... # index 6
|
||||
multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... # index 7
|
||||
multi.exec (error, result) ->
|
||||
return callback(error) if error?
|
||||
# check the hash computed on the redis server
|
||||
writeHash = result?[0]
|
||||
if logHashWriteErrors and writeHash? and writeHash isnt newHash
|
||||
logger.error doc_id: doc_id, writeHash: writeHash, origHash: newHash, docLines:newDocLines, "hash mismatch on updateDocument"
|
||||
return callback()
|
||||
# return length of uncompressedHistoryOps queue (index 7)
|
||||
uncompressedHistoryOpsLength = result?[7]
|
||||
return callback(null, uncompressedHistoryOpsLength)
|
||||
|
||||
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
|
||||
rclient.smembers keys.docsInProject(project_id: project_id), callback
|
||||
|
|
|
@ -6,7 +6,7 @@ Settings = require('settings-sharelatex')
|
|||
Keys = require "./UpdateKeys"
|
||||
{EventEmitter} = require "events"
|
||||
util = require "util"
|
||||
WebRedisManager = require "./WebRedisManager"
|
||||
RealTimeRedisManager = require "./RealTimeRedisManager"
|
||||
|
||||
ShareJsModel:: = {}
|
||||
util.inherits ShareJsModel, EventEmitter
|
||||
|
@ -52,5 +52,5 @@ module.exports = ShareJsUpdateManager =
|
|||
ShareJsUpdateManager._sendOp(project_id, doc_id, opData)
|
||||
|
||||
_sendOp: (project_id, doc_id, op) ->
|
||||
WebRedisManager.sendData {project_id, doc_id, op}
|
||||
RealTimeRedisManager.sendData {project_id, doc_id, op}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
LockManager = require "./LockManager"
|
||||
RedisManager = require "./RedisManager"
|
||||
WebRedisManager = require "./WebRedisManager"
|
||||
RealTimeRedisManager = require "./RealTimeRedisManager"
|
||||
ShareJsUpdateManager = require "./ShareJsUpdateManager"
|
||||
HistoryManager = require "./HistoryManager"
|
||||
Settings = require('settings-sharelatex')
|
||||
|
@ -30,7 +30,7 @@ module.exports = UpdateManager =
|
|||
UpdateManager.continueProcessingUpdatesWithLock project_id, doc_id, callback
|
||||
|
||||
continueProcessingUpdatesWithLock: (project_id, doc_id, callback = (error) ->) ->
|
||||
WebRedisManager.getUpdatesLength doc_id, (error, length) =>
|
||||
RealTimeRedisManager.getUpdatesLength doc_id, (error, length) =>
|
||||
return callback(error) if error?
|
||||
if length > 0
|
||||
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, callback
|
||||
|
@ -38,7 +38,7 @@ module.exports = UpdateManager =
|
|||
callback()
|
||||
|
||||
fetchAndApplyUpdates: (project_id, doc_id, callback = (error) ->) ->
|
||||
WebRedisManager.getPendingUpdatesForDoc doc_id, (error, updates) =>
|
||||
RealTimeRedisManager.getPendingUpdatesForDoc doc_id, (error, updates) =>
|
||||
return callback(error) if error?
|
||||
if updates.length == 0
|
||||
return callback()
|
||||
|
@ -49,7 +49,7 @@ module.exports = UpdateManager =
|
|||
applyUpdate: (project_id, doc_id, update, _callback = (error) ->) ->
|
||||
callback = (error) ->
|
||||
if error?
|
||||
WebRedisManager.sendData {project_id, doc_id, error: error.message || error}
|
||||
RealTimeRedisManager.sendData {project_id, doc_id, error: error.message || error}
|
||||
_callback(error)
|
||||
|
||||
UpdateManager._sanitizeUpdate update
|
||||
|
@ -61,9 +61,9 @@ module.exports = UpdateManager =
|
|||
return callback(error) if error?
|
||||
RangesManager.applyUpdate project_id, doc_id, ranges, appliedOps, updatedDocLines, (error, new_ranges) ->
|
||||
return callback(error) if error?
|
||||
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error) ->
|
||||
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) ->
|
||||
return callback(error) if error?
|
||||
HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback
|
||||
HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, historyOpsLength, callback
|
||||
|
||||
lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) ->
|
||||
LockManager.getLock doc_id, (error, lockValue) ->
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
Settings = require('settings-sharelatex')
|
||||
rclient = require("redis-sharelatex").createClient(Settings.redis.web)
|
||||
async = require "async"
|
||||
logger = require('logger-sharelatex')
|
||||
|
||||
module.exports = WebRedisManager =
|
||||
getPendingUpdatesForDoc : (doc_id, callback)->
|
||||
multi = rclient.multi()
|
||||
multi.lrange "PendingUpdates:#{doc_id}", 0 , -1
|
||||
multi.del "PendingUpdates:#{doc_id}"
|
||||
multi.exec (error, replys) ->
|
||||
return callback(error) if error?
|
||||
jsonUpdates = replys[0]
|
||||
updates = []
|
||||
for jsonUpdate in jsonUpdates
|
||||
try
|
||||
update = JSON.parse jsonUpdate
|
||||
catch e
|
||||
return callback e
|
||||
updates.push update
|
||||
callback error, updates
|
||||
|
||||
getUpdatesLength: (doc_id, callback)->
|
||||
rclient.llen "PendingUpdates:#{doc_id}", callback
|
||||
|
||||
pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) ->
|
||||
if ops.length == 0
|
||||
return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush
|
||||
opVersions = ops.map (op) -> op?.v
|
||||
logger.log project_id: project_id, doc_id: doc_id, op_versions: opVersions, "pushing uncompressed history ops"
|
||||
jsonOps = ops.map (op) -> JSON.stringify op
|
||||
async.parallel [
|
||||
(cb) -> rclient.rpush "UncompressedHistoryOps:#{doc_id}", jsonOps..., cb
|
||||
(cb) -> rclient.sadd "DocsWithHistoryOps:#{project_id}", doc_id, cb
|
||||
], (error, results) ->
|
||||
return callback(error) if error?
|
||||
[length, _] = results
|
||||
callback(error, length)
|
||||
|
||||
sendData: (data) ->
|
||||
rclient.publish "applied-ops", JSON.stringify(data)
|
|
@ -16,15 +16,22 @@ module.exports =
|
|||
url: "http://localhost:3015"
|
||||
|
||||
redis:
|
||||
web:
|
||||
port:"6379"
|
||||
host:"localhost"
|
||||
password:""
|
||||
documentupdater: [{
|
||||
primary: true
|
||||
realtime:
|
||||
port:"6379"
|
||||
host:"localhost"
|
||||
password:""
|
||||
key_schema:
|
||||
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
|
||||
# cluster: [{
|
||||
# port: "7000"
|
||||
# host: "localhost"
|
||||
# }]
|
||||
# key_schema:
|
||||
# pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}"
|
||||
documentupdater:
|
||||
port: "6379"
|
||||
host: "localhost"
|
||||
password: ""
|
||||
key_schema:
|
||||
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
|
||||
docLines: ({doc_id}) -> "doclines:#{doc_id}"
|
||||
|
@ -34,20 +41,45 @@ module.exports =
|
|||
projectKey: ({doc_id}) -> "ProjectId:#{doc_id}"
|
||||
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
|
||||
ranges: ({doc_id}) -> "Ranges:#{doc_id}"
|
||||
# }, {
|
||||
# cluster: [{
|
||||
# port: "7000"
|
||||
# host: "localhost"
|
||||
# }]
|
||||
# key_schema:
|
||||
# blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}"
|
||||
# docLines: ({doc_id}) -> "doclines:{#{doc_id}}"
|
||||
# docOps: ({doc_id}) -> "DocOps:{#{doc_id}}"
|
||||
# docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}"
|
||||
# projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}"
|
||||
# docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}"
|
||||
# ranges: ({doc_id}) -> "Ranges:{#{doc_id}}"
|
||||
}]
|
||||
# cluster: [{
|
||||
# port: "7000"
|
||||
# host: "localhost"
|
||||
# }]
|
||||
# key_schema:
|
||||
# blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}"
|
||||
# docLines: ({doc_id}) -> "doclines:{#{doc_id}}"
|
||||
# docOps: ({doc_id}) -> "DocOps:{#{doc_id}}"
|
||||
# docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}"
|
||||
# docHash: ({doc_id}) -> "DocHash:{#{doc_id}}"
|
||||
# projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}"
|
||||
# docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}"
|
||||
# ranges: ({doc_id}) -> "Ranges:{#{doc_id}}"
|
||||
history:
|
||||
port:"6379"
|
||||
host:"localhost"
|
||||
password:""
|
||||
key_schema:
|
||||
uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}"
|
||||
docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}"
|
||||
# cluster: [{
|
||||
# port: "7000"
|
||||
# host: "localhost"
|
||||
# }]
|
||||
# key_schema:
|
||||
# uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:{#{doc_id}}"
|
||||
# docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:{#{project_id}}"
|
||||
lock:
|
||||
port:"6379"
|
||||
host:"localhost"
|
||||
password:""
|
||||
key_schema:
|
||||
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
|
||||
# cluster: [{
|
||||
# port: "7000"
|
||||
# host: "localhost"
|
||||
# }]
|
||||
# key_schema:
|
||||
# blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}"
|
||||
|
||||
max_doc_length: 2 * 1024 * 1024 # 2mb
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
"logger-sharelatex": "git+https://github.com/sharelatex/logger-sharelatex.git#v1.5.6",
|
||||
"lynx": "0.0.11",
|
||||
"metrics-sharelatex": "git+https://github.com/sharelatex/metrics-sharelatex.git#v1.5.0",
|
||||
"redis-sharelatex": "0.0.9",
|
||||
"redis-sharelatex": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.2",
|
||||
"request": "2.25.0",
|
||||
"sandboxed-module": "~0.2.0",
|
||||
"settings-sharelatex": "git+https://github.com/sharelatex/settings-sharelatex.git#v1.0.0",
|
||||
|
|
|
@ -4,7 +4,10 @@ chai.should()
|
|||
expect = chai.expect
|
||||
async = require "async"
|
||||
Settings = require('settings-sharelatex')
|
||||
rclient = require("redis-sharelatex").createClient(Settings.redis.web)
|
||||
rclient_history = require("redis-sharelatex").createClient(Settings.redis.history)
|
||||
rclient_du = require("redis-sharelatex").createClient(Settings.redis.documentupdater)
|
||||
Keys = Settings.redis.documentupdater.key_schema
|
||||
HistoryKeys = Settings.redis.history.key_schema
|
||||
|
||||
MockTrackChangesApi = require "./helpers/MockTrackChangesApi"
|
||||
MockWebApi = require "./helpers/MockWebApi"
|
||||
|
@ -47,10 +50,10 @@ describe "Applying updates to a doc", ->
|
|||
done()
|
||||
|
||||
it "should push the applied updates to the track changes api", (done) ->
|
||||
rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) =>
|
||||
rclient_history.lrange HistoryKeys.uncompressedHistoryOps({@doc_id}), 0, -1, (error, updates) =>
|
||||
throw error if error?
|
||||
JSON.parse(updates[0]).op.should.deep.equal @update.op
|
||||
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
|
||||
rclient_history.sismember HistoryKeys.docsWithHistoryOps({@project_id}), @doc_id, (error, result) =>
|
||||
throw error if error?
|
||||
result.should.equal 1
|
||||
done()
|
||||
|
@ -80,9 +83,9 @@ describe "Applying updates to a doc", ->
|
|||
done()
|
||||
|
||||
it "should push the applied updates to the track changes api", (done) ->
|
||||
rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) =>
|
||||
rclient_history.lrange HistoryKeys.uncompressedHistoryOps({@doc_id}), 0, -1, (error, updates) =>
|
||||
JSON.parse(updates[0]).op.should.deep.equal @update.op
|
||||
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
|
||||
rclient_history.sismember HistoryKeys.docsWithHistoryOps({@project_id}), @doc_id, (error, result) =>
|
||||
result.should.equal 1
|
||||
done()
|
||||
|
||||
|
@ -125,17 +128,17 @@ describe "Applying updates to a doc", ->
|
|||
done()
|
||||
|
||||
it "should push the applied updates to the track changes api", (done) ->
|
||||
rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) =>
|
||||
rclient_history.lrange HistoryKeys.uncompressedHistoryOps({@doc_id}), 0, -1, (error, updates) =>
|
||||
updates = (JSON.parse(u) for u in updates)
|
||||
for appliedUpdate, i in @updates
|
||||
appliedUpdate.op.should.deep.equal updates[i].op
|
||||
|
||||
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
|
||||
rclient_history.sismember HistoryKeys.docsWithHistoryOps({@project_id}), @doc_id, (error, result) =>
|
||||
result.should.equal 1
|
||||
done()
|
||||
|
||||
it "should store the doc ops in the correct order", (done) ->
|
||||
rclient.lrange "DocOps:#{@doc_id}", 0, -1, (error, updates) =>
|
||||
rclient_du.lrange Keys.docOps({doc_id: @doc_id}), 0, -1, (error, updates) =>
|
||||
updates = (JSON.parse(u) for u in updates)
|
||||
for appliedUpdate, i in @updates
|
||||
appliedUpdate.op.should.deep.equal updates[i].op
|
||||
|
@ -204,7 +207,7 @@ describe "Applying updates to a doc", ->
|
|||
before (done) ->
|
||||
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
|
||||
updates = []
|
||||
for v in [0..99] # Should flush after 50 ops
|
||||
for v in [0..199] # Should flush after 100 ops
|
||||
updates.push
|
||||
doc_id: @doc_id,
|
||||
op: [i: v.toString(), p: 0]
|
||||
|
@ -216,7 +219,7 @@ describe "Applying updates to a doc", ->
|
|||
|
||||
# Send updates in chunks to causes multiple flushes
|
||||
actions = []
|
||||
for i in [0..9]
|
||||
for i in [0..19]
|
||||
do (i) =>
|
||||
actions.push (cb) =>
|
||||
DocUpdaterClient.sendUpdates @project_id, @doc_id, updates.slice(i*10, (i+1)*10), cb
|
||||
|
|
|
@ -3,7 +3,8 @@ chai = require("chai")
|
|||
chai.should()
|
||||
expect = require("chai").expect
|
||||
Settings = require('settings-sharelatex')
|
||||
rclient = require("redis-sharelatex").createClient(Settings.redis.web)
|
||||
rclient_du = require("redis-sharelatex").createClient(Settings.redis.documentupdater)
|
||||
Keys = Settings.redis.documentupdater.key_schema
|
||||
|
||||
MockTrackChangesApi = require "./helpers/MockTrackChangesApi"
|
||||
MockWebApi = require "./helpers/MockWebApi"
|
||||
|
@ -65,7 +66,7 @@ describe "Setting a document", ->
|
|||
done()
|
||||
|
||||
it "should leave the document in redis", (done) ->
|
||||
rclient.get "doclines:#{@doc_id}", (error, lines) =>
|
||||
rclient_du.get Keys.docLines({doc_id: @doc_id}), (error, lines) =>
|
||||
throw error if error?
|
||||
expect(JSON.parse(lines)).to.deep.equal @newLines
|
||||
done()
|
||||
|
@ -90,7 +91,7 @@ describe "Setting a document", ->
|
|||
MockTrackChangesApi.flushDoc.calledWith(@doc_id).should.equal true
|
||||
|
||||
it "should remove the document from redis", (done) ->
|
||||
rclient.get "doclines:#{@doc_id}", (error, lines) =>
|
||||
rclient_du.get Keys.docLines({doc_id: @doc_id}), (error, lines) =>
|
||||
throw error if error?
|
||||
expect(lines).to.not.exist
|
||||
done()
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
Settings = require('settings-sharelatex')
|
||||
rclient = require("redis-sharelatex").createClient(Settings.redis.web)
|
||||
rclient = require("redis-sharelatex").createClient(Settings.redis.realtime)
|
||||
keys = Settings.redis.realtime.key_schema
|
||||
request = require("request").defaults(jar: false)
|
||||
async = require "async"
|
||||
|
||||
rclient_sub = require("redis-sharelatex").createClient(Settings.redis.web)
|
||||
rclient_sub = require("redis-sharelatex").createClient(Settings.redis.realtime)
|
||||
rclient_sub.subscribe "applied-ops"
|
||||
rclient_sub.setMaxListeners(0)
|
||||
|
||||
|
@ -17,7 +18,7 @@ module.exports = DocUpdaterClient =
|
|||
rclient_sub.on "message", callback
|
||||
|
||||
sendUpdate: (project_id, doc_id, update, callback = (error) ->) ->
|
||||
rclient.rpush "PendingUpdates:#{doc_id}", JSON.stringify(update), (error)->
|
||||
rclient.rpush keys.pendingUpdates({doc_id}), JSON.stringify(update), (error)->
|
||||
return callback(error) if error?
|
||||
doc_key = "#{project_id}:#{doc_id}"
|
||||
rclient.sadd "DocsWithPendingUpdates", doc_key, (error) ->
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
redis = require "redis-sharelatex"
|
||||
rclient1 = redis.createClient(cluster: [{
|
||||
port: "7000"
|
||||
host: "localhost"
|
||||
}])
|
||||
|
||||
rclient2 = redis.createClient(cluster: [{
|
||||
port: "7000"
|
||||
host: "localhost"
|
||||
}])
|
||||
|
||||
counter = 0
|
||||
sendPing = (cb = () ->) ->
|
||||
rclient1.rpush "test-blpop", counter, (error) ->
|
||||
console.error "[SENDING ERROR]", error.message if error?
|
||||
if !error?
|
||||
counter += 1
|
||||
cb()
|
||||
|
||||
previous = null
|
||||
listenForPing = (cb) ->
|
||||
rclient2.blpop "test-blpop", 200, (error, result) ->
|
||||
return cb(error) if error?
|
||||
[key, value] = result
|
||||
value = parseInt(value, 10)
|
||||
if value % 10 == 0
|
||||
console.log "."
|
||||
if previous? and value != previous + 1
|
||||
error = new Error("Counter not in order. Got #{value}, expected #{previous + 1}")
|
||||
previous = value
|
||||
return cb(error, value)
|
||||
|
||||
PING_DELAY = 100
|
||||
do sendPings = () ->
|
||||
sendPing () ->
|
||||
setTimeout sendPings, PING_DELAY
|
||||
|
||||
do listenInBackground = (cb = () ->) ->
|
||||
listenForPing (error, value) ->
|
||||
console.error "[RECEIVING ERROR]", error.message if error
|
||||
setTimeout listenInBackground
|
|
@ -0,0 +1,33 @@
|
|||
redis = require "redis-sharelatex"
|
||||
rclient1 = redis.createClient(cluster: [{
|
||||
port: "7000"
|
||||
host: "localhost"
|
||||
}])
|
||||
|
||||
rclient2 = redis.createClient(cluster: [{
|
||||
port: "7000"
|
||||
host: "localhost"
|
||||
}])
|
||||
|
||||
counter = 0
|
||||
sendPing = (cb = () ->) ->
|
||||
rclient1.publish "test-pubsub", counter, (error) ->
|
||||
console.error "[SENDING ERROR]", error.message if error?
|
||||
if !error?
|
||||
counter += 1
|
||||
cb()
|
||||
|
||||
previous = null
|
||||
rclient2.subscribe "test-pubsub"
|
||||
rclient2.on "message", (channel, value) ->
|
||||
value = parseInt(value, 10)
|
||||
if value % 10 == 0
|
||||
console.log "."
|
||||
if previous? and value != previous + 1
|
||||
console.error "[RECEIVING ERROR]", "Counter not in order. Got #{value}, expected #{previous + 1}"
|
||||
previous = value
|
||||
|
||||
PING_DELAY = 100
|
||||
do sendPings = () ->
|
||||
sendPing () ->
|
||||
setTimeout sendPings, PING_DELAY
|
|
@ -11,7 +11,7 @@ describe "DispatchManager", ->
|
|||
"logger-sharelatex": @logger = { log: sinon.stub() }
|
||||
"settings-sharelatex": @settings =
|
||||
redis:
|
||||
web: {}
|
||||
realtime: {}
|
||||
"redis-sharelatex": @redis = {}
|
||||
@callback = sinon.stub()
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ describe "DocumentManager", ->
|
|||
"./Metrics": @Metrics =
|
||||
Timer: class Timer
|
||||
done: sinon.stub()
|
||||
"./WebRedisManager": @WebRedisManager = {}
|
||||
"./RealTimeRedisManager": @RealTimeRedisManager = {}
|
||||
"./DiffCodec": @DiffCodec = {}
|
||||
"./UpdateManager": @UpdateManager = {}
|
||||
"./RangesManager": @RangesManager = {}
|
||||
|
|
|
@ -9,7 +9,7 @@ describe "HistoryManager", ->
|
|||
"request": @request = {}
|
||||
"settings-sharelatex": @Settings = {}
|
||||
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
|
||||
"./WebRedisManager": @WebRedisManager = {}
|
||||
"./HistoryRedisManager": @HistoryRedisManager = {}
|
||||
@project_id = "mock-project-id"
|
||||
@doc_id = "mock-doc-id"
|
||||
@callback = sinon.stub()
|
||||
|
@ -40,18 +40,18 @@ describe "HistoryManager", ->
|
|||
it "should return the callback with an error", ->
|
||||
@callback.calledWith(new Error("track changes api return non-success code: 500")).should.equal true
|
||||
|
||||
describe "pushUncompressedHistoryOps", ->
|
||||
describe "recordAndFlushHistoryOps", ->
|
||||
beforeEach ->
|
||||
@ops = ["mock-ops"]
|
||||
@HistoryManager.flushDocChanges = sinon.stub().callsArg(2)
|
||||
|
||||
describe "pushing the op", ->
|
||||
beforeEach ->
|
||||
@WebRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1)
|
||||
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
|
||||
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null)
|
||||
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 1, @callback
|
||||
|
||||
it "should push the ops into redis", ->
|
||||
@WebRedisManager.pushUncompressedHistoryOps
|
||||
@HistoryRedisManager.recordDocHasHistoryOps
|
||||
.calledWith(@project_id, @doc_id, @ops)
|
||||
.should.equal true
|
||||
|
||||
|
@ -63,9 +63,9 @@ describe "HistoryManager", ->
|
|||
|
||||
describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", ->
|
||||
beforeEach ->
|
||||
@WebRedisManager.pushUncompressedHistoryOps =
|
||||
sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS)
|
||||
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
|
||||
@HistoryRedisManager.recordDocHasHistoryOps =
|
||||
sinon.stub().callsArgWith(3, null)
|
||||
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback
|
||||
|
||||
it "should tell the track changes api to flush", ->
|
||||
@HistoryManager.flushDocChanges
|
||||
|
@ -75,9 +75,9 @@ describe "HistoryManager", ->
|
|||
describe "when we go over a multiple of FLUSH_EVERY_N_OPS ops", ->
|
||||
beforeEach ->
|
||||
@ops = ["op1", "op2", "op3"]
|
||||
@WebRedisManager.pushUncompressedHistoryOps =
|
||||
sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1)
|
||||
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
|
||||
@HistoryRedisManager.recordDocHasHistoryOps =
|
||||
sinon.stub().callsArgWith(3, null)
|
||||
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback
|
||||
|
||||
it "should tell the track changes api to flush", ->
|
||||
@HistoryManager.flushDocChanges
|
||||
|
@ -86,10 +86,10 @@ describe "HistoryManager", ->
|
|||
|
||||
describe "when HistoryManager errors", ->
|
||||
beforeEach ->
|
||||
@WebRedisManager.pushUncompressedHistoryOps =
|
||||
sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS)
|
||||
@HistoryRedisManager.recordDocHasHistoryOps =
|
||||
sinon.stub().callsArgWith(3, null)
|
||||
@HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops"))
|
||||
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
|
||||
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback
|
||||
|
||||
it "should log out the error", ->
|
||||
@logger.error
|
||||
|
@ -103,10 +103,10 @@ describe "HistoryManager", ->
|
|||
|
||||
describe "with no ops", ->
|
||||
beforeEach ->
|
||||
@WebRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1)
|
||||
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], @callback
|
||||
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null)
|
||||
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, [], 1, @callback
|
||||
|
||||
it "should not call WebRedisManager.pushUncompressedHistoryOps", ->
|
||||
@WebRedisManager.pushUncompressedHistoryOps.called.should.equal false
|
||||
it "should not call HistoryRedisManager.recordDocHasHistoryOps", ->
|
||||
@HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
sinon = require('sinon')
|
||||
chai = require('chai')
|
||||
should = chai.should()
|
||||
modulePath = "../../../../app/js/HistoryRedisManager.js"
|
||||
SandboxedModule = require('sandboxed-module')
|
||||
Errors = require "../../../../app/js/Errors"
|
||||
|
||||
describe "HistoryRedisManager", ->
|
||||
beforeEach ->
|
||||
@rclient =
|
||||
auth: () ->
|
||||
exec: sinon.stub()
|
||||
@rclient.multi = () => @rclient
|
||||
@HistoryRedisManager = SandboxedModule.require modulePath, requires:
|
||||
"redis-sharelatex": createClient: () => @rclient
|
||||
"settings-sharelatex":
|
||||
redis:
|
||||
history: @settings =
|
||||
key_schema:
|
||||
uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}"
|
||||
docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}"
|
||||
"logger-sharelatex": { log: () -> }
|
||||
@doc_id = "doc-id-123"
|
||||
@project_id = "project-id-123"
|
||||
@callback = sinon.stub()
|
||||
|
||||
describe "recordDocHasHistoryOps", ->
|
||||
beforeEach ->
|
||||
@ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }]
|
||||
@rclient.sadd = sinon.stub().yields()
|
||||
|
||||
describe "with ops", ->
|
||||
beforeEach (done) ->
|
||||
@HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, @ops, (args...) =>
|
||||
@callback(args...)
|
||||
done()
|
||||
|
||||
it "should add the doc_id to the set of which records the project docs", ->
|
||||
@rclient.sadd
|
||||
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
|
||||
.should.equal true
|
||||
|
||||
describe "with no ops", ->
|
||||
beforeEach (done) ->
|
||||
@HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, [], (args...) =>
|
||||
@callback(args...)
|
||||
done()
|
||||
|
||||
it "should not add the doc_id to the set of which records the project docs", ->
|
||||
@rclient.sadd
|
||||
.called
|
||||
.should.equal false
|
||||
|
||||
it "should call the callback with an error", ->
|
||||
@callback.calledWith(new Error("cannot push no ops")).should.equal true
|
|
@ -19,6 +19,12 @@ describe 'LockManager - releasing the lock', ()->
|
|||
error:->
|
||||
"redis-sharelatex":
|
||||
createClient : () => @client
|
||||
"settings-sharelatex": {
|
||||
redis:
|
||||
lock:
|
||||
key_schema:
|
||||
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
|
||||
}
|
||||
"./Metrics": {inc: () ->}
|
||||
@LockManager = SandboxedModule.require(modulePath, requires: mocks)
|
||||
@lockValue = "lock-value-stub"
|
||||
|
|
|
@ -13,6 +13,12 @@ describe 'LockManager - trying the lock', ->
|
|||
auth:->
|
||||
set: @set = sinon.stub()
|
||||
"./Metrics": {inc: () ->}
|
||||
"settings-sharelatex": {
|
||||
redis:
|
||||
lock:
|
||||
key_schema:
|
||||
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
|
||||
}
|
||||
|
||||
@callback = sinon.stub()
|
||||
@doc_id = "doc-id-123"
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
sinon = require('sinon')
|
||||
chai = require('chai')
|
||||
should = chai.should()
|
||||
modulePath = "../../../../app/js/RealTimeRedisManager.js"
|
||||
SandboxedModule = require('sandboxed-module')
|
||||
Errors = require "../../../../app/js/Errors"
|
||||
|
||||
describe "RealTimeRedisManager", ->
|
||||
beforeEach ->
|
||||
@rclient =
|
||||
auth: () ->
|
||||
exec: sinon.stub()
|
||||
@rclient.multi = () => @rclient
|
||||
@RealTimeRedisManager = SandboxedModule.require modulePath, requires:
|
||||
"redis-sharelatex": createClient: () => @rclient
|
||||
"settings-sharelatex":
|
||||
redis:
|
||||
realtime: @settings =
|
||||
key_schema:
|
||||
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
|
||||
"logger-sharelatex": { log: () -> }
|
||||
@doc_id = "doc-id-123"
|
||||
@project_id = "project-id-123"
|
||||
@callback = sinon.stub()
|
||||
|
||||
describe "getPendingUpdatesForDoc", ->
|
||||
beforeEach ->
|
||||
@rclient.lrange = sinon.stub()
|
||||
@rclient.del = sinon.stub()
|
||||
|
||||
describe "successfully", ->
|
||||
beforeEach ->
|
||||
@updates = [
|
||||
{ op: [{ i: "foo", p: 4 }] }
|
||||
{ op: [{ i: "foo", p: 4 }] }
|
||||
]
|
||||
@jsonUpdates = @updates.map (update) -> JSON.stringify update
|
||||
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates])
|
||||
@RealTimeRedisManager.getPendingUpdatesForDoc @doc_id, @callback
|
||||
|
||||
it "should get the pending updates", ->
|
||||
@rclient.lrange
|
||||
.calledWith("PendingUpdates:#{@doc_id}", 0, -1)
|
||||
.should.equal true
|
||||
|
||||
it "should delete the pending updates", ->
|
||||
@rclient.del
|
||||
.calledWith("PendingUpdates:#{@doc_id}")
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback with the updates", ->
|
||||
@callback.calledWith(null, @updates).should.equal true
|
||||
|
||||
describe "when the JSON doesn't parse", ->
|
||||
beforeEach ->
|
||||
@jsonUpdates = [
|
||||
JSON.stringify { op: [{ i: "foo", p: 4 }] }
|
||||
"broken json"
|
||||
]
|
||||
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates])
|
||||
@RealTimeRedisManager.getPendingUpdatesForDoc @doc_id, @callback
|
||||
|
||||
it "should return an error to the callback", ->
|
||||
@callback.calledWith(new Error("JSON parse error")).should.equal true
|
||||
|
||||
|
||||
describe "getUpdatesLength", ->
|
||||
beforeEach ->
|
||||
@rclient.llen = sinon.stub().yields(null, @length = 3)
|
||||
@RealTimeRedisManager.getUpdatesLength @doc_id, @callback
|
||||
|
||||
it "should look up the length", ->
|
||||
@rclient.llen.calledWith("PendingUpdates:#{@doc_id}").should.equal true
|
||||
|
||||
it "should return the length", ->
|
||||
@callback.calledWith(null, @length).should.equal true
|
|
@ -1,504 +0,0 @@
|
|||
sinon = require('sinon')
|
||||
chai = require('chai')
|
||||
should = chai.should()
|
||||
modulePath = "../../../../app/js/RedisBackend.js"
|
||||
SandboxedModule = require('sandboxed-module')
|
||||
RedisKeyBuilder = require "../../../../app/js/RedisKeyBuilder"
|
||||
|
||||
describe "RedisBackend", ->
|
||||
beforeEach ->
|
||||
@Settings =
|
||||
redis:
|
||||
documentupdater: [{
|
||||
primary: true
|
||||
port: "6379"
|
||||
host: "localhost"
|
||||
password: "single-password"
|
||||
key_schema:
|
||||
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
|
||||
docLines: ({doc_id}) -> "doclines:#{doc_id}"
|
||||
docOps: ({doc_id}) -> "DocOps:#{doc_id}"
|
||||
docVersion: ({doc_id}) -> "DocVersion:#{doc_id}"
|
||||
docHash: ({doc_id}) -> "DocHash:#{doc_id}"
|
||||
projectKey: ({doc_id}) -> "ProjectId:#{doc_id}"
|
||||
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
|
||||
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
|
||||
}, {
|
||||
cluster: [{
|
||||
port: "7000"
|
||||
host: "localhost"
|
||||
}]
|
||||
password: "cluster-password"
|
||||
key_schema:
|
||||
blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}"
|
||||
docLines: ({doc_id}) -> "doclines:{#{doc_id}}"
|
||||
docOps: ({doc_id}) -> "DocOps:{#{doc_id}}"
|
||||
docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}"
|
||||
docHash: ({doc_id}) -> "DocHash:{#{doc_id}}"
|
||||
projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}"
|
||||
pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}"
|
||||
docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}"
|
||||
}]
|
||||
|
||||
test_context = @
|
||||
class Cluster
|
||||
constructor: (@config) ->
|
||||
test_context.rclient_ioredis = @
|
||||
|
||||
nodes: sinon.stub()
|
||||
|
||||
@timer = timer = sinon.stub()
|
||||
class Timer
|
||||
constructor: (args...) -> timer(args...)
|
||||
done: () ->
|
||||
|
||||
@RedisBackend = SandboxedModule.require modulePath, requires:
|
||||
"settings-sharelatex": @Settings
|
||||
"logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() }
|
||||
"redis-sharelatex": @redis =
|
||||
createClient: sinon.stub().returns @rclient_redis = {}
|
||||
activeHealthCheck: sinon.stub()
|
||||
"ioredis": @ioredis =
|
||||
Cluster: Cluster
|
||||
"metrics-sharelatex":
|
||||
@Metrics =
|
||||
inc: sinon.stub()
|
||||
Timer: Timer
|
||||
|
||||
@client = @RedisBackend.createClient()
|
||||
|
||||
@doc_id = "mock-doc-id"
|
||||
@project_id = "mock-project-id"
|
||||
|
||||
it "should create a redis client", ->
|
||||
@redis.createClient
|
||||
.calledWith({
|
||||
port: "6379"
|
||||
host: "localhost"
|
||||
password: "single-password"
|
||||
})
|
||||
.should.equal true
|
||||
|
||||
it "should create an ioredis cluster client", ->
|
||||
@rclient_ioredis.config.should.deep.equal [{
|
||||
port: "7000"
|
||||
host: "localhost"
|
||||
}]
|
||||
|
||||
describe "individual commands", ->
|
||||
describe "with the same results", ->
|
||||
beforeEach (done) ->
|
||||
@content = "bar"
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.get.withArgs("doclines:#{@doc_id}").yields(null, @content)
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(null, @content)
|
||||
@client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) =>
|
||||
setTimeout () -> # Let all background requests complete
|
||||
done(error)
|
||||
|
||||
it "should return the result", ->
|
||||
@result.should.equal @content
|
||||
|
||||
it "should have called the redis client with the appropriate key", ->
|
||||
@rclient_redis.get
|
||||
.calledWith("doclines:#{@doc_id}")
|
||||
.should.equal true
|
||||
|
||||
it "should have called the ioredis cluster client with the appropriate key", ->
|
||||
@rclient_ioredis.get
|
||||
.calledWith("doclines:{#{@doc_id}}")
|
||||
.should.equal true
|
||||
|
||||
it "should send a metric", ->
|
||||
@Metrics.inc
|
||||
.calledWith("backend-match")
|
||||
.should.equal true
|
||||
|
||||
it "should time the commands", ->
|
||||
@timer
|
||||
.calledWith("redis.ioredis.get")
|
||||
.should.equal true
|
||||
@timer
|
||||
.calledWith("redis.noderedis.get")
|
||||
.should.equal true
|
||||
|
||||
describe "with different results", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.get.withArgs("doclines:#{@doc_id}").yields(null, "primary-result")
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(null, "secondary-result")
|
||||
@client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) =>
|
||||
setTimeout () -> # Let all background requests complete
|
||||
done(error)
|
||||
|
||||
it "should return the primary result", ->
|
||||
@result.should.equal "primary-result"
|
||||
|
||||
it "should send a metric", ->
|
||||
@Metrics.inc
|
||||
.calledWith("backend-conflict")
|
||||
.should.equal true
|
||||
|
||||
describe "with differently ordered results from smembers", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.smembers = sinon.stub()
|
||||
@rclient_redis.smembers.withArgs("DocsIn:#{@project_id}").yields(null, ["one", "two"])
|
||||
@rclient_ioredis.smembers = sinon.stub()
|
||||
@rclient_ioredis.smembers.withArgs("DocsIn:{#{@project_id}}").yields(null, ["two", "one"])
|
||||
@client.smembers RedisKeyBuilder.docsInProject({project_id: @project_id}), (error, @result) =>
|
||||
setTimeout () -> # Let all background requests complete
|
||||
done(error)
|
||||
|
||||
it "should return the primary result", ->
|
||||
@result.should.deep.equal ["one", "two"]
|
||||
|
||||
it "should send a metric indicating a match", ->
|
||||
@Metrics.inc
|
||||
.calledWith("backend-match")
|
||||
.should.equal true
|
||||
|
||||
describe "when the secondary errors", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.get.withArgs("doclines:#{@doc_id}").yields(null, "primary-result")
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(@error = new Error("oops"))
|
||||
@client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) =>
|
||||
setTimeout () -> # Let all background requests complete
|
||||
done(error)
|
||||
|
||||
it "should return the primary result", ->
|
||||
@result.should.equal "primary-result"
|
||||
|
||||
it "should log out the secondary error", ->
|
||||
@logger.error
|
||||
.calledWith({
|
||||
err: @error
|
||||
}, "error in redis backend")
|
||||
.should.equal true
|
||||
|
||||
describe "when the primary errors", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.get.withArgs("doclines:#{@doc_id}").yields(@error = new Error("oops"))
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(null, "secondary-result")
|
||||
@client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (@returned_error, @result) =>
|
||||
setTimeout () -> # Let all background requests complete
|
||||
done()
|
||||
|
||||
it "should return the error", ->
|
||||
@returned_error.should.equal @error
|
||||
|
||||
it "should log out the error", ->
|
||||
@logger.error
|
||||
.calledWith({
|
||||
err: @error
|
||||
}, "error in redis backend")
|
||||
.should.equal true
|
||||
|
||||
describe "when the command has the key in a non-zero argument index", ->
|
||||
beforeEach (done) ->
|
||||
@script = "mock-script"
|
||||
@key_count = 1
|
||||
@value = "mock-value"
|
||||
@rclient_redis.eval = sinon.stub()
|
||||
@rclient_redis.eval.withArgs(@script, @key_count, "Blocking:#{@doc_id}", @value).yields(null)
|
||||
@rclient_ioredis.eval = sinon.stub()
|
||||
@rclient_ioredis.eval.withArgs(@script, @key_count, "Blocking:{#{@doc_id}}", @value).yields(null, @content)
|
||||
@client.eval @script, @key_count, RedisKeyBuilder.blockingKey({doc_id: @doc_id}), @value, (error) =>
|
||||
setTimeout () -> # Let all background requests complete
|
||||
done(error)
|
||||
|
||||
it "should have called the redis client with the appropriate key", ->
|
||||
@rclient_redis.eval
|
||||
.calledWith(@script, @key_count, "Blocking:#{@doc_id}", @value)
|
||||
.should.equal true
|
||||
|
||||
it "should have called the ioredis cluster client with the appropriate key", ->
|
||||
@rclient_ioredis.eval
|
||||
.calledWith(@script, @key_count, "Blocking:{#{@doc_id}}", @value)
|
||||
.should.equal true
|
||||
|
||||
describe "when the secondary takes longer than SECONDARY_TIMEOUT", ->
|
||||
beforeEach (done) ->
|
||||
@client.SECONDARY_TIMEOUT = 10
|
||||
@content = "bar"
|
||||
@rclient_redis.get = (key, cb) =>
|
||||
key.should.equal "doclines:#{@doc_id}"
|
||||
setTimeout () =>
|
||||
cb(null, @content)
|
||||
, @client.SECONDARY_TIMEOUT * 3 # If the secondary errors first, don't affect the primary result
|
||||
@rclient_ioredis.get = (key, cb) =>
|
||||
key.should.equal "doclines:{#{@doc_id}}"
|
||||
setTimeout () =>
|
||||
cb(null, @content)
|
||||
, @client.SECONDARY_TIMEOUT * 2
|
||||
@client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) =>
|
||||
done(error)
|
||||
|
||||
it "should log out an error for the backend", ->
|
||||
@logger.error
|
||||
.calledWith({err: new Error("backend timed out")}, "backend timed out")
|
||||
.should.equal true
|
||||
|
||||
it "should return the primary result", ->
|
||||
@result.should.equal @content
|
||||
|
||||
describe "when the primary takes longer than SECONDARY_TIMEOUT", ->
|
||||
beforeEach (done) ->
|
||||
@client.SECONDARY_TIMEOUT = 10
|
||||
@content = "bar"
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(null, @content)
|
||||
@rclient_redis.get = (key, cb) =>
|
||||
key.should.equal "doclines:#{@doc_id}"
|
||||
setTimeout () =>
|
||||
cb(null, @content)
|
||||
, @client.SECONDARY_TIMEOUT * 2
|
||||
@client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) =>
|
||||
done(error)
|
||||
|
||||
it "should not consider this an error", ->
|
||||
@logger.error
|
||||
.called
|
||||
.should.equal false
|
||||
|
||||
describe "multi commands", ->
|
||||
beforeEach ->
|
||||
# We will test with:
|
||||
# rclient.multi()
|
||||
# .get("doclines:foo")
|
||||
# .get("DocVersion:foo")
|
||||
# .exec (...) ->
|
||||
@doclines = "mock-doclines"
|
||||
@version = "42"
|
||||
@rclient_redis.multi = sinon.stub().returns @rclient_redis
|
||||
@rclient_ioredis.multi = sinon.stub().returns @rclient_ioredis
|
||||
|
||||
describe "with the same results", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.exec = sinon.stub().yields(null, [@doclines, @version])
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.exec = sinon.stub().yields(null, [ [null, @doclines], [null, @version] ])
|
||||
|
||||
multi = @client.multi()
|
||||
multi.get RedisKeyBuilder.docLines({doc_id: @doc_id})
|
||||
multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id})
|
||||
multi.exec (error, @result) =>
|
||||
setTimeout () ->
|
||||
done(error)
|
||||
|
||||
it "should return the result", ->
|
||||
@result.should.deep.equal [@doclines, @version]
|
||||
|
||||
it "should have called the redis client with the appropriate keys", ->
|
||||
@rclient_redis.get
|
||||
.calledWith("doclines:#{@doc_id}")
|
||||
.should.equal true
|
||||
@rclient_redis.get
|
||||
.calledWith("DocVersion:#{@doc_id}")
|
||||
.should.equal true
|
||||
@rclient_ioredis.exec
|
||||
.called
|
||||
.should.equal true
|
||||
|
||||
it "should have called the ioredis cluster client with the appropriate keys", ->
|
||||
@rclient_ioredis.get
|
||||
.calledWith("doclines:{#{@doc_id}}")
|
||||
.should.equal true
|
||||
@rclient_ioredis.get
|
||||
.calledWith("DocVersion:{#{@doc_id}}")
|
||||
.should.equal true
|
||||
@rclient_ioredis.exec
|
||||
.called
|
||||
.should.equal true
|
||||
|
||||
it "should send a metric", ->
|
||||
@Metrics.inc
|
||||
.calledWith("backend-match")
|
||||
.should.equal true
|
||||
|
||||
it "should time the exec", ->
|
||||
@timer
|
||||
.calledWith("redis.ioredis.exec")
|
||||
.should.equal true
|
||||
@timer
|
||||
.calledWith("redis.noderedis.exec")
|
||||
.should.equal true
|
||||
|
||||
describe "with different results", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.exec = sinon.stub().yields(null, [@doclines, @version])
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.exec = sinon.stub().yields(null, [ [null, "different-doc-lines"], [null, @version] ])
|
||||
|
||||
multi = @client.multi()
|
||||
multi.get RedisKeyBuilder.docLines({doc_id: @doc_id})
|
||||
multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id})
|
||||
multi.exec (error, @result) =>
|
||||
setTimeout () ->
|
||||
done(error)
|
||||
|
||||
it "should return the primary result", ->
|
||||
@result.should.deep.equal [@doclines, @version]
|
||||
|
||||
it "should send a metric", ->
|
||||
@Metrics.inc
|
||||
.calledWith("backend-conflict")
|
||||
.should.equal true
|
||||
|
||||
describe "when the secondary errors", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.exec = sinon.stub().yields(null, [@doclines, @version])
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.exec = sinon.stub().yields(@error = new Error("oops"))
|
||||
|
||||
multi = @client.multi()
|
||||
multi.get RedisKeyBuilder.docLines({doc_id: @doc_id})
|
||||
multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id})
|
||||
multi.exec (error, @result) =>
|
||||
setTimeout () ->
|
||||
done(error)
|
||||
|
||||
it "should return the primary result", ->
|
||||
@result.should.deep.equal [@doclines, @version]
|
||||
|
||||
describe "when the primary errors", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.exec = sinon.stub().yields(@error = new Error("oops"))
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.exec = sinon.stub().yields([ [null, @doclines], [null, @version] ])
|
||||
|
||||
multi = @client.multi()
|
||||
multi.get RedisKeyBuilder.docLines({doc_id: @doc_id})
|
||||
multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id})
|
||||
multi.exec (@returned_error) =>
|
||||
setTimeout () -> done()
|
||||
|
||||
it "should return the error", ->
|
||||
@returned_error.should.equal @error
|
||||
|
||||
describe "when the secondary takes longer than SECONDARY_TIMEOUT", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.exec = (cb) =>
|
||||
setTimeout () =>
|
||||
cb(null, [@doclines, @version])
|
||||
, 30 # If secondary errors first, don't affect the primary result
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.exec = (cb) =>
|
||||
setTimeout () =>
|
||||
cb(null, [ [null, @doclines], [null, @version] ])
|
||||
, 20
|
||||
|
||||
multi = @client.multi()
|
||||
multi.SECONDARY_TIMEOUT = 10
|
||||
multi.get RedisKeyBuilder.docLines({doc_id: @doc_id})
|
||||
multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id})
|
||||
multi.exec (error, @result) =>
|
||||
done(error)
|
||||
|
||||
it "should log out an error for the backend", ->
|
||||
@logger.error
|
||||
.calledWith({err: new Error("backend timed out")}, "backend timed out")
|
||||
.should.equal true
|
||||
|
||||
it "should return the primary result", ->
|
||||
@result.should.deep.equal [@doclines, @version]
|
||||
|
||||
describe "when the primary takes longer than SECONDARY_TIMEOUT", ->
|
||||
beforeEach (done) ->
|
||||
@rclient_redis.get = sinon.stub()
|
||||
@rclient_redis.exec = (cb) =>
|
||||
setTimeout () =>
|
||||
cb(null, [@doclines, @version])
|
||||
, 20
|
||||
@rclient_ioredis.get = sinon.stub()
|
||||
@rclient_ioredis.exec = sinon.stub().yields(null, [ [null, @doclines], [null, @version] ])
|
||||
|
||||
multi = @client.multi()
|
||||
multi.SECONDARY_TIMEOUT = 10
|
||||
multi.get RedisKeyBuilder.docLines({doc_id: @doc_id})
|
||||
multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id})
|
||||
multi.exec (error, @result) =>
|
||||
done(error)
|
||||
|
||||
it "should not consider this an error", ->
|
||||
@logger.error
|
||||
.called
|
||||
.should.equal false
|
||||
|
||||
describe "_healthCheckNodeRedisClient", ->
|
||||
beforeEach ->
|
||||
@redis.activeHealthCheckRedis = sinon.stub().returns @healthCheck = {
|
||||
isAlive: sinon.stub()
|
||||
}
|
||||
|
||||
describe "successfully", ->
|
||||
beforeEach (done) ->
|
||||
@healthCheck.isAlive.returns true
|
||||
@redis_client = {}
|
||||
@client._healthCheckNodeRedisClient(@redis_client, done)
|
||||
|
||||
it "should check the status of the node redis client", ->
|
||||
@healthCheck.isAlive.called.should.equal true
|
||||
|
||||
it "should only create one health check when called multiple times", (done) ->
|
||||
@client._healthCheckNodeRedisClient @redis_client, () =>
|
||||
@redis.activeHealthCheckRedis.calledOnce.should.equal true
|
||||
@healthCheck.isAlive.calledTwice.should.equal true
|
||||
done()
|
||||
|
||||
describe "when failing", ->
|
||||
beforeEach ->
|
||||
@healthCheck.isAlive.returns false
|
||||
@redis_client = {}
|
||||
|
||||
it "should return an error", (done) ->
|
||||
@client._healthCheckNodeRedisClient @redis_client, (error) ->
|
||||
error.message.should.equal "node-redis client failed health check"
|
||||
done()
|
||||
|
||||
describe "_healthCheckClusterClient", ->
|
||||
beforeEach ->
|
||||
@client.HEARTBEAT_TIMEOUT = 10
|
||||
@nodes = [{
|
||||
options: key: "node-0"
|
||||
stream: destroy: sinon.stub()
|
||||
}, {
|
||||
options: key: "node-1"
|
||||
stream: destroy: sinon.stub()
|
||||
}]
|
||||
@rclient_ioredis.nodes = sinon.stub().returns(@nodes)
|
||||
|
||||
describe "when both clients are successful", ->
|
||||
beforeEach (done) ->
|
||||
@nodes[0].ping = sinon.stub().yields()
|
||||
@nodes[1].ping = sinon.stub().yields()
|
||||
@client._healthCheckClusterClient({ rclient: @rclient_ioredis }, done)
|
||||
|
||||
it "should get all cluster nodes", ->
|
||||
@rclient_ioredis.nodes
|
||||
.calledWith("all")
|
||||
.should.equal true
|
||||
|
||||
it "should ping each cluster node", ->
|
||||
for node in @nodes
|
||||
node.ping.called.should.equal true
|
||||
|
||||
describe "when ping fails to a node", ->
|
||||
beforeEach ->
|
||||
@nodes[0].ping = (cb) -> cb()
|
||||
@nodes[1].ping = (cb) -> # Just hang
|
||||
|
||||
it "should return an error", (done) ->
|
||||
@client._healthCheckClusterClient { rclient: @rclient_ioredis }, (error) ->
|
||||
error.message.should.equal "ioredis node ping check timed out"
|
||||
done()
|
|
@ -14,20 +14,28 @@ describe "RedisManager", ->
|
|||
@rclient.multi = () => @rclient
|
||||
@RedisManager = SandboxedModule.require modulePath,
|
||||
requires:
|
||||
"./RedisBackend":
|
||||
createClient: () => @rclient
|
||||
"./RedisKeyBuilder":
|
||||
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
|
||||
docLines: ({doc_id}) -> "doclines:#{doc_id}"
|
||||
docOps: ({doc_id}) -> "DocOps:#{doc_id}"
|
||||
docVersion: ({doc_id}) -> "DocVersion:#{doc_id}"
|
||||
docHash: ({doc_id}) -> "DocHash:#{doc_id}"
|
||||
projectKey: ({doc_id}) -> "ProjectId:#{doc_id}"
|
||||
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
|
||||
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
|
||||
ranges: ({doc_id}) -> "Ranges:#{doc_id}"
|
||||
"logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() }
|
||||
"settings-sharelatex": {documentupdater: {logHashErrors: {write:true, read:true}}}
|
||||
"settings-sharelatex": {
|
||||
documentupdater: {logHashErrors: {write:true, read:true}}
|
||||
redis:
|
||||
documentupdater:
|
||||
key_schema:
|
||||
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
|
||||
docLines: ({doc_id}) -> "doclines:#{doc_id}"
|
||||
docOps: ({doc_id}) -> "DocOps:#{doc_id}"
|
||||
docVersion: ({doc_id}) -> "DocVersion:#{doc_id}"
|
||||
docHash: ({doc_id}) -> "DocHash:#{doc_id}"
|
||||
projectKey: ({doc_id}) -> "ProjectId:#{doc_id}"
|
||||
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
|
||||
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
|
||||
ranges: ({doc_id}) -> "Ranges:#{doc_id}"
|
||||
history:
|
||||
key_schema:
|
||||
uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}"
|
||||
docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}"
|
||||
}
|
||||
"redis-sharelatex":
|
||||
createClient: () => @rclient
|
||||
"./Metrics": @metrics =
|
||||
inc: sinon.stub()
|
||||
Timer: class Timer
|
||||
|
|
|
@ -17,7 +17,7 @@ describe "ShareJsUpdateManager", ->
|
|||
"./ShareJsDB" : @ShareJsDB = { mockDB: true }
|
||||
"redis-sharelatex" : createClient: () => @rclient = auth:->
|
||||
"logger-sharelatex": @logger = { log: sinon.stub() }
|
||||
"./WebRedisManager": @WebRedisManager = {}
|
||||
"./RealTimeRedisManager": @RealTimeRedisManager = {}
|
||||
globals:
|
||||
clearTimeout: @clearTimeout = sinon.stub()
|
||||
|
||||
|
@ -105,11 +105,11 @@ describe "ShareJsUpdateManager", ->
|
|||
@opData =
|
||||
op: {t: "foo", p: 1}
|
||||
meta: source: "bar"
|
||||
@WebRedisManager.sendData = sinon.stub()
|
||||
@RealTimeRedisManager.sendData = sinon.stub()
|
||||
@callback("#{@project_id}:#{@doc_id}", @opData)
|
||||
|
||||
it "should publish the op to redis", ->
|
||||
@WebRedisManager.sendData
|
||||
@RealTimeRedisManager.sendData
|
||||
.calledWith({project_id: @project_id, doc_id: @doc_id, op: @opData})
|
||||
.should.equal true
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ describe "UpdateManager", ->
|
|||
@UpdateManager = SandboxedModule.require modulePath, requires:
|
||||
"./LockManager" : @LockManager = {}
|
||||
"./RedisManager" : @RedisManager = {}
|
||||
"./WebRedisManager" : @WebRedisManager = {}
|
||||
"./RealTimeRedisManager" : @RealTimeRedisManager = {}
|
||||
"./ShareJsUpdateManager" : @ShareJsUpdateManager = {}
|
||||
"./HistoryManager" : @HistoryManager = {}
|
||||
"logger-sharelatex": @logger = { log: sinon.stub() }
|
||||
|
@ -94,7 +94,7 @@ describe "UpdateManager", ->
|
|||
describe "continueProcessingUpdatesWithLock", ->
|
||||
describe "when there are outstanding updates", ->
|
||||
beforeEach ->
|
||||
@WebRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 3)
|
||||
@RealTimeRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 3)
|
||||
@UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2)
|
||||
@UpdateManager.continueProcessingUpdatesWithLock @project_id, @doc_id, @callback
|
||||
|
||||
|
@ -106,7 +106,7 @@ describe "UpdateManager", ->
|
|||
|
||||
describe "when there are no outstanding updates", ->
|
||||
beforeEach ->
|
||||
@WebRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 0)
|
||||
@RealTimeRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 0)
|
||||
@UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2)
|
||||
@UpdateManager.continueProcessingUpdatesWithLock @project_id, @doc_id, @callback
|
||||
|
||||
|
@ -122,12 +122,12 @@ describe "UpdateManager", ->
|
|||
@updates = [{p: 1, t: "foo"}]
|
||||
@updatedDocLines = ["updated", "lines"]
|
||||
@version = 34
|
||||
@WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
|
||||
@RealTimeRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
|
||||
@UpdateManager.applyUpdate = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version)
|
||||
@UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback
|
||||
|
||||
it "should get the pending updates", ->
|
||||
@WebRedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true
|
||||
@RealTimeRedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true
|
||||
|
||||
it "should apply the updates", ->
|
||||
for update in @updates
|
||||
|
@ -141,7 +141,7 @@ describe "UpdateManager", ->
|
|||
describe "when there are no updates", ->
|
||||
beforeEach ->
|
||||
@updates = []
|
||||
@WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
|
||||
@RealTimeRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates)
|
||||
@UpdateManager.applyUpdate = sinon.stub()
|
||||
@RedisManager.setDocument = sinon.stub()
|
||||
@UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback
|
||||
|
@ -165,8 +165,8 @@ describe "UpdateManager", ->
|
|||
@RangesManager.applyUpdate = sinon.stub().yields(null, @updated_ranges)
|
||||
@ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps)
|
||||
@RedisManager.updateDocument = sinon.stub().yields()
|
||||
@WebRedisManager.sendData = sinon.stub()
|
||||
@HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3)
|
||||
@RealTimeRedisManager.sendData = sinon.stub()
|
||||
@HistoryManager.recordAndFlushHistoryOps = sinon.stub().callsArg(4)
|
||||
|
||||
describe "normally", ->
|
||||
beforeEach ->
|
||||
|
@ -188,7 +188,7 @@ describe "UpdateManager", ->
|
|||
.should.equal true
|
||||
|
||||
it "should push the applied ops into the history queue", ->
|
||||
@HistoryManager.pushUncompressedHistoryOps
|
||||
@HistoryManager.recordAndFlushHistoryOps
|
||||
.calledWith(@project_id, @doc_id, @appliedOps)
|
||||
.should.equal true
|
||||
|
||||
|
@ -214,8 +214,8 @@ describe "UpdateManager", ->
|
|||
@ShareJsUpdateManager.applyUpdate = sinon.stub().yields(@error)
|
||||
@UpdateManager.applyUpdate @project_id, @doc_id, @update, @callback
|
||||
|
||||
it "should call WebRedisManager.sendData with the error", ->
|
||||
@WebRedisManager.sendData
|
||||
it "should call RealTimeRedisManager.sendData with the error", ->
|
||||
@RealTimeRedisManager.sendData
|
||||
.calledWith({
|
||||
project_id: @project_id,
|
||||
doc_id: @doc_id,
|
||||
|
|
|
@ -1,115 +0,0 @@
|
|||
sinon = require('sinon')
|
||||
chai = require('chai')
|
||||
should = chai.should()
|
||||
modulePath = "../../../../app/js/WebRedisManager.js"
|
||||
SandboxedModule = require('sandboxed-module')
|
||||
Errors = require "../../../../app/js/Errors"
|
||||
|
||||
describe "WebRedisManager", ->
|
||||
beforeEach ->
|
||||
@rclient =
|
||||
auth: () ->
|
||||
exec: sinon.stub()
|
||||
@rclient.multi = () => @rclient
|
||||
@WebRedisManager = SandboxedModule.require modulePath, requires:
|
||||
"redis-sharelatex": createClient: () => @rclient
|
||||
"settings-sharelatex": redis: web: @settings = {"mock": "settings"}
|
||||
@doc_id = "doc-id-123"
|
||||
@project_id = "project-id-123"
|
||||
@callback = sinon.stub()
|
||||
|
||||
describe "getPendingUpdatesForDoc", ->
|
||||
beforeEach ->
|
||||
@rclient.lrange = sinon.stub()
|
||||
@rclient.del = sinon.stub()
|
||||
|
||||
describe "successfully", ->
|
||||
beforeEach ->
|
||||
@updates = [
|
||||
{ op: [{ i: "foo", p: 4 }] }
|
||||
{ op: [{ i: "foo", p: 4 }] }
|
||||
]
|
||||
@jsonUpdates = @updates.map (update) -> JSON.stringify update
|
||||
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates])
|
||||
@WebRedisManager.getPendingUpdatesForDoc @doc_id, @callback
|
||||
|
||||
it "should get the pending updates", ->
|
||||
@rclient.lrange
|
||||
.calledWith("PendingUpdates:#{@doc_id}", 0, -1)
|
||||
.should.equal true
|
||||
|
||||
it "should delete the pending updates", ->
|
||||
@rclient.del
|
||||
.calledWith("PendingUpdates:#{@doc_id}")
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback with the updates", ->
|
||||
@callback.calledWith(null, @updates).should.equal true
|
||||
|
||||
describe "when the JSON doesn't parse", ->
|
||||
beforeEach ->
|
||||
@jsonUpdates = [
|
||||
JSON.stringify { op: [{ i: "foo", p: 4 }] }
|
||||
"broken json"
|
||||
]
|
||||
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates])
|
||||
@WebRedisManager.getPendingUpdatesForDoc @doc_id, @callback
|
||||
|
||||
it "should return an error to the callback", ->
|
||||
@callback.calledWith(new Error("JSON parse error")).should.equal true
|
||||
|
||||
|
||||
describe "getUpdatesLength", ->
|
||||
beforeEach ->
|
||||
@rclient.llen = sinon.stub().yields(null, @length = 3)
|
||||
@WebRedisManager.getUpdatesLength @doc_id, @callback
|
||||
|
||||
it "should look up the length", ->
|
||||
@rclient.llen.calledWith("PendingUpdates:#{@doc_id}").should.equal true
|
||||
|
||||
it "should return the length", ->
|
||||
@callback.calledWith(null, @length).should.equal true
|
||||
|
||||
describe "pushUncompressedHistoryOps", ->
|
||||
beforeEach ->
|
||||
@ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }]
|
||||
@rclient.rpush = sinon.stub().yields(null, @length = 42)
|
||||
@rclient.sadd = sinon.stub().yields()
|
||||
|
||||
describe "with ops", ->
|
||||
beforeEach (done) ->
|
||||
@WebRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, (args...) =>
|
||||
@callback(args...)
|
||||
done()
|
||||
|
||||
it "should push the doc op into the doc ops list as JSON", ->
|
||||
@rclient.rpush
|
||||
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1]))
|
||||
.should.equal true
|
||||
|
||||
it "should add the doc_id to the set of which records the project docs", ->
|
||||
@rclient.sadd
|
||||
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback with the length", ->
|
||||
@callback.calledWith(null, @length).should.equal true
|
||||
|
||||
describe "with no ops", ->
|
||||
beforeEach (done) ->
|
||||
@WebRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, [], (args...) =>
|
||||
@callback(args...)
|
||||
done()
|
||||
|
||||
it "should not push the doc op into the doc ops list as JSON", ->
|
||||
@rclient.rpush
|
||||
.called
|
||||
.should.equal false
|
||||
|
||||
it "should not add the doc_id to the set of which records the project docs", ->
|
||||
@rclient.sadd
|
||||
.called
|
||||
.should.equal false
|
||||
|
||||
it "should call the callback with an error", ->
|
||||
@callback.calledWith(new Error("cannot push no ops")).should.equal true
|
Loading…
Add table
Reference in a new issue