[DocArchiveManager] (un-)archive docs in batches and let db filter docs

Also drop the broken 404 logic after switching to db-side filtering.
This commit is contained in:
Jakob Ackermann 2021-06-08 12:59:13 +01:00
parent cee6c0aad9
commit 4996f8bbcd
4 changed files with 59 additions and 44 deletions

View file

@ -10,6 +10,8 @@ const PersistorManager = require('./PersistorManager')
const pMap = require('p-map')
const PARALLEL_JOBS = settings.parallelArchiveJobs
const ARCHIVE_BATCH_SIZE = settings.archiveBatchSize
const UN_ARCHIVE_BATCH_SIZE = settings.unArchiveBatchSize
const DESTROY_BATCH_SIZE = settings.destroyBatchSize
const DESTROY_RETRY_COUNT = settings.destroyRetryCount
@ -33,20 +35,19 @@ module.exports = {
}
async function archiveAllDocs(projectId) {
const docs = await MongoManager.getProjectsDocs(
projectId,
{ include_deleted: true },
{ lines: true, ranges: true, rev: true, inS3: true }
)
while (true) {
const docs = await MongoManager.getNNonArchivedProjectDocs(
projectId,
ARCHIVE_BATCH_SIZE
)
if (!docs || docs.length === 0) {
break
}
if (!docs) {
throw new Errors.NotFoundError(`No docs for project ${projectId}`)
await pMap(docs, (doc) => archiveDoc(projectId, doc), {
concurrency: PARALLEL_JOBS
})
}
const docsToArchive = docs.filter((doc) => !doc.inS3)
await pMap(docsToArchive, (doc) => archiveDoc(projectId, doc), {
concurrency: PARALLEL_JOBS
})
}
async function archiveDocById(projectId, docId) {
@ -102,18 +103,26 @@ async function archiveDoc(projectId, doc) {
}
async function unArchiveAllDocs(projectId) {
let docs
if (settings.docstore.keepSoftDeletedDocsArchived) {
docs = await MongoManager.getNonDeletedArchivedProjectDocs(projectId)
} else {
docs = await MongoManager.getArchivedProjectDocs(projectId)
while (true) {
let docs
if (settings.docstore.keepSoftDeletedDocsArchived) {
docs = await MongoManager.getNNonDeletedArchivedProjectDocs(
projectId,
UN_ARCHIVE_BATCH_SIZE
)
} else {
docs = await MongoManager.getNArchivedProjectDocs(
projectId,
UN_ARCHIVE_BATCH_SIZE
)
}
if (!docs || docs.length === 0) {
break
}
await pMap(docs, (doc) => unarchiveDoc(projectId, doc._id), {
concurrency: PARALLEL_JOBS
})
}
if (!docs) {
throw new Errors.NotFoundError(`No docs for project ${projectId}`)
}
await pMap(docs, (doc) => unarchiveDoc(projectId, doc._id), {
concurrency: PARALLEL_JOBS
})
}
async function unarchiveDoc(projectId, docId) {

View file

@ -64,21 +64,29 @@ module.exports = MongoManager = {
db.docs.find(query, queryOptions).toArray(callback)
},
getArchivedProjectDocs(project_id, callback) {
getNArchivedProjectDocs(project_id, n, callback) {
const query = {
project_id: ObjectId(project_id.toString()),
inS3: true
}
db.docs.find(query).toArray(callback)
db.docs.find(query, { projection: { _id: 1 }, limit: n }).toArray(callback)
},
getNonDeletedArchivedProjectDocs(project_id, callback) {
getNNonArchivedProjectDocs(project_id, n, callback) {
const query = {
project_id: ObjectId(project_id.toString()),
inS3: { $ne: true }
}
db.docs.find(query, { limit: n }).toArray(callback)
},
getNNonDeletedArchivedProjectDocs(project_id, n, callback) {
const query = {
project_id: ObjectId(project_id.toString()),
deleted: { $ne: true },
inS3: true
}
db.docs.find(query).toArray(callback)
db.docs.find(query, { projection: { _id: 1 }, limit: n }).toArray(callback)
},
upsertIntoDocCollection(project_id, doc_id, updates, callback) {

View file

@ -37,6 +37,8 @@ const Settings = {
max_doc_length: parseInt(process.env.MAX_DOC_LENGTH) || 2 * 1024 * 1024, // 2mb
archiveBatchSize: parseInt(process.env.ARCHIVE_BATCH_SIZE, 10) || 50,
unArchiveBatchSize: parseInt(process.env.UN_ARCHIVE_BATCH_SIZE, 10) || 50,
destroyBatchSize: parseInt(process.env.DESTROY_BATCH_SIZE, 10) || 2000,
destroyRetryCount: parseInt(process.env.DESTROY_RETRY_COUNT || '3', 10),
parallelArchiveJobs: parseInt(process.env.PARALLEL_ARCHIVE_JOBS, 10) || 5

View file

@ -116,12 +116,24 @@ describe('DocArchiveManager', function () {
deleteObject: sinon.stub().resolves()
}
const getNNonArchivedProjectDocs = sinon.stub()
getNNonArchivedProjectDocs
.onCall(0)
.resolves(mongoDocs.filter((doc) => !doc.inS3))
getNNonArchivedProjectDocs.onCall(1).resolves([])
const getNArchivedProjectDocs = sinon.stub()
getNArchivedProjectDocs.onCall(0).resolves(archivedDocs)
getNArchivedProjectDocs.onCall(1).resolves([])
MongoManager = {
promises: {
markDocAsArchived: sinon.stub().resolves(),
upsertIntoDocCollection: sinon.stub().resolves(),
getProjectsDocs: sinon.stub().resolves(mongoDocs),
getArchivedProjectDocs: sinon.stub().resolves(archivedDocs),
getNNonDeletedArchivedProjectDocs: getNArchivedProjectDocs,
getNNonArchivedProjectDocs,
getNArchivedProjectDocs,
findDoc: sinon.stub().rejects(new Errors.NotFoundError()),
destroyDoc: sinon.stub().resolves()
}
@ -519,14 +531,6 @@ describe('DocArchiveManager', function () {
MongoManager.promises.markDocAsArchived
).not.to.have.been.calledWith(mongoDocs[3]._id)
})
it('should return error if the project has no docs', async function () {
MongoManager.promises.getProjectsDocs.resolves(null)
await expect(
DocArchiveManager.promises.archiveAllDocs(projectId)
).to.eventually.be.rejected.and.be.instanceof(Errors.NotFoundError)
})
})
describe('unArchiveAllDocs', function () {
@ -545,14 +549,6 @@ describe('DocArchiveManager', function () {
)
}
})
it('should return error if the project has no docs', async function () {
MongoManager.promises.getArchivedProjectDocs.resolves(null)
await expect(
DocArchiveManager.promises.unArchiveAllDocs(projectId)
).to.eventually.be.rejected.and.be.instanceof(Errors.NotFoundError)
})
})
describe('destroyAllDocs', function () {