Put doc_ids with history changes into project level set

This commit is contained in:
James Allen 2014-03-21 12:41:05 +00:00
parent 2d28f1903f
commit c0be3ef37b
11 changed files with 94 additions and 159 deletions

View file

@ -8,6 +8,7 @@ DOCLINES = "doclines"
DOCOPS = "DocOps"
DOCVERSION = "DocVersion"
DOCIDSWITHPENDINGUPDATES = "DocsWithPendingUpdates"
DOCSWITHHISTORYOPS = "DocsWithHistoryOps"
UNCOMPRESSED_HISTORY_OPS = "UncompressedHistoryOps"
module.exports =
@ -25,7 +26,7 @@ module.exports =
docsWithPendingUpdates : DOCIDSWITHPENDINGUPDATES
combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}"
splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":")
historyLoadManagerThreshold: "HistoryLoadManagerThreshold"
docsWithHistoryOps: (op) -> DOCSWITHHISTORYOPS + ":" + op.project_id
now : (key)->
d = new Date()
d.getDate()+":"+(d.getMonth()+1)+":"+d.getFullYear()+":"+key

View file

@ -155,9 +155,15 @@ module.exports =
jsonOps = ops.map (op) -> JSON.stringify op
rclient.lpush keys.docOps(doc_id: doc_id), jsonOps.reverse(), callback
pushUncompressedHistoryOp: (doc_id, op, callback = (error) ->) ->
pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error, length) ->) ->
jsonOp = JSON.stringify op
rclient.rpush keys.uncompressedHistoryOp(doc_id: doc_id), jsonOp, callback
multi = rclient.multi()
multi.rpush keys.uncompressedHistoryOp(doc_id: doc_id), jsonOp
multi.sadd keys.docsWithHistoryOps(project_id: project_id), doc_id
multi.exec (error, results) ->
return callback(error) if error?
[length, _] = results
callback(error, length)
getDocOpsLength: (doc_id, callback = (error, length) ->) ->
rclient.llen keys.docOps(doc_id: doc_id), callback
@ -165,12 +171,6 @@ module.exports =
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
rclient.smembers keys.docsInProject(project_id: project_id), callback
getHistoryLoadManagerThreshold: (callback = (error, threshold) ->) ->
rclient.get keys.historyLoadManagerThreshold, (error, value) ->
return callback(error) if error?
return callback null, 0 if !value?
callback null, parseInt(value, 10)
getDocumentsProjectId = (doc_id, callback)->
rclient.get keys.projectKey({doc_id:doc_id}), (err, project_id)->

View file

@ -23,22 +23,15 @@ module.exports = TrackChangesManager =
FLUSH_EVERY_N_OPS: 50
pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error) ->) ->
RedisManager.getHistoryLoadManagerThreshold (error, threshold) ->
logger.log project_id: project_id, doc_id: doc_id, "pushing history op"
RedisManager.pushUncompressedHistoryOp project_id, doc_id, op, (error, length) ->
return callback(error) if error?
if TrackChangesManager.getLoadManagerBucket(doc_id) < threshold
RedisManager.pushUncompressedHistoryOp doc_id, op, (error, length) ->
return callback(error) if error?
if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.
logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api"
TrackChangesManager.flushDocChanges project_id, doc_id, (error) ->
if error?
logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api"
callback()
else
callback()
if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.
logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api"
TrackChangesManager.flushDocChanges project_id, doc_id, (error) ->
if error?
logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api"
callback()
getLoadManagerBucket: (doc_id) ->
hash = crypto.createHash("md5").update(doc_id).digest("hex")
return parseInt(hash.slice(0,4), 16) % 100

View file

