put all new ops into packs

This commit is contained in:
Brian Gough 2015-12-11 15:56:47 +00:00
parent 184204130c
commit 5e830cbbdb
4 changed files with 122 additions and 43 deletions

View file

@ -1,20 +1,17 @@
{db, ObjectId} = require "./mongojs"
PackManager = require "./PackManager"
async = require "async"
_ = require "underscore"
module.exports = MongoManager =
getLastCompressedUpdate: (doc_id, callback = (error, update) ->) ->
db.docHistory
.find(doc_id: ObjectId(doc_id.toString()))
.find(doc_id: ObjectId(doc_id.toString()), {pack: {$slice:-1}}) # only return the last entry in a pack
.sort( v: -1 )
.limit(1)
.toArray (error, compressedUpdates) ->
return callback(error) if error?
if compressedUpdates[0]?.pack?
# cannot pop from a pack, throw error
error = new Error("last compressed update is a pack")
return callback error, null
return callback null, compressedUpdates[0] or null
callback null, compressedUpdates[0] or null
peekLastCompressedUpdate: (doc_id, callback = (error, update, version) ->) ->
# under normal use we pass back the last update as
@ -55,7 +52,6 @@ module.exports = MongoManager =
else
callback(err,results)
modifyCompressedUpdate: (lastUpdate, newUpdate, callback = (error) ->) ->
return callback() if not newUpdate?
db.docHistory.findAndModify

View file

@ -451,3 +451,74 @@ module.exports = PackManager =
bulk.execute callback
else
callback()
insertCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
return callback() if newUpdates.length == 0
updatesToFlush = []
updatesRemaining = newUpdates.slice()
n = lastUpdate?.n || 0
sz = lastUpdate?.sz || 0
while updatesRemaining.length and n < PackManager.MAX_COUNT and sz < PackManager.MAX_SIZE
nextUpdate = updatesRemaining[0]
nextUpdateSize = BSON.calculateObjectSize(nextUpdate)
if nextUpdateSize + sz > PackManager.MAX_SIZE and n > 0
break
n++
sz += nextUpdateSize
updatesToFlush.push updatesRemaining.shift()
PackManager.flushCompressedUpdates project_id, doc_id, lastUpdate, updatesToFlush, temporary, (error) ->
return callback(error) if error?
PackManager.insertCompressedUpdates project_id, doc_id, null, updatesRemaining, temporary, callback
flushCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
return callback() if newUpdates.length == 0
if lastUpdate?
PackManager.appendUpdatesToExistingPack project_id, doc_id, lastUpdate, newUpdates, temporary, callback
else
PackManager.insertUpdatesIntoNewPack project_id, doc_id, newUpdates, temporary, callback
insertUpdatesIntoNewPack: (project_id, doc_id, newUpdates, temporary, callback = (error) ->) ->
first = newUpdates[0]
last = newUpdates[newUpdates.length - 1]
n = newUpdates.length
sz = BSON.calculateObjectSize(newUpdates)
newPack =
project_id: ObjectId(project_id.toString())
doc_id: ObjectId(doc_id.toString())
pack: newUpdates
n: n
sz: sz
meta:
start_ts: first.meta.start_ts
end_ts: last.meta.end_ts
v: first.v
v_end: last.v
logger.log {project_id, doc_id, newUpdates}, "inserting updates into new pack"
db.docHistory.insert newPack, callback
appendUpdatesToExistingPack: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
first = newUpdates[0]
last = newUpdates[newUpdates.length - 1]
n = newUpdates.length
sz = BSON.calculateObjectSize(newUpdates)
query =
_id: lastUpdate._id
project_id: ObjectId(project_id.toString())
doc_id: ObjectId(doc_id.toString())
pack: {$exists: true}
update =
$push:
"pack": {$each: newUpdates}
$inc:
"n": n
"sz": sz
$set:
"meta.end_ts": last.meta.end_ts
"v_end": last.v
logger.log {project_id, doc_id, lastUpdate, newUpdates}, "appending updates to existing pack"
db.docHistory.findAndModify {query, update}, callback

View file

