diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 75312298bb..226214599e 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -70,25 +70,6 @@ module.exports = RedisManager = multi.set keys.docVersion(doc_id:doc_id), version multi.exec (error, replys) -> callback(error) - getPendingUpdatesForDoc : (doc_id, callback)-> - multi = rclient.multi() - multi.lrange keys.pendingUpdates(doc_id:doc_id), 0 , -1 - multi.del keys.pendingUpdates(doc_id:doc_id) - multi.exec (error, replys) -> - return callback(error) if error? - jsonUpdates = replys[0] - updates = [] - for jsonUpdate in jsonUpdates - try - update = JSON.parse jsonUpdate - catch e - return callback e - updates.push update - callback error, updates - - getUpdatesLength: (doc_id, callback)-> - rclient.llen keys.pendingUpdates(doc_id:doc_id), callback - getPreviousDocOps: (doc_id, start, end, callback = (error, jsonOps) ->) -> rclient.llen keys.docOps(doc_id: doc_id), (error, length) -> return callback(error) if error? diff --git a/services/document-updater/app/coffee/TrackChangesManager.coffee b/services/document-updater/app/coffee/TrackChangesManager.coffee index fcfbcdcc58..cc61bdb0ae 100644 --- a/services/document-updater/app/coffee/TrackChangesManager.coffee +++ b/services/document-updater/app/coffee/TrackChangesManager.coffee @@ -1,9 +1,7 @@ settings = require "settings-sharelatex" request = require "request" logger = require "logger-sharelatex" -redis = require("redis-sharelatex") -rclient = redis.createClient(settings.redis.web) -async = require "async" +WebRedisManager = require "./WebRedisManager" module.exports = TrackChangesManager = flushDocChanges: (project_id, doc_id, callback = (error) ->) -> @@ -24,13 +22,8 @@ module.exports = TrackChangesManager = FLUSH_EVERY_N_OPS: 50 pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error) ->) -> - jsonOp = JSON.stringify op - multi = rclient.multi() - multi.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp - multi.sadd "DocsWithHistoryOps:#{project_id}", doc_id - multi.exec (error, results) -> + WebRedisManager.pushUncompressedHistoryOp project_id, doc_id, op, (error, length) -> return callback(error) if error? - [length, _] = results if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0 # Do this in the background since it uses HTTP and so may be too # slow to wait for when processing a doc update. diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index 97c33e8b6f..219c52848b 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -1,5 +1,6 @@ LockManager = require "./LockManager" RedisManager = require "./RedisManager" +WebRedisManager = require "./WebRedisManager" ShareJsUpdateManager = require "./ShareJsUpdateManager" Settings = require('settings-sharelatex') async = require("async") @@ -25,7 +26,7 @@ module.exports = UpdateManager = UpdateManager.continueProcessingUpdatesWithLock project_id, doc_id, callback continueProcessingUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> - RedisManager.getUpdatesLength doc_id, (error, length) => + WebRedisManager.getUpdatesLength doc_id, (error, length) => return callback(error) if error? if length > 0 UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, callback @@ -33,7 +34,7 @@ module.exports = UpdateManager = callback() fetchAndApplyUpdates: (project_id, doc_id, callback = (error) ->) -> - RedisManager.getPendingUpdatesForDoc doc_id, (error, updates) => + WebRedisManager.getPendingUpdatesForDoc doc_id, (error, updates) => return callback(error) if error? if updates.length == 0 return callback() diff --git a/services/document-updater/app/coffee/WebRedisManager.coffee b/services/document-updater/app/coffee/WebRedisManager.coffee new file mode 100644 index 0000000000..a14c2d6c86 --- /dev/null +++ b/services/document-updater/app/coffee/WebRedisManager.coffee @@ -0,0 +1,33 @@ +Settings = require('settings-sharelatex') +rclient = require("redis-sharelatex").createClient(Settings.redis.web) +async = require "async" + +module.exports = WebRedisManager = + getPendingUpdatesForDoc : (doc_id, callback)-> + multi = rclient.multi() + multi.lrange "PendingUpdates:#{doc_id}", 0 , -1 + multi.del "PendingUpdates:#{doc_id}" + multi.exec (error, replys) -> + return callback(error) if error? + jsonUpdates = replys[0] + updates = [] + for jsonUpdate in jsonUpdates + try + update = JSON.parse jsonUpdate + catch e + return callback e + updates.push update + callback error, updates + + getUpdatesLength: (doc_id, callback)-> + rclient.llen "PendingUpdates:#{doc_id}", callback + + pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error, length) ->) -> + jsonOp = JSON.stringify op + async.parallel [ + (cb) -> rclient.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp, cb + (cb) -> rclient.sadd "DocsWithHistoryOps:#{project_id}", doc_id, cb + ], (error, results) -> + return callback(error) if error? + [length, _] = results + callback(error, length) \ No newline at end of file diff --git a/services/document-updater/config/settings.defaults.coffee b/services/document-updater/config/settings.defaults.coffee index 9a5c6d91f2..df2c9758c6 100755 --- a/services/document-updater/config/settings.defaults.coffee +++ b/services/document-updater/config/settings.defaults.coffee @@ -20,10 +20,9 @@ module.exports = port:"6379" host:"localhost" password:"" - documentupdater: [{ - primary: true - port: "6379" - host: "localhost" + documentupdater: + port:"6379" + host:"localhost" password:"" key_schema: blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" @@ -33,20 +32,22 @@ module.exports = projectKey: ({doc_id}) -> "ProjectId:#{doc_id}" pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" docsInProject: ({project_id}) -> "DocsIn:#{project_id}" - }, { - cluster: [{ - port: "7000" - host: "localhost" - }] - key_schema: - blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}" - docLines: ({doc_id}) -> "doclines:{#{doc_id}}" - docOps: ({doc_id}) -> "DocOps:{#{doc_id}}" - docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}" - projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}" - pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}" - docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}" - }] + # To use Redis cluster, configure the backend as follows: + # [{ + # primary: true + # cluster: [{ + # port: "7000" + # host: "localhost" + # }] + # key_schema: + # blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}" + # docLines: ({doc_id}) -> "doclines:{#{doc_id}}" + # docOps: ({doc_id}) -> "DocOps:{#{doc_id}}" + # docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}" + # projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}" + # pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}" + # docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}" + # }] max_doc_length: 2 * 1024 * 1024 # 2mb diff --git a/services/document-updater/package.json b/services/document-updater/package.json index 59def540a8..6872d5f332 100644 --- a/services/document-updater/package.json +++ b/services/document-updater/package.json @@ -26,6 +26,7 @@ "bunyan": "~0.22.1", "chai": "^3.5.0", "chai-spies": "^0.7.1", + "cluster-key-slot": "^1.0.5", "grunt": "~0.4.2", "grunt-available-tasks": "~0.4.1", "grunt-bunyan": "~0.5.0", diff --git a/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee b/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee index e9fd0b0c34..4f76f4cd6e 100644 --- a/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee +++ b/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee @@ -1,4 +1,5 @@ -rclient = require("redis").createClient() +Settings = require('settings-sharelatex') +rclient = require("redis").createClient(Settings.redis.web) request = require("request").defaults(jar: false) async = require "async" @@ -7,6 +8,11 @@ module.exports = DocUpdaterClient = chars = for i in [1..24] Math.random().toString(16)[2] return chars.join("") + + subscribeToAppliedOps: (callback = (message) ->) -> + rclient_sub = require("redis").createClient() + rclient_sub.subscribe "applied-ops" + rclient_sub.on "message", callback sendUpdate: (project_id, doc_id, update, callback = (error) ->) -> rclient.rpush "PendingUpdates:#{doc_id}", JSON.stringify(update), (error)-> diff --git a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 338e7cd668..7ee63de648 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -54,46 +54,6 @@ describe "RedisManager", -> @callback .calledWith(null, @lines, @version) .should.equal true - - describe "getPendingUpdatesForDoc", -> - beforeEach -> - @rclient.lrange = sinon.stub() - @rclient.del = sinon.stub() - - describe "successfully", -> - beforeEach -> - @updates = [ - { op: [{ i: "foo", p: 4 }] } - { op: [{ i: "foo", p: 4 }] } - ] - @jsonUpdates = @updates.map (update) -> JSON.stringify update - @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates]) - @RedisManager.getPendingUpdatesForDoc @doc_id, @callback - - it "should get the pending updates", -> - @rclient.lrange - .calledWith("PendingUpdates:#{@doc_id}", 0, -1) - .should.equal true - - it "should delete the pending updates", -> - @rclient.del - .calledWith("PendingUpdates:#{@doc_id}") - .should.equal true - - it "should call the callback with the updates", -> - @callback.calledWith(null, @updates).should.equal true - - describe "when the JSON doesn't parse", -> - beforeEach -> - @jsonUpdates = [ - JSON.stringify { op: [{ i: "foo", p: 4 }] } - "broken json" - ] - @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates]) - @RedisManager.getPendingUpdatesForDoc @doc_id, @callback - - it "should return an error to the callback", -> - @callback.calledWith(new Error("JSON parse error")).should.equal true describe "getPreviousDocOpsTests", -> describe "with a start and an end value", -> @@ -179,17 +139,6 @@ describe "RedisManager", -> it "should log out the problem", -> @logger.warn.called.should.equal true - describe "getUpdatesLength", -> - beforeEach -> - @rclient.llen = sinon.stub().yields(null, @length = 3) - @RedisManager.getUpdatesLength @doc_id, @callback - - it "should look up the length", -> - @rclient.llen.calledWith("PendingUpdates:#{@doc_id}").should.equal true - - it "should return the length", -> - @callback.calledWith(null, @length).should.equal true - describe "pushDocOp", -> beforeEach -> @rclient.rpush = sinon.stub() diff --git a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee index c2f58abbf5..574795f3bb 100644 --- a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee @@ -7,9 +7,9 @@ describe "TrackChangesManager", -> beforeEach -> @TrackChangesManager = SandboxedModule.require modulePath, requires: "request": @request = {} - "settings-sharelatex": @Settings = { redis: web: {} } + "settings-sharelatex": @Settings = {} "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } - "redis-sharelatex": createClient: () => @rclient = {} + "./WebRedisManager": @WebRedisManager = {} @project_id = "mock-project-id" @doc_id = "mock-doc-id" @callback = sinon.stub() @@ -42,23 +42,17 @@ describe "TrackChangesManager", -> describe "pushUncompressedHistoryOp", -> beforeEach -> - @op = { op: [{ i: "foo", p: 4 }] } - @rclient.multi = sinon.stub().returns(@multi = {}) - @multi.rpush = sinon.stub() - @multi.sadd = sinon.stub() - @multi.exec = sinon.stub().yields(null, [@length = 42, "foo"]) + @op = "mock-op" @TrackChangesManager.flushDocChanges = sinon.stub().callsArg(2) describe "pushing the op", -> beforeEach -> + @WebRedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(3, null, 1) @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback it "should push the op into redis", -> - @multi.rpush - .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify @op) - .should.equal true - @multi.sadd - .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) + @WebRedisManager.pushUncompressedHistoryOp + .calledWith(@project_id, @doc_id, @op) .should.equal true it "should call the callback", -> @@ -69,7 +63,8 @@ describe "TrackChangesManager", -> describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> - @multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"]) + @WebRedisManager.pushUncompressedHistoryOp = + sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback it "should tell the track changes api to flush", -> @@ -79,7 +74,8 @@ describe "TrackChangesManager", -> describe "when TrackChangesManager errors", -> beforeEach -> - @multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"]) + @WebRedisManager.pushUncompressedHistoryOp = + sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) @TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback diff --git a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee index 249740973f..19094794bb 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee @@ -12,6 +12,7 @@ describe "UpdateManager", -> @UpdateManager = SandboxedModule.require modulePath, requires: "./LockManager" : @LockManager = {} "./RedisManager" : @RedisManager = {} + "./WebRedisManager" : @WebRedisManager = {} "./ShareJsUpdateManager" : @ShareJsUpdateManager = {} "logger-sharelatex": @logger = { log: sinon.stub() } "./Metrics": @Metrics = @@ -89,7 +90,7 @@ describe "UpdateManager", -> describe "continueProcessingUpdatesWithLock", -> describe "when there are outstanding updates", -> beforeEach -> - @RedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 3) + @WebRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 3) @UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2) @UpdateManager.continueProcessingUpdatesWithLock @project_id, @doc_id, @callback @@ -101,7 +102,7 @@ describe "UpdateManager", -> describe "when there are no outstanding updates", -> beforeEach -> - @RedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 0) + @WebRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 0) @UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2) @UpdateManager.continueProcessingUpdatesWithLock @project_id, @doc_id, @callback @@ -117,12 +118,12 @@ describe "UpdateManager", -> @updates = [{p: 1, t: "foo"}] @updatedDocLines = ["updated", "lines"] @version = 34 - @RedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) + @WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) @UpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version) @UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback it "should get the pending updates", -> - @RedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true + @WebRedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true it "should apply the updates", -> @UpdateManager.applyUpdates @@ -135,7 +136,7 @@ describe "UpdateManager", -> describe "when there are no updates", -> beforeEach -> @updates = [] - @RedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) + @WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) @UpdateManager.applyUpdates = sinon.stub() @RedisManager.setDocument = sinon.stub() @UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback diff --git a/services/document-updater/test/unit/coffee/UpdateManager/lockUpdatesAndDoTests.coffee b/services/document-updater/test/unit/coffee/UpdateManager/lockUpdatesAndDoTests.coffee index adba644b27..fa9ca76356 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/lockUpdatesAndDoTests.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/lockUpdatesAndDoTests.coffee @@ -9,6 +9,7 @@ describe 'UpdateManager - lockUpdatesAndDo', -> @UpdateManager = SandboxedModule.require modulePath, requires: "./LockManager" : @LockManager = {} "./RedisManager" : @RedisManager = {} + "./WebRedisManager" : @WebRedisManager = {} "./ShareJsUpdateManager" : @ShareJsUpdateManager = {} "logger-sharelatex": @logger = { log: sinon.stub() } @project_id = "project-id-123" diff --git a/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee new file mode 100644 index 0000000000..932cb92e26 --- /dev/null +++ b/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee @@ -0,0 +1,93 @@ +sinon = require('sinon') +chai = require('chai') +should = chai.should() +modulePath = "../../../../app/js/WebRedisManager.js" +SandboxedModule = require('sandboxed-module') +Errors = require "../../../../app/js/Errors" + +describe "WebRedisManager", -> + beforeEach -> + @rclient = + auth: () -> + exec: sinon.stub() + @rclient.multi = () => @rclient + @WebRedisManager = SandboxedModule.require modulePath, requires: + "redis-sharelatex": createClient: () => @rclient + "settings-sharelatex": redis: web: @settings = {"mock": "settings"} + @doc_id = "doc-id-123" + @project_id = "project-id-123" + @callback = sinon.stub() + + describe "getPendingUpdatesForDoc", -> + beforeEach -> + @rclient.lrange = sinon.stub() + @rclient.del = sinon.stub() + + describe "successfully", -> + beforeEach -> + @updates = [ + { op: [{ i: "foo", p: 4 }] } + { op: [{ i: "foo", p: 4 }] } + ] + @jsonUpdates = @updates.map (update) -> JSON.stringify update + @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates]) + @WebRedisManager.getPendingUpdatesForDoc @doc_id, @callback + + it "should get the pending updates", -> + @rclient.lrange + .calledWith("PendingUpdates:#{@doc_id}", 0, -1) + .should.equal true + + it "should delete the pending updates", -> + @rclient.del + .calledWith("PendingUpdates:#{@doc_id}") + .should.equal true + + it "should call the callback with the updates", -> + @callback.calledWith(null, @updates).should.equal true + + describe "when the JSON doesn't parse", -> + beforeEach -> + @jsonUpdates = [ + JSON.stringify { op: [{ i: "foo", p: 4 }] } + "broken json" + ] + @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates]) + @WebRedisManager.getPendingUpdatesForDoc @doc_id, @callback + + it "should return an error to the callback", -> + @callback.calledWith(new Error("JSON parse error")).should.equal true + + + describe "getUpdatesLength", -> + beforeEach -> + @rclient.llen = sinon.stub().yields(null, @length = 3) + @WebRedisManager.getUpdatesLength @doc_id, @callback + + it "should look up the length", -> + @rclient.llen.calledWith("PendingUpdates:#{@doc_id}").should.equal true + + it "should return the length", -> + @callback.calledWith(null, @length).should.equal true + + describe "pushUncompressedHistoryOp", -> + beforeEach (done) -> + @op = { op: [{ i: "foo", p: 4 }] } + @rclient.rpush = sinon.stub().yields(null, @length = 42) + @rclient.sadd = sinon.stub().yields() + @WebRedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, (args...) => + @callback(args...) + done() + + it "should push the doc op into the doc ops list", -> + @rclient.rpush + .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@op)) + .should.equal true + + it "should add the doc_id to the set of which records the project docs", -> + @rclient.sadd + .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) + .should.equal true + + it "should call the callback with the length", -> + @callback.calledWith(null, @length).should.equal true