Merge pull request #3 from sharelatex/gzip-large-docs

Gzip large docs
This commit is contained in:
Brian Gough 2015-03-31 10:58:18 +01:00
commit ea92fe0e9e
10 changed files with 410 additions and 43 deletions

View file

@ -6,6 +6,11 @@ _ = require('underscore')
keys = require('./RedisKeyBuilder')
logger = require('logger-sharelatex')
metrics = require('./Metrics')
ZipManager = require('./ZipManager')
redisOptions = _.clone(Settings.redis.web)
redisOptions.return_buffers = true
rclientBuffer = redis.createClient(redisOptions)
# Make times easy to read
minutes = 60 # seconds for Redis expire
@ -13,7 +18,7 @@ minutes = 60 # seconds for Redis expire
module.exports = RedisManager =
putDocInMemory : (project_id, doc_id, docLines, version, callback)->
timer = new metrics.Timer("redis.put-doc")
logger.log project_id:project_id, doc_id:doc_id, docLines:docLines, version: version, "putting doc in redis"
logger.log project_id:project_id, doc_id:doc_id, version: version, "putting doc in redis"
multi = rclient.multi()
multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines)
multi.set keys.projectKey({doc_id:doc_id}), project_id
@ -27,7 +32,6 @@ module.exports = RedisManager =
removeDocFromMemory : (project_id, doc_id, callback)->
logger.log project_id:project_id, doc_id:doc_id, "removing doc from redis"
multi = rclient.multi()
multi.get keys.docLines(doc_id:doc_id)
multi.del keys.docLines(doc_id:doc_id)
multi.del keys.projectKey(doc_id:doc_id)
multi.del keys.docVersion(doc_id:doc_id)
@ -38,25 +42,26 @@ module.exports = RedisManager =
logger.err project_id:project_id, doc_id:doc_id, err:err, "error removing doc from redis"
callback(err, null)
else
docLines = replys[0]
logger.log project_id:project_id, doc_id:doc_id, docLines:docLines, "removed doc from redis"
logger.log project_id:project_id, doc_id:doc_id, "removed doc from redis"
callback()
getDoc : (doc_id, callback = (error, lines, version) ->)->
timer = new metrics.Timer("redis.get-doc")
multi = rclient.multi()
# use Buffer when retrieving data as it may be gzipped
multi = rclientBuffer.multi()
linesKey = keys.docLines(doc_id:doc_id)
multi.get linesKey
multi.get keys.docVersion(doc_id:doc_id)
multi.exec (error, result)->
timer.done()
return callback(error) if error?
try
docLines = JSON.parse result[0]
catch e
return callback(e)
version = parseInt(result[1] or 0, 10)
callback null, docLines, version
ZipManager.uncompressIfNeeded doc_id, result, (error, result) ->
try
docLines = JSON.parse result[0]
catch e
return callback(e)
version = parseInt(result[1] or 0, 10)
callback null, docLines, version
getDocVersion: (doc_id, callback = (error, version) ->) ->
rclient.get keys.docVersion(doc_id: doc_id), (error, version) ->
@ -70,11 +75,12 @@ module.exports = RedisManager =
callback null, len
setDocument : (doc_id, docLines, version, callback = (error) ->)->
multi = rclient.multi()
multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines)
multi.set keys.docVersion(doc_id:doc_id), version
multi.incr keys.now("docsets")
multi.exec (error, replys) -> callback(error)
ZipManager.compressIfNeeded doc_id, JSON.stringify(docLines), (err, result) ->
multi = rclient.multi()
multi.set keys.docLines(doc_id:doc_id), result
multi.set keys.docVersion(doc_id:doc_id), version
multi.incr keys.now("docsets")
multi.exec (error, replys) -> callback(error)
getPendingUpdatesForDoc : (doc_id, callback)->
multi = rclient.multi()
@ -170,25 +176,3 @@ module.exports = RedisManager =
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
rclient.smembers keys.docsInProject(project_id: project_id), callback
getDocumentsProjectId = (doc_id, callback)->
rclient.get keys.projectKey({doc_id:doc_id}), (err, project_id)->
callback err, {doc_id:doc_id, project_id:project_id}
getAllProjectDocsIds = (project_id, callback)->
rclient.SMEMBERS keys.docsInProject(project_id:project_id), (err, doc_ids)->
if callback?
callback(err, doc_ids)
getDocumentsAndExpire = (doc_ids, callback)->
multi = rclient.multi()
oneDay = 86400
doc_ids.forEach (doc_id)->
# rclient.expire keys.docLines(doc_id:doc_id), oneDay, ->
doc_ids.forEach (doc_id)->
multi.get keys.docLines(doc_id:doc_id)
multi.exec (err, docsLines)->
callback err, docsLines

