add unflushed time to doc in redis

This commit is contained in:
Brian Gough 2017-10-06 12:23:23 +01:00
parent ad745f133d
commit 2bbbf3c005
7 changed files with 151 additions and 54 deletions

View file

@ -8,14 +8,16 @@ RealTimeRedisManager = require "./RealTimeRedisManager"
Errors = require "./Errors"
RangesManager = require "./RangesManager"
MAX_UNFLUSHED_AGE = 300 * 1000 # 5 mins, document should be flushed to mongo this time after a change
module.exports = DocumentManager =
getDoc: (project_id, doc_id, _callback = (error, lines, version, alreadyLoaded) ->) ->
getDoc: (project_id, doc_id, _callback = (error, lines, version, ranges, alreadyLoaded, unflushedTime) ->) ->
timer = new Metrics.Timer("docManager.getDoc")
callback = (args...) ->
timer.done()
_callback(args...)
RedisManager.getDoc project_id, doc_id, (error, lines, version, ranges) ->
RedisManager.getDoc project_id, doc_id, (error, lines, version, ranges, unflushedTime) ->
return callback(error) if error?
if !lines? or !version?
logger.log {project_id, doc_id}, "doc not in redis so getting from persistence API"
@ -24,9 +26,9 @@ module.exports = DocumentManager =
logger.log {project_id, doc_id, lines, version}, "got doc from persistence API"
RedisManager.putDocInMemory project_id, doc_id, lines, version, ranges, (error) ->
return callback(error) if error?
callback null, lines, version, ranges, false
callback null, lines, version, ranges, false, null
else
callback null, lines, version, ranges, true
callback null, lines, version, ranges, true, unflushedTime
getDocAndRecentOps: (project_id, doc_id, fromVersion, _callback = (error, lines, version, recentOps, ranges) ->) ->
timer = new Metrics.Timer("docManager.getDocAndRecentOps")
@ -103,7 +105,7 @@ module.exports = DocumentManager =
logger.log project_id: project_id, doc_id: doc_id, version: version, "flushing doc"
PersistenceManager.setDoc project_id, doc_id, lines, version, ranges, (error) ->
return callback(error) if error?
callback null
RedisManager.clearUnflushedTime doc_id, callback
flushAndDeleteDoc: (project_id, doc_id, _callback = (error) ->) ->
timer = new Metrics.Timer("docManager.flushAndDeleteDoc")
@ -156,6 +158,17 @@ module.exports = DocumentManager =
return callback(error) if error?
callback()
getDocAndFlushIfOld: (project_id, doc_id, callback = (error, doc) ->) ->
DocumentManager.getDoc project_id, doc_id, (error, lines, version, ranges, alreadyLoaded, unflushedTime) ->
return callback(error) if error?
# if doc was already loaded see if it needs to be flushed
if alreadyLoaded and unflushedTime? and (Date.now() - unflushedTime) > MAX_UNFLUSHED_AGE
DocumentManager.flushDocIfLoaded project_id, doc_id, (error) ->
return callback(error) if error?
callback(null, lines, version)
else
callback(null, lines, version)
getDocWithLock: (project_id, doc_id, callback = (error, lines, version) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.getDoc, project_id, doc_id, callback
@ -163,7 +176,11 @@ module.exports = DocumentManager =
getDocAndRecentOpsWithLock: (project_id, doc_id, fromVersion, callback = (error, lines, version) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.getDocAndRecentOps, project_id, doc_id, fromVersion, callback
getDocAndFlushIfOldWithLock: (project_id, doc_id, callback = (error, doc) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.getDocAndFlushIfOld, project_id, doc_id, callback
setDocWithLock: (project_id, doc_id, lines, source, user_id, undoing, callback = (error) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.setDoc, project_id, doc_id, lines, source, user_id, undoing, callback

View file

@ -69,40 +69,25 @@ module.exports = ProjectManager =
logger.error err: error, project_id: project_id, "error getting/setting project state in getProjectDocs"
return callback(error)
# we can't return docs if project structure has changed
return callback Errors.ProjectStateChangedError("project state changed") if projectStateChanged
if projectStateChanged
return callback Errors.ProjectStateChangedError("project state changed")
# project structure hasn't changed, return doc content from redis
RedisManager.getDocIdsInProject project_id, (error, doc_ids) ->
if error?
logger.error err: error, project_id: project_id, "error getting doc ids in getProjectDocs"
return callback(error)
jobs = []
docs = []
for doc_id in doc_ids or []
do (doc_id) ->
jobs.push (cb) ->
# check the doc version first
RedisManager.getDocVersion doc_id, (error, version) ->
if error?
logger.error err: error, project_id: project_id, doc_id: doc_id, "error getting project doc version in getProjectDocs"
return cb(error)
# skip getting the doc if we already have that version
if version? and version is excludeVersions[doc_id]
# not currently using excludeVersions so we shouldn't get here!
# change to logger.log when this code path is in use
logger.error err: error, project_id: project_id, doc_id: doc_id, version: version, "skipping doc version in getProjectDocs"
return cb()
# otherwise get the doc lines from redis
RedisManager.getDocLines doc_id, (error, lines) ->
if error?
logger.error err: error, project_id: project_id, doc_id: doc_id, "error getting project doc lines in getProjectDocs"
return cb(error)
try
docs.push {_id: doc_id, lines: JSON.parse(lines), v: version}
catch e
logger.error err: e, project_id: project_id, doc_id: doc_id, lines: lines, version: version, "error parsing doc lines in getProjectDocs"
return cb(e)
cb()
async.series jobs, (error) ->
# get the doc lines from redis
DocumentManager.getDocAndFlushIfOldWithLock project_id, doc_id, (err, lines, version) ->
if err?
logger.error err:err, project_id: project_id, doc_id: doc_id, "error getting project doc lines in getProjectDocs"
return cb(err)
doc = {_id:doc_id, lines:lines, v:version} # create a doc object to return
cb(null, doc)
async.series jobs, (error, docs) ->
return callback(error) if error?
callback(null, docs)

View file

@ -86,6 +86,7 @@ module.exports = RedisManager =
multi.del keys.docVersion(doc_id:doc_id)
multi.del keys.docHash(doc_id:doc_id)
multi.del keys.ranges(doc_id:doc_id)
multi.del keys.unflushedTime(doc_id:doc_id)
multi.exec (error) ->
return callback(error) if error?
multi = rclient.multi()
@ -105,7 +106,7 @@ module.exports = RedisManager =
clearProjectState: (project_id, callback = (error) ->) ->
rclient.del keys.projectState(project_id:project_id), callback
getDoc : (project_id, doc_id, callback = (error, lines, version, ranges) ->)->
getDoc : (project_id, doc_id, callback = (error, lines, version, ranges, unflushedTime) ->)->
timer = new metrics.Timer("redis.get-doc")
multi = rclient.multi()
multi.get keys.docLines(doc_id:doc_id)
@ -113,7 +114,8 @@ module.exports = RedisManager =
multi.get keys.docHash(doc_id:doc_id)
multi.get keys.projectKey(doc_id:doc_id)
multi.get keys.ranges(doc_id:doc_id)
multi.exec (error, [docLines, version, storedHash, doc_project_id, ranges])->
multi.get keys.unflushedTime(doc_id:doc_id)
multi.exec (error, [docLines, version, storedHash, doc_project_id, ranges, unflushedTime])->
timeSpan = timer.done()
return callback(error) if error?
# check if request took too long and bail out. only do this for
@ -149,7 +151,7 @@ module.exports = RedisManager =
return callback(error) if error?
if result isnt 0 # doc should already be in set
logger.error project_id: project_id, doc_id: doc_id, doc_project_id: doc_project_id, "doc missing from docsInProject set"
callback null, docLines, version, ranges
callback null, docLines, version, ranges, unflushedTime
getDocVersion: (doc_id, callback = (error, version) ->) ->
rclient.get keys.docVersion(doc_id: doc_id), (error, version) ->
@ -247,6 +249,10 @@ module.exports = RedisManager =
# expire must come after rpush since before it will be a no-op if the list is empty
multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL # index 6
multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... # index 7
# Set the unflushed timestamp to the current time if the doc
# hasn't been modified before (the content in mongo has been
# valid up to this point). Otherwise leave it alone ("NX" flag).
multi.set keys.unflushedTime(doc_id: doc_id), Date.now(), "NX"
multi.exec (error, result) ->
return callback(error) if error?
# check the hash computed on the redis server
@ -257,6 +263,9 @@ module.exports = RedisManager =
uncompressedHistoryOpsLength = result?[7]
return callback(null, uncompressedHistoryOpsLength)
clearUnflushedTime: (doc_id, callback = (error) ->) ->
rclient.del keys.unflushedTime(doc_id:doc_id), callback
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
rclient.smembers keys.docsInProject(project_id: project_id), callback

View file

@ -42,6 +42,7 @@ module.exports =
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
ranges: ({doc_id}) -> "Ranges:#{doc_id}"
projectState: ({project_id}) -> "ProjectState:#{project_id}"
unflushedTime: ({doc_id}) -> "UnflushedTime:#{doc_id}"
# cluster: [{
# port: "7000"
# host: "localhost"

View file

@ -4,6 +4,7 @@ should = chai.should()
modulePath = "../../../../app/js/DocumentManager.js"
SandboxedModule = require('sandboxed-module')
Errors = require "../../../../app/js/Errors"
tk = require "timekeeper"
describe "DocumentManager", ->
beforeEach ->
@ -60,6 +61,7 @@ describe "DocumentManager", ->
describe "when the doc is in Redis", ->
beforeEach ->
@RedisManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges)
@RedisManager.clearUnflushedTime = sinon.stub().callsArgWith(1, null)
@PersistenceManager.setDoc = sinon.stub().yields()
@DocumentManager.flushDocIfLoaded @project_id, @doc_id, @callback
@ -373,4 +375,65 @@ describe "DocumentManager", ->
it "should call the callback with a not found error", ->
error = new Errors.NotFoundError("document not found: #{@doc_id}")
@callback.calledWith(error).should.equal true
@callback.calledWith(error).should.equal true
describe "getDocAndFlushIfOld", ->
beforeEach ->
tk.freeze(new Date())
@DocumentManager.flushDocIfLoaded = sinon.stub().callsArg(2)
afterEach ->
tk.reset()
describe "when the doc is in Redis", ->
describe "and has changes to be flushed", ->
beforeEach ->
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, true, Date.now() - 1e9)
@DocumentManager.getDocAndFlushIfOld @project_id, @doc_id, @callback
it "should get the doc", ->
@DocumentManager.getDoc
.calledWith(@project_id, @doc_id)
.should.equal true
it "should flush the doc", ->
@DocumentManager.flushDocIfLoaded
.calledWith(@project_id, @doc_id)
.should.equal true
it "should call the callback with the lines and versions", ->
@callback.calledWith(null, @lines, @version).should.equal true
describe "and has only changes that don't need to be flushed", ->
beforeEach ->
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, true, Date.now() - 100)
@DocumentManager.getDocAndFlushIfOld @project_id, @doc_id, @callback
it "should get the doc", ->
@DocumentManager.getDoc
.calledWith(@project_id, @doc_id)
.should.equal true
it "should not flush the doc", ->
@DocumentManager.flushDocIfLoaded
.called.should.equal false
it "should call the callback with the lines and versions", ->
@callback.calledWith(null, @lines, @version).should.equal true
describe "when the doc is not in Redis", ->
beforeEach ->
@DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, false)
@DocumentManager.getDocAndFlushIfOld @project_id, @doc_id, @callback
it "should get the doc", ->
@DocumentManager.getDoc
.calledWith(@project_id, @doc_id)
.should.equal true
it "should not flush the doc", ->
@DocumentManager.flushDocIfLoaded
.called.should.equal false
it "should call the callback with the lines and versions", ->
@callback.calledWith(null, @lines, @version).should.equal true

