Pop and set history in batches for speed

This commit is contained in:
James Allen 2013-08-09 13:27:35 +01:00
parent 3ab45c2a35
commit fe503010aa
2 changed files with 57 additions and 57 deletions

View file

@ -4,39 +4,61 @@ ConcatManager = require "./ConcatManager"
module.exports = ConversionManager = module.exports = ConversionManager =
OPS_TO_LEAVE: 10 OPS_TO_LEAVE: 10
removeLatestCompressedUpdate: (doc_id, callback = (error) ->) -> popLatestCompressedUpdate: (doc_id, callback = (error, update) ->) ->
db.docHistory.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: 1 } }, callback db.docHistory.findAndModify
query: { doc_id: ObjectId(doc_id) }
getLatestCompressedUpdate: (doc_id, callback = (error) ->) -> fields: { docOps: { $slice: -1 } }
db.docHistory.find { doc_id: ObjectId(doc_id) }, { docOps: { $slice: -1 } }, (error, history) -> update: { $pop: { docOps: 1 } }
, (error, history = { docOps: [] }) ->
return callback(error) if error? return callback(error) if error?
history = history[0] or { docOps: [] } callback null, history.docOps[0]
callback null, history.docOps.slice(-1)[0]
insertCompressedUpdates: (doc_id, updates, callback = (error) ->) -> insertCompressedUpdates: (doc_id, updates, callback = (error) ->) ->
db.docHistory.update { doc_id: ObjectId(doc_id) }, { $push: { docOps: { $each: updates } } }, { upsert: true }, callback db.docHistory.update { doc_id: ObjectId(doc_id) }, { $push: { docOps: { $each: updates } } }, { upsert: true }, callback
trimLastRawUpdate: (doc_id, tailVersion, callback = (error) ->) -> popOldRawUpdates: (doc_id, callback = (error, updates) ->) ->
db.docOps.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: -1 }, $set: { tailVersion: tailVersion + 1 } }, callback db.docOps.find { doc_id: ObjectId(doc_id) }, { version: true, tailVersion: true }, (error, docs) ->
getLastRawUpdateAndVersion: (doc_id, callback = (error, update, currentVersion, tailVersion) ->) ->
db.docOps.find { doc_id: ObjectId(doc_id) }, { version: true, tailVersion: true, docOps: { $slice: 1 } }, (error, docs) ->
return callback(error) if error? return callback(error) if error?
return callback(new Error("doc not found")) if docs.length == 0 return callback(new Error("doc not found")) if docs.length == 0
doc = docs[0] doc = docs[0]
callback null, doc.docOps[0], doc.version, doc.tailVersion or 0 currentVersion = doc.version
tailVersion = doc.tailVersion or 0
convertOldestRawUpdate: (doc_id, callback = (error, converted) ->) ->
ConversionManager.getLastRawUpdateAndVersion doc_id, (error, rawUpdate, currentVersion, tailVersion) ->
return callback(error) if error?
if currentVersion - tailVersion > ConversionManager.OPS_TO_LEAVE if currentVersion - tailVersion > ConversionManager.OPS_TO_LEAVE
rawUpdates = ConcatManager.normalizeUpdate(rawUpdate) db.docOps.findAndModify
ConversionManager.getLatestCompressedUpdate doc_id, (error, lastCompressedUpdate) -> query: { doc_id: ObjectId(doc_id), version: currentVersion }
update: {
$push: { docOps: { $each: [], $slice: - ConversionManager.OPS_TO_LEAVE } }
$set: tailVersion: currentVersion - ConversionManager.OPS_TO_LEAVE,
}
fields: { docOps: $slice: currentVersion - tailVersion - ConversionManager.OPS_TO_LEAVE }
, (error, doc) ->
return callback(error) if error?
if !doc?
# Version was modified since so try again
return ConversionManager.popOldRawUpdates doc_id, callback
else
return callback null, doc.docOps
else
callback null, []
convertOldRawUpdates: (doc_id, callback = (error) ->) ->
ConversionManager.popOldRawUpdates doc_id, (error, rawUpdates) ->
return callback(error) if error? return callback(error) if error?
removeAndModifyPreviousCompressedUpdate = (callback, compressedUpdates) -> length = rawUpdates.length
if lastCompressedUpdate?
normalizedRawUpdates = []
for rawUpdate in rawUpdates
normalizedRawUpdates = normalizedRawUpdates.concat ConcatManager.normalizeUpdate(rawUpdate)
rawUpdates = normalizedRawUpdates
ConversionManager.popLatestCompressedUpdate doc_id, (error, lastCompressedUpdate) ->
return callback(error) if error?
if !lastCompressedUpdate?
lastCompressedUpdate = rawUpdates.shift()
compressedUpdates = [lastCompressedUpdate] compressedUpdates = [lastCompressedUpdate]
for rawUpdate in rawUpdates for rawUpdate in rawUpdates
lastCompressedUpdate = compressedUpdates.pop() lastCompressedUpdate = compressedUpdates.pop()
@ -44,30 +66,8 @@ module.exports = ConversionManager =
compressedUpdates = compressedUpdates.concat ConcatManager.concatTwoUpdates lastCompressedUpdate, rawUpdate compressedUpdates = compressedUpdates.concat ConcatManager.concatTwoUpdates lastCompressedUpdate, rawUpdate
else else
compressedUpdates.push rawUpdate compressedUpdates.push rawUpdate
ConversionManager.removeLatestCompressedUpdate doc_id, (error) -> ConversionManager.insertCompressedUpdates doc_id, compressedUpdates, (error) ->
return callback(error) if error? return callback(error) if error?
callback null, compressedUpdates console.log doc_id, "Pushed doc ops", length
else
callback null, rawUpdates
removeAndModifyPreviousCompressedUpdate (error, newCompressedUpdates) ->
return callback(error) if error?
ConversionManager.insertCompressedUpdates doc_id, newCompressedUpdates, (error) ->
return callback(error) if error?
ConversionManager.trimLastRawUpdate doc_id, tailVersion, (error) ->
return callback(error) if error?
console.log doc_id, "Pushed op", tailVersion
callback null, true callback null, true
else
console.log doc_id, "Up to date"
callback null, false
convertAllOldRawUpdates: (doc_id, callback = (error) ->) ->
ConversionManager.convertOldestRawUpdate doc_id, (error, converted) ->
return callback(error) if error?
if converted
# Keep going
ConversionManager.convertAllOldRawUpdates doc_id, callback
else
callback()