View file

@ -0,0 +1,73 @@
Settings = require('settings-sharelatex')
logger = require('logger-sharelatex')
metrics = require('./Metrics')
zlib = require('zlib')
# Compress and uncompress data sent to Redis using the node 'zlib'
# module, to reduce load on Redis.
#
# Measurements show that most of the load on Redis comes from a very
# large documents. We can shift some of that CPU load from redis to
# the docupdaters (which are scalable) by compressing the data in the
# docupdater first.
#
# To avoid overloading the docupdater clients we impose a minimum size
# on data we will compress, so we only catch the large ones.
#
# The optimimum size for the cutoff is about 10K, below this we do
# more work but don't really gain any extra reduction in Redis CPU
#
# |--------------------+-----------+--------------------------|
# | Compression cutoff | Redis CPU | Extra doc updater CPU(*) |
# |--------------------+-----------+--------------------------|
# | N/A | 100% | 0% |
# | 100k | 80% | 10% |
# | 10k | 50% | 30% |
# |--------------------+-----------+--------------------------|
#
# (*) percentage of a single core, because node zlib runs in multiple
# threads.
ZIP_WRITES_ENABLED = Settings.redis.zip?.writesEnabled
ZIP_MINSIZE = Settings.redis.zip?.minSize || 64*1024
module.exports = ZipManager =
uncompressIfNeeded: (doc_id, result, callback) ->
# result is an array of [text, version]. Each entry is a node
# Buffer object which we need to convert to strings on output
# first make sure the version (result[1]) is returned as a string
if result?[1]?.toString?
result[1] = result[1].toString()
# now uncompress the text (result[0]) if needed
buf = result?[0]
# Check if we have a GZIP file (magic numbers in header)
if buf? and buf[0] == 0x1F and buf[1] == 0x8B
zlib.gunzip buf, (err, newbuf) ->
if err?
logger.err doc_id:doc_id, err:err, "error uncompressing doc"
callback(err, null)
else
logger.log doc_id:doc_id, fromBytes: buf.length, toChars: newbuf.length, factor: buf.length/newbuf.length, "uncompressed successfully"
result[0] = newbuf.toString()
callback(null, result)
else
# if we don't have a GZIP file it's just a buffer of text, convert it back to a string
if buf?.toString?
result[0] = buf.toString()
callback(null, result)
compressIfNeeded: (doc_id, text, callback) ->
if ZIP_WRITES_ENABLED and ZIP_MINSIZE > 1024 and text.length > ZIP_MINSIZE
# N.B. skip files of 1k or less, because gzip increases the size
zlib.gzip text, (err, buf) ->
if err?
logger.err doc_id:doc_id, err:err, "error compressing doc"
callback(err, null)
else
logger.log doc_id:doc_id, fromChars: text.length, toBytes: buf.length, factor: buf.length/text.length , "compressed successfully"
callback(null, buf)
else
callback(null, text)

View file

@ -20,6 +20,9 @@ module.exports =
port:"6379"
host:"localhost"
password:""
zip:
minSize: 10*1024
writesEnabled: false
mongo:
url: 'mongodb://127.0.0.1/sharelatex'

View file

