mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-23 06:07:56 +00:00
Merge pull request #7869 from overleaf/em-docstore-archive-lock
Add a lock around doc archiving GitOrigin-RevId: eaf85dbc3b491edd15eeb2c1a84df3a2883fb61d
This commit is contained in:
parent
30b07f3d34
commit
de4091f955
11 changed files with 115 additions and 93 deletions
services/docstore
|
@ -10,12 +10,10 @@ const PersistorManager = require('./PersistorManager')
|
|||
const pMap = require('p-map')
|
||||
|
||||
const PARALLEL_JOBS = Settings.parallelArchiveJobs
|
||||
const ARCHIVE_BATCH_SIZE = Settings.archiveBatchSize
|
||||
const UN_ARCHIVE_BATCH_SIZE = Settings.unArchiveBatchSize
|
||||
|
||||
module.exports = {
|
||||
archiveAllDocs: callbackify(archiveAllDocs),
|
||||
archiveDocById: callbackify(archiveDocById),
|
||||
archiveDoc: callbackify(archiveDoc),
|
||||
unArchiveAllDocs: callbackify(unArchiveAllDocs),
|
||||
unarchiveDoc: callbackify(unarchiveDoc),
|
||||
|
@ -23,7 +21,6 @@ module.exports = {
|
|||
getDoc: callbackify(getDoc),
|
||||
promises: {
|
||||
archiveAllDocs,
|
||||
archiveDocById,
|
||||
archiveDoc,
|
||||
unArchiveAllDocs,
|
||||
unarchiveDoc,
|
||||
|
@ -33,43 +30,21 @@ module.exports = {
|
|||
}
|
||||
|
||||
async function archiveAllDocs(projectId) {
|
||||
while (true) {
|
||||
const docs = await MongoManager.getNonArchivedProjectDocs(
|
||||
projectId,
|
||||
ARCHIVE_BATCH_SIZE
|
||||
)
|
||||
if (!docs || docs.length === 0) {
|
||||
break
|
||||
}
|
||||
|
||||
await pMap(docs, doc => archiveDoc(projectId, doc), {
|
||||
concurrency: PARALLEL_JOBS,
|
||||
})
|
||||
}
|
||||
const docIds = await MongoManager.getNonArchivedProjectDocIds(projectId)
|
||||
await pMap(docIds, docId => archiveDoc(projectId, docId), {
|
||||
concurrency: PARALLEL_JOBS,
|
||||
})
|
||||
}
|
||||
|
||||
async function archiveDocById(projectId, docId) {
|
||||
const doc = await MongoManager.findDoc(projectId, docId, {
|
||||
lines: true,
|
||||
ranges: true,
|
||||
rev: true,
|
||||
inS3: true,
|
||||
})
|
||||
async function archiveDoc(projectId, docId) {
|
||||
const doc = await MongoManager.getDocForArchiving(projectId, docId)
|
||||
|
||||
if (!doc) {
|
||||
throw new Errors.NotFoundError(
|
||||
`Cannot find doc ${docId} in project ${projectId}`
|
||||
)
|
||||
}
|
||||
|
||||
if (doc.inS3) {
|
||||
// No need to throw an error if the doc is already archived
|
||||
// The doc wasn't found, it was already archived, or the lock couldn't be
|
||||
// acquired. Since we don't know which it is, silently return.
|
||||
return
|
||||
}
|
||||
await archiveDoc(projectId, doc)
|
||||
}
|
||||
|
||||
async function archiveDoc(projectId, doc) {
|
||||
logger.debug(
|
||||
{ project_id: projectId, doc_id: doc._id },
|
||||
'sending doc to persistor'
|
||||
|
@ -100,7 +75,7 @@ async function archiveDoc(projectId, doc) {
|
|||
await PersistorManager.sendStream(Settings.docstore.bucket, key, stream, {
|
||||
sourceMd5: md5,
|
||||
})
|
||||
await MongoManager.markDocAsArchived(doc._id, doc.rev)
|
||||
await MongoManager.markDocAsArchived(projectId, docId, doc.rev)
|
||||
}
|
||||
|
||||
async function unArchiveAllDocs(projectId) {
|
||||
|
|
|
@ -364,7 +364,7 @@ module.exports = DocManager = {
|
|||
|
||||
if (meta.deleted && Settings.docstore.archiveOnSoftDelete) {
|
||||
// The user will not read this doc anytime soon. Flush it out of mongo.
|
||||
DocArchive.archiveDocById(project_id, doc_id, err => {
|
||||
DocArchive.archiveDoc(project_id, doc_id, err => {
|
||||
if (err) {
|
||||
logger.warn(
|
||||
{ project_id, doc_id, err },
|
||||
|
|
|
@ -238,7 +238,7 @@ function archiveAllDocs(req, res, next) {
|
|||
function archiveDoc(req, res, next) {
|
||||
const { doc_id: docId, project_id: projectId } = req.params
|
||||
logger.debug({ projectId, docId }, 'archiving a doc')
|
||||
DocArchive.archiveDocById(projectId, docId, function (error) {
|
||||
DocArchive.archiveDoc(projectId, docId, function (error) {
|
||||
if (error) {
|
||||
return next(error)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ const OError = require('@overleaf/o-error')
|
|||
const Errors = require('./Errors')
|
||||
const { promisify } = require('util')
|
||||
|
||||
const ARCHIVING_LOCK_DURATION_MS = Settings.archivingLockDurationMs
|
||||
|
||||
function findDoc(projectId, docId, filter, callback) {
|
||||
db.docs.findOne(
|
||||
{
|
||||
|
@ -59,12 +61,17 @@ function getArchivedProjectDocs(projectId, maxResults, callback) {
|
|||
.toArray(callback)
|
||||
}
|
||||
|
||||
function getNonArchivedProjectDocs(projectId, maxResults, callback) {
|
||||
const query = {
|
||||
project_id: ObjectId(projectId.toString()),
|
||||
inS3: { $ne: true },
|
||||
}
|
||||
db.docs.find(query, { limit: maxResults }).toArray(callback)
|
||||
function getNonArchivedProjectDocIds(projectId, callback) {
|
||||
db.docs
|
||||
.find(
|
||||
{
|
||||
project_id: ObjectId(projectId),
|
||||
inS3: { $ne: true },
|
||||
},
|
||||
{ projection: { _id: 1 } }
|
||||
)
|
||||
.map(doc => doc._id)
|
||||
.toArray(callback)
|
||||
}
|
||||
|
||||
function getNonDeletedArchivedProjectDocs(projectId, maxResults, callback) {
|
||||
|
@ -108,19 +115,44 @@ function patchDoc(projectId, docId, meta, callback) {
|
|||
)
|
||||
}
|
||||
|
||||
function markDocAsArchived(docId, rev, callback) {
|
||||
const update = {
|
||||
$set: {},
|
||||
$unset: {},
|
||||
}
|
||||
update.$set.inS3 = true
|
||||
update.$unset.lines = true
|
||||
update.$unset.ranges = true
|
||||
const query = {
|
||||
_id: docId,
|
||||
rev,
|
||||
}
|
||||
db.docs.updateOne(query, update, callback)
|
||||
/**
|
||||
* Fetch a doc and lock it for archiving
|
||||
*
|
||||
* This will return null if the doc is not found, if it's already archived or
|
||||
* if the lock can't be acquired.
|
||||
*/
|
||||
function getDocForArchiving(projectId, docId, callback) {
|
||||
const archivingUntil = new Date(Date.now() + ARCHIVING_LOCK_DURATION_MS)
|
||||
db.docs.findOneAndUpdate(
|
||||
{
|
||||
_id: ObjectId(docId),
|
||||
project_id: ObjectId(projectId),
|
||||
inS3: { $ne: true },
|
||||
$or: [{ archivingUntil: null }, { archivingUntil: { $lt: new Date() } }],
|
||||
},
|
||||
{ $set: { archivingUntil } },
|
||||
{ projection: { lines: 1, ranges: 1, rev: 1 } },
|
||||
(err, result) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
callback(null, result.value)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the doc contents from Mongo and release the archiving lock
|
||||
*/
|
||||
function markDocAsArchived(projectId, docId, rev, callback) {
|
||||
db.docs.updateOne(
|
||||
{ _id: ObjectId(docId), rev },
|
||||
{
|
||||
$set: { inS3: true },
|
||||
$unset: { lines: 1, ranges: 1, archivingUntil: 1 },
|
||||
},
|
||||
callback
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -267,11 +299,12 @@ module.exports = {
|
|||
getProjectsDeletedDocs,
|
||||
getProjectsDocs,
|
||||
getArchivedProjectDocs,
|
||||
getNonArchivedProjectDocs,
|
||||
getNonArchivedProjectDocIds,
|
||||
getNonDeletedArchivedProjectDocs,
|
||||
upsertIntoDocCollection,
|
||||
restoreArchivedDoc,
|
||||
patchDoc,
|
||||
getDocForArchiving,
|
||||
markDocAsArchived,
|
||||
getDocVersion,
|
||||
setDocVersion,
|
||||
|
|
|
@ -40,9 +40,10 @@ const Settings = {
|
|||
maxJsonRequestSize:
|
||||
parseInt(process.env.MAX_JSON_REQUEST_SIZE) || 6 * 1024 * 1024, // 6 MB
|
||||
|
||||
archiveBatchSize: parseInt(process.env.ARCHIVE_BATCH_SIZE, 10) || 50,
|
||||
unArchiveBatchSize: parseInt(process.env.UN_ARCHIVE_BATCH_SIZE, 10) || 50,
|
||||
parallelArchiveJobs: parseInt(process.env.PARALLEL_ARCHIVE_JOBS, 10) || 5,
|
||||
archivingLockDurationMs:
|
||||
parseInt(process.env.ARCHIVING_LOCK_DURATION_MS, 10) || 60000,
|
||||
}
|
||||
|
||||
if (process.env.MONGO_CONNECTION_STRING) {
|
||||
|
|
|
@ -342,7 +342,7 @@ describe('Archiving', function () {
|
|||
if (error) {
|
||||
return done(error)
|
||||
}
|
||||
DocstoreClient.archiveDocById(
|
||||
DocstoreClient.archiveDoc(
|
||||
this.project_id,
|
||||
this.doc._id,
|
||||
(error, res) => {
|
||||
|
|
|
@ -35,7 +35,7 @@ describe('Getting A Doc from Archive', function () {
|
|||
if (error) {
|
||||
return done(error)
|
||||
}
|
||||
DocstoreClient.archiveDocById(
|
||||
DocstoreClient.archiveDoc(
|
||||
this.project_id,
|
||||
this.doc._id,
|
||||
(error, res) => {
|
||||
|
|
|
@ -163,7 +163,7 @@ module.exports = DocstoreClient = {
|
|||
)
|
||||
},
|
||||
|
||||
archiveDocById(projectId, docId, callback) {
|
||||
archiveDoc(projectId, docId, callback) {
|
||||
request.post(
|
||||
{
|
||||
url: `http://localhost:${settings.internal.docstore.port}/project/${projectId}/doc/${docId}/archive`,
|
||||
|
|
|
@ -118,16 +118,27 @@ describe('DocArchiveManager', function () {
|
|||
deleteDirectory: sinon.stub().resolves(),
|
||||
}
|
||||
|
||||
const getNonArchivedProjectDocs = sinon.stub()
|
||||
getNonArchivedProjectDocs
|
||||
const getNonArchivedProjectDocIds = sinon.stub()
|
||||
getNonArchivedProjectDocIds
|
||||
.onCall(0)
|
||||
.resolves(mongoDocs.filter(doc => !doc.inS3))
|
||||
getNonArchivedProjectDocs.onCall(1).resolves([])
|
||||
.resolves(mongoDocs.filter(doc => !doc.inS3).map(doc => doc._id))
|
||||
getNonArchivedProjectDocIds.onCall(1).resolves([])
|
||||
|
||||
const getArchivedProjectDocs = sinon.stub()
|
||||
getArchivedProjectDocs.onCall(0).resolves(archivedDocs)
|
||||
getArchivedProjectDocs.onCall(1).resolves([])
|
||||
|
||||
const fakeGetDoc = async (_projectId, _docId) => {
|
||||
if (_projectId.equals(projectId)) {
|
||||
for (const mongoDoc of mongoDocs.concat(archivedDocs)) {
|
||||
if (mongoDoc._id.equals(_docId)) {
|
||||
return mongoDoc
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new Errors.NotFoundError()
|
||||
}
|
||||
|
||||
MongoManager = {
|
||||
promises: {
|
||||
markDocAsArchived: sinon.stub().resolves(),
|
||||
|
@ -135,17 +146,13 @@ describe('DocArchiveManager', function () {
|
|||
upsertIntoDocCollection: sinon.stub().resolves(),
|
||||
getProjectsDocs: sinon.stub().resolves(mongoDocs),
|
||||
getNonDeletedArchivedProjectDocs: getArchivedProjectDocs,
|
||||
getNonArchivedProjectDocs,
|
||||
getNonArchivedProjectDocIds,
|
||||
getArchivedProjectDocs,
|
||||
findDoc: sinon.stub().rejects(new Errors.NotFoundError()),
|
||||
findDoc: sinon.stub().callsFake(fakeGetDoc),
|
||||
getDocForArchiving: sinon.stub().callsFake(fakeGetDoc),
|
||||
destroyProject: sinon.stub().resolves(),
|
||||
},
|
||||
}
|
||||
for (const mongoDoc of mongoDocs.concat(archivedDocs)) {
|
||||
MongoManager.promises.findDoc
|
||||
.withArgs(projectId, mongoDoc._id, sinon.match.any)
|
||||
.resolves(mongoDoc)
|
||||
}
|
||||
|
||||
DocArchiveManager = SandboxedModule.require(modulePath, {
|
||||
requires: {
|
||||
|
@ -163,7 +170,7 @@ describe('DocArchiveManager', function () {
|
|||
describe('archiveDoc', function () {
|
||||
it('should resolve when passed a valid document', async function () {
|
||||
await expect(
|
||||
DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0])
|
||||
DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id)
|
||||
).to.eventually.be.fulfilled
|
||||
})
|
||||
|
||||
|
@ -172,26 +179,26 @@ describe('DocArchiveManager', function () {
|
|||
doc.lines = null
|
||||
|
||||
await expect(
|
||||
DocArchiveManager.promises.archiveDoc(projectId, doc)
|
||||
DocArchiveManager.promises.archiveDoc(projectId, doc._id)
|
||||
).to.eventually.be.rejectedWith('doc has no lines')
|
||||
})
|
||||
|
||||
it('should add the schema version', async function () {
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[1])
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[1]._id)
|
||||
expect(Streamifier.createReadStream).to.have.been.calledWith(
|
||||
sinon.match(/"schema_v":1/)
|
||||
)
|
||||
})
|
||||
|
||||
it('should calculate the hex md5 sum of the content', async function () {
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0])
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id)
|
||||
expect(Crypto.createHash).to.have.been.calledWith('md5')
|
||||
expect(HashUpdate).to.have.been.calledWith(archivedDocJson)
|
||||
expect(HashDigest).to.have.been.calledWith('hex')
|
||||
})
|
||||
|
||||
it('should pass the md5 hash to the object persistor for verification', async function () {
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0])
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id)
|
||||
|
||||
expect(PersistorManager.sendStream).to.have.been.calledWith(
|
||||
sinon.match.any,
|
||||
|
@ -202,7 +209,7 @@ describe('DocArchiveManager', function () {
|
|||
})
|
||||
|
||||
it('should pass the correct bucket and key to the persistor', async function () {
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0])
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id)
|
||||
|
||||
expect(PersistorManager.sendStream).to.have.been.calledWith(
|
||||
Settings.docstore.bucket,
|
||||
|
@ -211,7 +218,7 @@ describe('DocArchiveManager', function () {
|
|||
})
|
||||
|
||||
it('should create a stream from the encoded json and send it', async function () {
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0])
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id)
|
||||
expect(Streamifier.createReadStream).to.have.been.calledWith(
|
||||
archivedDocJson
|
||||
)
|
||||
|
@ -223,8 +230,9 @@ describe('DocArchiveManager', function () {
|
|||
})
|
||||
|
||||
it('should mark the doc as archived', async function () {
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0])
|
||||
await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id)
|
||||
expect(MongoManager.promises.markDocAsArchived).to.have.been.calledWith(
|
||||
projectId,
|
||||
mongoDocs[0]._id,
|
||||
mongoDocs[0].rev
|
||||
)
|
||||
|
@ -243,7 +251,7 @@ describe('DocArchiveManager', function () {
|
|||
|
||||
it('should return an error', async function () {
|
||||
await expect(
|
||||
DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0])
|
||||
DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id)
|
||||
).to.eventually.be.rejectedWith('null bytes detected')
|
||||
})
|
||||
})
|
||||
|
@ -452,22 +460,25 @@ describe('DocArchiveManager', function () {
|
|||
await DocArchiveManager.promises.archiveAllDocs(projectId)
|
||||
// not inS3
|
||||
expect(MongoManager.promises.markDocAsArchived).to.have.been.calledWith(
|
||||
projectId,
|
||||
mongoDocs[0]._id
|
||||
)
|
||||
expect(MongoManager.promises.markDocAsArchived).to.have.been.calledWith(
|
||||
projectId,
|
||||
mongoDocs[1]._id
|
||||
)
|
||||
expect(MongoManager.promises.markDocAsArchived).to.have.been.calledWith(
|
||||
projectId,
|
||||
mongoDocs[4]._id
|
||||
)
|
||||
|
||||
// inS3
|
||||
expect(
|
||||
MongoManager.promises.markDocAsArchived
|
||||
).not.to.have.been.calledWith(mongoDocs[2]._id)
|
||||
).not.to.have.been.calledWith(projectId, mongoDocs[2]._id)
|
||||
expect(
|
||||
MongoManager.promises.markDocAsArchived
|
||||
).not.to.have.been.calledWith(mongoDocs[3]._id)
|
||||
).not.to.have.been.calledWith(projectId, mongoDocs[3]._id)
|
||||
})
|
||||
})
|
||||
|
||||
|
|
|
@ -379,7 +379,7 @@ describe('DocManager', function () {
|
|||
.stub()
|
||||
.yields(null, { _id: ObjectId(this.doc_id) })
|
||||
this.MongoManager.patchDoc = sinon.stub().yields(null)
|
||||
this.DocArchiveManager.archiveDocById = sinon.stub().yields(null)
|
||||
this.DocArchiveManager.archiveDoc = sinon.stub().yields(null)
|
||||
this.meta = {}
|
||||
})
|
||||
|
||||
|
@ -429,7 +429,7 @@ describe('DocManager', function () {
|
|||
})
|
||||
|
||||
it('should not flush the doc out of mongo', function () {
|
||||
expect(this.DocArchiveManager.archiveDocById).to.not.have.been.called
|
||||
expect(this.DocArchiveManager.archiveDoc).to.not.have.been.called
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -447,7 +447,7 @@ describe('DocManager', function () {
|
|||
})
|
||||
|
||||
it('should not flush the doc out of mongo', function () {
|
||||
expect(this.DocArchiveManager.archiveDocById).to.not.have.been.called
|
||||
expect(this.DocArchiveManager.archiveDoc).to.not.have.been.called
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -459,7 +459,7 @@ describe('DocManager', function () {
|
|||
|
||||
describe('when the background flush succeeds', function () {
|
||||
beforeEach(function (done) {
|
||||
this.DocArchiveManager.archiveDocById = sinon.stub().yields(null)
|
||||
this.DocArchiveManager.archiveDoc = sinon.stub().yields(null)
|
||||
this.callback = sinon.stub().callsFake(done)
|
||||
this.DocManager.patchDoc(
|
||||
this.project_id,
|
||||
|
@ -474,18 +474,17 @@ describe('DocManager', function () {
|
|||
})
|
||||
|
||||
it('should flush the doc out of mongo', function () {
|
||||
expect(
|
||||
this.DocArchiveManager.archiveDocById
|
||||
).to.have.been.calledWith(this.project_id, this.doc_id)
|
||||
expect(this.DocArchiveManager.archiveDoc).to.have.been.calledWith(
|
||||
this.project_id,
|
||||
this.doc_id
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe('when the background flush fails', function () {
|
||||
beforeEach(function (done) {
|
||||
this.err = new Error('foo')
|
||||
this.DocArchiveManager.archiveDocById = sinon
|
||||
.stub()
|
||||
.yields(this.err)
|
||||
this.DocArchiveManager.archiveDoc = sinon.stub().yields(this.err)
|
||||
this.callback = sinon.stub().callsFake(done)
|
||||
this.DocManager.patchDoc(
|
||||
this.project_id,
|
||||
|
|
|
@ -23,7 +23,10 @@ describe('MongoManager', function () {
|
|||
ObjectId,
|
||||
},
|
||||
'@overleaf/metrics': { timeAsyncMethod: sinon.stub() },
|
||||
'@overleaf/settings': { max_deleted_docs: 42 },
|
||||
'@overleaf/settings': {
|
||||
max_deleted_docs: 42,
|
||||
docstore: { archivingLockDurationMs: 5000 },
|
||||
},
|
||||
'./Errors': Errors,
|
||||
},
|
||||
})
|
||||
|
|
Loading…
Add table
Reference in a new issue