diff --git a/services/track-changes/app/coffee/MongoManager.coffee b/services/track-changes/app/coffee/MongoManager.coffee index 494ed04b6e..efcefe9286 100644 --- a/services/track-changes/app/coffee/MongoManager.coffee +++ b/services/track-changes/app/coffee/MongoManager.coffee @@ -53,25 +53,58 @@ module.exports = MongoManager = # may need to roll over a pack here if we are inserting packs db.docHistory.insert update, callback + # The following function implements a method like a mongo find, but + # which expands any documents containing a 'pack' field into + # multiple values + # + # e.g. a single update looks like + # + # { + # "doc_id" : 549dae9e0a2a615c0c7f0c98, + # "project_id" : 549dae9c0a2a615c0c7f0c8c, + # "op" : [ {"p" : 6981, "d" : "?" } ], + # "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 }, + # "v" : 17082 + # } + # + # and a pack looks like this + # + # { + # "doc_id" : 549dae9e0a2a615c0c7f0c98, + # "project_id" : 549dae9c0a2a615c0c7f0c8c, + # "pack" : [ D1, D2, D3, ....], + # "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 }, + # "v" : 17082 + # } + # + # where D1, D2, D3, .... are single updates stripped of their + # doc_id and project_id fields (which are the same for all the + # updates in the pack). The meta and v fields of the pack itself + # are those of the first entry in the pack D1 (this makes it + # possible to treat packs and single updates in the same way). + _findResults: (param, query, limit, callback) -> + # param - the field used to select and sort ops within a range, + # either 'v' or 'meta.end_ts' + # query - the mongo query selector, includes both the doc_id/project_id and + # the range on v or meta.end_ts + # limit - the mongo limit, we need to apply it after unpacking any + # packs + sort = {} sort[param] = -1; cursor = db.docHistory .find( query ) .sort( sort ) - + # if we have packs, we will trim the results more later after expanding them if limit? cursor.limit(limit) + # take the part of the query which selects the range over the parameter rangeQuery = query[param] - extraQuery = _.clone(query) - if rangeQuery?['$gte']? - extraQuery[param] = {'$lt' : rangeQuery['$gte']} - else if rangeQuery?['$gt'] - extraQuery[param] = {'$lte' : rangeQuery['$gt']} - else - delete extraQuery[param] + # helper function to check if an item from a pack is inside the + # desired range filterFn = (item) -> return false if rangeQuery?['$gte']? && item[param] < rangeQuery['$gte'] return false if rangeQuery?['$lte']? && item[param] > rangeQuery['$lte'] @@ -79,11 +112,28 @@ module.exports = MongoManager = return false if rangeQuery?['$gt']? && item[param] <= rangeQuery['$gt'] return true - needMore = false + # create a query which can be used to select the entries BEFORE + # the range because we sometimes need to find extra ones (when the + # boundary falls in the middle of a pack) + extraQuery = _.clone(query) + # The pack uses its first entry for its metadata and v, so the + # only queries where we might not get all the packs are those for + # $gt and $gte (i.e. we need to find packs which start before our + # range but end in it) + if rangeQuery?['$gte']? + extraQuery[param] = {'$lt' : rangeQuery['$gte']} + else if rangeQuery?['$gt'] + extraQuery[param] = {'$lte' : rangeQuery['$gt']} + else + delete extraQuery[param] + + needMore = false # keep track of whether we need to load more data + updates = [] # used to accumulate the set of results cursor.toArray (err, result) -> unpackedSet = MongoManager._unpackResults(result) - updates = unpackedSet.filter(filterFn) - updates = updates.slice(0, limit) if limit? + MongoManager._filterAndLimit(updates, unpackedSet, filterFn, limit) + # check if we need to retrieve more data, because there is a + # pack that crosses into our range last = if unpackedSet.length then unpackedSet[unpackedSet.length-1] else null if limit? && updates.length == limit needMore = false @@ -92,7 +142,7 @@ module.exports = MongoManager = else if extraQuery[param]? && updates.length == 0 needMore = true if needMore - # need an extra result set + # we do need an extra result set extra = db.docHistory .find(extraQuery) .sort(sort) @@ -101,9 +151,8 @@ module.exports = MongoManager = if err? return callback err, updates else - extraSet = MongoManager._unpackResults(result2).filter(filterFn) - updates = updates.concat extraSet - updates = updates.slice(0, limit) if limit? + extraSet = MongoManager._unpackResults(result2) + MongoManager._filterAndLimit(updates, extraSet, filterFn, limit) callback err, updates return if err? @@ -112,9 +161,9 @@ module.exports = MongoManager = callback err, updates _unpackResults: (updates) -> + # iterate over the updates, if there's a pack, expand it into ops and + # insert it into the array at that point result = [] - # iterate over the updates - # if it's a pack, expand it into ops and insert it into the array at that point updates.forEach (item) -> if item.pack? all = MongoManager._explodePackToOps item @@ -124,6 +173,7 @@ module.exports = MongoManager = return result _explodePackToOps: (packObj) -> + # convert a pack into an array of ops doc_id = packObj.doc_id project_id = packObj.project_id result = packObj.pack.map (item) -> @@ -132,6 +182,13 @@ module.exports = MongoManager = item return result.reverse() + _filterAndLimit: (results, extra, filterFn, limit) + # update results with extra docs, after filtering and limiting + filtered = extra.filter(filterFn) + newResults = results.concat filtered + newResults.slice(0, limit) if limit? + results = newResults + getDocUpdates:(doc_id, options = {}, callback = (error, updates) ->) -> query = doc_id: ObjectId(doc_id.toString())