@ -12,7 +12,7 @@ MockWebApi = require "./helpers/MockWebApi"
DocUpdaterClient = require "./helpers/DocUpdaterClient"
describe "Applying updates to a doc", ->
before (done) ->
before ->
@lines = ["one", "two", "three"]
@update =
doc: @doc_id
@ -22,8 +22,6 @@ describe "Applying updates to a doc", ->
}]
v: 0
@result = ["one", "one and a half", "two", "three"]
rclient.set "HistoryLoadManagerThreshold", 100, (error) =>
done()
describe "when the document is not loaded", ->
before (done) ->
@ -51,8 +49,13 @@ describe "Applying updates to a doc", ->
it "should push the applied updates to the track changes api", (done) ->
rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) =>
throw error if error?
JSON.parse(updates[0]).op.should.deep.equal @update.op
done()
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
throw error if error?
result.should.equal 1
done()
describe "when the document is loaded", ->
before (done) ->
@ -81,7 +84,9 @@ describe "Applying updates to a doc", ->
it "should push the applied updates to the track changes api", (done) ->
rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) =>
JSON.parse(updates[0]).op.should.deep.equal @update.op
done()
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
result.should.equal 1
done()
describe "when the document has been deleted", ->
describe "when the ops come in a single linear order", ->
@ -131,7 +136,10 @@ describe "Applying updates to a doc", ->
updates = (JSON.parse(u) for u in updates)
for appliedUpdate, i in @updates
appliedUpdate.op.should.deep.equal updates[i].op
done()
rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) =>
result.should.equal 1
done()
describe "when older ops come in after the delete", ->
before ->
@ -235,7 +243,7 @@ describe "Applying updates to a doc", ->
done()
describe "with enough updates to flush to the track changes api", ->
beforeEach ->
beforeEach (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
MockWebApi.insertDoc @project_id, @doc_id, {
lines: @lines
@ -249,27 +257,13 @@ describe "Applying updates to a doc", ->
sinon.spy MockTrackChangesApi, "flushDoc"
DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) =>
throw error if error?
setTimeout done, 200
afterEach ->
MockTrackChangesApi.flushDoc.restore()
describe "when under the load manager threshold", ->
beforeEach (done) ->
rclient.set "HistoryLoadManagerThreshold", 100, (error) =>
throw error if error?
DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) =>
throw error if error?
setTimeout done, 200
it "should flush the doc twice", ->
MockTrackChangesApi.flushDoc.calledTwice.should.equal true
it "should flush the doc twice", ->
MockTrackChangesApi.flushDoc.calledTwice.should.equal true
describe "when over the load manager threshold", ->
beforeEach (done) ->
rclient.set "HistoryLoadManagerThreshold", 0, (error) =>
throw error if error?
DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) =>
throw error if error?
setTimeout done, 200
it "should not flush the doc", ->
MockTrackChangesApi.flushDoc.called.should.equal false

View file

@ -12,6 +12,7 @@ describe "RedisManager.clearDocFromPendingUpdatesSet", ->
@RedisManager = SandboxedModule.require modulePath, requires:
"redis" : createClient: () =>
@rclient = auth:->
"logger-sharelatex": {}
@rclient.srem = sinon.stub().callsArg(2)
@RedisManager.clearDocFromPendingUpdatesSet(@project_id, @doc_id, @callback)

View file

