From 05b09a447e1db8a7dcc29e4fccc69dc76749bbce Mon Sep 17 00:00:00 2001 From: James Allen Date: Wed, 1 Jun 2016 11:40:12 +0100 Subject: [PATCH] Ensure that all multi call keys will hash to the same node in cluster --- .../app/coffee/RedisManager.coffee | 40 +++++++++++------- .../pushUncompressedHistoryOpTests.coffee | 14 +++---- .../RedisManager/putDocInMemoryTests.coffee | 42 ++++++++++--------- .../removeDocFromMemoryTests.coffee | 38 +++++++++-------- 4 files changed, 73 insertions(+), 61 deletions(-) diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 1215471ce1..9f0e1cd7f5 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -18,23 +18,31 @@ module.exports = RedisManager = putDocInMemory : (project_id, doc_id, docLines, version, callback)-> timer = new metrics.Timer("redis.put-doc") logger.log project_id:project_id, doc_id:doc_id, version: version, "putting doc in redis" - multi = rclient.multi() - multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) - multi.set keys.projectKey({doc_id:doc_id}), project_id - multi.set keys.docVersion(doc_id:doc_id), version - multi.sadd keys.docsInProject(project_id:project_id), doc_id - multi.exec (err, replys)-> + async.parallel [ + (cb) -> + multi = rclient.multi() + multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) + multi.set keys.projectKey({doc_id:doc_id}), project_id + multi.set keys.docVersion(doc_id:doc_id), version + multi.exec cb + (cb) -> + rclient.sadd keys.docsInProject(project_id:project_id), doc_id, cb + ], (err) -> timer.done() callback(err) removeDocFromMemory : (project_id, doc_id, callback)-> logger.log project_id:project_id, doc_id:doc_id, "removing doc from redis" - multi = rclient.multi() - multi.del keys.docLines(doc_id:doc_id) - multi.del keys.projectKey(doc_id:doc_id) - multi.del keys.docVersion(doc_id:doc_id) - multi.srem keys.docsInProject(project_id:project_id), doc_id - multi.exec (err, replys)-> + async.parallel [ + (cb) -> + multi = rclient.multi() + multi.del keys.docLines(doc_id:doc_id) + multi.del keys.projectKey(doc_id:doc_id) + multi.del keys.docVersion(doc_id:doc_id) + multi.exec cb + (cb) -> + rclient.srem keys.docsInProject(project_id:project_id), doc_id, cb + ], (err) -> if err? logger.err project_id:project_id, doc_id:doc_id, err:err, "error removing doc from redis" callback(err, null) @@ -135,10 +143,10 @@ module.exports = RedisManager = pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error, length) ->) -> jsonOp = JSON.stringify op - multi = rclient.multi() - multi.rpush keys.uncompressedHistoryOp(doc_id: doc_id), jsonOp - multi.sadd keys.docsWithHistoryOps(project_id: project_id), doc_id - multi.exec (error, results) -> + async.parallel [ + (cb) -> rclient.rpush keys.uncompressedHistoryOp(doc_id: doc_id), jsonOp, cb + (cb) -> rclient.sadd keys.docsWithHistoryOps(project_id: project_id), doc_id, cb + ], (error, results) -> return callback(error) if error? [length, _] = results callback(error, length) diff --git a/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee index c7423fcff0..97530a193c 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee @@ -11,19 +11,19 @@ describe "RedisManager.pushUncompressedHistoryOp", -> "redis-sharelatex": createClient: () => @rclient ?= auth: () -> - multi: () => @rclient "logger-sharelatex": @logger = {log: sinon.stub()} @doc_id = "doc-id-123" @project_id = "project-id-123" @callback = sinon.stub() describe "successfully", -> - beforeEach -> + beforeEach (done) -> @op = { op: [{ i: "foo", p: 4 }] } - @rclient.rpush = sinon.stub() - @rclient.sadd = sinon.stub() - @rclient.exec = sinon.stub().callsArgWith(0, null, [@length = 42, "1"]) - @RedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback + @rclient.rpush = sinon.stub().yields(null, @length = 42) + @rclient.sadd = sinon.stub().yields() + @RedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, (args...) => + @callback(args...) + done() it "should push the doc op into the doc ops list", -> @rclient.rpush @@ -36,7 +36,7 @@ describe "RedisManager.pushUncompressedHistoryOp", -> .should.equal true it "should call the callback with the length", -> - @callback.calledWith(null, @length).should.equal true + @callback.calledWith(undefined, @length).should.equal true diff --git a/services/document-updater/test/unit/coffee/RedisManager/putDocInMemoryTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/putDocInMemoryTests.coffee index 5b7c8ef4ce..eea348b49b 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/putDocInMemoryTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/putDocInMemoryTests.coffee @@ -19,30 +19,32 @@ describe 'RedisManager.putDocInMemory', ()-> potentialSAdds = {} potentialSAdds[keys.docsInProject(project_id:project_id)] = doc_id + rclient = + auth:-> + set:(key, value)-> + result = potentialSets[key] + delete potentialSets[key] + if key == keys.docLines(doc_id:doc_id) + value = JSON.parse(value) + assert.deepEqual result, value + incr:()-> + sadd:(key, value, cb)-> + result = potentialSAdds[key] + delete potentialSAdds[key] + assert.equal result, value + cb() + del: (key) -> + result = potentialDels[key] + delete potentialDels[key] + assert.equal result, true + exec:(callback)-> + callback() + rclient.multi = () -> rclient mocks = "./ZipManager": {} "logger-sharelatex": log:-> "redis-sharelatex": - createClient : ()-> - auth:-> - multi: ()-> - set:(key, value)-> - result = potentialSets[key] - delete potentialSets[key] - if key == keys.docLines(doc_id:doc_id) - value = JSON.parse(value) - assert.deepEqual result, value - incr:()-> - sadd:(key, value)-> - result = potentialSAdds[key] - delete potentialSAdds[key] - assert.equal result, value - del: (key) -> - result = potentialDels[key] - delete potentialDels[key] - assert.equal result, true - exec:(callback)-> - callback() + createClient : () -> rclient redisManager = SandboxedModule.require(modulePath, requires: mocks) diff --git a/services/document-updater/test/unit/coffee/RedisManager/removeDocFromMemoryTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/removeDocFromMemoryTests.coffee index 2c5076bb1c..d1750f8fdd 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/removeDocFromMemoryTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/removeDocFromMemoryTests.coffee @@ -19,30 +19,32 @@ describe 'RedisManager.removeDocFromMemory', ()-> self = @ beforeEach (done)-> redisMemory = {} - + rclient = + auth:-> + get:-> + set:(key, value)-> + redisMemory[key] = value + sadd:(key, value, cb)-> + if !redisMemory[key]? + redisMemory[key] = [] + redisMemory[key].push value + cb() + del : (key)-> + delete redisMemory[key] + srem : (key, member, cb)-> + index = redisMemory[key].indexOf(member) + redisMemory[key].splice(index, 1) + cb() + exec:(callback)-> + callback(null, []) + rclient.multi = () -> rclient mocks = "./ZipManager": {} "logger-sharelatex": error:-> log:-> "redis-sharelatex": - createClient : -> - auth:-> - multi: -> - get:-> - set:(key, value)-> - redisMemory[key] = value - sadd:(key, value)-> - if !redisMemory[key]? - redisMemory[key] = [] - redisMemory[key].push value - del : (key)-> - delete redisMemory[key] - srem : (key, member)-> - index = redisMemory[key].indexOf(member) - redisMemory[key].splice(index, 1) - exec:(callback)-> - callback(null, []) + createClient : -> rclient redisManager = SandboxedModule.require(modulePath, requires: mocks) redisManager.putDocInMemory project_id, doc_id1, 0, ["line"], ->