View file

@ -29,14 +29,13 @@ describe "ProjectManager - getProjectDocs", ->
]
@RedisManager.checkOrSetProjectState = sinon.stub().callsArgWith(2, null)
@RedisManager.getDocIdsInProject = sinon.stub().callsArgWith(1, null, @doc_ids)
@RedisManager.getDocVersion = sinon.stub()
@RedisManager.getDocVersion.withArgs(@doc_ids[0]).callsArgWith(1, null, @doc_versions[0])
@RedisManager.getDocVersion.withArgs(@doc_ids[1]).callsArgWith(1, null, @doc_versions[1])
@RedisManager.getDocVersion.withArgs(@doc_ids[2]).callsArgWith(1, null, @doc_versions[2])
@RedisManager.getDocLines = sinon.stub()
@RedisManager.getDocLines.withArgs(@doc_ids[0]).callsArgWith(1, null, JSON.stringify(@doc_lines[0]))
@RedisManager.getDocLines.withArgs(@doc_ids[1]).callsArgWith(1, null, JSON.stringify(@doc_lines[1]))
@RedisManager.getDocLines.withArgs(@doc_ids[2]).callsArgWith(1, null, JSON.stringify(@doc_lines[2]))
@DocumentManager.getDocAndFlushIfOldWithLock = sinon.stub()
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, @doc_ids[0])
.callsArgWith(2, null, @doc_lines[0], @doc_versions[0])
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, @doc_ids[1])
.callsArgWith(2, null, @doc_lines[1], @doc_versions[1])
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, @doc_ids[2])
.callsArgWith(2, null, @doc_lines[2], @doc_versions[2])
@ProjectManager.getProjectDocs @project_id, @projectStateHash, @excludeVersions, (error, docs) =>
@callback(error, docs)
done()
@ -81,10 +80,11 @@ describe "ProjectManager - getProjectDocs", ->
@doc_ids = ["doc-id-1", "doc-id-2", "doc-id-3"]
@RedisManager.checkOrSetProjectState = sinon.stub().callsArgWith(2, null)
@RedisManager.getDocIdsInProject = sinon.stub().callsArgWith(1, null, @doc_ids)
@RedisManager.getDocVersion = sinon.stub().callsArgWith(1, null)
@RedisManager.getDocLines = sinon.stub()
@RedisManager.getDocLines.withArgs("doc-id-1").callsArgWith(1, null, JSON.stringify(["test doc content"]))
@RedisManager.getDocLines.withArgs("doc-id-2").callsArgWith(1, @error = new Error("oops")) # trigger an error
@DocumentManager.getDocAndFlushIfOldWithLock = sinon.stub()
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, "doc-id-1")
.callsArgWith(2, null, ["test doc content"], @doc_versions[1])
@DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, "doc-id-2")
.callsArgWith(2, @error = new Error("oops")) # trigger an error
@ProjectManager.getProjectDocs @project_id, @projectStateHash, @excludeVersions, (error, docs) =>
@callback(error)
done()