@ -10,6 +10,7 @@ describe "RedisManager.getDocsWithPendingUpdates", ->
@RedisManager = SandboxedModule.require modulePath, requires:
"redis" : createClient: () =>
@rclient = auth:->
"logger-sharelatex": {}
@docs = [{
doc_id: "doc-id-1"

View file

@ -1,43 +0,0 @@
sinon = require('sinon')
chai = require('chai')
should = chai.should()
modulePath = "../../../../app/js/RedisManager.js"
SandboxedModule = require('sandboxed-module')
describe "RedisManager.getHistoryLoadManagerThreshold", ->
beforeEach ->
@RedisManager = SandboxedModule.require modulePath, requires:
"redis": createClient: () =>
@rclient =
auth: () ->
"logger-sharelatex": @logger = {log: sinon.stub()}
@callback = sinon.stub()
describe "with no value", ->
beforeEach ->
@rclient.get = sinon.stub().callsArgWith(1, null, null)
@RedisManager.getHistoryLoadManagerThreshold @callback
it "should get the value", ->
@rclient.get
.calledWith("HistoryLoadManagerThreshold")
.should.equal true
it "should call the callback with 0", ->
@callback.calledWith(null, 0).should.equal true
describe "with a value", ->
beforeEach ->
@rclient.get = sinon.stub().callsArgWith(1, null, "42")
@RedisManager.getHistoryLoadManagerThreshold @callback
it "should get the value", ->
@rclient.get
.calledWith("HistoryLoadManagerThreshold")
.should.equal true
it "should call the callback with the numeric value", ->
@callback.calledWith(null, 42).should.equal true

View file

@ -4,13 +4,14 @@ should = chai.should()
modulePath = "../../../../app/js/RedisManager"
SandboxedModule = require('sandboxed-module')
describe "RedisManager.clearDocFromPendingUpdatesSet", ->
describe "RedisManager.prependDocOps", ->
beforeEach ->
@doc_id = "document-id"
@callback = sinon.stub()
@RedisManager = SandboxedModule.require modulePath, requires:
"redis" : createClient: () =>
@rclient = auth:->
"logger-sharelatex": {}
@rclient.lpush = sinon.stub().callsArg(2)
@ops = [

View file

@ -4,7 +4,7 @@ should = chai.should()
modulePath = "../../../../app/js/RedisManager"
SandboxedModule = require('sandboxed-module')
describe "RedisManager.getPreviousDocOpsTests", ->
describe "RedisManager.pushDocOp", ->
beforeEach ->
@callback = sinon.stub()
@RedisManager = SandboxedModule.require modulePath, requires:
@ -12,6 +12,7 @@ describe "RedisManager.getPreviousDocOpsTests", ->
@rclient =
auth: ->
multi: => @rclient
"logger-sharelatex": {}
@doc_id = "doc-id-123"
beforeEach ->

View file

@ -13,20 +13,27 @@ describe "RedisManager.pushUncompressedHistoryOp", ->
multi: () => @rclient
"logger-sharelatex": @logger = {log: sinon.stub()}
@doc_id = "doc-id-123"
@project_id = "project-id-123"
@callback = sinon.stub()
@rclient.rpush = sinon.stub()
describe "successfully", ->
beforeEach ->
@op = { op: [{ i: "foo", p: 4 }] }
@rclient.rpush = sinon.stub().callsArgWith(2, null, @length = 42)
@RedisManager.pushUncompressedHistoryOp @doc_id, @op, @callback
@rclient.rpush = sinon.stub()
@rclient.sadd = sinon.stub()
@rclient.exec = sinon.stub().callsArgWith(0, null, [@length = 42, "1"])
@RedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should push the doc op into the doc ops list", ->
@rclient.rpush
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@op))
.should.equal true
it "should add the doc_id to the set of which records the project docs", ->
@rclient.sadd
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
.should.equal true
it "should call the callback with the length", ->
@callback.calledWith(null, @length).should.equal true

View file

@ -45,69 +45,48 @@ describe "TrackChangesManager", ->
@op = "mock-op"
@TrackChangesManager.flushDocChanges = sinon.stub().callsArg(2)
describe "when the doc is under the load manager threshold", ->
describe "pushing the op", ->
beforeEach ->
@RedisManager.getHistoryLoadManagerThreshold = sinon.stub().callsArgWith(0, null, 40)
@TrackChangesManager.getLoadManagerBucket = sinon.stub().returns(30)
describe "pushing the op", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(2, null, 1)
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should push the op into redis", ->
@RedisManager.pushUncompressedHistoryOp
.calledWith(@doc_id, @op)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
it "should not try to flush the op", ->
@TrackChangesManager.flushDocChanges.called.should.equal false
describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(2, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should tell the track changes api to flush", ->
@TrackChangesManager.flushDocChanges
.calledWith(@project_id, @doc_id)
.should.equal true
describe "when TrackChangesManager errors", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(2, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops"))
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should log out the error", ->
@logger.error
.calledWith(
err: @error
doc_id: @doc_id
project_id: @project_id
"error flushing doc to track changes api"
)
.should.equal true
describe "when the doc is over the load manager threshold", ->
beforeEach ->
@RedisManager.getHistoryLoadManagerThreshold = sinon.stub().callsArgWith(0, null, 40)
@TrackChangesManager.getLoadManagerBucket = sinon.stub().returns(50)
@RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(2, null, 1)
@RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(3, null, 1)
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should not push the op", ->
@RedisManager.pushUncompressedHistoryOp.called.should.equal false
it "should not try to flush the op", ->
@TrackChangesManager.flushDocChanges.called.should.equal false
it "should push the op into redis", ->
@RedisManager.pushUncompressedHistoryOp
.calledWith(@project_id, @doc_id, @op)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
it "should not try to flush the op", ->
@TrackChangesManager.flushDocChanges.called.should.equal false
describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should tell the track changes api to flush", ->
@TrackChangesManager.flushDocChanges
.calledWith(@project_id, @doc_id)
.should.equal true
describe "when TrackChangesManager errors", ->
beforeEach ->
@RedisManager.pushUncompressedHistoryOp =
sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS)
@TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops"))
@TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback
it "should log out the error", ->
@logger.error
.calledWith(
err: @error
doc_id: @doc_id
project_id: @project_id
"error flushing doc to track changes api"
)
.should.equal true