@ -238,3 +238,153 @@ describe "Applying updates to a doc", ->
doc.lines.should.deep.equal @result
done()
describe "Applying updates to a large doc (uses compression)", ->
MIN_SIZE = 500000
before ->
@lines = ["one", "two", "three"]
while @lines.join('').length < MIN_SIZE
@lines.push "this is a repeated long line which will create a large document which must be compressed #{@lines.length}"
@version = 42
@update =
doc: @doc_id
op: [{
i: "one and a half\n"
p: 4
}]
v: @version
@result = @lines.slice()
@result.splice 1, 0, "one and a half"
describe "when the document is not loaded", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
sinon.spy MockWebApi, "getDocument"
MockWebApi.insertDoc @project_id, @doc_id, lines: @lines
db.docOps.insert {
doc_id: ObjectId(@doc_id)
version: @version
}, (error) =>
throw error if error?
DocUpdaterClient.sendUpdate @project_id, @doc_id, @update, (error) ->
throw error if error?
setTimeout done, 200
after ->
MockWebApi.getDocument.restore()
it "should load the document from the web API", ->
MockWebApi.getDocument
.calledWith(@project_id, @doc_id)
.should.equal true
it "should update the doc", (done) ->
DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) =>
doc.lines.should.deep.equal @result
done()
it "should push the applied updates to the track changes api", (done) ->
rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) =>
throw error if error?
JSON.parse(updates[0]).op.should.deep.equal @update.op
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
throw error if error?
result.should.equal 1
done()
describe "when the document is loaded", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
MockWebApi.insertDoc @project_id, @doc_id, lines: @lines
db.docOps.insert doc_id: ObjectId(@doc_id), version: @version, (error) =>
throw error if error?
DocUpdaterClient.preloadDoc @project_id, @doc_id, (error) =>
throw error if error?
sinon.spy MockWebApi, "getDocument"
DocUpdaterClient.sendUpdate @project_id, @doc_id, @update, (error) ->
throw error if error?
setTimeout done, 200
after ->
MockWebApi.getDocument.restore()
it "should not need to call the web api", ->
MockWebApi.getDocument.called.should.equal false
it "should update the doc", (done) ->
DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) =>
doc.lines.should.deep.equal @result
done()
it "should push the applied updates to the track changes api", (done) ->
rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) =>
JSON.parse(updates[0]).op.should.deep.equal @update.op
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
result.should.equal 1
done()
describe "with a broken update", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
MockWebApi.insertDoc @project_id, @doc_id, lines: @lines
db.docOps.insert doc_id: ObjectId(@doc_id), version: @version, (error) =>
throw error if error?
DocUpdaterClient.sendUpdate @project_id, @doc_id, @undefined, (error) ->
throw error if error?
setTimeout done, 200
it "should not update the doc", (done) ->
DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) =>
doc.lines.should.deep.equal @lines
done()
describe "with enough updates to flush to the track changes api", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
updates = []
for v in [0..99] # Should flush after 50 ops
updates.push
doc_id: @doc_id,
op: [i: v.toString(), p: 0]
v: v
sinon.spy MockTrackChangesApi, "flushDoc"
MockWebApi.insertDoc @project_id, @doc_id, lines: @lines
db.docOps.insert doc_id: ObjectId(@doc_id), version: 0, (error) =>
throw error if error?
DocUpdaterClient.sendUpdates @project_id, @doc_id, updates, (error) =>
throw error if error?
setTimeout done, 200
after ->
MockTrackChangesApi.flushDoc.restore()
it "should flush the doc twice", ->
MockTrackChangesApi.flushDoc.calledTwice.should.equal true
describe "when there is no version in Mongo", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
MockWebApi.insertDoc @project_id, @doc_id, {
lines: @lines
}
update =
doc: @doc_id
op: @update.op
v: 0
DocUpdaterClient.sendUpdate @project_id, @doc_id, update, (error) ->
throw error if error?
setTimeout done, 200
it "should update the doc (using version = 0)", (done) ->
DocUpdaterClient.getDoc @project_id, @doc_id, (error, res, doc) =>
doc.lines.should.deep.equal @result
done()

View file

@ -11,7 +11,7 @@ describe "RedisManager.clearDocFromPendingUpdatesSet", ->
@callback = sinon.stub()
@RedisManager = SandboxedModule.require modulePath, requires:
"redis-sharelatex" : createClient: () =>
@rclient = auth:->
@rclient ?= auth:-> # only assign one rclient
"logger-sharelatex": {}
@rclient.srem = sinon.stub().callsArg(2)

View file

