Merge pull request #12 from sharelatex/pack-in-place

Pack in place
This commit is contained in:
Brian Gough 2016-01-20 13:38:04 +00:00
commit b3f0dc4157
8 changed files with 432 additions and 90 deletions

View file

@ -1,20 +1,17 @@
{db, ObjectId} = require "./mongojs"
PackManager = require "./PackManager"
async = require "async"
_ = require "underscore"
module.exports = MongoManager =
getLastCompressedUpdate: (doc_id, callback = (error, update) ->) ->
db.docHistory
.find(doc_id: ObjectId(doc_id.toString()))
.find(doc_id: ObjectId(doc_id.toString()), {pack: {$slice:-1}}) # only return the last entry in a pack
.sort( v: -1 )
.limit(1)
.toArray (error, compressedUpdates) ->
return callback(error) if error?
if compressedUpdates[0]?.pack?
# cannot pop from a pack, throw error
error = new Error("last compressed update is a pack")
return callback error, null
return callback null, compressedUpdates[0] or null
callback null, compressedUpdates[0] or null
peekLastCompressedUpdate: (doc_id, callback = (error, update, version) ->) ->
# under normal use we pass back the last update as
@ -55,7 +52,6 @@ module.exports = MongoManager =
else
callback(err,results)
modifyCompressedUpdate: (lastUpdate, newUpdate, callback = (error) ->) ->
return callback() if not newUpdate?
db.docHistory.findAndModify
@ -140,9 +136,7 @@ module.exports = MongoManager =
# For finding all updates that go into a diff for a doc
db.docHistory.ensureIndex { doc_id: 1, v: 1 }, { background: true }
# For finding all updates that affect a project
db.docHistory.ensureIndex { project_id: 1, "meta.end_ts": 1 }, { background: true }
# For finding all packs that affect a project (use a sparse index so only packs are included)
db.docHistory.ensureIndex { project_id: 1, "pack.0.meta.end_ts": 1, "meta.end_ts": 1}, { background: true, sparse: true }
db.docHistory.ensureIndex { project_id: 1, "meta.end_ts": 1, "meta.start_ts": -1 }, { background: true }
# For finding updates that don't yet have a project_id and need it inserting
db.docHistory.ensureIndex { doc_id: 1, project_id: 1 }, { background: true }
# For finding project meta-data

View file

