Merge pull request #3 from sharelatex/bg-auto-doc-flush

add unflushed time to doc in redis; connects to overleaf/sharelatex#190
This commit is contained in:
Brian Gough 2017-10-12 10:55:54 +01:00 committed by GitHub
commit 08f0c67cbf
10 changed files with 180 additions and 79 deletions

View file

@ -38,7 +38,10 @@ app.param 'doc_id', (req, res, next, doc_id) ->
next new Error("invalid doc id")
app.get '/project/:project_id/doc/:doc_id', HttpController.getDoc
app.get '/project/:project_id/doc', HttpController.getProjectDocs
# temporarily keep the GET method for backwards compatibility
app.get '/project/:project_id/doc', HttpController.getProjectDocsAndFlushIfOld
# will migrate to the POST method of get_and_flush_if_old instead
app.post '/project/:project_id/get_and_flush_if_old', HttpController.getProjectDocsAndFlushIfOld
app.post '/project/:project_id/clearState', HttpController.clearProjectState
app.post '/project/:project_id/doc/:doc_id', HttpController.setDoc
app.post '/project/:project_id/doc/:doc_id/flush', HttpController.flushDocIfLoaded

View file

@ -8,14 +8,16 @@ RealTimeRedisManager = require "./RealTimeRedisManager"
Errors = require "./Errors"
RangesManager = require "./RangesManager"
MAX_UNFLUSHED_AGE = 300 * 1000 # 5 mins, document should be flushed to mongo this time after a change
module.exports = DocumentManager =
getDoc: (project_id, doc_id, _callback = (error, lines, version, alreadyLoaded) ->) ->
getDoc: (project_id, doc_id, _callback = (error, lines, version, ranges, unflushedTime, alreadyLoaded) ->) ->
timer = new Metrics.Timer("docManager.getDoc")
callback = (args...) ->
timer.done()
_callback(args...)
RedisManager.getDoc project_id, doc_id, (error, lines, version, ranges) ->
RedisManager.getDoc project_id, doc_id, (error, lines, version, ranges, unflushedTime) ->
return callback(error) if error?
if !lines? or !version?
logger.log {project_id, doc_id}, "doc not in redis so getting from persistence API"
@ -24,9 +26,9 @@ module.exports = DocumentManager =
logger.log {project_id, doc_id, lines, version}, "got doc from persistence API"
RedisManager.putDocInMemory project_id, doc_id, lines, version, ranges, (error) ->
return callback(error) if error?
callback null, lines, version, ranges, false
callback null, lines, version, ranges, null, false
else
callback null, lines, version, ranges, true
callback null, lines, version, ranges, unflushedTime, true
getDocAndRecentOps: (project_id, doc_id, fromVersion, _callback = (error, lines, version, recentOps, ranges) ->) ->
timer = new Metrics.Timer("docManager.getDocAndRecentOps")
@ -53,7 +55,7 @@ module.exports = DocumentManager =
return callback(new Error("No lines were provided to setDoc"))
UpdateManager = require "./UpdateManager"
DocumentManager.getDoc project_id, doc_id, (error, oldLines, version, ranges, alreadyLoaded) ->
DocumentManager.getDoc project_id, doc_id, (error, oldLines, version, ranges, unflushedTime, alreadyLoaded) ->
return callback(error) if error?
if oldLines? and oldLines.length > 0 and oldLines[0].text?
@ -103,7 +105,7 @@ module.exports = DocumentManager =
logger.log project_id: project_id, doc_id: doc_id, version: version, "flushing doc"
PersistenceManager.setDoc project_id, doc_id, lines, version, ranges, (error) ->
return callback(error) if error?
callback null
RedisManager.clearUnflushedTime doc_id, callback
flushAndDeleteDoc: (project_id, doc_id, _callback = (error) ->) ->
timer = new Metrics.Timer("docManager.flushAndDeleteDoc")
@ -156,6 +158,17 @@ module.exports = DocumentManager =
return callback(error) if error?
callback()
getDocAndFlushIfOld: (project_id, doc_id, callback = (error, doc) ->) ->
DocumentManager.getDoc project_id, doc_id, (error, lines, version, ranges, unflushedTime, alreadyLoaded) ->
return callback(error) if error?
# if doc was already loaded see if it needs to be flushed
if alreadyLoaded and unflushedTime? and (Date.now() - unflushedTime) > MAX_UNFLUSHED_AGE
DocumentManager.flushDocIfLoaded project_id, doc_id, (error) ->
return callback(error) if error?
callback(null, lines, version)
else
callback(null, lines, version)
getDocWithLock: (project_id, doc_id, callback = (error, lines, version) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.getDoc, project_id, doc_id, callback
@ -163,7 +176,11 @@ module.exports = DocumentManager =
getDocAndRecentOpsWithLock: (project_id, doc_id, fromVersion, callback = (error, lines, version) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.getDocAndRecentOps, project_id, doc_id, fromVersion, callback
getDocAndFlushIfOldWithLock: (project_id, doc_id, callback = (error, doc) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.getDocAndFlushIfOld, project_id, doc_id, callback
setDocWithLock: (project_id, doc_id, lines, source, user_id, undoing, callback = (error) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.setDoc, project_id, doc_id, lines, source, user_id, undoing, callback

View file

@ -37,7 +37,7 @@ module.exports = HttpController =
size += (line.length + 1)
return size
getProjectDocs: (req, res, next = (error) ->) ->
getProjectDocsAndFlushIfOld: (req, res, next = (error) ->) ->
project_id = req.params.project_id
projectStateHash = req.query?.state
# exclude is string of existing docs "id:version,id:version,..."
@ -49,7 +49,7 @@ module.exports = HttpController =
[id,version] = item?.split(':')
excludeVersions[id] = version
logger.log {project_id: project_id, projectStateHash: projectStateHash, excludeVersions: excludeVersions}, "excluding versions"
ProjectManager.getProjectDocs project_id, projectStateHash, excludeVersions, (error, result) ->
ProjectManager.getProjectDocsAndFlushIfOld project_id, projectStateHash, excludeVersions, (error, result) ->
timer.done()
if error instanceof Errors.ProjectStateChangedError
res.send 409 # conflict

View file

@ -58,51 +58,36 @@ module.exports = ProjectManager =
else
callback(null)
getProjectDocs: (project_id, projectStateHash, excludeVersions = {}, _callback = (error, docs) ->) ->
timer = new Metrics.Timer("projectManager.getProjectDocs")
getProjectDocsAndFlushIfOld: (project_id, projectStateHash, excludeVersions = {}, _callback = (error, docs) ->) ->
timer = new Metrics.Timer("projectManager.getProjectDocsAndFlushIfOld")
callback = (args...) ->
timer.done()
_callback(args...)
RedisManager.checkOrSetProjectState project_id, projectStateHash, (error, projectStateChanged) ->
if error?
logger.error err: error, project_id: project_id, "error getting/setting project state in getProjectDocs"
logger.error err: error, project_id: project_id, "error getting/setting project state in getProjectDocsAndFlushIfOld"
return callback(error)
# we can't return docs if project structure has changed
return callback Errors.ProjectStateChangedError("project state changed") if projectStateChanged
if projectStateChanged
return callback Errors.ProjectStateChangedError("project state changed")
# project structure hasn't changed, return doc content from redis
RedisManager.getDocIdsInProject project_id, (error, doc_ids) ->
if error?
logger.error err: error, project_id: project_id, "error getting doc ids in getProjectDocs"
return callback(error)
jobs = []
docs = []
for doc_id in doc_ids or []
do (doc_id) ->
jobs.push (cb) ->
# check the doc version first
RedisManager.getDocVersion doc_id, (error, version) ->
if error?
logger.error err: error, project_id: project_id, doc_id: doc_id, "error getting project doc version in getProjectDocs"
return cb(error)
# skip getting the doc if we already have that version
if version? and version is excludeVersions[doc_id]
# not currently using excludeVersions so we shouldn't get here!
# change to logger.log when this code path is in use
logger.error err: error, project_id: project_id, doc_id: doc_id, version: version, "skipping doc version in getProjectDocs"
return cb()
# otherwise get the doc lines from redis
RedisManager.getDocLines doc_id, (error, lines) ->
if error?
logger.error err: error, project_id: project_id, doc_id: doc_id, "error getting project doc lines in getProjectDocs"
return cb(error)
try
docs.push {_id: doc_id, lines: JSON.parse(lines), v: version}
catch e
logger.error err: e, project_id: project_id, doc_id: doc_id, lines: lines, version: version, "error parsing doc lines in getProjectDocs"
return cb(e)
cb()
async.series jobs, (error) ->
# get the doc lines from redis
DocumentManager.getDocAndFlushIfOldWithLock project_id, doc_id, (err, lines, version) ->
if err?
logger.error err:err, project_id: project_id, doc_id: doc_id, "error getting project doc lines in getProjectDocsAndFlushIfOld"
return cb(err)
doc = {_id:doc_id, lines:lines, v:version} # create a doc object to return
cb(null, doc)
async.series jobs, (error, docs) ->
return callback(error) if error?
callback(null, docs)

View file

@ -86,6 +86,7 @@ module.exports = RedisManager =
multi.del keys.docVersion(doc_id:doc_id)
multi.del keys.docHash(doc_id:doc_id)
multi.del keys.ranges(doc_id:doc_id)
multi.del keys.unflushedTime(doc_id:doc_id)
multi.exec (error) ->
return callback(error) if error?
multi = rclient.multi()
@ -105,7 +106,7 @@ module.exports = RedisManager =
clearProjectState: (project_id, callback = (error) ->) ->
rclient.del keys.projectState(project_id:project_id), callback
getDoc : (project_id, doc_id, callback = (error, lines, version, ranges) ->)->
getDoc : (project_id, doc_id, callback = (error, lines, version, ranges, unflushedTime) ->)->
timer = new metrics.Timer("redis.get-doc")
multi = rclient.multi()
multi.get keys.docLines(doc_id:doc_id)
@ -113,7 +114,8 @@ module.exports = RedisManager =
multi.get keys.docHash(doc_id:doc_id)
multi.get keys.projectKey(doc_id:doc_id)
multi.get keys.ranges(doc_id:doc_id)
multi.exec (error, [docLines, version, storedHash, doc_project_id, ranges])->
multi.get keys.unflushedTime(doc_id:doc_id)
multi.exec (error, [docLines, version, storedHash, doc_project_id, ranges, unflushedTime])->
timeSpan = timer.done()
return callback(error) if error?
# check if request took too long and bail out. only do this for
@ -149,7 +151,7 @@ module.exports = RedisManager =
return callback(error) if error?
if result isnt 0 # doc should already be in set
logger.error project_id: project_id, doc_id: doc_id, doc_project_id: doc_project_id, "doc missing from docsInProject set"
callback null, docLines, version, ranges
callback null, docLines, version, ranges, unflushedTime
getDocVersion: (doc_id, callback = (error, version) ->) ->
rclient.get keys.docVersion(doc_id: doc_id), (error, version) ->
@ -247,6 +249,10 @@ module.exports = RedisManager =
# expire must come after rpush since before it will be a no-op if the list is empty
multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL # index 6
multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... # index 7
# Set the unflushed timestamp to the current time if the doc
# hasn't been modified before (the content in mongo has been
# valid up to this point). Otherwise leave it alone ("NX" flag).
multi.set keys.unflushedTime(doc_id: doc_id), Date.now(), "NX"
multi.exec (error, result) ->
return callback(error) if error?
# check the hash computed on the redis server
@ -257,6 +263,9 @@ module.exports = RedisManager =
uncompressedHistoryOpsLength = result?[7]
return callback(null, uncompressedHistoryOpsLength)
clearUnflushedTime: (doc_id, callback = (error) ->) ->
rclient.del keys.unflushedTime(doc_id:doc_id), callback
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
rclient.smembers keys.docsInProject(project_id: project_id), callback

View file

@ -42,6 +42,7 @@ module.exports =
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
ranges: ({doc_id}) -> "Ranges:#{doc_id}"
projectState: ({project_id}) -> "ProjectState:#{project_id}"
unflushedTime: ({doc_id}) -> "UnflushedTime:#{doc_id}"
# cluster: [{
# port: "7000"
# host: "localhost"

View file

@ -4,6 +4,7 @@ should = chai.should()
modulePath = "../../../../app/js/DocumentManager.js"
SandboxedModule = require('sandboxed-module')
Errors = require "../../../../app/js/Errors"
tk = require "timekeeper"
describe "DocumentManager", ->
beforeEach ->
@ -26,6 +27,7 @@ describe "DocumentManager", ->
@lines = ["one", "two", "three"]
@version = 42
@ranges = { comments: "mock", entries: "mock" }
@unflushedTime = Date.now()
describe "flushAndDeleteDoc", ->
describe "successfully", ->
@ -60,6 +62,7 @@ describe "DocumentManager", ->
describe "when the doc is in Redis", ->
beforeEach ->
@RedisManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges)
@RedisManager.clearUnflushedTime = sinon.stub().callsArgWith(1, null)
@PersistenceManager.setDoc = sinon.stub().yields()
@DocumentManager.flushDocIfLoaded @project_id, @doc_id, @callback
@ -147,7 +150,7 @@ describe "DocumentManager", ->
describe "getDoc", ->
describe "when the doc exists in Redis", ->
beforeEach ->
@RedisManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges)
@RedisManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, @unflushedTime)
@DocumentManager.getDoc @project_id, @doc_id, @callback
it "should get the doc from Redis", ->
@ -156,7 +159,7 @@ describe "DocumentManager", ->
.should.equal true
it "should call the callback with the doc info", ->
@callback.calledWith(null, @lines, @version, @ranges, true).should.equal true
@callback.calledWith(null, @lines, @version, @ranges, @unflushedTime, true).should.equal true
it "should time the execution", ->
@Metrics.Timer::done.called.should.equal true
@ -184,7 +187,7 @@ describe "DocumentManager", ->
.should.equal true
it "should call the callback with the doc info", ->
@callback.calledWith(null, @lines, @version, @ranges, false).should.equal true
@callback.calledWith(null, @lines, @version, @ranges, null, false).should.equal true
it "should time the execution", ->
@Metrics.Timer::done.called.should.equal true
@ -195,7 +198,7 @@ describe "DocumentManager", ->
@beforeLines = ["before", "lines"]
@afterLines = ["after", "lines"]
@ops = [{ i: "foo", p: 4 }, { d: "bar", p: 42 }]
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @beforeLines, @version, @ranges, true)
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @beforeLines, @version, @ranges, @unflushedTime, true)
@DiffCodec.diffAsShareJsOp = sinon.stub().callsArgWith(2, null, @ops)
@UpdateManager.applyUpdate = sinon.stub().callsArgWith(3, null)
@DocumentManager.flushDocIfLoaded = sinon.stub().callsArg(2)
@ -246,7 +249,7 @@ describe "DocumentManager", ->
describe "when not already loaded", ->
beforeEach ->
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @beforeLines, @version, false)
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @beforeLines, @version, null, false)
@DocumentManager.setDoc @project_id, @doc_id, @afterLines, @source, @user_id, false, @callback
it "should flush and delete the doc from the doc updater", ->
@ -373,4 +376,65 @@ describe "DocumentManager", ->
it "should call the callback with a not found error", ->
error = new Errors.NotFoundError("document not found: #{@doc_id}")
@callback.calledWith(error).should.equal true
@callback.calledWith(error).should.equal true
describe "getDocAndFlushIfOld", ->
beforeEach ->
tk.freeze(new Date())
@DocumentManager.flushDocIfLoaded = sinon.stub().callsArg(2)
afterEach ->
tk.reset()
describe "when the doc is in Redis", ->
describe "and has changes to be flushed", ->
beforeEach ->
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, Date.now() - 1e9, true)
@DocumentManager.getDocAndFlushIfOld @project_id, @doc_id, @callback
it "should get the doc", ->
@DocumentManager.getDoc
.calledWith(@project_id, @doc_id)
.should.equal true
it "should flush the doc", ->
@DocumentManager.flushDocIfLoaded
.calledWith(@project_id, @doc_id)
.should.equal true
it "should call the callback with the lines and versions", ->
@callback.calledWith(null, @lines, @version).should.equal true
describe "and has only changes that don't need to be flushed", ->
beforeEach ->
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, Date.now() - 100, true)
@DocumentManager.getDocAndFlushIfOld @project_id, @doc_id, @callback
it "should get the doc", ->
@DocumentManager.getDoc
.calledWith(@project_id, @doc_id)
.should.equal true
it "should not flush the doc", ->
@DocumentManager.flushDocIfLoaded
.called.should.equal false
it "should call the callback with the lines and versions", ->
@callback.calledWith(null, @lines, @version).should.equal true
describe "when the doc is not in Redis", ->
beforeEach ->
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, null, false)
@DocumentManager.getDocAndFlushIfOld @project_id, @doc_id, @callback
it "should get the doc", ->
@DocumentManager.getDoc
.calledWith(@project_id, @doc_id)
.should.equal true
it "should not flush the doc", ->
@DocumentManager.flushDocIfLoaded
.called.should.equal false
it "should call the callback with the lines and versions", ->
@callback.calledWith(null, @lines, @version).should.equal true

