From 437e88581218bc0c850d0fd9ac3f2f9a099d7469 Mon Sep 17 00:00:00 2001 From: James Allen Date: Wed, 8 Jun 2016 16:21:56 +0100 Subject: [PATCH 1/2] Lock down to specific async version --- services/document-updater/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/document-updater/package.json b/services/document-updater/package.json index d27b16271c..01c280c843 100644 --- a/services/document-updater/package.json +++ b/services/document-updater/package.json @@ -7,7 +7,7 @@ "url": "https://github.com/sharelatex/document-updater-sharelatex.git" }, "dependencies": { - "async": "^2.0.0-rc.5", + "async": "2.0.0-rc.5", "coffee-script": "1.4.0", "express": "3.3.4", "logger-sharelatex": "git+https://github.com/sharelatex/logger-sharelatex.git#v1.0.0", From 48a92b28e5d04c11e2d7d3f6be1ee721a96e007b Mon Sep 17 00:00:00 2001 From: James Allen Date: Wed, 8 Jun 2016 16:42:09 +0100 Subject: [PATCH 2/2] Don't run redis commands in parallel for easier consistency reasoning --- .../app/coffee/RedisManager.coffee | 59 ++++++++----------- .../app/coffee/TrackChangesManager.coffee | 13 ++-- .../RedisManager/RedisManagerTests.coffee | 22 ------- .../TrackChangesManagerTests.coffee | 25 ++++---- 4 files changed, 46 insertions(+), 73 deletions(-) diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index a300e92fcc..396979e52f 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -15,40 +15,37 @@ redisOptions.return_buffers = true minutes = 60 # seconds for Redis expire module.exports = RedisManager = - putDocInMemory : (project_id, doc_id, docLines, version, callback)-> + putDocInMemory : (project_id, doc_id, docLines, version, _callback)-> timer = new metrics.Timer("redis.put-doc") - logger.log project_id:project_id, doc_id:doc_id, version: version, "putting doc in redis" - async.parallel [ - (cb) -> - multi = rclient.multi() - multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) - multi.set keys.projectKey({doc_id:doc_id}), project_id - multi.set keys.docVersion(doc_id:doc_id), version - multi.exec cb - (cb) -> - rclient.sadd keys.docsInProject(project_id:project_id), doc_id, cb - ], (err) -> + callback = (error) -> timer.done() - callback(err) + _callback(error) + logger.log project_id:project_id, doc_id:doc_id, version: version, "putting doc in redis" + multi = rclient.multi() + multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) + multi.set keys.projectKey({doc_id:doc_id}), project_id + multi.set keys.docVersion(doc_id:doc_id), version + multi.exec (error) -> + return callback(error) if error? + rclient.sadd keys.docsInProject(project_id:project_id), doc_id, callback - removeDocFromMemory : (project_id, doc_id, callback)-> + removeDocFromMemory : (project_id, doc_id, _callback)-> logger.log project_id:project_id, doc_id:doc_id, "removing doc from redis" - async.parallel [ - (cb) -> - multi = rclient.multi() - multi.del keys.docLines(doc_id:doc_id) - multi.del keys.projectKey(doc_id:doc_id) - multi.del keys.docVersion(doc_id:doc_id) - multi.exec cb - (cb) -> - rclient.srem keys.docsInProject(project_id:project_id), doc_id, cb - ], (err) -> + callback = (err) -> if err? logger.err project_id:project_id, doc_id:doc_id, err:err, "error removing doc from redis" - callback(err, null) + _callback(err) else logger.log project_id:project_id, doc_id:doc_id, "removed doc from redis" - callback() + _callback() + + multi = rclient.multi() + multi.del keys.docLines(doc_id:doc_id) + multi.del keys.projectKey(doc_id:doc_id) + multi.del keys.docVersion(doc_id:doc_id) + multi.exec (error) -> + return callback(error) if error? + rclient.srem keys.docsInProject(project_id:project_id), doc_id, callback getDoc : (doc_id, callback = (error, lines, version) ->)-> timer = new metrics.Timer("redis.get-doc") @@ -141,15 +138,5 @@ module.exports = RedisManager = version = parseInt(version, 10) callback null, version - pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error, length) ->) -> - jsonOp = JSON.stringify op - async.parallel [ - (cb) -> rclient.rpush keys.uncompressedHistoryOp(doc_id: doc_id), jsonOp, cb - (cb) -> rclient.sadd keys.docsWithHistoryOps(project_id: project_id), doc_id, cb - ], (error, results) -> - return callback(error) if error? - [length, _] = results - callback(error, length) - getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback diff --git a/services/document-updater/app/coffee/TrackChangesManager.coffee b/services/document-updater/app/coffee/TrackChangesManager.coffee index 90cba86b36..fcfbcdcc58 100644 --- a/services/document-updater/app/coffee/TrackChangesManager.coffee +++ b/services/document-updater/app/coffee/TrackChangesManager.coffee @@ -1,8 +1,9 @@ settings = require "settings-sharelatex" request = require "request" logger = require "logger-sharelatex" -RedisManager = require "./RedisManager" -crypto = require("crypto") +redis = require("redis-sharelatex") +rclient = redis.createClient(settings.redis.web) +async = require "async" module.exports = TrackChangesManager = flushDocChanges: (project_id, doc_id, callback = (error) ->) -> @@ -23,8 +24,13 @@ module.exports = TrackChangesManager = FLUSH_EVERY_N_OPS: 50 pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error) ->) -> - RedisManager.pushUncompressedHistoryOp project_id, doc_id, op, (error, length) -> + jsonOp = JSON.stringify op + multi = rclient.multi() + multi.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp + multi.sadd "DocsWithHistoryOps:#{project_id}", doc_id + multi.exec (error, results) -> return callback(error) if error? + [length, _] = results if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0 # Do this in the background since it uses HTTP and so may be too # slow to wait for when processing a doc update. @@ -33,4 +39,3 @@ module.exports = TrackChangesManager = if error? logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api" callback() - diff --git a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 99daf8f706..09af6781d3 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -171,28 +171,6 @@ describe "RedisManager", -> it "should log out the problem", -> @logger.warn.called.should.equal true - describe "pushUncompressedHistoryOp", -> - beforeEach (done) -> - @op = { op: [{ i: "foo", p: 4 }] } - @rclient.rpush = sinon.stub().yields(null, @length = 42) - @rclient.sadd = sinon.stub().yields() - @RedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, (args...) => - @callback(args...) - done() - - it "should push the doc op into the doc ops list", -> - @rclient.rpush - .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@op)) - .should.equal true - - it "should add the doc_id to the set of which records the project docs", -> - @rclient.sadd - .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) - .should.equal true - - it "should call the callback with the length", -> - @callback.calledWith(null, @length).should.equal true - describe "getUpdatesLength", -> beforeEach -> @rclient.llen = sinon.stub().yields(null, @length = 3) diff --git a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee index 8fad5322e2..c2f58abbf5 100644 --- a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee @@ -7,9 +7,9 @@ describe "TrackChangesManager", -> beforeEach -> @TrackChangesManager = SandboxedModule.require modulePath, requires: "request": @request = {} - "settings-sharelatex": @Settings = {} + "settings-sharelatex": @Settings = { redis: web: {} } "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } - "./RedisManager": @RedisManager = {} + "redis-sharelatex": createClient: () => @rclient = {} @project_id = "mock-project-id" @doc_id = "mock-doc-id" @callback = sinon.stub() @@ -42,17 +42,23 @@ describe "TrackChangesManager", -> describe "pushUncompressedHistoryOp", -> beforeEach -> - @op = "mock-op" + @op = { op: [{ i: "foo", p: 4 }] } + @rclient.multi = sinon.stub().returns(@multi = {}) + @multi.rpush = sinon.stub() + @multi.sadd = sinon.stub() + @multi.exec = sinon.stub().yields(null, [@length = 42, "foo"]) @TrackChangesManager.flushDocChanges = sinon.stub().callsArg(2) describe "pushing the op", -> beforeEach -> - @RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(3, null, 1) @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback it "should push the op into redis", -> - @RedisManager.pushUncompressedHistoryOp - .calledWith(@project_id, @doc_id, @op) + @multi.rpush + .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify @op) + .should.equal true + @multi.sadd + .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) .should.equal true it "should call the callback", -> @@ -63,8 +69,7 @@ describe "TrackChangesManager", -> describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> - @RedisManager.pushUncompressedHistoryOp = - sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) + @multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"]) @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback it "should tell the track changes api to flush", -> @@ -74,8 +79,7 @@ describe "TrackChangesManager", -> describe "when TrackChangesManager errors", -> beforeEach -> - @RedisManager.pushUncompressedHistoryOp = - sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) + @multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"]) @TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback @@ -89,4 +93,3 @@ describe "TrackChangesManager", -> ) .should.equal true -