@ -9,7 +9,7 @@ describe "RedisManager.getDocsWithPendingUpdates", ->
@callback = sinon.stub()
@RedisManager = SandboxedModule.require modulePath, requires:
"redis-sharelatex" : createClient: () =>
@rclient = auth:->
@rclient ?= auth:->
"logger-sharelatex": {}
@docs = [{

View file

@ -9,7 +9,7 @@ describe "RedisManager.getPreviousDocOpsTests", ->
@callback = sinon.stub()
@RedisManager = SandboxedModule.require modulePath, requires:
"redis-sharelatex" : createClient: () =>
@rclient =
@rclient ?=
auth: ->
multi: => @rclient
"logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub() }

View file

@ -8,7 +8,7 @@ describe "RedisManager.pushDocOp", ->
beforeEach ->
@RedisManager = SandboxedModule.require modulePath, requires:
"redis-sharelatex": createClient: () =>
@rclient =
@rclient ?=
auth: () ->
multi: () => @rclient
"logger-sharelatex": @logger = {log: sinon.stub()}

View file

@ -8,7 +8,7 @@ describe "RedisManager.pushUncompressedHistoryOp", ->
beforeEach ->
@RedisManager = SandboxedModule.require modulePath, requires:
"redis-sharelatex": createClient: () =>
@rclient =
@rclient ?=
auth: () ->
multi: () => @rclient
"logger-sharelatex": @logger = {log: sinon.stub()}

View file

@ -0,0 +1,157 @@
sinon = require('sinon')
chai = require('chai')
should = chai.should()
zipModulePath = "../../../../app/js/ZipManager"
redisModulePath = "../../../../app/js/RedisManager"
SandboxedModule = require('sandboxed-module')
MIN_SIZE = 9999
describe "ZipManager with RedisManager", ->
describe "for a small document (uncompressed)", ->
rclient = null
beforeEach (done) ->
@ZipManager = SandboxedModule.require zipModulePath, requires:
"logger-sharelatex": log:->
'settings-sharelatex': redis:
web:
host: 'none'
port: 'none'
zip:
writesEnabled: true
minSize: MIN_SIZE
@RedisManager = SandboxedModule.require redisModulePath, requires:
"./ZipManager" : @ZipManager
"redis-sharelatex" : createClient: () =>
rclient ?=
auth:-> # only assign one rclient
multi: () => rclient
set: (key, value) => rclient.store[key] = value
get: (key) => rclient.results.push rclient.store[key]
incr: (key) => rclient.store[key]++
exec: (callback) =>
callback.apply(null, [null, rclient.results])
rclient.results = []
store: {}
results: []
"logger-sharelatex": {}
@doc_id = "document-id"
@version = 123
@docLines = ["hello", "world"]
@callback = sinon.stub()
@RedisManager.setDocument @doc_id, @docLines, @version, () =>
@callback()
done()
it "should set the document", ->
rclient.store['doclines:document-id']
.should.equal JSON.stringify(@docLines)
it "should return the callback", ->
@callback.called.should.equal true
it "should get the document back again", (done) ->
@RedisManager.getDoc @doc_id, (err, lines, version) =>
@docLines.should.eql lines
done()
describe "for a large document (with compression enabled)", ->
rclient = null
beforeEach (done) ->
@ZipManager = SandboxedModule.require zipModulePath, requires:
"logger-sharelatex": log:->
'settings-sharelatex': redis:
web:
host: 'none'
port: 'none'
zip:
writesEnabled: true
minSize: MIN_SIZE
@RedisManager = SandboxedModule.require redisModulePath, requires:
"./ZipManager" : @ZipManager
"redis-sharelatex" : createClient: () =>
rclient ?=
auth:-> # only assign one rclient
multi: () => rclient
set: (key, value) => rclient.store[key] = value
get: (key) => rclient.results.push rclient.store[key]
incr: (key) => rclient.store[key]++
exec: (callback) =>
callback.apply(null, [null, rclient.results])
rclient.results = []
store: {}
results: []
"logger-sharelatex": {}
@doc_id = "document-id"
@version = 123
@docLines = []
while @docLines.join('').length <= MIN_SIZE
@docLines.push "this is a long line in a long document"
@callback = sinon.stub()
@RedisManager.setDocument @doc_id, @docLines, @version, () =>
@callback()
done()
it "should set the document as a gzipped blob", ->
rclient.store['doclines:document-id']
.should.not.equal JSON.stringify(@docLines)
it "should return the callback", ->
@callback.called.should.equal true
it "should get the uncompressed document back again", (done) ->
@RedisManager.getDoc @doc_id, (err, lines, version) =>
@docLines.should.eql lines
done()
describe "for a large document (with compression disabled)", ->
rclient = null
beforeEach (done) ->
@ZipManager = SandboxedModule.require zipModulePath, requires:
"logger-sharelatex": log:->
'settings-sharelatex': redis:
web:
host: 'none'
port: 'none'
zip:
writesEnabled: false
minSize: MIN_SIZE
@RedisManager = SandboxedModule.require redisModulePath, requires:
"./ZipManager" : @ZipManager
"redis-sharelatex" : createClient: () =>
rclient ?=
auth:-> # only assign one rclient
multi: () => rclient
set: (key, value) => rclient.store[key] = value
get: (key) => rclient.results.push rclient.store[key]
incr: (key) => rclient.store[key]++
exec: (callback) =>
callback.apply(null, [null, rclient.results])
rclient.results = []
store: {}
results: []
"logger-sharelatex": {}
@doc_id = "document-id"
@version = 123
@docLines = []
while @docLines.join('').length <= MIN_SIZE
@docLines.push "this is a long line in a long document"
@callback = sinon.stub()
@RedisManager.setDocument @doc_id, @docLines, @version, () =>
@callback()
done()
it "should set the document", ->
rclient.store['doclines:document-id']
.should.equal JSON.stringify(@docLines)
it "should return the callback", ->
@callback.called.should.equal true
it "should get the document back again", (done) ->
@RedisManager.getDoc @doc_id, (err, lines, version) =>
@docLines.should.eql lines
done()