diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 69d917815e..d280de1cea 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -6,6 +6,11 @@ _ = require('underscore') keys = require('./RedisKeyBuilder') logger = require('logger-sharelatex') metrics = require('./Metrics') +ZipManager = require('./ZipManager') + +redisOptions = _.clone(Settings.redis.web) +redisOptions.return_buffers = true +rclientBuffer = redis.createClient(redisOptions) # Make times easy to read minutes = 60 # seconds for Redis expire @@ -13,7 +18,7 @@ minutes = 60 # seconds for Redis expire module.exports = RedisManager = putDocInMemory : (project_id, doc_id, docLines, version, callback)-> timer = new metrics.Timer("redis.put-doc") - logger.log project_id:project_id, doc_id:doc_id, docLines:docLines, version: version, "putting doc in redis" + logger.log project_id:project_id, doc_id:doc_id, version: version, "putting doc in redis" multi = rclient.multi() multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) multi.set keys.projectKey({doc_id:doc_id}), project_id @@ -27,7 +32,6 @@ module.exports = RedisManager = removeDocFromMemory : (project_id, doc_id, callback)-> logger.log project_id:project_id, doc_id:doc_id, "removing doc from redis" multi = rclient.multi() - multi.get keys.docLines(doc_id:doc_id) multi.del keys.docLines(doc_id:doc_id) multi.del keys.projectKey(doc_id:doc_id) multi.del keys.docVersion(doc_id:doc_id) @@ -38,25 +42,26 @@ module.exports = RedisManager = logger.err project_id:project_id, doc_id:doc_id, err:err, "error removing doc from redis" callback(err, null) else - docLines = replys[0] - logger.log project_id:project_id, doc_id:doc_id, docLines:docLines, "removed doc from redis" + logger.log project_id:project_id, doc_id:doc_id, "removed doc from redis" callback() getDoc : (doc_id, callback = (error, lines, version) ->)-> timer = new metrics.Timer("redis.get-doc") - multi = rclient.multi() + # use Buffer when retrieving data as it may be gzipped + multi = rclientBuffer.multi() linesKey = keys.docLines(doc_id:doc_id) multi.get linesKey multi.get keys.docVersion(doc_id:doc_id) multi.exec (error, result)-> timer.done() return callback(error) if error? - try - docLines = JSON.parse result[0] - catch e - return callback(e) - version = parseInt(result[1] or 0, 10) - callback null, docLines, version + ZipManager.uncompressIfNeeded doc_id, result, (error, result) -> + try + docLines = JSON.parse result[0] + catch e + return callback(e) + version = parseInt(result[1] or 0, 10) + callback null, docLines, version getDocVersion: (doc_id, callback = (error, version) ->) -> rclient.get keys.docVersion(doc_id: doc_id), (error, version) -> @@ -70,11 +75,12 @@ module.exports = RedisManager = callback null, len setDocument : (doc_id, docLines, version, callback = (error) ->)-> - multi = rclient.multi() - multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) - multi.set keys.docVersion(doc_id:doc_id), version - multi.incr keys.now("docsets") - multi.exec (error, replys) -> callback(error) + ZipManager.compressIfNeeded doc_id, JSON.stringify(docLines), (err, result) -> + multi = rclient.multi() + multi.set keys.docLines(doc_id:doc_id), result + multi.set keys.docVersion(doc_id:doc_id), version + multi.incr keys.now("docsets") + multi.exec (error, replys) -> callback(error) getPendingUpdatesForDoc : (doc_id, callback)-> multi = rclient.multi() @@ -170,25 +176,3 @@ module.exports = RedisManager = getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback - - -getDocumentsProjectId = (doc_id, callback)-> - rclient.get keys.projectKey({doc_id:doc_id}), (err, project_id)-> - callback err, {doc_id:doc_id, project_id:project_id} - -getAllProjectDocsIds = (project_id, callback)-> - rclient.SMEMBERS keys.docsInProject(project_id:project_id), (err, doc_ids)-> - if callback? - callback(err, doc_ids) - -getDocumentsAndExpire = (doc_ids, callback)-> - multi = rclient.multi() - oneDay = 86400 - doc_ids.forEach (doc_id)-> - # rclient.expire keys.docLines(doc_id:doc_id), oneDay, -> - doc_ids.forEach (doc_id)-> - multi.get keys.docLines(doc_id:doc_id) - multi.exec (err, docsLines)-> - callback err, docsLines - - diff --git a/services/document-updater/app/coffee/ZipManager.coffee b/services/document-updater/app/coffee/ZipManager.coffee new file mode 100644 index 0000000000..18482a5c69 --- /dev/null +++ b/services/document-updater/app/coffee/ZipManager.coffee @@ -0,0 +1,73 @@ +Settings = require('settings-sharelatex') +logger = require('logger-sharelatex') +metrics = require('./Metrics') +zlib = require('zlib') + +# Compress and uncompress data sent to Redis using the node 'zlib' +# module, to reduce load on Redis. +# +# Measurements show that most of the load on Redis comes from a very +# large documents. We can shift some of that CPU load from redis to +# the docupdaters (which are scalable) by compressing the data in the +# docupdater first. +# +# To avoid overloading the docupdater clients we impose a minimum size +# on data we will compress, so we only catch the large ones. +# +# The optimimum size for the cutoff is about 10K, below this we do +# more work but don't really gain any extra reduction in Redis CPU +# +# |--------------------+-----------+--------------------------| +# | Compression cutoff | Redis CPU | Extra doc updater CPU(*) | +# |--------------------+-----------+--------------------------| +# | N/A | 100% | 0% | +# | 100k | 80% | 10% | +# | 10k | 50% | 30% | +# |--------------------+-----------+--------------------------| +# +# (*) percentage of a single core, because node zlib runs in multiple +# threads. + +ZIP_WRITES_ENABLED = Settings.redis.zip?.writesEnabled +ZIP_MINSIZE = Settings.redis.zip?.minSize || 64*1024 + +module.exports = ZipManager = + uncompressIfNeeded: (doc_id, result, callback) -> + # result is an array of [text, version]. Each entry is a node + # Buffer object which we need to convert to strings on output + + # first make sure the version (result[1]) is returned as a string + if result?[1]?.toString? + result[1] = result[1].toString() + + # now uncompress the text (result[0]) if needed + buf = result?[0] + + # Check if we have a GZIP file (magic numbers in header) + if buf? and buf[0] == 0x1F and buf[1] == 0x8B + zlib.gunzip buf, (err, newbuf) -> + if err? + logger.err doc_id:doc_id, err:err, "error uncompressing doc" + callback(err, null) + else + logger.log doc_id:doc_id, fromBytes: buf.length, toChars: newbuf.length, factor: buf.length/newbuf.length, "uncompressed successfully" + result[0] = newbuf.toString() + callback(null, result) + else + # if we don't have a GZIP file it's just a buffer of text, convert it back to a string + if buf?.toString? + result[0] = buf.toString() + callback(null, result) + + compressIfNeeded: (doc_id, text, callback) -> + if ZIP_WRITES_ENABLED and ZIP_MINSIZE > 1024 and text.length > ZIP_MINSIZE + # N.B. skip files of 1k or less, because gzip increases the size + zlib.gzip text, (err, buf) -> + if err? + logger.err doc_id:doc_id, err:err, "error compressing doc" + callback(err, null) + else + logger.log doc_id:doc_id, fromChars: text.length, toBytes: buf.length, factor: buf.length/text.length , "compressed successfully" + callback(null, buf) + else + callback(null, text) diff --git a/services/document-updater/config/settings.defaults.coffee b/services/document-updater/config/settings.defaults.coffee index b4f12ed81c..babdaafdc3 100755 --- a/services/document-updater/config/settings.defaults.coffee +++ b/services/document-updater/config/settings.defaults.coffee @@ -20,6 +20,9 @@ module.exports = port:"6379" host:"localhost" password:"" + zip: + minSize: 10*1024 + writesEnabled: false mongo: url: 'mongodb://127.0.0.1/sharelatex' diff --git a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee index 1e9c2e2689..f213458168 100644 --- a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee +++ b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee @@ -238,3 +238,153 @@ describe "Applying updates to a doc", -> doc.lines.should.deep.equal @result done() + + + +describe "Applying updates to a large doc (uses compression)", -> + MIN_SIZE = 500000 + before -> + @lines = ["one", "two", "three"] + while @lines.join('').length < MIN_SIZE + @lines.push "this is a repeated long line which will create a large document which must be compressed #{@lines.length}" + @version = 42 + @update = + doc: @doc_id + op: [{ + i: "one and a half\n" + p: 4 + }] + v: @version + @result = @lines.slice() + @result.splice 1, 0, "one and a half" + + describe "when the document is not loaded", -> + before (done) -> + [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] + sinon.spy MockWebApi, "getDocument" + + MockWebApi.insertDoc @project_id, @doc_id, lines: @lines + db.docOps.insert { + doc_id: ObjectId(@doc_id) + version: @version + }, (error) => + throw error if error? + DocUpdaterClient.sendUpdate @project_id, @doc_id, @update, (error) -> + throw error if error? + setTimeout done, 200 + + after -> + MockWebApi.getDocument.restore() + + it "should load the document from the web API", -> + MockWebApi.getDocument + .calledWith(@project_id, @doc_id) + .should.equal true + + it "should update the doc", (done) -> + DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) => + doc.lines.should.deep.equal @result + done() + + it "should push the applied updates to the track changes api", (done) -> + rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + throw error if error? + JSON.parse(updates[0]).op.should.deep.equal @update.op + rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + throw error if error? + result.should.equal 1 + done() + + + describe "when the document is loaded", -> + before (done) -> + [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] + + MockWebApi.insertDoc @project_id, @doc_id, lines: @lines + db.docOps.insert doc_id: ObjectId(@doc_id), version: @version, (error) => + throw error if error? + DocUpdaterClient.preloadDoc @project_id, @doc_id, (error) => + throw error if error? + sinon.spy MockWebApi, "getDocument" + DocUpdaterClient.sendUpdate @project_id, @doc_id, @update, (error) -> + throw error if error? + setTimeout done, 200 + + after -> + MockWebApi.getDocument.restore() + + it "should not need to call the web api", -> + MockWebApi.getDocument.called.should.equal false + + it "should update the doc", (done) -> + DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) => + doc.lines.should.deep.equal @result + done() + + it "should push the applied updates to the track changes api", (done) -> + rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + JSON.parse(updates[0]).op.should.deep.equal @update.op + rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + result.should.equal 1 + done() + + describe "with a broken update", -> + before (done) -> + [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] + MockWebApi.insertDoc @project_id, @doc_id, lines: @lines + db.docOps.insert doc_id: ObjectId(@doc_id), version: @version, (error) => + throw error if error? + DocUpdaterClient.sendUpdate @project_id, @doc_id, @undefined, (error) -> + throw error if error? + setTimeout done, 200 + + it "should not update the doc", (done) -> + DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) => + doc.lines.should.deep.equal @lines + done() + + describe "with enough updates to flush to the track changes api", -> + before (done) -> + [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] + updates = [] + for v in [0..99] # Should flush after 50 ops + updates.push + doc_id: @doc_id, + op: [i: v.toString(), p: 0] + v: v + + sinon.spy MockTrackChangesApi, "flushDoc" + + MockWebApi.insertDoc @project_id, @doc_id, lines: @lines + db.docOps.insert doc_id: ObjectId(@doc_id), version: 0, (error) => + throw error if error? + DocUpdaterClient.sendUpdates @project_id, @doc_id, updates, (error) => + throw error if error? + setTimeout done, 200 + + after -> + MockTrackChangesApi.flushDoc.restore() + + it "should flush the doc twice", -> + MockTrackChangesApi.flushDoc.calledTwice.should.equal true + + describe "when there is no version in Mongo", -> + before (done) -> + [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] + MockWebApi.insertDoc @project_id, @doc_id, { + lines: @lines + } + + update = + doc: @doc_id + op: @update.op + v: 0 + DocUpdaterClient.sendUpdate @project_id, @doc_id, update, (error) -> + throw error if error? + setTimeout done, 200 + + it "should update the doc (using version = 0)", (done) -> + DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) => + doc.lines.should.deep.equal @result + done() + diff --git a/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee index 81eb0bfefe..86ab837a2f 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee @@ -11,7 +11,7 @@ describe "RedisManager.clearDocFromPendingUpdatesSet", -> @callback = sinon.stub() @RedisManager = SandboxedModule.require modulePath, requires: "redis-sharelatex" : createClient: () => - @rclient = auth:-> + @rclient ?= auth:-> # only assign one rclient "logger-sharelatex": {} @rclient.srem = sinon.stub().callsArg(2) diff --git a/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee index 5bbb93a723..2f54ba171e 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee @@ -9,7 +9,7 @@ describe "RedisManager.getDocsWithPendingUpdates", -> @callback = sinon.stub() @RedisManager = SandboxedModule.require modulePath, requires: "redis-sharelatex" : createClient: () => - @rclient = auth:-> + @rclient ?= auth:-> "logger-sharelatex": {} @docs = [{ diff --git a/services/document-updater/test/unit/coffee/RedisManager/getPreviousDocOpsTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/getPreviousDocOpsTests.coffee index 6cd4980fd8..4a6d42c1ab 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/getPreviousDocOpsTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/getPreviousDocOpsTests.coffee @@ -9,7 +9,7 @@ describe "RedisManager.getPreviousDocOpsTests", -> @callback = sinon.stub() @RedisManager = SandboxedModule.require modulePath, requires: "redis-sharelatex" : createClient: () => - @rclient = + @rclient ?= auth: -> multi: => @rclient "logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub() } diff --git a/services/document-updater/test/unit/coffee/RedisManager/pushDocOpTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/pushDocOpTests.coffee index 71a36bb4f3..a90b20bced 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/pushDocOpTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/pushDocOpTests.coffee @@ -8,7 +8,7 @@ describe "RedisManager.pushDocOp", -> beforeEach -> @RedisManager = SandboxedModule.require modulePath, requires: "redis-sharelatex": createClient: () => - @rclient = + @rclient ?= auth: () -> multi: () => @rclient "logger-sharelatex": @logger = {log: sinon.stub()} diff --git a/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee index 621a3b1a3b..82b28a25d2 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee @@ -8,7 +8,7 @@ describe "RedisManager.pushUncompressedHistoryOp", -> beforeEach -> @RedisManager = SandboxedModule.require modulePath, requires: "redis-sharelatex": createClient: () => - @rclient = + @rclient ?= auth: () -> multi: () => @rclient "logger-sharelatex": @logger = {log: sinon.stub()} diff --git a/services/document-updater/test/unit/coffee/ZipManager/ZipManager.coffee b/services/document-updater/test/unit/coffee/ZipManager/ZipManager.coffee new file mode 100644 index 0000000000..0ae53ebc19 --- /dev/null +++ b/services/document-updater/test/unit/coffee/ZipManager/ZipManager.coffee @@ -0,0 +1,157 @@ +sinon = require('sinon') +chai = require('chai') +should = chai.should() +zipModulePath = "../../../../app/js/ZipManager" +redisModulePath = "../../../../app/js/RedisManager" +SandboxedModule = require('sandboxed-module') + +MIN_SIZE = 9999 + +describe "ZipManager with RedisManager", -> + describe "for a small document (uncompressed)", -> + rclient = null + beforeEach (done) -> + @ZipManager = SandboxedModule.require zipModulePath, requires: + "logger-sharelatex": log:-> + 'settings-sharelatex': redis: + web: + host: 'none' + port: 'none' + zip: + writesEnabled: true + minSize: MIN_SIZE + @RedisManager = SandboxedModule.require redisModulePath, requires: + "./ZipManager" : @ZipManager + "redis-sharelatex" : createClient: () => + rclient ?= + auth:-> # only assign one rclient + multi: () => rclient + set: (key, value) => rclient.store[key] = value + get: (key) => rclient.results.push rclient.store[key] + incr: (key) => rclient.store[key]++ + exec: (callback) => + callback.apply(null, [null, rclient.results]) + rclient.results = [] + store: {} + results: [] + "logger-sharelatex": {} + @doc_id = "document-id" + @version = 123 + + @docLines = ["hello", "world"] + @callback = sinon.stub() + + @RedisManager.setDocument @doc_id, @docLines, @version, () => + @callback() + done() + + it "should set the document", -> + rclient.store['doclines:document-id'] + .should.equal JSON.stringify(@docLines) + + it "should return the callback", -> + @callback.called.should.equal true + + it "should get the document back again", (done) -> + @RedisManager.getDoc @doc_id, (err, lines, version) => + @docLines.should.eql lines + done() + + describe "for a large document (with compression enabled)", -> + rclient = null + beforeEach (done) -> + @ZipManager = SandboxedModule.require zipModulePath, requires: + "logger-sharelatex": log:-> + 'settings-sharelatex': redis: + web: + host: 'none' + port: 'none' + zip: + writesEnabled: true + minSize: MIN_SIZE + @RedisManager = SandboxedModule.require redisModulePath, requires: + "./ZipManager" : @ZipManager + "redis-sharelatex" : createClient: () => + rclient ?= + auth:-> # only assign one rclient + multi: () => rclient + set: (key, value) => rclient.store[key] = value + get: (key) => rclient.results.push rclient.store[key] + incr: (key) => rclient.store[key]++ + exec: (callback) => + callback.apply(null, [null, rclient.results]) + rclient.results = [] + store: {} + results: [] + "logger-sharelatex": {} + @doc_id = "document-id" + @version = 123 + + @docLines = [] + while @docLines.join('').length <= MIN_SIZE + @docLines.push "this is a long line in a long document" + @callback = sinon.stub() + @RedisManager.setDocument @doc_id, @docLines, @version, () => + @callback() + done() + + it "should set the document as a gzipped blob", -> + rclient.store['doclines:document-id'] + .should.not.equal JSON.stringify(@docLines) + + it "should return the callback", -> + @callback.called.should.equal true + + it "should get the uncompressed document back again", (done) -> + @RedisManager.getDoc @doc_id, (err, lines, version) => + @docLines.should.eql lines + done() + + describe "for a large document (with compression disabled)", -> + rclient = null + beforeEach (done) -> + @ZipManager = SandboxedModule.require zipModulePath, requires: + "logger-sharelatex": log:-> + 'settings-sharelatex': redis: + web: + host: 'none' + port: 'none' + zip: + writesEnabled: false + minSize: MIN_SIZE + @RedisManager = SandboxedModule.require redisModulePath, requires: + "./ZipManager" : @ZipManager + "redis-sharelatex" : createClient: () => + rclient ?= + auth:-> # only assign one rclient + multi: () => rclient + set: (key, value) => rclient.store[key] = value + get: (key) => rclient.results.push rclient.store[key] + incr: (key) => rclient.store[key]++ + exec: (callback) => + callback.apply(null, [null, rclient.results]) + rclient.results = [] + store: {} + results: [] + "logger-sharelatex": {} + @doc_id = "document-id" + @version = 123 + @docLines = [] + while @docLines.join('').length <= MIN_SIZE + @docLines.push "this is a long line in a long document" + @callback = sinon.stub() + @RedisManager.setDocument @doc_id, @docLines, @version, () => + @callback() + done() + + it "should set the document", -> + rclient.store['doclines:document-id'] + .should.equal JSON.stringify(@docLines) + + it "should return the callback", -> + @callback.called.should.equal true + + it "should get the document back again", (done) -> + @RedisManager.getDoc @doc_id, (err, lines, version) => + @docLines.should.eql lines + done()