2013-08-09 06:10:19 -04:00
|
|
|
{db, ObjectId} = require "./mongojs"
|
|
|
|
ConcatManager = require "./ConcatManager"
|
|
|
|
|
|
|
|
module.exports = ConversionManager =
|
|
|
|
OPS_TO_LEAVE: 10
|
|
|
|
|
|
|
|
removeLatestCompressedUpdate: (doc_id, callback = (error) ->) ->
|
|
|
|
db.docHistory.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: 1 } }, callback
|
|
|
|
|
|
|
|
getLatestCompressedUpdate: (doc_id, callback = (error) ->) ->
|
|
|
|
db.docHistory.find { doc_id: ObjectId(doc_id) }, { docOps: { $slice: -1 } }, (error, history) ->
|
|
|
|
return callback(error) if error?
|
|
|
|
history = history[0] or { docOps: [] }
|
|
|
|
callback null, history.docOps.slice(-1)[0]
|
|
|
|
|
|
|
|
insertCompressedUpdates: (doc_id, updates, callback = (error) ->) ->
|
|
|
|
db.docHistory.update { doc_id: ObjectId(doc_id) }, { $push: { docOps: { $each: updates } } }, { upsert: true }, callback
|
|
|
|
|
|
|
|
trimLastRawUpdate: (doc_id, tailVersion, callback = (error) ->) ->
|
|
|
|
db.docOps.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: -1 }, $set: { tailVersion: tailVersion + 1 } }, callback
|
|
|
|
|
|
|
|
getLastRawUpdateAndVersion: (doc_id, callback = (error, update, currentVersion, tailVersion) ->) ->
|
|
|
|
db.docOps.find { doc_id: ObjectId(doc_id) }, { version: true, tailVersion: true, docOps: { $slice: 1 } }, (error, docs) ->
|
|
|
|
return callback(error) if error?
|
|
|
|
return callback(new Error("doc not found")) if docs.length == 0
|
|
|
|
doc = docs[0]
|
|
|
|
callback null, doc.docOps[0], doc.version, doc.tailVersion or 0
|
|
|
|
|
|
|
|
convertOldestRawUpdate: (doc_id, callback = (error, converted) ->) ->
|
|
|
|
ConversionManager.getLastRawUpdateAndVersion doc_id, (error, rawUpdate, currentVersion, tailVersion) ->
|
|
|
|
return callback(error) if error?
|
|
|
|
|
2013-08-09 06:36:19 -04:00
|
|
|
if currentVersion - tailVersion > ConversionManager.OPS_TO_LEAVE
|
|
|
|
rawUpdates = ConcatManager.normalizeUpdate(rawUpdate)
|
|
|
|
ConversionManager.getLatestCompressedUpdate doc_id, (error, lastCompressedUpdate) ->
|
2013-08-09 06:10:19 -04:00
|
|
|
return callback(error) if error?
|
|
|
|
|
|
|
|
removeAndModifyPreviousCompressedUpdate = (callback, compressedUpdates) ->
|
|
|
|
if lastCompressedUpdate?
|
|
|
|
compressedUpdates = [lastCompressedUpdate]
|
|
|
|
for rawUpdate in rawUpdates
|
|
|
|
lastCompressedUpdate = compressedUpdates.pop()
|
2013-08-09 07:36:17 -04:00
|
|
|
if lastCompressedUpdate?
|
|
|
|
compressedUpdates = compressedUpdates.concat ConcatManager.concatTwoUpdates lastCompressedUpdate, rawUpdate
|
|
|
|
else
|
|
|
|
compressedUpdates.push rawUpdate
|
2013-08-09 06:10:19 -04:00
|
|
|
ConversionManager.removeLatestCompressedUpdate doc_id, (error) ->
|
|
|
|
return callback(error) if error?
|
2013-08-09 06:36:19 -04:00
|
|
|
callback null, compressedUpdates
|
2013-08-09 06:10:19 -04:00
|
|
|
else
|
|
|
|
callback null, rawUpdates
|
|
|
|
|
|
|
|
removeAndModifyPreviousCompressedUpdate (error, newCompressedUpdates) ->
|
|
|
|
return callback(error) if error?
|
|
|
|
ConversionManager.insertCompressedUpdates doc_id, newCompressedUpdates, (error) ->
|
|
|
|
return callback(error) if error?
|
|
|
|
ConversionManager.trimLastRawUpdate doc_id, tailVersion, (error) ->
|
|
|
|
return callback(error) if error?
|
2013-08-09 06:36:19 -04:00
|
|
|
console.log doc_id, "Pushed op", tailVersion
|
2013-08-09 06:10:19 -04:00
|
|
|
callback null, true
|
|
|
|
|
|
|
|
else
|
2013-08-09 06:36:19 -04:00
|
|
|
console.log doc_id, "Up to date"
|
2013-08-09 06:10:19 -04:00
|
|
|
callback null, false
|
2013-08-09 06:36:19 -04:00
|
|
|
|
|
|
|
convertAllOldRawUpdates: (doc_id, callback = (error) ->) ->
|
|
|
|
ConversionManager.convertOldestRawUpdate doc_id, (error, converted) ->
|
|
|
|
return callback(error) if error?
|
|
|
|
if converted
|
|
|
|
# Keep going
|
|
|
|
ConversionManager.convertAllOldRawUpdates doc_id, callback
|
|
|
|
else
|
|
|
|
callback()
|