@ -4,6 +4,8 @@ _ = require "underscore"
logger = require "logger-sharelatex"
LockManager = require "./LockManager"
DAYS = 24 * 3600 * 1000 # one day in milliseconds
module.exports = PackManager =
# The following functions implement methods like a mongo find, but
# expands any documents containing a 'pack' field into multiple
@ -86,6 +88,10 @@ module.exports = PackManager =
needMore = false # keep track of whether we need to load more data
updates = [] # used to accumulate the set of results
# FIXME: packs are big so we should accumulate the results
# incrementally instead of using .toArray() to avoid reading all
# of the changes into memory
cursor.toArray (err, result) ->
unpackedSet = PackManager._unpackResults(result)
updates = PackManager._filterAndLimit(updates, unpackedSet, filterFn, limit)
@ -139,6 +145,9 @@ module.exports = PackManager =
updates = [] # used to accumulate the set of results
# FIXME: packs are big so we should accumulate the results
# incrementally instead of using .toArray() to avoid reading all
# of the changes into memory
cursor.toArray (err, result) ->
if err?
return callback err, result
@ -176,20 +185,25 @@ module.exports = PackManager =
.find(tailQuery, projection)
.sort(sort)
# now find any packs that overlap with the time window
# now find any packs that overlap with the time window from outside
# cutoff before
# --|-----wanted-range--|------------------ time=>
# |-------------|pack(end_ts)
#
# these were not picked up by the original query because
# end_ts>before but the beginning of the pack may be in the time range
overlapQuery = _.clone(query)
if before? && cutoff?
overlapQuery['meta.end_ts'] = {"$gte": before}
overlapQuery['pack.0.meta.end_ts'] = {"$lte": before }
overlapQuery['meta.start_ts'] = {"$lte": before }
else if before? && not cutoff?
overlapQuery['meta.end_ts'] = {"$gte": before}
overlapQuery['pack.0.meta.end_ts'] = {"$lte": before }
overlapQuery['meta.start_ts'] = {"$lte": before }
else if not before? && cutoff?
overlapQuery['meta.end_ts'] = {"$gte": cutoff}
overlapQuery['pack.0.meta.end_ts'] = {"$gte": 0 }
overlapQuery['meta.end_ts'] = {"$gte": cutoff} # we already have these??
else if not before? && not cutoff?
overlapQuery['meta.end_ts'] = {"$gte": 0 }
overlapQuery['pack.0.meta.end_ts'] = {"$gte": 0 }
overlapQuery['meta.end_ts'] = {"$gte": 0 } # shouldn't happen??
overlap = collection
.find(overlapQuery, projection)
.sort(sort)
@ -254,16 +268,11 @@ module.exports = PackManager =
return newResults
MAX_SIZE: 1024*1024 # make these configurable parameters
MAX_COUNT: 1024
MIN_COUNT: 100
KEEP_OPS: 100
MAX_COUNT: 512
convertDocsToPacks: (docs, callback) ->
packs = []
top = null
# keep the last KEEP_OPS as individual ops
docs = docs.slice(0,-PackManager.KEEP_OPS)
docs.forEach (d,i) ->
# skip existing packs
if d.pack?
@ -280,22 +289,17 @@ module.exports = PackManager =
top.v_end = d.v
top.meta.end_ts = d.meta.end_ts
return
else if sz < PackManager.MAX_SIZE
else
# create a new pack
top = _.clone(d)
top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ]
top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts }
top.sz = sz
top.v_end = d.v
delete top.op
delete top._id
packs.push top
else
# keep the op
# util.log "keeping large op unchanged (#{sz} bytes)"
# only store packs with a sufficient number of ops, discard others
packs = packs.filter (packObj) ->
packObj.pack.length > PackManager.MIN_COUNT
callback(null, packs)
checkHistory: (docs, callback) ->
@ -405,7 +409,7 @@ module.exports = PackManager =
}, {upsert:true}, () ->
callback null, null
DB_WRITE_DELAY: 2000
DB_WRITE_DELAY: 100
savePacks: (packs, callback) ->
async.eachSeries packs, PackManager.safeInsert, (err, result) ->
@ -451,3 +455,78 @@ module.exports = PackManager =
bulk.execute callback
else
callback()
insertCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
return callback() if newUpdates.length == 0
updatesToFlush = []
updatesRemaining = newUpdates.slice()
n = lastUpdate?.n || 0
sz = lastUpdate?.sz || 0
while updatesRemaining.length and n < PackManager.MAX_COUNT and sz < PackManager.MAX_SIZE
nextUpdate = updatesRemaining[0]
nextUpdateSize = BSON.calculateObjectSize(nextUpdate)
if nextUpdateSize + sz > PackManager.MAX_SIZE and n > 0
break
n++
sz += nextUpdateSize
updatesToFlush.push updatesRemaining.shift()
PackManager.flushCompressedUpdates project_id, doc_id, lastUpdate, updatesToFlush, temporary, (error) ->
return callback(error) if error?
PackManager.insertCompressedUpdates project_id, doc_id, null, updatesRemaining, temporary, callback
flushCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
return callback() if newUpdates.length == 0
if lastUpdate? and not (temporary and ((Date.now() - lastUpdate.meta?.start_ts) > 1 * DAYS))
PackManager.appendUpdatesToExistingPack project_id, doc_id, lastUpdate, newUpdates, temporary, callback
else
PackManager.insertUpdatesIntoNewPack project_id, doc_id, newUpdates, temporary, callback
insertUpdatesIntoNewPack: (project_id, doc_id, newUpdates, temporary, callback = (error) ->) ->
first = newUpdates[0]
last = newUpdates[newUpdates.length - 1]
n = newUpdates.length
sz = BSON.calculateObjectSize(newUpdates)
newPack =
project_id: ObjectId(project_id.toString())
doc_id: ObjectId(doc_id.toString())
pack: newUpdates
n: n
sz: sz
meta:
start_ts: first.meta.start_ts
end_ts: last.meta.end_ts
v: first.v
v_end: last.v
if temporary
newPack.expiresAt = new Date(Date.now() + 7 * DAYS)
logger.log {project_id, doc_id, newUpdates}, "inserting updates into new pack"
db.docHistory.insert newPack, callback
appendUpdatesToExistingPack: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
first = newUpdates[0]
last = newUpdates[newUpdates.length - 1]
n = newUpdates.length
sz = BSON.calculateObjectSize(newUpdates)
query =
_id: lastUpdate._id
project_id: ObjectId(project_id.toString())
doc_id: ObjectId(doc_id.toString())
pack: {$exists: true}
update =
$push:
"pack": {$each: newUpdates}
$inc:
"n": n
"sz": sz
$set:
"meta.end_ts": last.meta.end_ts
"v_end": last.v
if lastUpdate.expiresAt and temporary
update.$set.expiresAt = new Date(Date.now() + 7 * DAYS)
logger.log {project_id, doc_id, lastUpdate, newUpdates}, "appending updates to existing pack"
db.docHistory.findAndModify {query, update}, callback

