overleaf/services/document-updater/app/coffee/DocOpsManager.coffee
2014-02-12 10:40:42 +00:00

127 lines
5 KiB
CoffeeScript

RedisManager = require "./RedisManager"
mongojs = require("./mongojs")
db = mongojs.db
ObjectId = mongojs.ObjectId
logger = require "logger-sharelatex"
async = require "async"
Metrics = require("./Metrics")
module.exports = DocOpsManager =
flushDocOpsToMongo: (project_id, doc_id, _callback = (error) ->) ->
timer = new Metrics.Timer("docOpsManager.flushDocOpsToMongo")
callback = (args...) ->
timer.done()
_callback(args...)
DocOpsManager.getDocVersionInMongo doc_id, (error, mongoVersion) ->
return callback(error) if error?
RedisManager.getDocVersion doc_id, (error, redisVersion) ->
return callback(error) if error?
if !mongoVersion? or !redisVersion? or mongoVersion > redisVersion
logger.error doc_id: doc_id, redisVersion: redisVersion, mongoVersion: mongoVersion, "mongo version is ahead of redis"
return callback(new Error("inconsistent versions"))
RedisManager.getPreviousDocOps doc_id, mongoVersion, -1, (error, ops) ->
return callback(error) if error?
if ops.length != redisVersion - mongoVersion
logger.error doc_id: doc_id, redisVersion: redisVersion, mongoVersion: mongoVersion, opsLength: ops.length, "version difference does not match ops length"
return callback(new Error("inconsistent versions"))
logger.log doc_id: doc_id, redisVersion: redisVersion, mongoVersion: mongoVersion, "flushing doc ops to mongo"
DocOpsManager._appendDocOpsInMongo doc_id, ops, redisVersion, (error) ->
return callback(error) if error?
callback null
getPreviousDocOps: (project_id, doc_id, start, end, _callback = (error, ops) ->) ->
timer = new Metrics.Timer("docOpsManager.getPreviousDocOps")
callback = (args...) ->
timer.done()
_callback(args...)
DocOpsManager._ensureOpsAreLoaded project_id, doc_id, start, (error) ->
return callback(error) if error?
RedisManager.getPreviousDocOps doc_id, start, end, (error, ops) ->
return callback(error) if error?
callback null, ops
pushDocOp: (project_id, doc_id, op, callback = (error) ->) ->
RedisManager.pushDocOp doc_id, op, callback
_ensureOpsAreLoaded: (project_id, doc_id, backToVersion, callback = (error) ->) ->
RedisManager.getDocVersion doc_id, (error, redisVersion) ->
return callback(error) if error?
RedisManager.getDocOpsLength doc_id, (error, opsLength) ->
return callback(error) if error?
oldestVersionInRedis = redisVersion - opsLength
if oldestVersionInRedis > backToVersion
# _getDocOpsFromMongo(<id>, 4, 6, ...) will return the ops in positions 4 and 5, but not 6.
logger.log doc_id: doc_id, backToVersion: backToVersion, oldestVersionInRedis: oldestVersionInRedis, "loading old ops from mongo"
DocOpsManager._getDocOpsFromMongo doc_id, backToVersion, oldestVersionInRedis, (error, ops) ->
logger.log doc_id: doc_id, backToVersion: backToVersion, oldestVersionInRedis: oldestVersionInRedis, ops: ops, "loaded old ops from mongo"
return callback(error) if error?
RedisManager.prependDocOps doc_id, ops, (error) ->
return callback(error) if error?
callback null
else
logger.log doc_id: doc_id, backToVersion: backToVersion, oldestVersionInRedis: oldestVersionInRedis, "ops already in redis"
callback()
getDocVersionInMongo: (doc_id, callback = (error, version) ->) ->
t = new Metrics.Timer("mongo-time")
db.docOps.find {
doc_id: ObjectId(doc_id)
}, {
version: 1
}, (error, docs) ->
t.done()
return callback(error) if error?
if docs.length < 1 or !docs[0].version?
return callback null, 0
else
return callback null, docs[0].version
APPEND_OPS_BATCH_SIZE: 100
_appendDocOpsInMongo: (doc_id, docOps, newVersion, callback = (error) ->) ->
currentVersion = newVersion - docOps.length
batchSize = DocOpsManager.APPEND_OPS_BATCH_SIZE
noOfBatches = Math.ceil(docOps.length / batchSize)
if noOfBatches <= 0
return callback()
jobs = []
for batchNo in [0..(noOfBatches-1)]
do (batchNo) ->
jobs.push (callback) ->
batch = docOps.slice(batchNo * batchSize, (batchNo + 1) * batchSize)
currentVersion += batch.length
logger.log doc_id: doc_id, batchNo: batchNo, "appending doc op batch to Mongo"
t = new Metrics.Timer("mongo-time")
db.docOps.update {
doc_id: ObjectId(doc_id)
}, {
$push: docOps: { $each: batch, $slice: -100 }
$set: version: currentVersion
}, {
upsert: true
}, (err)->
t.done()
callback(err)
async.series jobs, (error) -> callback(error)
_getDocOpsFromMongo: (doc_id, start, end, callback = (error, ops) ->) ->
DocOpsManager.getDocVersionInMongo doc_id, (error, version) ->
return callback(error) if error?
offset = - (version - start) # Negative tells mongo to count from the end backwards
limit = end - start
t = new Metrics.Timer("mongo-time")
db.docOps.find {
doc_id: ObjectId(doc_id)
}, {
docOps: $slice: [offset, limit]
}, (error, docs) ->
t.done()
if docs.length < 1 or !docs[0].docOps?
return callback null, []
else
return callback null, docs[0].docOps