View file

@ -9,7 +9,7 @@ db.docOps.find { }, { doc_id: true }, (error, docs) ->
do (doc) -> do (doc) ->
jobs.push (callback) -> jobs.push (callback) ->
doc_id = doc.doc_id.toString() doc_id = doc.doc_id.toString()
ConversionManager.convertAllOldRawUpdates doc_id, (error) -> ConversionManager.convertOldRawUpdates doc_id, (error) ->
return callback(error) if error? return callback(error) if error?
console.log doc_id, "DONE" console.log doc_id, "DONE"
db.docHistory.find { doc_id: ObjectId(doc_id) }, (error, docs) -> db.docHistory.find { doc_id: ObjectId(doc_id) }, (error, docs) ->
@ -17,10 +17,10 @@ db.docOps.find { }, { doc_id: true }, (error, docs) ->
doc = docs[0] doc = docs[0]
if doc? if doc?
for update in doc.docOps for update in doc.docOps
op = update.op[0] op = update?.op[0]
if op.i? if op?.i?
console.log doc_id, update.meta.start_ts, update.meta.end_ts, update.meta.user_id, "INSERT", op.p, op.i console.log doc_id, update.meta.start_ts, update.meta.end_ts, update.meta.user_id, "INSERT", op.p, op.i
else if op.d? else if op?.d?
console.log doc_id, update.meta.start_ts, update.meta.end_ts, update.meta.user_id, "DELETE", op.p, op.d console.log doc_id, update.meta.start_ts, update.meta.end_ts, update.meta.user_id, "DELETE", op.p, op.d
else else
console.log doc_id, "NO HISTORY" console.log doc_id, "NO HISTORY"