View file

@ -435,7 +435,7 @@ describe "HttpController", ->
.calledWith(new Error("oops"))
.should.equal true
describe "getProjectDocs", ->
describe "getProjectDocsAndFlushIfOld", ->
beforeEach ->
@state = "01234567890abcdef"
@docs = [{_id: "1234", lines: "hello", v: 23}, {_id: "4567", lines: "world", v: 45}]
@ -447,11 +447,11 @@ describe "HttpController", ->
describe "successfully", ->
beforeEach ->
@ProjectManager.getProjectDocs = sinon.stub().callsArgWith(3,null, @docs)
@HttpController.getProjectDocs(@req, @res, @next)
@ProjectManager.getProjectDocsAndFlushIfOld = sinon.stub().callsArgWith(3,null, @docs)
@HttpController.getProjectDocsAndFlushIfOld(@req, @res, @next)
it "should get docs from the project manager", ->
@ProjectManager.getProjectDocs
@ProjectManager.getProjectDocsAndFlushIfOld
.calledWith(@project_id, @state, {})
.should.equal true
@ -475,8 +475,8 @@ describe "HttpController", ->
describe "when there is a conflict", ->
beforeEach ->
@ProjectManager.getProjectDocs = sinon.stub().callsArgWith(3, new Errors.ProjectStateChangedError("project state changed"))
@HttpController.getProjectDocs(@req, @res, @next)
@ProjectManager.getProjectDocsAndFlushIfOld = sinon.stub().callsArgWith(3, new Errors.ProjectStateChangedError("project state changed"))
@HttpController.getProjectDocsAndFlushIfOld(@req, @res, @next)
it "should return an HTTP 409 Conflict response", ->
@res.send
@ -485,8 +485,8 @@ describe "HttpController", ->
describe "when an error occurs", ->
beforeEach ->
@ProjectManager.getProjectDocs = sinon.stub().callsArgWith(3, new Error("oops"))
@HttpController.getProjectDocs(@req, @res, @next)
@ProjectManager.getProjectDocsAndFlushIfOld = sinon.stub().callsArgWith(3, new Error("oops"))
@HttpController.getProjectDocsAndFlushIfOld(@req, @res, @next)
it "should call next with the error", ->
@next