View file

@ -1,4 +1,5 @@
MongoManager = require "./MongoManager"
PackManager = require "./PackManager"
RedisManager = require "./RedisManager"
UpdateCompressor = require "./UpdateCompressor"
LockManager = require "./LockManager"
@ -43,34 +44,50 @@ module.exports = UpdatesManager =
else
return callback error
compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates
if rawUpdates.length == 0
return callback()
if not lastCompressedUpdate?
# no existing update, insert everything
if not lastCompressedUpdate? or lastCompressedUpdate.pack? # handle pack append as a special case
UpdatesManager._updatePack project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback
else #use the existing op code
UpdatesManager._updateOp project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback
_updatePack: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) ->
compressedUpdates = UpdateCompressor.compressRawUpdates null, rawUpdates
PackManager.insertCompressedUpdates project_id, doc_id, lastCompressedUpdate, compressedUpdates, temporary, (error, result) ->
return callback(error) if error?
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result?
callback()
_updateOp: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) ->
compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates
if not lastCompressedUpdate?
# no existing update, insert everything
updateToModify = null
updatesToInsert = compressedUpdates
else
# there are existing updates, see what happens when we
# compress them together with the new ones
[firstUpdate, additionalUpdates...] = compressedUpdates
if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate)
# first update version hasn't changed, skip it and insert remaining updates
# this is an optimisation, we could update the existing op with itself
updateToModify = null
updatesToInsert = compressedUpdates
updatesToInsert = additionalUpdates
else
# there are existing updates, see what happens when we
# compress them together with the new ones
[firstUpdate, additionalUpdates...] = compressedUpdates
# first update version did changed, modify it and insert remaining updates
updateToModify = firstUpdate
updatesToInsert = additionalUpdates
if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate)
# first update version hasn't changed, skip it and insert remaining updates
# this is an optimisation, we could update the existing op with itself
updateToModify = null
updatesToInsert = additionalUpdates
else
# first update version did changed, modify it and insert remaining updates
updateToModify = firstUpdate
updatesToInsert = additionalUpdates
MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) ->
MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) ->
return callback(error) if error?
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result?
MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) ->
return callback(error) if error?
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result?
MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) ->
return callback(error) if error?
logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates"
callback()
logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: rawUpdates.length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates"
callback()
REDIS_READ_BATCH_SIZE: 100
processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) ->

View file

