overleaf/services/track-changes/app/js/PackManager.js

1252 lines
34 KiB
JavaScript
Raw Normal View History

/* eslint-disable
camelcase,
no-unused-vars,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS102: Remove unnecessary code created because of implicit returns
* DS205: Consider reworking code to avoid use of IIFEs
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let PackManager
const async = require('async')
const _ = require('underscore')
2020-09-10 09:23:04 -04:00
const Bson = require('bson')
const BSON = new Bson()
const { db, ObjectId } = require('./mongodb')
const logger = require('@overleaf/logger')
const LockManager = require('./LockManager')
const MongoAWS = require('./MongoAWS')
const Metrics = require('@overleaf/metrics')
const ProjectIterator = require('./ProjectIterator')
const DocIterator = require('./DocIterator')
const Settings = require('@overleaf/settings')
const util = require('util')
const keys = Settings.redis.lock.key_schema
// Sharejs operations are stored in a 'pack' object
//
// e.g. a single sharejs update looks like
//
// {
// "doc_id" : 549dae9e0a2a615c0c7f0c98,
// "project_id" : 549dae9c0a2a615c0c7f0c8c,
// "op" : [ {"p" : 6981, "d" : "?" } ],
// "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 },
// "v" : 17082
// }
//
// and a pack looks like this
//
// {
// "doc_id" : 549dae9e0a2a615c0c7f0c98,
// "project_id" : 549dae9c0a2a615c0c7f0c8c,
// "pack" : [ U1, U2, U3, ...., UN],
// "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 },
// "v" : 17082
// "v_end" : ...
// }
//
// where U1, U2, U3, .... are single updates stripped of their
// doc_id and project_id fields (which are the same for all the
// updates in the pack).
//
// The pack itself has v and meta fields, this makes it possible to
// treat packs and single updates in a similar way.
//
// The v field of the pack itself is from the first entry U1, the
// v_end field from UN. The meta.end_ts field of the pack itself is
// from the last entry UN, the meta.start_ts field from U1.
const DAYS = 24 * 3600 * 1000 // one day in milliseconds
module.exports = PackManager = {
MAX_SIZE: 1024 * 1024, // make these configurable parameters
MAX_COUNT: 1024,
insertCompressedUpdates(
project_id,
doc_id,
lastUpdate,
newUpdates,
temporary,
callback
) {
if (callback == null) {
callback = function () {}
}
if (newUpdates.length === 0) {
return callback()
}
// never append permanent ops to a pack that will expire
if (
(lastUpdate != null ? lastUpdate.expiresAt : undefined) != null &&
!temporary
) {
lastUpdate = null
}
const updatesToFlush = []
const updatesRemaining = newUpdates.slice()
let n = (lastUpdate != null ? lastUpdate.n : undefined) || 0
let sz = (lastUpdate != null ? lastUpdate.sz : undefined) || 0
while (
updatesRemaining.length &&
n < PackManager.MAX_COUNT &&
sz < PackManager.MAX_SIZE
) {
const nextUpdate = updatesRemaining[0]
const nextUpdateSize = BSON.calculateObjectSize(nextUpdate)
if (nextUpdateSize + sz > PackManager.MAX_SIZE && n > 0) {
break
}
n++
sz += nextUpdateSize
updatesToFlush.push(updatesRemaining.shift())
}
return PackManager.flushCompressedUpdates(
project_id,
doc_id,
lastUpdate,
updatesToFlush,
temporary,
2020-06-04 04:24:21 -04:00
function (error) {
if (error != null) {
return callback(error)
}
return PackManager.insertCompressedUpdates(
project_id,
doc_id,
null,
updatesRemaining,
temporary,
callback
)
}
)
},
flushCompressedUpdates(
project_id,
doc_id,
lastUpdate,
newUpdates,
temporary,
callback
) {
if (callback == null) {
callback = function () {}
}
if (newUpdates.length === 0) {
return callback()
}
let canAppend = false
// check if it is safe to append to an existing pack
if (lastUpdate != null) {
if (!temporary && lastUpdate.expiresAt == null) {
// permanent pack appends to permanent pack
canAppend = true
}
const age =
Date.now() -
(lastUpdate.meta != null ? lastUpdate.meta.start_ts : undefined)
if (temporary && lastUpdate.expiresAt != null && age < 1 * DAYS) {
// temporary pack appends to temporary pack if same day
canAppend = true
}
}
if (canAppend) {
return PackManager.appendUpdatesToExistingPack(
project_id,
doc_id,
lastUpdate,
newUpdates,
temporary,
callback
)
} else {
return PackManager.insertUpdatesIntoNewPack(
project_id,
doc_id,
newUpdates,
temporary,
callback
)
}
},
insertUpdatesIntoNewPack(
project_id,
doc_id,
newUpdates,
temporary,
callback
) {
if (callback == null) {
callback = function () {}
}
const first = newUpdates[0]
const last = newUpdates[newUpdates.length - 1]
const n = newUpdates.length
const sz = BSON.calculateObjectSize(newUpdates)
const newPack = {
project_id: ObjectId(project_id.toString()),
doc_id: ObjectId(doc_id.toString()),
pack: newUpdates,
n,
sz,
meta: {
start_ts: first.meta.start_ts,
2021-07-13 07:04:43 -04:00
end_ts: last.meta.end_ts,
},
v: first.v,
v_end: last.v,
2021-07-13 07:04:43 -04:00
temporary,
}
if (temporary) {
newPack.expiresAt = new Date(Date.now() + 7 * DAYS)
newPack.last_checked = new Date(Date.now() + 30 * DAYS) // never check temporary packs
}
logger.debug(
{ project_id, doc_id, newUpdates },
'inserting updates into new pack'
)
return db.docHistory.insertOne(newPack, function (err) {
if (err != null) {
return callback(err)
}
Metrics.inc(`insert-pack-${temporary ? 'temporary' : 'permanent'}`)
if (temporary) {
return callback()
} else {
return PackManager.updateIndex(project_id, doc_id, callback)
}
})
},
appendUpdatesToExistingPack(
project_id,
doc_id,
lastUpdate,
newUpdates,
temporary,
callback
) {
if (callback == null) {
callback = function () {}
}
const first = newUpdates[0]
const last = newUpdates[newUpdates.length - 1]
const n = newUpdates.length
const sz = BSON.calculateObjectSize(newUpdates)
const query = {
_id: lastUpdate._id,
project_id: ObjectId(project_id.toString()),
doc_id: ObjectId(doc_id.toString()),
2021-07-13 07:04:43 -04:00
pack: { $exists: true },
}
const update = {
$push: {
2021-07-13 07:04:43 -04:00
pack: { $each: newUpdates },
},
$inc: {
n,
sz,
},
$set: {
'meta.end_ts': last.meta.end_ts,
2021-07-13 07:04:43 -04:00
v_end: last.v,
},
}
if (lastUpdate.expiresAt && temporary) {
update.$set.expiresAt = new Date(Date.now() + 7 * DAYS)
}
logger.debug(
{ project_id, doc_id, lastUpdate, newUpdates },
'appending updates to existing pack'
)
Metrics.inc(`append-pack-${temporary ? 'temporary' : 'permanent'}`)
return db.docHistory.updateOne(query, update, callback)
},
// Retrieve all changes for a document
getOpsByVersionRange(project_id, doc_id, fromVersion, toVersion, callback) {
if (callback == null) {
callback = function () {}
}
return PackManager.loadPacksByVersionRange(
project_id,
doc_id,
fromVersion,
toVersion,
2020-06-04 04:24:21 -04:00
function (error) {
if (error) return callback(error)
const query = { doc_id: ObjectId(doc_id.toString()) }
if (toVersion != null) {
query.v = { $lte: toVersion }
}
if (fromVersion != null) {
query.v_end = { $gte: fromVersion }
}
// console.log "query:", query
2020-06-04 04:24:21 -04:00
return db.docHistory
.find(query)
.sort({ v: -1 })
.toArray(function (err, result) {
2020-06-04 04:24:21 -04:00
if (err != null) {
return callback(err)
}
2020-06-04 04:24:21 -04:00
// console.log "getOpsByVersionRange:", err, result
const updates = []
const opInRange = function (op, from, to) {
if (fromVersion != null && op.v < fromVersion) {
return false
}
if (toVersion != null && op.v > toVersion) {
return false
}
return true
}
2020-06-04 04:24:21 -04:00
for (const docHistory of Array.from(result)) {
// console.log 'adding', docHistory.pack
for (const op of Array.from(docHistory.pack.reverse())) {
if (opInRange(op, fromVersion, toVersion)) {
op.project_id = docHistory.project_id
op.doc_id = docHistory.doc_id
// console.log "added op", op.v, fromVersion, toVersion
updates.push(op)
}
}
}
2020-06-04 04:24:21 -04:00
return callback(null, updates)
})
}
)
},
loadPacksByVersionRange(
project_id,
doc_id,
fromVersion,
toVersion,
callback
) {
2020-06-04 04:24:21 -04:00
return PackManager.getIndex(doc_id, function (err, indexResult) {
let pack
if (err != null) {
return callback(err)
}
const indexPacks =
(indexResult != null ? indexResult.packs : undefined) || []
2020-06-04 04:24:21 -04:00
const packInRange = function (pack, from, to) {
if (fromVersion != null && pack.v_end < fromVersion) {
return false
}
if (toVersion != null && pack.v > toVersion) {
return false
}
return true
}
const neededIds = (() => {
const result = []
for (pack of Array.from(indexPacks)) {
if (packInRange(pack, fromVersion, toVersion)) {
result.push(pack._id)
}
}
return result
})()
if (neededIds.length) {
return PackManager.fetchPacksIfNeeded(
project_id,
doc_id,
neededIds,
callback
)
} else {
return callback()
}
})
},
fetchPacksIfNeeded(project_id, doc_id, pack_ids, callback) {
let id
return db.docHistory
.find(
{ _id: { $in: pack_ids.map(ObjectId) } },
{ projection: { _id: 1 } }
)
.toArray(function (err, loadedPacks) {
if (err != null) {
return callback(err)
}
const allPackIds = (() => {
const result1 = []
for (id of Array.from(pack_ids)) {
result1.push(id.toString())
}
return result1
})()
2021-07-13 07:04:43 -04:00
const loadedPackIds = Array.from(loadedPacks).map(pack =>
pack._id.toString()
)
const packIdsToFetch = _.difference(allPackIds, loadedPackIds)
logger.debug(
{ project_id, doc_id, loadedPackIds, allPackIds, packIdsToFetch },
'analysed packs'
)
if (packIdsToFetch.length === 0) {
return callback()
}
return async.eachLimit(
packIdsToFetch,
4,
(pack_id, cb) =>
MongoAWS.unArchivePack(project_id, doc_id, pack_id, cb),
2020-06-04 04:24:21 -04:00
function (err) {
if (err != null) {
return callback(err)
}
logger.debug({ project_id, doc_id }, 'done unarchiving')
return callback()
}
)
})
},
findAllDocsInProject(project_id, callback) {
const docIdSet = new Set()
async.series(
[
cb => {
db.docHistory
.find(
{ project_id: ObjectId(project_id) },
{ projection: { pack: false } }
)
.toArray((err, packs) => {
if (err) return callback(err)
packs.forEach(pack => {
docIdSet.add(pack.doc_id.toString())
})
return cb()
})
},
cb => {
db.docHistoryIndex
.find({ project_id: ObjectId(project_id) })
.toArray((err, indexes) => {
if (err) return callback(err)
indexes.forEach(index => {
docIdSet.add(index._id.toString())
})
return cb()
})
},
],
err => {
if (err) return callback(err)
callback(null, [...docIdSet])
}
)
},
// rewrite any query using doc_id to use _id instead
// (because docHistoryIndex uses the doc_id)
_rewriteQueryForIndex(query) {
const indexQuery = _.omit(query, 'doc_id')
if ('doc_id' in query) {
indexQuery._id = query.doc_id
}
return indexQuery
},
// Retrieve all changes across a project
_findPacks(query, sortKeys, callback) {
// get all the docHistory Entries
return db.docHistory
.find(query, { projection: { pack: false } })
.sort(sortKeys)
.toArray(function (err, packs) {
let pack
if (err != null) {
return callback(err)
}
const allPacks = []
const seenIds = {}
for (pack of Array.from(packs)) {
allPacks.push(pack)
seenIds[pack._id] = true
}
const indexQuery = PackManager._rewriteQueryForIndex(query)
return db.docHistoryIndex
.find(indexQuery)
.toArray(function (err, indexes) {
if (err != null) {
return callback(err)
}
for (const index of Array.from(indexes)) {
for (pack of Array.from(index.packs)) {
if (!seenIds[pack._id]) {
pack.project_id = index.project_id
pack.doc_id = index._id
pack.fromIndex = true
allPacks.push(pack)
seenIds[pack._id] = true
}
}
}
return callback(null, allPacks)
})
})
},
makeProjectIterator(project_id, before, callback) {
PackManager._findPacks(
{ project_id: ObjectId(project_id) },
{ 'meta.end_ts': -1 },
function (err, allPacks) {
if (err) return callback(err)
callback(
null,
new ProjectIterator(allPacks, before, PackManager.getPackById)
)
}
)
},
makeDocIterator(doc_id, callback) {
PackManager._findPacks(
{ doc_id: ObjectId(doc_id) },
{ v: -1 },
function (err, allPacks) {
if (err) return callback(err)
callback(null, new DocIterator(allPacks, PackManager.getPackById))
}
)
},
getPackById(project_id, doc_id, pack_id, callback) {
2020-06-04 04:24:21 -04:00
return db.docHistory.findOne({ _id: pack_id }, function (err, pack) {
if (err != null) {
return callback(err)
}
if (pack == null) {
return MongoAWS.unArchivePack(project_id, doc_id, pack_id, callback)
} else if (pack.expiresAt != null && pack.temporary === false) {
// we only need to touch the TTL when listing the changes in the project
// because diffs on individual documents are always done after that
return PackManager.increaseTTL(pack, callback)
// only do this for cached packs, not temporary ones to avoid older packs
// being kept longer than newer ones (which messes up the last update version)
} else {
return callback(null, pack)
}
})
},
increaseTTL(pack, callback) {
if (pack.expiresAt < new Date(Date.now() + 6 * DAYS)) {
// update cache expiry since we are using this pack
return db.docHistory.updateOne(
{ _id: pack._id },
{ $set: { expiresAt: new Date(Date.now() + 7 * DAYS) } },
2021-07-13 07:04:43 -04:00
err => callback(err, pack)
)
} else {
return callback(null, pack)
}
},
// Manage docHistoryIndex collection
getIndex(doc_id, callback) {
return db.docHistoryIndex.findOne(
{ _id: ObjectId(doc_id.toString()) },
callback
)
},
getPackFromIndex(doc_id, pack_id, callback) {
return db.docHistoryIndex.findOne(
{ _id: ObjectId(doc_id.toString()), 'packs._id': pack_id },
{ projection: { 'packs.$': 1 } },
callback
)
},
getLastPackFromIndex(doc_id, callback) {
return db.docHistoryIndex.findOne(
{ _id: ObjectId(doc_id.toString()) },
{ projection: { packs: { $slice: -1 } } },
2020-06-04 04:24:21 -04:00
function (err, indexPack) {
if (err != null) {
return callback(err)
}
if (indexPack == null) {
return callback()
}
return callback(null, indexPack[0])
}
)
},
getIndexWithKeys(doc_id, callback) {
2020-06-04 04:24:21 -04:00
return PackManager.getIndex(doc_id, function (err, index) {
if (err != null) {
return callback(err)
}
if (index == null) {
return callback()
}
for (const pack of Array.from(
(index != null ? index.packs : undefined) || []
)) {
index[pack._id] = pack
}
return callback(null, index)
})
},
initialiseIndex(project_id, doc_id, callback) {
2021-07-13 07:04:43 -04:00
return PackManager.findCompletedPacks(
project_id,
doc_id,
function (err, packs) {
// console.log 'err', err, 'packs', packs, packs?.length
if (err != null) {
return callback(err)
}
if (packs == null) {
return callback()
}
return PackManager.insertPacksIntoIndexWithLock(
project_id,
doc_id,
packs,
callback
)
}
2021-07-13 07:04:43 -04:00
)
},
updateIndex(project_id, doc_id, callback) {
// find all packs prior to current pack
2021-07-13 07:04:43 -04:00
return PackManager.findUnindexedPacks(
project_id,
doc_id,
function (err, newPacks) {
if (err != null) {
return callback(err)
}
if (newPacks == null || newPacks.length === 0) {
return callback()
}
2021-07-13 07:04:43 -04:00
return PackManager.insertPacksIntoIndexWithLock(
project_id,
doc_id,
newPacks,
function (err) {
if (err != null) {
return callback(err)
}
logger.debug(
2021-07-13 07:04:43 -04:00
{ project_id, doc_id, newPacks },
'added new packs to index'
)
return callback()
}
)
}
)
},
findCompletedPacks(project_id, doc_id, callback) {
const query = {
doc_id: ObjectId(doc_id.toString()),
2021-07-13 07:04:43 -04:00
expiresAt: { $exists: false },
}
return db.docHistory
.find(query, { projection: { pack: false } })
.sort({ v: 1 })
.toArray(function (err, packs) {
if (err != null) {
return callback(err)
}
if (packs == null) {
return callback()
}
if (!(packs != null ? packs.length : undefined)) {
return callback()
}
const last = packs.pop() // discard the last pack, if it's still in progress
if (last.finalised) {
packs.push(last)
} // it's finalised so we push it back to archive it
return callback(null, packs)
})
},
findPacks(project_id, doc_id, callback) {
const query = {
doc_id: ObjectId(doc_id.toString()),
2021-07-13 07:04:43 -04:00
expiresAt: { $exists: false },
}
return db.docHistory
.find(query, { projection: { pack: false } })
.sort({ v: 1 })
.toArray(function (err, packs) {
if (err != null) {
return callback(err)
}
if (packs == null) {
return callback()
}
if (!(packs != null ? packs.length : undefined)) {
return callback()
}
return callback(null, packs)
})
},
findUnindexedPacks(project_id, doc_id, callback) {
2020-06-04 04:24:21 -04:00
return PackManager.getIndexWithKeys(doc_id, function (err, indexResult) {
if (err != null) {
return callback(err)
}
2021-07-13 07:04:43 -04:00
return PackManager.findCompletedPacks(
project_id,
doc_id,
function (err, historyPacks) {
let pack
if (err != null) {
return callback(err)
}
2021-07-13 07:04:43 -04:00
if (historyPacks == null) {
return callback()
}
// select only the new packs not already in the index
let newPacks = (() => {
const result = []
for (pack of Array.from(historyPacks)) {
if (
(indexResult != null ? indexResult[pack._id] : undefined) ==
null
) {
result.push(pack)
}
}
return result
})()
newPacks = (() => {
const result1 = []
for (pack of Array.from(newPacks)) {
result1.push(
_.omit(
pack,
'doc_id',
'project_id',
'n',
'sz',
'last_checked',
'finalised'
)
)
2021-07-13 07:04:43 -04:00
}
return result1
})()
if (newPacks.length) {
logger.debug(
2021-07-13 07:04:43 -04:00
{ project_id, doc_id, n: newPacks.length },
'found new packs'
)
}
2021-07-13 07:04:43 -04:00
return callback(null, newPacks)
}
2021-07-13 07:04:43 -04:00
)
})
},
insertPacksIntoIndexWithLock(project_id, doc_id, newPacks, callback) {
return LockManager.runWithLock(
keys.historyIndexLock({ doc_id }),
2021-07-13 07:04:43 -04:00
releaseLock =>
PackManager._insertPacksIntoIndex(
project_id,
doc_id,
newPacks,
releaseLock
),
callback
)
},
_insertPacksIntoIndex(project_id, doc_id, newPacks, callback) {
return db.docHistoryIndex.updateOne(
{ _id: ObjectId(doc_id.toString()) },
{
$setOnInsert: { project_id: ObjectId(project_id.toString()) },
$push: {
2021-07-13 07:04:43 -04:00
packs: { $each: newPacks, $sort: { v: 1 } },
},
},
{
2021-07-13 07:04:43 -04:00
upsert: true,
},
callback
)
},
// Archiving packs to S3
archivePack(project_id, doc_id, pack_id, callback) {
2020-06-04 04:24:21 -04:00
const clearFlagOnError = function (err, cb) {
if (err != null) {
// clear the inS3 flag on error
return PackManager.clearPackAsArchiveInProgress(
project_id,
doc_id,
pack_id,
2020-06-04 04:24:21 -04:00
function (err2) {
if (err2 != null) {
return cb(err2)
}
return cb(err)
}
)
} else {
return cb()
}
}
return async.series(
[
2021-07-13 07:04:43 -04:00
cb =>
PackManager.checkArchiveNotInProgress(
project_id,
doc_id,
pack_id,
cb
),
2021-07-13 07:04:43 -04:00
cb =>
PackManager.markPackAsArchiveInProgress(
project_id,
doc_id,
pack_id,
cb
),
2021-07-13 07:04:43 -04:00
cb =>
MongoAWS.archivePack(project_id, doc_id, pack_id, err =>
clearFlagOnError(err, cb)
),
2021-07-13 07:04:43 -04:00
cb =>
PackManager.checkArchivedPack(project_id, doc_id, pack_id, err =>
clearFlagOnError(err, cb)
),
2021-07-13 07:04:43 -04:00
cb => PackManager.markPackAsArchived(project_id, doc_id, pack_id, cb),
cb =>
PackManager.setTTLOnArchivedPack(
project_id,
doc_id,
pack_id,
callback
2021-07-13 07:04:43 -04:00
),
],
callback
)
},
checkArchivedPack(project_id, doc_id, pack_id, callback) {
2020-06-04 04:24:21 -04:00
return db.docHistory.findOne({ _id: pack_id }, function (err, pack) {
if (err != null) {
return callback(err)
}
if (pack == null) {
return callback(new Error('pack not found'))
}
2021-07-13 07:04:43 -04:00
return MongoAWS.readArchivedPack(
project_id,
doc_id,
pack_id,
function (err, result) {
if (err) return callback(err)
2021-07-13 07:04:43 -04:00
delete result.last_checked
delete pack.last_checked
// need to compare ids as ObjectIds with .equals()
for (const key of ['_id', 'project_id', 'doc_id']) {
if (result[key].equals(pack[key])) {
result[key] = pack[key]
}
}
2021-07-13 07:04:43 -04:00
for (let i = 0; i < result.pack.length; i++) {
const op = result.pack[i]
if (op._id != null && op._id.equals(pack.pack[i]._id)) {
op._id = pack.pack[i]._id
}
}
if (_.isEqual(pack, result)) {
return callback()
} else {
logger.err(
{
pack,
result,
jsondiff: JSON.stringify(pack) === JSON.stringify(result),
},
'difference when comparing packs'
)
return callback(
new Error('pack retrieved from s3 does not match pack in mongo')
)
}
}
2021-07-13 07:04:43 -04:00
)
})
},
// Extra methods to test archive/unarchive for a doc_id
pushOldPacks(project_id, doc_id, callback) {
2020-06-04 04:24:21 -04:00
return PackManager.findPacks(project_id, doc_id, function (err, packs) {
if (err != null) {
return callback(err)
}
if (!(packs != null ? packs.length : undefined)) {
return callback()
}
return PackManager.processOldPack(
project_id,
doc_id,
packs[0]._id,
callback
)
})
},
pullOldPacks(project_id, doc_id, callback) {
return PackManager.loadPacksByVersionRange(
project_id,
doc_id,
null,
null,
callback
)
},
// Processing old packs via worker
processOldPack(project_id, doc_id, pack_id, callback) {
2021-07-13 07:04:43 -04:00
const markAsChecked = err =>
PackManager.markPackAsChecked(
project_id,
doc_id,
pack_id,
function (err2) {
if (err2 != null) {
return callback(err2)
}
return callback(err)
}
2021-07-13 07:04:43 -04:00
)
logger.debug({ project_id, doc_id }, 'processing old packs')
2020-06-04 04:24:21 -04:00
return db.docHistory.findOne({ _id: pack_id }, function (err, pack) {
if (err != null) {
return markAsChecked(err)
}
if (pack == null) {
return markAsChecked()
}
if (pack.expiresAt != null) {
return callback()
} // return directly
return PackManager.finaliseIfNeeded(
project_id,
doc_id,
pack._id,
pack,
2020-06-04 04:24:21 -04:00
function (err) {
if (err != null) {
return markAsChecked(err)
}
2021-07-13 07:04:43 -04:00
return PackManager.updateIndexIfNeeded(
project_id,
doc_id,
function (err) {
if (err != null) {
return markAsChecked(err)
}
return PackManager.findUnarchivedPacks(
project_id,
doc_id,
function (err, unarchivedPacks) {
if (err != null) {
return markAsChecked(err)
}
if (
!(unarchivedPacks != null
? unarchivedPacks.length
: undefined)
) {
logger.debug(
2021-07-13 07:04:43 -04:00
{ project_id, doc_id },
'no packs need archiving'
)
2020-06-04 04:24:21 -04:00
return markAsChecked()
}
2021-07-13 07:04:43 -04:00
return async.eachSeries(
unarchivedPacks,
(pack, cb) =>
PackManager.archivePack(project_id, doc_id, pack._id, cb),
function (err) {
if (err != null) {
return markAsChecked(err)
}
logger.debug({ project_id, doc_id }, 'done processing')
2021-07-13 07:04:43 -04:00
return markAsChecked()
}
)
}
)
}
)
}
)
})
},
finaliseIfNeeded(project_id, doc_id, pack_id, pack, callback) {
const sz = pack.sz / (1024 * 1024) // in fractions of a megabyte
const n = pack.n / 1024 // in fraction of 1024 ops
const age = (Date.now() - pack.meta.end_ts) / DAYS
if (age < 30) {
// always keep if less than 1 month old
logger.debug(
{ project_id, doc_id, pack_id, age },
'less than 30 days old'
)
return callback()
}
// compute an archiving threshold which decreases for each month of age
const archive_threshold = 30 / age
if (sz > archive_threshold || n > archive_threshold || age > 90) {
logger.debug(
{ project_id, doc_id, pack_id, age, archive_threshold, sz, n },
'meets archive threshold'
)
return PackManager.markPackAsFinalisedWithLock(
project_id,
doc_id,
pack_id,
callback
)
} else {
logger.debug(
{ project_id, doc_id, pack_id, age, archive_threshold, sz, n },
'does not meet archive threshold'
)
return callback()
}
},
markPackAsFinalisedWithLock(project_id, doc_id, pack_id, callback) {
return LockManager.runWithLock(
keys.historyLock({ doc_id }),
2021-07-13 07:04:43 -04:00
releaseLock =>
PackManager._markPackAsFinalised(
project_id,
doc_id,
pack_id,
releaseLock
),
callback
)
},
_markPackAsFinalised(project_id, doc_id, pack_id, callback) {
logger.debug({ project_id, doc_id, pack_id }, 'marking pack as finalised')
return db.docHistory.updateOne(
{ _id: pack_id },
{ $set: { finalised: true } },
callback
)
},
updateIndexIfNeeded(project_id, doc_id, callback) {
logger.debug({ project_id, doc_id }, 'archiving old packs')
2020-06-04 04:24:21 -04:00
return PackManager.getIndexWithKeys(doc_id, function (err, index) {
if (err != null) {
return callback(err)
}
if (index == null) {
return PackManager.initialiseIndex(project_id, doc_id, callback)
} else {
return PackManager.updateIndex(project_id, doc_id, callback)
}
})
},
markPackAsChecked(project_id, doc_id, pack_id, callback) {
logger.debug({ project_id, doc_id, pack_id }, 'marking pack as checked')
return db.docHistory.updateOne(
{ _id: pack_id },
{ $currentDate: { last_checked: true } },
callback
)
},
findUnarchivedPacks(project_id, doc_id, callback) {
2020-06-04 04:24:21 -04:00
return PackManager.getIndex(doc_id, function (err, indexResult) {
if (err != null) {
return callback(err)
}
const indexPacks =
(indexResult != null ? indexResult.packs : undefined) || []
const unArchivedPacks = (() => {
const result = []
for (const pack of Array.from(indexPacks)) {
if (pack.inS3 == null) {
result.push(pack)
}
}
return result
})()
if (unArchivedPacks.length) {
logger.debug(
{ project_id, doc_id, n: unArchivedPacks.length },
'find unarchived packs'
)
}
return callback(null, unArchivedPacks)
})
},
// Archive locking flags
checkArchiveNotInProgress(project_id, doc_id, pack_id, callback) {
logger.debug(
{ project_id, doc_id, pack_id },
'checking if archive in progress'
)
2021-07-13 07:04:43 -04:00
return PackManager.getPackFromIndex(
doc_id,
pack_id,
function (err, result) {
if (err != null) {
return callback(err)
}
if (result == null) {
return callback(new Error('pack not found in index'))
}
if (result.inS3) {
return callback(new Error('pack archiving already done'))
} else if (result.inS3 != null) {
return callback(new Error('pack archiving already in progress'))
} else {
return callback()
}
}
2021-07-13 07:04:43 -04:00
)
},
markPackAsArchiveInProgress(project_id, doc_id, pack_id, callback) {
logger.debug(
{ project_id, doc_id },
'marking pack as archive in progress status'
)
return db.docHistoryIndex.findOneAndUpdate(
{
_id: ObjectId(doc_id.toString()),
2021-07-13 07:04:43 -04:00
packs: { $elemMatch: { _id: pack_id, inS3: { $exists: false } } },
},
{ $set: { 'packs.$.inS3': false } },
{ projection: { 'packs.$': 1 } },
2020-06-04 04:24:21 -04:00
function (err, result) {
if (err != null) {
return callback(err)
}
if (!result.value) {
return callback(new Error('archive is already in progress'))
}
logger.debug(
{ project_id, doc_id, pack_id },
'marked as archive in progress'
)
return callback()
}
)
},
clearPackAsArchiveInProgress(project_id, doc_id, pack_id, callback) {
logger.debug(
{ project_id, doc_id, pack_id },
'clearing as archive in progress'
)
return db.docHistoryIndex.updateOne(
{
_id: ObjectId(doc_id.toString()),
2021-07-13 07:04:43 -04:00
packs: { $elemMatch: { _id: pack_id, inS3: false } },
},
{ $unset: { 'packs.$.inS3': true } },
callback
)
},
markPackAsArchived(project_id, doc_id, pack_id, callback) {
logger.debug({ project_id, doc_id, pack_id }, 'marking pack as archived')
return db.docHistoryIndex.findOneAndUpdate(
{
_id: ObjectId(doc_id.toString()),
2021-07-13 07:04:43 -04:00
packs: { $elemMatch: { _id: pack_id, inS3: false } },
},
{ $set: { 'packs.$.inS3': true } },
{ projection: { 'packs.$': 1 } },
2020-06-04 04:24:21 -04:00
function (err, result) {
if (err != null) {
return callback(err)
}
if (!result.value) {
return callback(new Error('archive is not marked as progress'))
}
logger.debug({ project_id, doc_id, pack_id }, 'marked as archived')
return callback()
}
)
},
setTTLOnArchivedPack(project_id, doc_id, pack_id, callback) {
return db.docHistory.updateOne(
{ _id: pack_id },
{ $set: { expiresAt: new Date(Date.now() + 1 * DAYS) } },
2020-06-04 04:24:21 -04:00
function (err) {
if (err) {
return callback(err)
}
logger.debug({ project_id, doc_id, pack_id }, 'set expiry on pack')
return callback()
}
)
2021-07-13 07:04:43 -04:00
},
}
module.exports.promises = {
getOpsByVersionRange: util.promisify(PackManager.getOpsByVersionRange),
findAllDocsInProject: util.promisify(PackManager.findAllDocsInProject),
makeDocIterator: util.promisify(PackManager.makeDocIterator),
}
// _getOneDayInFutureWithRandomDelay: ->
// thirtyMins = 1000 * 60 * 30
// randomThirtyMinMax = Math.ceil(Math.random() * thirtyMins)
// return new Date(Date.now() + randomThirtyMinMax + 1*DAYS)