View file

@ -5,6 +5,7 @@ modulePath = "../../../../app/js/RedisManager.js"
SandboxedModule = require('sandboxed-module')
Errors = require "../../../../app/js/Errors"
crypto = require "crypto"
tk = require "timekeeper"
describe "RedisManager", ->
beforeEach ->
@ -30,6 +31,7 @@ describe "RedisManager", ->
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
ranges: ({doc_id}) -> "Ranges:#{doc_id}"
projectState: ({project_id}) -> "ProjectState:#{project_id}"
unflushedTime: ({doc_id}) -> "UnflushedTime:#{doc_id}"
history:
key_schema:
uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}"
@ -61,8 +63,9 @@ describe "RedisManager", ->
@hash = crypto.createHash('sha1').update(@jsonlines,'utf8').digest('hex')
@ranges = { comments: "mock", entries: "mock" }
@json_ranges = JSON.stringify @ranges
@unflushed_time = 12345
@rclient.get = sinon.stub()
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @hash, @project_id, @json_ranges])
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @hash, @project_id, @json_ranges, @unflushed_time])
@rclient.sadd = sinon.stub().yields(null, 0)
describe "successfully", ->
@ -89,6 +92,11 @@ describe "RedisManager", ->
.calledWith("Ranges:#{@doc_id}")
.should.equal true
it "should get the unflushed time", ->
@rclient.get
.calledWith("UnflushedTime:#{@doc_id}")
.should.equal true
it "should check if the document is in the DocsIn set", ->
@rclient.sadd
.calledWith("DocsIn:#{@project_id}")
@ -96,7 +104,7 @@ describe "RedisManager", ->
it 'should return the document', ->
@callback
.calledWith(null, @lines, @version, @ranges)
.calledWithExactly(null, @lines, @version, @ranges, @unflushed_time)
.should.equal true
it 'should not log any errors', ->
@ -116,7 +124,7 @@ describe "RedisManager", ->
it 'should return an empty result', ->
@callback
.calledWith(null, null, 0, {})
.calledWithExactly(null, null, 0, {})
.should.equal true
it 'should not log any errors', ->
@ -134,7 +142,7 @@ describe "RedisManager", ->
it 'should return the document', ->
@callback
.calledWith(null, @lines, @version, @ranges)
.calledWithExactly(null, @lines, @version, @ranges, @unflushed_time)
.should.equal true
describe "with a corrupted document", ->
@ -155,11 +163,11 @@ describe "RedisManager", ->
describe "with a slow request to redis", ->
beforeEach ->
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @badHash, @project_id, @json_ranges])
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @badHash, @project_id, @json_ranges, @unflushed_time])
@clock = sinon.useFakeTimers();
@rclient.exec = (cb) =>
@clock.tick(6000);
cb(null, [@jsonlines, @version, @another_project_id, @json_ranges])
cb(null, [@jsonlines, @version, @another_project_id, @json_ranges, @unflushed_time])
@RedisManager.getDoc @project_id, @doc_id, @callback
@ -174,7 +182,7 @@ describe "RedisManager", ->
describe "getDoc with an invalid project id", ->
beforeEach ->
@another_project_id = "project-id-456"
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @another_project_id, @json_ranges])
@rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @another_project_id, @json_ranges, @unflushed_time])
@RedisManager.getDoc @project_id, @doc_id, @callback
it 'should return an error', ->
@ -317,6 +325,10 @@ describe "RedisManager", ->
beforeEach ->
@RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length)
@RedisManager.updateDocument @doc_id, @lines, @version, @ops, @ranges, @callback
tk.freeze(new Date())
afterEach ->
tk.reset()
it "should get the current doc version to check for consistency", ->
@RedisManager.getDocVersion
@ -343,6 +355,11 @@ describe "RedisManager", ->
.calledWith("Ranges:#{@doc_id}", JSON.stringify(@ranges))
.should.equal true
it "should set the unflushed time", ->
@rclient.set
.calledWith("UnflushedTime:#{@doc_id}", Date.now(), "NX")
.should.equal true
it "should push the doc op into the doc ops list", ->
@rclient.rpush
.calledWith("DocOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1]))
@ -570,6 +587,11 @@ describe "RedisManager", ->
@rclient.del
.calledWith("DocHash:#{@doc_id}")
.should.equal true
it "should delete the unflushed time", ->
@rclient.del
.calledWith("UnflushedTime:#{@doc_id}")
.should.equal true
it "should delete the project_id for the doc", ->
@rclient.del