Merge pull request #109 from overleaf/jpa-batched-archiving-and-un-archiving

[DocArchiveManager] (un-)archive docs in batches and let db filter docs
This commit is contained in:
Jakob Ackermann 2021-06-10 11:09:10 +02:00 committed by GitHub
commit 3c95327b67
4 changed files with 63 additions and 44 deletions

View file

@ -10,6 +10,8 @@ const PersistorManager = require('./PersistorManager')
const pMap = require('p-map') const pMap = require('p-map')
const PARALLEL_JOBS = settings.parallelArchiveJobs const PARALLEL_JOBS = settings.parallelArchiveJobs
const ARCHIVE_BATCH_SIZE = settings.archiveBatchSize
const UN_ARCHIVE_BATCH_SIZE = settings.unArchiveBatchSize
const DESTROY_BATCH_SIZE = settings.destroyBatchSize const DESTROY_BATCH_SIZE = settings.destroyBatchSize
const DESTROY_RETRY_COUNT = settings.destroyRetryCount const DESTROY_RETRY_COUNT = settings.destroyRetryCount
@ -33,20 +35,19 @@ module.exports = {
} }
async function archiveAllDocs(projectId) { async function archiveAllDocs(projectId) {
const docs = await MongoManager.getProjectsDocs( while (true) {
const docs = await MongoManager.getNonArchivedProjectDocs(
projectId, projectId,
{ include_deleted: true }, ARCHIVE_BATCH_SIZE
{ lines: true, ranges: true, rev: true, inS3: true }
) )
if (!docs || docs.length === 0) {
if (!docs) { break
throw new Errors.NotFoundError(`No docs for project ${projectId}`)
} }
const docsToArchive = docs.filter((doc) => !doc.inS3) await pMap(docs, (doc) => archiveDoc(projectId, doc), {
await pMap(docsToArchive, (doc) => archiveDoc(projectId, doc), {
concurrency: PARALLEL_JOBS concurrency: PARALLEL_JOBS
}) })
}
} }
async function archiveDocById(projectId, docId) { async function archiveDocById(projectId, docId) {
@ -102,18 +103,26 @@ async function archiveDoc(projectId, doc) {
} }
async function unArchiveAllDocs(projectId) { async function unArchiveAllDocs(projectId) {
while (true) {
let docs let docs
if (settings.docstore.keepSoftDeletedDocsArchived) { if (settings.docstore.keepSoftDeletedDocsArchived) {
docs = await MongoManager.getNonDeletedArchivedProjectDocs(projectId) docs = await MongoManager.getNonDeletedArchivedProjectDocs(
projectId,
UN_ARCHIVE_BATCH_SIZE
)
} else { } else {
docs = await MongoManager.getArchivedProjectDocs(projectId) docs = await MongoManager.getArchivedProjectDocs(
projectId,
UN_ARCHIVE_BATCH_SIZE
)
} }
if (!docs) { if (!docs || docs.length === 0) {
throw new Errors.NotFoundError(`No docs for project ${projectId}`) break
} }
await pMap(docs, (doc) => unarchiveDoc(projectId, doc._id), { await pMap(docs, (doc) => unarchiveDoc(projectId, doc._id), {
concurrency: PARALLEL_JOBS concurrency: PARALLEL_JOBS
}) })
}
} }
async function unarchiveDoc(projectId, docId) { async function unarchiveDoc(projectId, docId) {

View file

@ -64,21 +64,33 @@ module.exports = MongoManager = {
db.docs.find(query, queryOptions).toArray(callback) db.docs.find(query, queryOptions).toArray(callback)
}, },
getArchivedProjectDocs(project_id, callback) { getArchivedProjectDocs(project_id, maxResults, callback) {
const query = { const query = {
project_id: ObjectId(project_id.toString()), project_id: ObjectId(project_id.toString()),
inS3: true inS3: true
} }
db.docs.find(query).toArray(callback) db.docs
.find(query, { projection: { _id: 1 }, limit: maxResults })
.toArray(callback)
}, },
getNonDeletedArchivedProjectDocs(project_id, callback) { getNonArchivedProjectDocs(project_id, maxResults, callback) {
const query = {
project_id: ObjectId(project_id.toString()),
inS3: { $ne: true }
}
db.docs.find(query, { limit: maxResults }).toArray(callback)
},
getNonDeletedArchivedProjectDocs(project_id, maxResults, callback) {
const query = { const query = {
project_id: ObjectId(project_id.toString()), project_id: ObjectId(project_id.toString()),
deleted: { $ne: true }, deleted: { $ne: true },
inS3: true inS3: true
} }
db.docs.find(query).toArray(callback) db.docs
.find(query, { projection: { _id: 1 }, limit: maxResults })
.toArray(callback)
}, },
upsertIntoDocCollection(project_id, doc_id, updates, callback) { upsertIntoDocCollection(project_id, doc_id, updates, callback) {

View file

@ -37,6 +37,8 @@ const Settings = {
max_doc_length: parseInt(process.env.MAX_DOC_LENGTH) || 2 * 1024 * 1024, // 2mb max_doc_length: parseInt(process.env.MAX_DOC_LENGTH) || 2 * 1024 * 1024, // 2mb
archiveBatchSize: parseInt(process.env.ARCHIVE_BATCH_SIZE, 10) || 50,
unArchiveBatchSize: parseInt(process.env.UN_ARCHIVE_BATCH_SIZE, 10) || 50,
destroyBatchSize: parseInt(process.env.DESTROY_BATCH_SIZE, 10) || 2000, destroyBatchSize: parseInt(process.env.DESTROY_BATCH_SIZE, 10) || 2000,
destroyRetryCount: parseInt(process.env.DESTROY_RETRY_COUNT || '3', 10), destroyRetryCount: parseInt(process.env.DESTROY_RETRY_COUNT || '3', 10),
parallelArchiveJobs: parseInt(process.env.PARALLEL_ARCHIVE_JOBS, 10) || 5 parallelArchiveJobs: parseInt(process.env.PARALLEL_ARCHIVE_JOBS, 10) || 5

View file

@ -116,12 +116,24 @@ describe('DocArchiveManager', function () {
deleteObject: sinon.stub().resolves() deleteObject: sinon.stub().resolves()
} }
const getNonArchivedProjectDocs = sinon.stub()
getNonArchivedProjectDocs
.onCall(0)
.resolves(mongoDocs.filter((doc) => !doc.inS3))
getNonArchivedProjectDocs.onCall(1).resolves([])
const getArchivedProjectDocs = sinon.stub()
getArchivedProjectDocs.onCall(0).resolves(archivedDocs)
getArchivedProjectDocs.onCall(1).resolves([])
MongoManager = { MongoManager = {
promises: { promises: {
markDocAsArchived: sinon.stub().resolves(), markDocAsArchived: sinon.stub().resolves(),
upsertIntoDocCollection: sinon.stub().resolves(), upsertIntoDocCollection: sinon.stub().resolves(),
getProjectsDocs: sinon.stub().resolves(mongoDocs), getProjectsDocs: sinon.stub().resolves(mongoDocs),
getArchivedProjectDocs: sinon.stub().resolves(archivedDocs), getNonDeletedArchivedProjectDocs: getArchivedProjectDocs,
getNonArchivedProjectDocs,
getArchivedProjectDocs,
findDoc: sinon.stub().rejects(new Errors.NotFoundError()), findDoc: sinon.stub().rejects(new Errors.NotFoundError()),
destroyDoc: sinon.stub().resolves() destroyDoc: sinon.stub().resolves()
} }
@ -519,14 +531,6 @@ describe('DocArchiveManager', function () {
MongoManager.promises.markDocAsArchived MongoManager.promises.markDocAsArchived
).not.to.have.been.calledWith(mongoDocs[3]._id) ).not.to.have.been.calledWith(mongoDocs[3]._id)
}) })
it('should return error if the project has no docs', async function () {
MongoManager.promises.getProjectsDocs.resolves(null)
await expect(
DocArchiveManager.promises.archiveAllDocs(projectId)
).to.eventually.be.rejected.and.be.instanceof(Errors.NotFoundError)
})
}) })
describe('unArchiveAllDocs', function () { describe('unArchiveAllDocs', function () {
@ -545,14 +549,6 @@ describe('DocArchiveManager', function () {
) )
} }
}) })
it('should return error if the project has no docs', async function () {
MongoManager.promises.getArchivedProjectDocs.resolves(null)
await expect(
DocArchiveManager.promises.unArchiveAllDocs(projectId)
).to.eventually.be.rejected.and.be.instanceof(Errors.NotFoundError)
})
}) })
describe('destroyAllDocs', function () { describe('destroyAllDocs', function () {