support scan operations on redis cluster

This commit is contained in:
Brian Gough 2017-05-05 11:30:11 +01:00
parent 238638b906
commit b82567ef79

View file

@ -2,6 +2,7 @@ Settings = require "settings-sharelatex"
redis = require("redis-sharelatex") redis = require("redis-sharelatex")
rclient = redis.createClient(Settings.redis.history) rclient = redis.createClient(Settings.redis.history)
Keys = Settings.redis.history.key_schema Keys = Settings.redis.history.key_schema
async = require "async"
module.exports = RedisManager = module.exports = RedisManager =
@ -33,12 +34,19 @@ module.exports = RedisManager =
rclient.smembers Keys.docsWithHistoryOps({project_id}), callback rclient.smembers Keys.docsWithHistoryOps({project_id}), callback
# iterate over keys asynchronously using redis scan (non-blocking) # iterate over keys asynchronously using redis scan (non-blocking)
# handle all the cluster nodes or single redis server
_getKeys: (pattern, callback) -> _getKeys: (pattern, callback) ->
nodes = rclient.nodes?('master') || [ rclient ];
doKeyLookupForNode = (node, cb) ->
RedisManager._getKeysFromNode node, pattern, cb
async.concatSeries nodes, doKeyLookupForNode, callback
_getKeysFromNode: (node, pattern, callback) ->
cursor = 0 # redis iterator cursor = 0 # redis iterator
keySet = {} # use hash to avoid duplicate results keySet = {} # use hash to avoid duplicate results
# scan over all keys looking for pattern # scan over all keys looking for pattern
doIteration = (cb) -> doIteration = (cb) ->
rclient.scan cursor, "MATCH", pattern, "COUNT", 1000, (error, reply) -> node.scan cursor, "MATCH", pattern, "COUNT", 1000, (error, reply) ->
return callback(error) if error? return callback(error) if error?
[cursor, keys] = reply [cursor, keys] = reply
for key in keys for key in keys
@ -50,21 +58,20 @@ module.exports = RedisManager =
doIteration() doIteration()
# extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b # extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
# or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
_extractIds: (keyList) -> _extractIds: (keyList) ->
ids = (key.split(":")[1] for key in keyList) ids = for key in keyList
m = key.match(/:\{?([0-9a-f]{24})\}?/) # extract object id
m[1]
return ids return ids
# this will only work on single node redis, not redis cluster
getProjectIdsWithHistoryOps: (callback = (error, project_ids) ->) -> getProjectIdsWithHistoryOps: (callback = (error, project_ids) ->) ->
return callback(new Error("not supported")) if rclient.nodes?
RedisManager._getKeys Keys.docsWithHistoryOps({project_id:"*"}), (error, project_keys) -> RedisManager._getKeys Keys.docsWithHistoryOps({project_id:"*"}), (error, project_keys) ->
return callback(error) if error? return callback(error) if error?
project_ids = RedisManager._extractIds project_keys project_ids = RedisManager._extractIds project_keys
callback(error, project_ids) callback(error, project_ids)
# this will only work on single node redis, not redis cluster
getAllDocIdsWithHistoryOps: (callback = (error, doc_ids) ->) -> getAllDocIdsWithHistoryOps: (callback = (error, doc_ids) ->) ->
return callback(new Error("not supported")) if rclient.nodes?
# return all the docids, to find dangling history entries after # return all the docids, to find dangling history entries after
# everything is flushed. # everything is flushed.
RedisManager._getKeys Keys.uncompressedHistoryOps({doc_id:"*"}), (error, doc_keys) -> RedisManager._getKeys Keys.uncompressedHistoryOps({doc_id:"*"}), (error, doc_keys) ->