@ -1,4 +1,5 @@
MongoManager = require "./MongoManager"
PackManager = require "./PackManager"
RedisManager = require "./RedisManager"
UpdateCompressor = require "./UpdateCompressor"
LockManager = require "./LockManager"
@ -43,34 +44,50 @@ module.exports = UpdatesManager =
else
return callback error
compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates
if rawUpdates.length == 0
return callback()
if not lastCompressedUpdate?
# no existing update, insert everything
if not lastCompressedUpdate? or lastCompressedUpdate.pack? # handle pack append as a special case
UpdatesManager._updatePack project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback
else #use the existing op code
UpdatesManager._updateOp project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback
_updatePack: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) ->
compressedUpdates = UpdateCompressor.compressRawUpdates null, rawUpdates
PackManager.insertCompressedUpdates project_id, doc_id, lastCompressedUpdate, compressedUpdates, temporary, (error, result) ->
return callback(error) if error?
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result?
callback()
_updateOp: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) ->
compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates
if not lastCompressedUpdate?
# no existing update, insert everything
updateToModify = null
updatesToInsert = compressedUpdates
else
# there are existing updates, see what happens when we
# compress them together with the new ones
[firstUpdate, additionalUpdates...] = compressedUpdates
if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate)
# first update version hasn't changed, skip it and insert remaining updates
# this is an optimisation, we could update the existing op with itself
updateToModify = null
updatesToInsert = compressedUpdates
updatesToInsert = additionalUpdates
else
# there are existing updates, see what happens when we
# compress them together with the new ones
[firstUpdate, additionalUpdates...] = compressedUpdates
# first update version did changed, modify it and insert remaining updates
updateToModify = firstUpdate
updatesToInsert = additionalUpdates
if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate)
# first update version hasn't changed, skip it and insert remaining updates
# this is an optimisation, we could update the existing op with itself
updateToModify = null
updatesToInsert = additionalUpdates
else
# first update version did changed, modify it and insert remaining updates
updateToModify = firstUpdate
updatesToInsert = additionalUpdates
MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) ->
MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) ->
return callback(error) if error?
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result?
MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) ->
return callback(error) if error?
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result?
MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) ->
return callback(error) if error?
logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates"
callback()
logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: rawUpdates.length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates"
callback()
REDIS_READ_BATCH_SIZE: 100
processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) ->

View file

@ -10,6 +10,7 @@ describe "UpdatesManager", ->
@UpdatesManager = SandboxedModule.require modulePath, requires:
"./UpdateCompressor": @UpdateCompressor = {}
"./MongoManager" : @MongoManager = {}
"./PackManager" : @PackManager = {}
"./RedisManager" : @RedisManager = {}
"./LockManager" : @LockManager = {}
"./WebApiManager": @WebApiManager = {}
@ -41,8 +42,7 @@ describe "UpdatesManager", ->
@compressedUpdates = [ { v: 13, op: "compressed-op-12" } ]
@MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null)
@MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2)
@MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4)
@PackManager.insertCompressedUpdates = sinon.stub().callsArg(5)
@UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates)
@UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, @rawUpdates, @temporary, @callback
@ -51,16 +51,12 @@ describe "UpdatesManager", ->
.calledWith(@doc_id)
.should.equal true
it "should compress the raw ops", ->
@UpdateCompressor.compressRawUpdates
.calledWith(null, @rawUpdates)
.should.equal true
it "should save the compressed ops", ->
@MongoManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, @compressedUpdates, @temporary)
@PackManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, null, @compressedUpdates, @temporary)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
@ -136,8 +132,7 @@ describe "UpdatesManager", ->
@compressedUpdates = [ { v: 13, op: "compressed-op-12" } ]
@MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null, @lastVersion)
@MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2)
@MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4)
@PackManager.insertCompressedUpdates = sinon.stub().callsArg(5)
@UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates)
describe "when the raw ops start where the existing history ends", ->
@ -156,8 +151,8 @@ describe "UpdatesManager", ->
.should.equal true
it "should save the compressed ops", ->
@MongoManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, @compressedUpdates, @temporary)
@PackManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, null, @compressedUpdates, @temporary)
.should.equal true
it "should call the callback", ->