View file

@ -5,7 +5,7 @@ modulePath = "../../../../app/js/ProjectManager.js"
SandboxedModule = require('sandboxed-module')
Errors = require "../../../../app/js/Errors.js"
describe "ProjectManager - getProjectDocs", ->
describe "ProjectManager - getProjectDocsAndFlushIfOld", ->
beforeEach ->
@ProjectManager = SandboxedModule.require modulePath, requires:
"./RedisManager": @RedisManager = {}
@ -29,15 +29,14 @@ describe "ProjectManager - getProjectDocs", ->
]
@RedisManager.checkOrSetProjectState = sinon.stub().callsArgWith(2, null)
@RedisManager.getDocIdsInProject = sinon.stub().callsArgWith(1, null, @doc_ids)
@RedisManager.getDocVersion = sinon.stub()
@RedisManager.getDocVersion.withArgs(@doc_ids[0]).callsArgWith(1, null, @doc_versions[0])
@RedisManager.getDocVersion.withArgs(@doc_ids[1]).callsArgWith(1, null, @doc_versions[1])
@RedisManager.getDocVersion.withArgs(@doc_ids[2]).callsArgWith(1, null, @doc_versions[2])
@RedisManager.getDocLines = sinon.stub()
@RedisManager.getDocLines.withArgs(@doc_ids[0]).callsArgWith(1, null, JSON.stringify(@doc_lines[0]))
@RedisManager.getDocLines.withArgs(@doc_ids[1]).callsArgWith(1, null, JSON.stringify(@doc_lines[1]))
@RedisManager.getDocLines.withArgs(@doc_ids[2]).callsArgWith(1, null, JSON.stringify(@doc_lines[2]))
@ProjectManager.getProjectDocs @project_id, @projectStateHash, @excludeVersions, (error, docs) =>
@DocumentManager.getDocAndFlushIfOldWithLock = sinon.stub()
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, @doc_ids[0])
.callsArgWith(2, null, @doc_lines[0], @doc_versions[0])
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, @doc_ids[1])
.callsArgWith(2, null, @doc_lines[1], @doc_versions[1])
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, @doc_ids[2])
.callsArgWith(2, null, @doc_lines[2], @doc_versions[2])
@ProjectManager.getProjectDocsAndFlushIfOld @project_id, @projectStateHash, @excludeVersions, (error, docs) =>
@callback(error, docs)
done()
@ -61,7 +60,7 @@ describe "ProjectManager - getProjectDocs", ->
beforeEach (done) ->
@doc_ids = ["doc-id-1", "doc-id-2", "doc-id-3"]
@RedisManager.checkOrSetProjectState = sinon.stub().callsArgWith(2, null, true)
@ProjectManager.getProjectDocs @project_id, @projectStateHash, @excludeVersions, (error, docs) =>
@ProjectManager.getProjectDocsAndFlushIfOld @project_id, @projectStateHash, @excludeVersions, (error, docs) =>
@callback(error, docs)
done()
@ -81,17 +80,18 @@ describe "ProjectManager - getProjectDocs", ->
@doc_ids = ["doc-id-1", "doc-id-2", "doc-id-3"]
@RedisManager.checkOrSetProjectState = sinon.stub().callsArgWith(2, null)
@RedisManager.getDocIdsInProject = sinon.stub().callsArgWith(1, null, @doc_ids)
@RedisManager.getDocVersion = sinon.stub().callsArgWith(1, null)
@RedisManager.getDocLines = sinon.stub()
@RedisManager.getDocLines.withArgs("doc-id-1").callsArgWith(1, null, JSON.stringify(["test doc content"]))
@RedisManager.getDocLines.withArgs("doc-id-2").callsArgWith(1, @error = new Error("oops")) # trigger an error
@ProjectManager.getProjectDocs @project_id, @projectStateHash, @excludeVersions, (error, docs) =>
@DocumentManager.getDocAndFlushIfOldWithLock = sinon.stub()
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, "doc-id-1")
.callsArgWith(2, null, ["test doc content"], @doc_versions[1])
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, "doc-id-2")
.callsArgWith(2, @error = new Error("oops")) # trigger an error
@ProjectManager.getProjectDocsAndFlushIfOld @project_id, @projectStateHash, @excludeVersions, (error, docs) =>
@callback(error)
done()
it "should record the error", ->
@logger.error
.calledWith(err: @error, project_id: @project_id, doc_id: "doc-id-2", "error getting project doc lines in getProjectDocs")
.calledWith(err: @error, project_id: @project_id, doc_id: "doc-id-2", "error getting project doc lines in getProjectDocsAndFlushIfOld")
.should.equal true
it "should call the callback with an error", ->

