mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-11 01:57:23 +00:00
Don't run redis commands in parallel for easier consistency reasoning
This commit is contained in:
parent
437e885812
commit
48a92b28e5
4 changed files with 46 additions and 73 deletions
|
@ -15,40 +15,37 @@ redisOptions.return_buffers = true
|
|||
minutes = 60 # seconds for Redis expire
|
||||
|
||||
module.exports = RedisManager =
|
||||
putDocInMemory : (project_id, doc_id, docLines, version, callback)->
|
||||
putDocInMemory : (project_id, doc_id, docLines, version, _callback)->
|
||||
timer = new metrics.Timer("redis.put-doc")
|
||||
logger.log project_id:project_id, doc_id:doc_id, version: version, "putting doc in redis"
|
||||
async.parallel [
|
||||
(cb) ->
|
||||
multi = rclient.multi()
|
||||
multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines)
|
||||
multi.set keys.projectKey({doc_id:doc_id}), project_id
|
||||
multi.set keys.docVersion(doc_id:doc_id), version
|
||||
multi.exec cb
|
||||
(cb) ->
|
||||
rclient.sadd keys.docsInProject(project_id:project_id), doc_id, cb
|
||||
], (err) ->
|
||||
callback = (error) ->
|
||||
timer.done()
|
||||
callback(err)
|
||||
_callback(error)
|
||||
logger.log project_id:project_id, doc_id:doc_id, version: version, "putting doc in redis"
|
||||
multi = rclient.multi()
|
||||
multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines)
|
||||
multi.set keys.projectKey({doc_id:doc_id}), project_id
|
||||
multi.set keys.docVersion(doc_id:doc_id), version
|
||||
multi.exec (error) ->
|
||||
return callback(error) if error?
|
||||
rclient.sadd keys.docsInProject(project_id:project_id), doc_id, callback
|
||||
|
||||
removeDocFromMemory : (project_id, doc_id, callback)->
|
||||
removeDocFromMemory : (project_id, doc_id, _callback)->
|
||||
logger.log project_id:project_id, doc_id:doc_id, "removing doc from redis"
|
||||
async.parallel [
|
||||
(cb) ->
|
||||
multi = rclient.multi()
|
||||
multi.del keys.docLines(doc_id:doc_id)
|
||||
multi.del keys.projectKey(doc_id:doc_id)
|
||||
multi.del keys.docVersion(doc_id:doc_id)
|
||||
multi.exec cb
|
||||
(cb) ->
|
||||
rclient.srem keys.docsInProject(project_id:project_id), doc_id, cb
|
||||
], (err) ->
|
||||
callback = (err) ->
|
||||
if err?
|
||||
logger.err project_id:project_id, doc_id:doc_id, err:err, "error removing doc from redis"
|
||||
callback(err, null)
|
||||
_callback(err)
|
||||
else
|
||||
logger.log project_id:project_id, doc_id:doc_id, "removed doc from redis"
|
||||
callback()
|
||||
_callback()
|
||||
|
||||
multi = rclient.multi()
|
||||
multi.del keys.docLines(doc_id:doc_id)
|
||||
multi.del keys.projectKey(doc_id:doc_id)
|
||||
multi.del keys.docVersion(doc_id:doc_id)
|
||||
multi.exec (error) ->
|
||||
return callback(error) if error?
|
||||
rclient.srem keys.docsInProject(project_id:project_id), doc_id, callback
|
||||
|
||||
getDoc : (doc_id, callback = (error, lines, version) ->)->
|
||||
timer = new metrics.Timer("redis.get-doc")
|
||||
|
@ -141,15 +138,5 @@ module.exports = RedisManager =
|
|||
version = parseInt(version, 10)
|
||||
callback null, version
|
||||
|
||||
pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error, length) ->) ->
|
||||
jsonOp = JSON.stringify op
|
||||
async.parallel [
|
||||
(cb) -> rclient.rpush keys.uncompressedHistoryOp(doc_id: doc_id), jsonOp, cb
|
||||
(cb) -> rclient.sadd keys.docsWithHistoryOps(project_id: project_id), doc_id, cb
|
||||
], (error, results) ->
|
||||
return callback(error) if error?
|
||||
[length, _] = results
|
||||
callback(error, length)
|
||||
|
||||
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
|
||||
rclient.smembers keys.docsInProject(project_id: project_id), callback
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
settings = require "settings-sharelatex"
|
||||
request = require "request"
|
||||
logger = require "logger-sharelatex"
|
||||
RedisManager = require "./RedisManager"
|
||||
crypto = require("crypto")
|
||||
redis = require("redis-sharelatex")
|
||||
rclient = redis.createClient(settings.redis.web)
|
||||
async = require "async"
|
||||
|
||||
module.exports = TrackChangesManager =
|
||||
flushDocChanges: (project_id, doc_id, callback = (error) ->) ->
|
||||
|
@ -23,8 +24,13 @@ module.exports = TrackChangesManager =
|
|||
|
||||
FLUSH_EVERY_N_OPS: 50
|
||||
pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error) ->) ->
|
||||
RedisManager.pushUncompressedHistoryOp project_id, doc_id, op, (error, length) ->
|
||||
jsonOp = JSON.stringify op
|
||||
multi = rclient.multi()
|
||||
multi.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp
|
||||
multi.sadd "DocsWithHistoryOps:#{project_id}", doc_id
|
||||
multi.exec (error, results) ->
|
||||
return callback(error) if error?
|
||||
[length, _] = results
|
||||
if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0
|
||||
# Do this in the background since it uses HTTP and so may be too
|
||||
# slow to wait for when processing a doc update.
|
||||
|
@ -33,4 +39,3 @@ module.exports = TrackChangesManager =
|
|||
if error?
|
||||
logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api"
|
||||
callback()
|
||||
|
||||
|
|
|
@ -171,28 +171,6 @@ describe "RedisManager", ->
|
|||
it "should log out the problem", ->
|
||||
@logger.warn.called.should.equal true
|
||||
|
||||
describe "pushUncompressedHistoryOp", ->
|
||||
beforeEach (done) ->
|
||||
@op = { op: [{ i: "foo", p: 4 }] }
|
||||
@rclient.rpush = sinon.stub().yields(null, @length = 42)
|
||||
@rclient.sadd = sinon.stub().yields()
|
||||
@RedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, (args...) =>
|
||||
@callback(args...)
|
||||
done()
|
||||
|
||||
it "should push the doc op into the doc ops list", ->
|
||||
@rclient.rpush
|
||||
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@op))
|
||||
.should.equal true
|
||||
|
||||
it "should add the doc_id to the set of which records the project docs", ->
|
||||
@rclient.sadd
|
||||
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback with the length", ->
|
||||
@callback.calledWith(null, @length).should.equal true
|
||||
|
||||
describe "getUpdatesLength", ->
|
||||
beforeEach ->
|
||||
@rclient.llen = sinon.stub().yields(null, @length = 3)
|
||||
|
|
|
@ -7,9 +7,9 @@ describe "TrackChangesManager", ->
|
|||
beforeEach ->
|
||||
@TrackChangesManager = SandboxedModule.require modulePath, requires:
|
||||
"request": @request = {}
|
||||
"settings-sharelatex": @Settings = {}
|
||||
"settings-sharelatex": @Settings = { redis: web: {} }
|
||||
"logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() }
|
||||
"./RedisManager": @RedisManager = {}
|
||||
"redis-sharelatex": createClient: () => @rclient = {}
|
||||
@project_id = "mock-project-id"
|
||||
@doc_id = "mock-doc-id"
|
||||
@callback = sinon.stub()
|
||||
|
@ -42,17 +42,23 @@ describe "TrackChangesManager", ->
|
|||
|
||||
describe "pushUncompressedHistoryOp", ->
|
||||
beforeEach ->
|
||||
@op = "mock-op"
|
||||
@op = { op: [{ i: "foo", p: 4 }] }
|
||||
@rclient.multi = sinon.stub().returns(@multi = {})
|
||||
@multi.rpush = sinon.stub()
|
||||
@multi.sadd = sinon.stub()
|
||||
@multi.exec = sinon.stub().yields(null, [@length = 42, "foo"])
|
||||
@TrackChangesManager.flushDocChanges = sinon.stub().callsArg(2)
|
||||
|
||||
describe "pushing the op", ->
|
||||
beforeEach ->
|
||||
@RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(3, null, 1)
|
||||
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
|
||||
|
||||
it "should push the op into redis", ->
|
||||
@RedisManager.pushUncompressedHistoryOp
|
||||
.calledWith(@project_id, @doc_id, @op)
|
||||
@multi.rpush
|
||||
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify @op)
|
||||
.should.equal true
|
||||
@multi.sadd
|
||||
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback", ->
|
||||
|
@ -63,8 +69,7 @@ describe "TrackChangesManager", ->
|
|||
|
||||
describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", ->
|
||||
beforeEach ->
|
||||
@RedisManager.pushUncompressedHistoryOp =
|
||||
sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
|
||||
@multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"])
|
||||
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
|
||||
|
||||
it "should tell the track changes api to flush", ->
|
||||
|
@ -74,8 +79,7 @@ describe "TrackChangesManager", ->
|
|||
|
||||
describe "when TrackChangesManager errors", ->
|
||||
beforeEach ->
|
||||
@RedisManager.pushUncompressedHistoryOp =
|
||||
sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
|
||||
@multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"])
|
||||
@TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops"))
|
||||
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
|
||||
|
||||
|
@ -89,4 +93,3 @@ describe "TrackChangesManager", ->
|
|||
)
|
||||
.should.equal true
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue