mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-09 19:39:06 +00:00
Merge pull request #3 from sharelatex/autopack
Move pack migration logic into PackManager
This commit is contained in:
commit
bbbe2077f0
4 changed files with 155 additions and 98 deletions
|
@ -29,12 +29,19 @@ module.exports = MongoManager =
|
|||
else
|
||||
callback null, null
|
||||
|
||||
insertCompressedUpdates: (project_id, doc_id, updates, permanent, callback = (error) ->) ->
|
||||
insertCompressedUpdates: (project_id, doc_id, updates, temporary, callback = (error) ->) ->
|
||||
jobs = []
|
||||
for update in updates
|
||||
do (update) ->
|
||||
jobs.push (callback) -> MongoManager.insertCompressedUpdate project_id, doc_id, update, permanent, callback
|
||||
async.series jobs, callback
|
||||
jobs.push (callback) -> MongoManager.insertCompressedUpdate project_id, doc_id, update, temporary, callback
|
||||
async.series jobs, (err, results) ->
|
||||
if not temporary
|
||||
# keep track of updates to be packed
|
||||
db.docHistoryStats.update {doc_id:ObjectId(doc_id)}, {$inc:{updates:updates.length}}, {upsert:true}, () ->
|
||||
callback(err,results)
|
||||
else
|
||||
callback(err,results)
|
||||
|
||||
|
||||
insertCompressedUpdate: (project_id, doc_id, update, temporary, callback = (error) ->) ->
|
||||
update = {
|
||||
|
@ -113,3 +120,6 @@ module.exports = MongoManager =
|
|||
db.projectHistoryMetaData.ensureIndex { project_id: 1 }, { background: true }
|
||||
# TTL index for auto deleting week old temporary ops
|
||||
db.docHistory.ensureIndex { expiresAt: 1 }, { expireAfterSeconds: 0, background: true }
|
||||
# For finding documents which need packing
|
||||
db.docHistoryStats.ensureIndex { doc_id: 1 }, { background: true }
|
||||
db.docHistoryStats.ensureIndex { updates: -1, doc_id: 1 }, { background: true }
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
async = require "async"
|
||||
_ = require "underscore"
|
||||
{db, ObjectId} = require "./mongojs"
|
||||
BSON=db.bson.BSON
|
||||
|
||||
module.exports = PackManager =
|
||||
# The following functions implement methods like a mongo find, but
|
||||
|
@ -249,3 +251,122 @@ module.exports = PackManager =
|
|||
return true
|
||||
newResults.slice(0, limit) if limit?
|
||||
return newResults
|
||||
|
||||
convertDocsToPacks: (docs, callback) ->
|
||||
MAX_SIZE = 1024*1024 # make these configurable parameters
|
||||
MAX_COUNT = 1024
|
||||
MIN_COUNT = 100
|
||||
KEEP_OPS = 100
|
||||
packs = []
|
||||
top = null
|
||||
# keep the last KEEP_OPS as individual ops
|
||||
docs = docs.slice(0,-KEEP_OPS)
|
||||
|
||||
docs.forEach (d,i) ->
|
||||
# skip existing packs
|
||||
if d.pack?
|
||||
top = null
|
||||
return
|
||||
# skip temporary ops (we could pack these into temporary packs in future)
|
||||
if d.expiresAt?
|
||||
top = null
|
||||
return
|
||||
sz = BSON.calculateObjectSize(d)
|
||||
if top? && top.pack.length < MAX_COUNT && top.sz + sz < MAX_SIZE
|
||||
top.pack = top.pack.concat {v: d.v, meta: d.meta, op: d.op, _id: d._id}
|
||||
top.sz += sz
|
||||
top.v_end = d.v
|
||||
top.meta.end_ts = d.meta.end_ts
|
||||
return
|
||||
else if sz < MAX_SIZE
|
||||
# create a new pack
|
||||
top = _.clone(d)
|
||||
top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ]
|
||||
top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts }
|
||||
top.sz = sz
|
||||
delete top.op
|
||||
delete top._id
|
||||
packs.push top
|
||||
else
|
||||
# keep the op
|
||||
# util.log "keeping large op unchanged (#{sz} bytes)"
|
||||
|
||||
# only store packs with a sufficient number of ops, discard others
|
||||
packs = packs.filter (packObj) ->
|
||||
packObj.pack.length > MIN_COUNT
|
||||
callback(null, packs)
|
||||
|
||||
checkHistory: (docs, callback) ->
|
||||
errors = []
|
||||
prev = null
|
||||
error = (args...) ->
|
||||
errors.push args
|
||||
docs.forEach (d,i) ->
|
||||
if d.pack?
|
||||
n = d.pack.length
|
||||
last = d.pack[n-1]
|
||||
error('bad pack v_end', d) if d.v_end != last.v
|
||||
error('bad pack start_ts', d) if d.meta.start_ts != d.pack[0].meta.start_ts
|
||||
error('bad pack end_ts', d) if d.meta.end_ts != last.meta.end_ts
|
||||
d.pack.forEach (p, i) ->
|
||||
prev = v
|
||||
v = p.v
|
||||
error('bad version', v, 'in', p) if v <= prev
|
||||
#error('expired op', p, 'in pack') if p.expiresAt?
|
||||
else
|
||||
prev = v
|
||||
v = d.v
|
||||
error('bad version', v, 'in', d) if v <= prev
|
||||
if errors.length
|
||||
callback(errors)
|
||||
else
|
||||
callback()
|
||||
|
||||
insertPack: (packObj, callback) ->
|
||||
bulk = db.docHistory.initializeOrderedBulkOp()
|
||||
expect_nInserted = 1
|
||||
expect_nRemoved = packObj.pack.length
|
||||
bulk.insert packObj
|
||||
packObj.pack.forEach (op) ->
|
||||
bulk.find({_id:op._id}).removeOne()
|
||||
bulk.execute (err, result) ->
|
||||
if err?
|
||||
callback(err, result)
|
||||
else if result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved
|
||||
callback(new Error(
|
||||
msg: 'unexpected result'
|
||||
expected: {expect_nInserted, expect_nRemoved}
|
||||
), result)
|
||||
else
|
||||
callback(err, result)
|
||||
|
||||
deleteExpiredPackOps: (docs, callback) ->
|
||||
now = Date.now()
|
||||
toRemove = []
|
||||
toUpdate = []
|
||||
docs.forEach (d,i) ->
|
||||
if d.pack?
|
||||
newPack = d.pack.filter (op) ->
|
||||
if op.expiresAt? then op.expiresAt > now else true
|
||||
if newPack.length == 0
|
||||
toRemove.push d
|
||||
else if newPack.length < d.pack.length
|
||||
# adjust the pack properties
|
||||
d.pack = newPack
|
||||
first = d.pack[0]
|
||||
last = d.pack[d.pack.length - 1]
|
||||
d.v_end = last.v
|
||||
d.meta.start_ts = first.meta.start_ts
|
||||
d.meta.end_ts = last.meta.end_ts
|
||||
toUpdate.push d
|
||||
if toRemove.length or toUpdate.length
|
||||
bulk = db.docHistory.initializeOrderedBulkOp()
|
||||
toRemove.forEach (pack) ->
|
||||
console.log "would remove", pack
|
||||
#bulk.find({_id:pack._id}).removeOne()
|
||||
toUpdate.forEach (pack) ->
|
||||
console.log "would update", pack
|
||||
#bulk.find({_id:pack._id}).updateOne(pack);
|
||||
bulk.execute callback
|
||||
else
|
||||
callback()
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
Settings = require "settings-sharelatex"
|
||||
mongojs = require "mongojs"
|
||||
db = mongojs.connect(Settings.mongo.url, ["docHistory", "projectHistoryMetaData"])
|
||||
db = mongojs.connect(Settings.mongo.url, ["docHistory", "projectHistoryMetaData", "docHistoryStats"])
|
||||
module.exports =
|
||||
db: db
|
||||
ObjectId: mongojs.ObjectId
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
Settings = require "settings-sharelatex"
|
||||
fs = require("fs")
|
||||
mongojs = require("mongojs")
|
||||
ObjectId = mongojs.ObjectId
|
||||
db = mongojs(Settings.mongo.url, ['docHistory'])
|
||||
{db, ObjectId} = require "./app/coffee/mongojs"
|
||||
async = require("async")
|
||||
BSON=db.bson.BSON
|
||||
util = require 'util'
|
||||
_ = require 'underscore'
|
||||
PackManager = require "./app/coffee/PackManager.coffee"
|
||||
|
||||
lineReader = require "line-reader"
|
||||
cli = require "cli"
|
||||
|
@ -15,11 +14,6 @@ options = cli.parse {
|
|||
'fast': [false, 'no delays on writes']
|
||||
}
|
||||
|
||||
MAX_SIZE = 1024*1024
|
||||
MAX_COUNT = 1024
|
||||
MIN_COUNT = 100
|
||||
KEEP_OPS = 100
|
||||
|
||||
DB_WRITE_DELAY = if options.fast then 0 else 2000
|
||||
DOCUMENT_PACK_DELAY = if options.fast then 0 else 1000
|
||||
|
||||
|
@ -28,7 +22,7 @@ packDocHistory = (doc_id, callback) ->
|
|||
getDocHistory doc_id, (err, docs) ->
|
||||
return callback(err) if err?
|
||||
origDocs = docs.length
|
||||
convertDocsToPacks docs, (err, packs) ->
|
||||
PackManager.convertDocsToPacks docs, (err, packs) ->
|
||||
return callback(err) if err?
|
||||
util.log "docs #{origDocs} packs #{packs.length}"
|
||||
if packs.length
|
||||
|
@ -45,103 +39,34 @@ packDocHistory = (doc_id, callback) ->
|
|||
|
||||
# retrieve document ops/packs and check them
|
||||
getDocHistory = (doc_id, callback) ->
|
||||
db.docHistory.find({doc_id:mongojs.ObjectId(doc_id)}).sort {v:1}, (err, docs) ->
|
||||
db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) ->
|
||||
return callback(err) if err?
|
||||
# for safety, do a consistency check of the history
|
||||
checkHistory doc_id, docs, (err) ->
|
||||
util.log "checking history for #{doc_id}"
|
||||
PackManager.checkHistory docs, (err) ->
|
||||
return callback(err) if err?
|
||||
callback err, docs
|
||||
callback(err, docs)
|
||||
#PackManager.deleteExpiredPackOps docs, (err) ->
|
||||
# return callback(err) if err?
|
||||
# callback err, docs
|
||||
|
||||
convertDocsToPacks = (docs, callback) ->
|
||||
packs = []
|
||||
top = null
|
||||
# keep the last KEEP_OPS as individual ops
|
||||
docs = docs.slice(0,-KEEP_OPS)
|
||||
|
||||
docs.forEach (d,i) ->
|
||||
if d.pack?
|
||||
util.log "skipping existing pack of #{d.pack.length}"
|
||||
top = null if top? # flush the current pack
|
||||
return # and try next
|
||||
sz = BSON.calculateObjectSize(d)
|
||||
if top? && top.pack.length < MAX_COUNT && top.sz + sz < MAX_SIZE
|
||||
top.pack = top.pack.concat {v: d.v, meta: d.meta, op: d.op, _id: d._id}
|
||||
top.sz += sz
|
||||
top.v_end = d.v
|
||||
top.meta.end_ts = d.meta.end_ts
|
||||
return
|
||||
else if sz < MAX_SIZE
|
||||
# create a new pack
|
||||
top = _.clone(d)
|
||||
top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ]
|
||||
top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts }
|
||||
top.sz = sz
|
||||
delete top.op
|
||||
delete top._id
|
||||
packs.push top
|
||||
else
|
||||
# keep the op
|
||||
util.log "keeping large op unchanged (#{sz} bytes)"
|
||||
|
||||
# only store packs with a sufficient number of ops, discard others
|
||||
packs = packs.filter (packObj) ->
|
||||
packObj.pack.length > MIN_COUNT
|
||||
callback(null, packs)
|
||||
safeInsert = (packObj, callback) ->
|
||||
if shutdownRequested
|
||||
return callback('shutdown')
|
||||
PackManager.insertPack packObj, (err, result) ->
|
||||
setTimeout () ->
|
||||
callback(err,result)
|
||||
, DB_WRITE_DELAY
|
||||
|
||||
savePacks = (packs, callback) ->
|
||||
async.eachSeries packs, insertPack, (err, result) ->
|
||||
async.eachSeries packs, safeInsert, (err, result) ->
|
||||
if err?
|
||||
console.log err
|
||||
util.log "error writing packs"
|
||||
callback err, result
|
||||
else
|
||||
util.log "done writing packs"
|
||||
callback()
|
||||
|
||||
insertPack = (packObj, callback) ->
|
||||
if shutdownRequested
|
||||
return callback('shutdown')
|
||||
bulk = db.docHistory.initializeOrderedBulkOp();
|
||||
expect_nInserted = 1
|
||||
expect_nRemoved = packObj.pack.length
|
||||
util.log "insert #{expect_nInserted} pack, remove #{expect_nRemoved} ops"
|
||||
bulk.insert packObj
|
||||
packObj.pack.forEach (op) ->
|
||||
bulk.find({_id:op._id}).removeOne()
|
||||
bulk.execute (err, result) ->
|
||||
if err? or result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved
|
||||
console.log err, result
|
||||
console.log 'nInserted', result.nInserted, 'nRemoved', result.nRemoved
|
||||
setTimeout () ->
|
||||
callback(err, result)
|
||||
, DB_WRITE_DELAY
|
||||
|
||||
checkHistory = (doc_id, docs, callback) ->
|
||||
util.log "checking history for #{doc_id}"
|
||||
errors = 0
|
||||
prev = null
|
||||
error = (args...) ->
|
||||
errors++
|
||||
console.log.apply(null, args)
|
||||
docs.forEach (d,i) ->
|
||||
if d.pack?
|
||||
n = d.pack.length
|
||||
last = d.pack[n-1]
|
||||
error('bad pack v_end', d) if d.v_end != last.v
|
||||
error('bad pack start_ts', d) if d.meta.start_ts != d.pack[0].meta.start_ts
|
||||
error('bad pack end_ts', d) if d.meta.end_ts != last.meta.end_ts
|
||||
d.pack.forEach (p, i) ->
|
||||
prev = v
|
||||
v = p.v
|
||||
error('bad version', v, 'in', p) if v <= prev
|
||||
else
|
||||
prev = v
|
||||
v = d.v
|
||||
error('bad version', v, 'in', d) if v <= prev
|
||||
if errors
|
||||
callback({errcount: errors})
|
||||
else
|
||||
callback()
|
||||
|
||||
readFile = (file, callback) ->
|
||||
ids = []
|
||||
lineReader.eachLine file, (line) ->
|
||||
|
@ -168,6 +93,7 @@ readFile todoFile, (err, todo) ->
|
|||
async.eachSeries pending, (doc_id, callback) ->
|
||||
packDocHistory doc_id, (err, result) ->
|
||||
if err?
|
||||
console.log "ERROR:", err, result
|
||||
return callback(err)
|
||||
else if not options['dry-run']
|
||||
fs.appendFileSync doneFile, doc_id + '\n'
|
||||
|
|
Loading…
Add table
Reference in a new issue