@ -37,7 +37,7 @@ describe "Appending doc ops to the history", ->
done()
it "should insert the compressed op into mongo", ->
expect(@updates[0].op).to.deep.equal [{
expect(@updates[0].pack[0].op).to.deep.equal [{
p: 3, i: "foo"
}]
@ -99,13 +99,13 @@ describe "Appending doc ops to the history", ->
throw error if error?
done()
it "should combine all the updates into one", ->
expect(@updates[0].op).to.deep.equal [{
p: 3, i: "foobar"
it "should combine all the updates into one pack", ->
expect(@updates[0].pack[1].op).to.deep.equal [{
p: 6, i: "bar"
}]
it "should insert the correct version number into mongo", ->
expect(@updates[0].v).to.equal 8
expect(@updates[0].v_end).to.equal 8
describe "when the updates are far apart", ->
@ -129,11 +129,11 @@ describe "Appending doc ops to the history", ->
throw error if error?
done()
it "should keep the updates separate", ->
expect(@updates[0].op).to.deep.equal [{
it "should combine the updates into one pack", ->
expect(@updates[0].pack[0].op).to.deep.equal [{
p: 3, i: "foo"
}]
expect(@updates[1].op).to.deep.equal [{
expect(@updates[0].pack[1].op).to.deep.equal [{
p: 6, i: "bar"
}]
@ -160,10 +160,10 @@ describe "Appending doc ops to the history", ->
done()
it "should concat the compressed op into mongo", ->
expect(@updates[0].op).to.deep.equal @expectedOp
expect(@updates[0].pack.length).to.deep.equal 3 # batch size is 100
it "should insert the correct version number into mongo", ->
expect(@updates[0].v).to.equal 250
expect(@updates[0].v_end).to.equal 250
describe "when there are multiple ops in each update", ->
@ -188,16 +188,16 @@ describe "Appending doc ops to the history", ->
done()
it "should insert the compressed ops into mongo", ->
expect(@updates[0].op).to.deep.equal [{
expect(@updates[0].pack[0].op).to.deep.equal [{
p: 3, i: "foo"
}]
expect(@updates[1].op).to.deep.equal [{
expect(@updates[0].pack[1].op).to.deep.equal [{
p: 6, i: "bar"
}]
it "should insert the correct version numbers into mongo", ->
expect(@updates[0].v).to.equal 3
expect(@updates[1].v).to.equal 4
expect(@updates[0].pack[0].v).to.equal 3
expect(@updates[0].pack[1].v).to.equal 4
describe "when there is a no-op update", ->
before (done) ->
@ -221,17 +221,17 @@ describe "Appending doc ops to the history", ->
done()
it "should insert the compressed no-op into mongo", ->
expect(@updates[0].op).to.deep.equal []
expect(@updates[0].pack[0].op).to.deep.equal []
it "should insert the compressed next update into mongo", ->
expect(@updates[1].op).to.deep.equal [{
expect(@updates[0].pack[1].op).to.deep.equal [{
p: 3, i: "foo"
}]
it "should insert the correct version numbers into mongo", ->
expect(@updates[0].v).to.equal 3
expect(@updates[1].v).to.equal 4
expect(@updates[0].pack[0].v).to.equal 3
expect(@updates[0].pack[1].v).to.equal 4
describe "when the project has versioning enabled", ->
before (done) ->

View file

@ -89,9 +89,10 @@ describe "Archiving updates", ->
doc.lastVersion.should.equal 20
done()
it "should store twenty doc changes in S3", (done) ->
it "should store twenty doc changes in S3 in one pack", (done) ->
TrackChangesClient.getS3Doc @project_id, @doc_id, (error, res, doc) =>
doc.length.should.equal 20
doc.length.should.equal 1
doc[0].pack.length.should.equal 20
done()
describe "unarchiving a doc's updates", ->
@ -103,7 +104,7 @@ describe "Archiving updates", ->
it "should restore doc changes", (done) ->
db.docHistory.count { doc_id: ObjectId(@doc_id) }, (error, count) ->
throw error if error?
count.should.equal 20
count.should.equal 1
done()
it "should remove doc marked as inS3", (done) ->

View file

@ -31,7 +31,7 @@ describe "Flushing updates", ->
it "should flush the op into mongo", (done) ->
TrackChangesClient.getCompressedUpdates @doc_id, (error, updates) ->
expect(updates[0].op).to.deep.equal [{
expect(updates[0].pack[0].op).to.deep.equal [{
p: 3, i: "f"
}]
done()

View file

@ -0,0 +1,232 @@
sinon = require('sinon')
chai = require('chai')
should = chai.should()
expect = chai.expect
modulePath = "../../../../app/js/PackManager.js"
SandboxedModule = require('sandboxed-module')
{ObjectId} = require("mongojs")
bson = require("bson")
BSON = new bson.BSONPure()
tk = require "timekeeper"
describe "PackManager", ->
beforeEach ->
tk.freeze(new Date())
@PackManager = SandboxedModule.require modulePath, requires:
"./mongojs" : { db: @db = {}, ObjectId: ObjectId, BSON: BSON }
"./LockManager" : {}
"logger-sharelatex": { log: sinon.stub(), error: sinon.stub() }
@callback = sinon.stub()
@doc_id = ObjectId().toString()
@project_id = ObjectId().toString()
afterEach ->
tk.reset()
describe "insertCompressedUpdates", ->
beforeEach ->
@lastUpdate = {
_id: "12345"
pack: [
{ op: "op-1", meta: "meta-1", v: 1},
{ op: "op-2", meta: "meta-2", v: 2}
]
n : 2
sz : 100
}
@newUpdates = [
{ op: "op-3", meta: "meta-3", v: 3},
{ op: "op-4", meta: "meta-4", v: 4}
]
@db.docHistory =
insert: sinon.stub().callsArg(1)
findAndModify: sinon.stub().callsArg(1)
describe "with no last update", ->
beforeEach ->
@PackManager.insertUpdatesIntoNewPack = sinon.stub().callsArg(4)
@PackManager.insertCompressedUpdates @project_id, @doc_id, null, @newUpdates, true, @callback
describe "for a small update", ->
it "should insert the update into a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates, true).should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "for many small updates", ->
beforeEach ->
@newUpdates = ({ op: "op-#{i}", meta: "meta-#{i}", v: i} for i in [0..2048])
@PackManager.insertCompressedUpdates @project_id, @doc_id, null, @newUpdates, false, @callback
it "should append the initial updates to the existing pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[0...512], false).should.equal true
it "should insert the first set remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[512...1024], false).should.equal true
it "should insert the second set of remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1024...1536], false).should.equal true
it "should insert the third set of remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1536...2048], false).should.equal true
it "should insert the final set of remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[2048..2048], false).should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "with an existing pack as the last update", ->
beforeEach ->
@PackManager.appendUpdatesToExistingPack = sinon.stub().callsArg(5)
@PackManager.insertUpdatesIntoNewPack = sinon.stub().callsArg(4)
@PackManager.insertCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, false, @callback
describe "for a small update", ->
it "should append the update to the existing pack", ->
@PackManager.appendUpdatesToExistingPack.calledWith(@project_id, @doc_id, @lastUpdate, @newUpdates, false).should.equal true
it "should not insert any new packs", ->
@PackManager.insertUpdatesIntoNewPack.called.should.equal false
it "should call the callback", ->
@callback.called.should.equal true
describe "for many small updates", ->
beforeEach ->
@newUpdates = ({ op: "op-#{i}", meta: "meta-#{i}", v: i} for i in [0..2048])
@PackManager.insertCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, false, @callback
it "should append the initial updates to the existing pack", ->
@PackManager.appendUpdatesToExistingPack.calledWith(@project_id, @doc_id, @lastUpdate, @newUpdates[0...510], false).should.equal true
it "should insert the first set remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[510...1022], false).should.equal true
it "should insert the second set of remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1022...1534], false).should.equal true
it "should insert the third set of remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1534...2046], false).should.equal true
it "should insert the final set of remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[2046..2048], false).should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "for many big updates", ->
beforeEach ->
longString = ("a" for [0 .. (0.75*@PackManager.MAX_SIZE)]).join("")
@newUpdates = ({ op: "op-#{i}-#{longString}", meta: "meta-#{i}", v: i} for i in [0..4])
@PackManager.insertCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, false, @callback
it "should append the initial updates to the existing pack", ->
@PackManager.appendUpdatesToExistingPack.calledWith(@project_id, @doc_id, @lastUpdate, @newUpdates[0..0], false).should.equal true
it "should insert the first set remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1..1], false).should.equal true
it "should insert the second set of remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[2..2], false).should.equal true
it "should insert the third set of remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[3..3], false).should.equal true
it "should insert the final set of remaining updates as a new pack", ->
@PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[4..4], false).should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "flushCompressedUpdates", ->
describe "when there is no previous update", ->
beforeEach ->
@PackManager.flushCompressedUpdates @project_id, @doc_id, null, @newUpdates, true, @callback
describe "for a small update that will expire", ->
it "should insert the update into mongo", ->
@db.docHistory.insert.calledWithMatch({
pack: @newUpdates,
project_id: ObjectId(@project_id),
doc_id: ObjectId(@doc_id)
n: @newUpdates.length
v: @newUpdates[0].v
v_end: @newUpdates[@newUpdates.length-1].v
}).should.equal true
it "should set an expiry time in the future", ->
@db.docHistory.insert.calledWithMatch({
expiresAt: new Date(Date.now() + 7 * 24 * 3600 * 1000)
}).should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "when there is a recent previous update in mongo", ->
beforeEach ->
@lastUpdate = {
_id: "12345"
pack: [
{ op: "op-1", meta: "meta-1", v: 1},
{ op: "op-2", meta: "meta-2", v: 2}
]
n : 2
sz : 100
expiresAt: new Date(Date.now())
}
@PackManager.flushCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, true, @callback
describe "for a small update that will expire", ->
it "should append the update in mongo", ->
@db.docHistory.findAndModify.calledWithMatch({
query: {_id: @lastUpdate._id}
update: { $push: {"pack" : {$each: @newUpdates}}, $set: {v_end: @newUpdates[@newUpdates.length-1].v}}
}).should.equal true
it "should set an expiry time in the future", ->
@db.docHistory.findAndModify.calledWithMatch({
update: {$set: {expiresAt: new Date(Date.now() + 7 * 24 * 3600 * 1000)}}
}).should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "when there is an old previous update in mongo", ->
beforeEach ->
@lastUpdate = {
_id: "12345"
pack: [
{ op: "op-1", meta: "meta-1", v: 1},
{ op: "op-2", meta: "meta-2", v: 2}
]
n : 2
sz : 100
meta: {start_ts: Date.now() - 30 * 24 * 3600 * 1000}
expiresAt: new Date(Date.now() - 30 * 24 * 3600 * 1000)
}
@PackManager.flushCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, true, @callback
describe "for a small update that will expire", ->
it "should insert the update into mongo", ->
@db.docHistory.insert.calledWithMatch({
pack: @newUpdates,
project_id: ObjectId(@project_id),
doc_id: ObjectId(@doc_id)
n: @newUpdates.length
v: @newUpdates[0].v
v_end: @newUpdates[@newUpdates.length-1].v
}).should.equal true
it "should set an expiry time in the future", ->
@db.docHistory.insert.calledWithMatch({
expiresAt: new Date(Date.now() + 7 * 24 * 3600 * 1000)
}).should.equal true
it "should call the callback", ->
@callback.called.should.equal true

View file

@ -10,6 +10,7 @@ describe "UpdatesManager", ->
@UpdatesManager = SandboxedModule.require modulePath, requires:
"./UpdateCompressor": @UpdateCompressor = {}
"./MongoManager" : @MongoManager = {}
"./PackManager" : @PackManager = {}
"./RedisManager" : @RedisManager = {}
"./LockManager" : @LockManager = {}
"./WebApiManager": @WebApiManager = {}
@ -41,8 +42,7 @@ describe "UpdatesManager", ->
@compressedUpdates = [ { v: 13, op: "compressed-op-12" } ]
@MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null)
@MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2)
@MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4)
@PackManager.insertCompressedUpdates = sinon.stub().callsArg(5)
@UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates)
@UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, @rawUpdates, @temporary, @callback
@ -51,14 +51,9 @@ describe "UpdatesManager", ->
.calledWith(@doc_id)
.should.equal true
it "should compress the raw ops", ->
@UpdateCompressor.compressRawUpdates
.calledWith(null, @rawUpdates)
.should.equal true
it "should save the compressed ops", ->
@MongoManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, @compressedUpdates, @temporary)
it "should save the compressed ops as a pack", ->
@PackManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, null, @compressedUpdates, @temporary)
.should.equal true
it "should call the callback", ->
@ -72,6 +67,7 @@ describe "UpdatesManager", ->
@MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate, @lastCompressedUpdate.v)
@MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2)
@MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4)
@PackManager.insertCompressedUpdates = sinon.stub().callsArg(5)
@UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates)
describe "when the raw ops start where the existing history ends", ->
@ -102,6 +98,30 @@ describe "UpdatesManager", ->
it "should call the callback", ->
@callback.called.should.equal true
describe "when the raw ops start where the existing history ends and the history is in a pack", ->
beforeEach ->
@lastCompressedUpdate = {pack: [{ v: 11, op: "compressed-op-11" }], v:11}
@rawUpdates = [{ v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }]
@MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate, @lastCompressedUpdate.v)
@UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, @rawUpdates, @temporary, @callback
it "should look at the last compressed op", ->
@MongoManager.peekLastCompressedUpdate
.calledWith(@doc_id)
.should.equal true
it "should defer the compression of raw ops until they are written in a new pack", ->
@UpdateCompressor.compressRawUpdates
.should.not.be.called
it "should save the new compressed ops into a pack", ->
@PackManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, @lastCompressedUpdate, @compressedUpdates, @temporary)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "when some raw ops are passed that have already been compressed", ->
beforeEach ->
@rawUpdates = [{ v: 10, op: "mock-op-10" }, { v: 11, op: "mock-op-11"}, { v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }]
@ -136,8 +156,7 @@ describe "UpdatesManager", ->
@compressedUpdates = [ { v: 13, op: "compressed-op-12" } ]
@MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null, @lastVersion)
@MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2)
@MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4)
@PackManager.insertCompressedUpdates = sinon.stub().callsArg(5)
@UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates)
describe "when the raw ops start where the existing history ends", ->
@ -156,8 +175,8 @@ describe "UpdatesManager", ->
.should.equal true
it "should save the compressed ops", ->
@MongoManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, @compressedUpdates, @temporary)
@PackManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, null, @compressedUpdates, @temporary)
.should.equal true
it "should call the callback", ->