From fe503010aaee6efb1666f8febd077792f0363505 Mon Sep 17 00:00:00 2001 From: James Allen Date: Fri, 9 Aug 2013 13:27:35 +0100 Subject: [PATCH] Pop and set history in batches for speed --- .../app/coffee/ConversionManager.coffee | 106 +++++++++--------- services/track-changes/compressHistory.coffee | 8 +- 2 files changed, 57 insertions(+), 57 deletions(-) diff --git a/services/track-changes/app/coffee/ConversionManager.coffee b/services/track-changes/app/coffee/ConversionManager.coffee index fef745b220..5dca40a476 100644 --- a/services/track-changes/app/coffee/ConversionManager.coffee +++ b/services/track-changes/app/coffee/ConversionManager.coffee @@ -4,70 +4,70 @@ ConcatManager = require "./ConcatManager" module.exports = ConversionManager = OPS_TO_LEAVE: 10 - removeLatestCompressedUpdate: (doc_id, callback = (error) ->) -> - db.docHistory.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: 1 } }, callback - - getLatestCompressedUpdate: (doc_id, callback = (error) ->) -> - db.docHistory.find { doc_id: ObjectId(doc_id) }, { docOps: { $slice: -1 } }, (error, history) -> + popLatestCompressedUpdate: (doc_id, callback = (error, update) ->) -> + db.docHistory.findAndModify + query: { doc_id: ObjectId(doc_id) } + fields: { docOps: { $slice: -1 } } + update: { $pop: { docOps: 1 } } + , (error, history = { docOps: [] }) -> return callback(error) if error? - history = history[0] or { docOps: [] } - callback null, history.docOps.slice(-1)[0] + callback null, history.docOps[0] insertCompressedUpdates: (doc_id, updates, callback = (error) ->) -> db.docHistory.update { doc_id: ObjectId(doc_id) }, { $push: { docOps: { $each: updates } } }, { upsert: true }, callback - trimLastRawUpdate: (doc_id, tailVersion, callback = (error) ->) -> - db.docOps.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: -1 }, $set: { tailVersion: tailVersion + 1 } }, callback - - getLastRawUpdateAndVersion: (doc_id, callback = (error, update, currentVersion, tailVersion) ->) -> - db.docOps.find { doc_id: ObjectId(doc_id) }, { version: true, tailVersion: true, docOps: { $slice: 1 } }, (error, docs) -> + popOldRawUpdates: (doc_id, callback = (error, updates) ->) -> + db.docOps.find { doc_id: ObjectId(doc_id) }, { version: true, tailVersion: true }, (error, docs) -> return callback(error) if error? return callback(new Error("doc not found")) if docs.length == 0 doc = docs[0] - callback null, doc.docOps[0], doc.version, doc.tailVersion or 0 - - convertOldestRawUpdate: (doc_id, callback = (error, converted) ->) -> - ConversionManager.getLastRawUpdateAndVersion doc_id, (error, rawUpdate, currentVersion, tailVersion) -> - return callback(error) if error? - + currentVersion = doc.version + tailVersion = doc.tailVersion or 0 if currentVersion - tailVersion > ConversionManager.OPS_TO_LEAVE - rawUpdates = ConcatManager.normalizeUpdate(rawUpdate) - ConversionManager.getLatestCompressedUpdate doc_id, (error, lastCompressedUpdate) -> + db.docOps.findAndModify + query: { doc_id: ObjectId(doc_id), version: currentVersion } + update: { + $push: { docOps: { $each: [], $slice: - ConversionManager.OPS_TO_LEAVE } } + $set: tailVersion: currentVersion - ConversionManager.OPS_TO_LEAVE, + } + fields: { docOps: $slice: currentVersion - tailVersion - ConversionManager.OPS_TO_LEAVE } + , (error, doc) -> return callback(error) if error? - - removeAndModifyPreviousCompressedUpdate = (callback, compressedUpdates) -> - if lastCompressedUpdate? - compressedUpdates = [lastCompressedUpdate] - for rawUpdate in rawUpdates - lastCompressedUpdate = compressedUpdates.pop() - if lastCompressedUpdate? - compressedUpdates = compressedUpdates.concat ConcatManager.concatTwoUpdates lastCompressedUpdate, rawUpdate - else - compressedUpdates.push rawUpdate - ConversionManager.removeLatestCompressedUpdate doc_id, (error) -> - return callback(error) if error? - callback null, compressedUpdates - else - callback null, rawUpdates + if !doc? + # Version was modified since so try again + return ConversionManager.popOldRawUpdates doc_id, callback + else + return callback null, doc.docOps - removeAndModifyPreviousCompressedUpdate (error, newCompressedUpdates) -> - return callback(error) if error? - ConversionManager.insertCompressedUpdates doc_id, newCompressedUpdates, (error) -> - return callback(error) if error? - ConversionManager.trimLastRawUpdate doc_id, tailVersion, (error) -> - return callback(error) if error? - console.log doc_id, "Pushed op", tailVersion - callback null, true - else - console.log doc_id, "Up to date" - callback null, false + callback null, [] - convertAllOldRawUpdates: (doc_id, callback = (error) ->) -> - ConversionManager.convertOldestRawUpdate doc_id, (error, converted) -> + convertOldRawUpdates: (doc_id, callback = (error) ->) -> + ConversionManager.popOldRawUpdates doc_id, (error, rawUpdates) -> return callback(error) if error? - if converted - # Keep going - ConversionManager.convertAllOldRawUpdates doc_id, callback - else - callback() + + length = rawUpdates.length + + normalizedRawUpdates = [] + for rawUpdate in rawUpdates + normalizedRawUpdates = normalizedRawUpdates.concat ConcatManager.normalizeUpdate(rawUpdate) + rawUpdates = normalizedRawUpdates + + ConversionManager.popLatestCompressedUpdate doc_id, (error, lastCompressedUpdate) -> + return callback(error) if error? + + if !lastCompressedUpdate? + lastCompressedUpdate = rawUpdates.shift() + + compressedUpdates = [lastCompressedUpdate] + for rawUpdate in rawUpdates + lastCompressedUpdate = compressedUpdates.pop() + if lastCompressedUpdate? + compressedUpdates = compressedUpdates.concat ConcatManager.concatTwoUpdates lastCompressedUpdate, rawUpdate + else + compressedUpdates.push rawUpdate + ConversionManager.insertCompressedUpdates doc_id, compressedUpdates, (error) -> + return callback(error) if error? + console.log doc_id, "Pushed doc ops", length + callback null, true + diff --git a/services/track-changes/compressHistory.coffee b/services/track-changes/compressHistory.coffee index 95679a4dcd..c76988526f 100644 --- a/services/track-changes/compressHistory.coffee +++ b/services/track-changes/compressHistory.coffee @@ -9,7 +9,7 @@ db.docOps.find { }, { doc_id: true }, (error, docs) -> do (doc) -> jobs.push (callback) -> doc_id = doc.doc_id.toString() - ConversionManager.convertAllOldRawUpdates doc_id, (error) -> + ConversionManager.convertOldRawUpdates doc_id, (error) -> return callback(error) if error? console.log doc_id, "DONE" db.docHistory.find { doc_id: ObjectId(doc_id) }, (error, docs) -> @@ -17,10 +17,10 @@ db.docOps.find { }, { doc_id: true }, (error, docs) -> doc = docs[0] if doc? for update in doc.docOps - op = update.op[0] - if op.i? + op = update?.op[0] + if op?.i? console.log doc_id, update.meta.start_ts, update.meta.end_ts, update.meta.user_id, "INSERT", op.p, op.i - else if op.d? + else if op?.d? console.log doc_id, update.meta.start_ts, update.meta.end_ts, update.meta.user_id, "DELETE", op.p, op.d else console.log doc_id, "NO HISTORY"