View file

@ -5,6 +5,7 @@ modulePath = "../../../../app/js/RedisManager.js"
SandboxedModule = require('sandboxed-module')
Errors = require "../../../../app/js/Errors"
crypto = require "crypto"
tk = require "timekeeper"
describe "RedisManager", ->
beforeEach ->
@ -30,6 +31,7 @@ describe "RedisManager", ->
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
ranges: ({doc_id}) -> "Ranges:#{doc_id}"
projectState: ({project_id}) -> "ProjectState:#{project_id}"
unflushedTime: ({doc_id}) -> "UnflushedTime:#{doc_id}"
history:
key_schema:
uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}"
@ -61,8 +63,9 @@ describe "RedisManager", ->
@hash = crypto.createHash('sha1').update(@jsonlines,'utf8').digest('hex')
@ranges = { comments: "mock", entries: "mock" }
@json_ranges = JSON.stringify @ranges
@unflushed_time = 12345
@rclient.get = sinon.stub()
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @hash, @project_id, @json_ranges])
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @hash, @project_id, @json_ranges, @unflushed_time])
@rclient.sadd = sinon.stub().yields(null, 0)
describe "successfully", ->
@ -89,6 +92,11 @@ describe "RedisManager", ->
.calledWith("Ranges:#{@doc_id}")
.should.equal true
it "should get the unflushed time", ->
@rclient.get
.calledWith("UnflushedTime:#{@doc_id}")
.should.equal true
it "should check if the document is in the DocsIn set", ->
@rclient.sadd
.calledWith("DocsIn:#{@project_id}")
@ -96,7 +104,7 @@ describe "RedisManager", ->
it 'should return the document', ->
@callback
.calledWith(null, @lines, @version, @ranges)
.calledWithExactly(null, @lines, @version, @ranges, @unflushed_time)
.should.equal true
it 'should not log any errors', ->
@ -116,7 +124,7 @@ describe "RedisManager", ->
it 'should return an empty result', ->
@callback
.calledWith(null, null, 0, {})
.calledWithExactly(null, null, 0, {})
.should.equal true
it 'should not log any errors', ->
@ -134,7 +142,7 @@ describe "RedisManager", ->
it 'should return the document', ->
@callback
.calledWith(null, @lines, @version, @ranges)
.calledWithExactly(null, @lines, @version, @ranges, @unflushed_time)
.should.equal true
describe "with a corrupted document", ->
@ -155,11 +163,11 @@ describe "RedisManager", ->
describe "with a slow request to redis", ->
beforeEach ->
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @badHash, @project_id, @json_ranges])
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @badHash, @project_id, @json_ranges, @unflushed_time])
@clock = sinon.useFakeTimers();
@rclient.exec = (cb) =>
@clock.tick(6000);
cb(null, [@jsonlines, @version, @another_project_id, @json_ranges])
cb(null, [@jsonlines, @version, @another_project_id, @json_ranges, @unflushed_time])
@RedisManager.getDoc @project_id, @doc_id, @callback
@ -174,7 +182,7 @@ describe "RedisManager", ->
describe "getDoc with an invalid project id", ->
beforeEach ->
@another_project_id = "project-id-456"
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @another_project_id, @json_ranges])
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @another_project_id, @json_ranges, @unflushed_time])
@RedisManager.getDoc @project_id, @doc_id, @callback
it 'should return an error', ->
@ -317,6 +325,10 @@ describe "RedisManager", ->
beforeEach ->
@RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length)
@RedisManager.updateDocument @doc_id, @lines, @version, @ops, @ranges, @callback
tk.freeze(new Date())
afterEach ->
tk.reset()
it "should get the current doc version to check for consistency", ->
@RedisManager.getDocVersion
@ -343,6 +355,11 @@ describe "RedisManager", ->
.calledWith("Ranges:#{@doc_id}", JSON.stringify(@ranges))
.should.equal true
it "should set the unflushed time", ->
@rclient.set
.calledWith("UnflushedTime:#{@doc_id}", Date.now(), "NX")
.should.equal true
it "should push the doc op into the doc ops list", ->
@rclient.rpush
.calledWith("DocOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1]))
@ -570,6 +587,11 @@ describe "RedisManager", ->
@rclient.del
.calledWith("DocHash:#{@doc_id}")
.should.equal true
it "should delete the unflushed time", ->
@rclient.del
.calledWith("UnflushedTime:#{@doc_id}")
.should.equal true
it "should delete the project_id for the doc", ->
@rclient.del