Implement object-persistor support

This commit is contained in:
Simon Detheridge 2020-07-23 19:42:49 +01:00
parent 812c996c47
commit 6e04db552a
3 changed files with 187 additions and 300 deletions

View file

@ -1,295 +1,179 @@
/* eslint-disable const { promisify, callbackify } = require('util')
camelcase,
handle-callback-err,
no-useless-escape,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let DocArchive
const MongoManager = require('./MongoManager') const MongoManager = require('./MongoManager')
const Errors = require('./Errors') const Errors = require('./Errors')
const logger = require('logger-sharelatex') const logger = require('logger-sharelatex')
const _ = require('underscore')
const async = require('async')
const settings = require('settings-sharelatex') const settings = require('settings-sharelatex')
const request = require('request')
const crypto = require('crypto') const crypto = require('crypto')
const Streamifier = require('streamifier')
const RangeManager = require('./RangeManager') const RangeManager = require('./RangeManager')
const thirtySeconds = 30 * 1000 const PersistorManager = require('./PersistorManager')
const AsyncPool = require('tiny-async-pool')
module.exports = DocArchive = { const PARALLEL_JOBS = 5
archiveAllDocs(project_id, callback) {
if (callback == null) { module.exports = {
callback = function (err, docs) {} archiveAllDocs: callbackify(archiveAllDocs),
archiveDoc: callbackify(archiveDoc),
unArchiveAllDocs: callbackify(unArchiveAllDocs),
unarchiveDoc: callbackify(unarchiveDoc),
destroyAllDocs: callbackify(destroyAllDocs),
destroyDoc: callbackify(destroyDoc),
promises: {
archiveAllDocs,
archiveDoc,
unArchiveAllDocs,
unarchiveDoc,
destroyAllDocs,
destroyDoc
} }
return MongoManager.getProjectsDocs( }
project_id,
async function archiveAllDocs(projectId) {
const docs = await promisify(MongoManager.getProjectsDocs)(
projectId,
{ include_deleted: true }, { include_deleted: true },
{ lines: true, ranges: true, rev: true, inS3: true }, { lines: true, ranges: true, rev: true, inS3: true }
function (err, docs) {
if (err != null) {
return callback(err)
} else if (docs == null) {
return callback(
new Errors.NotFoundError(`No docs for project ${project_id}`)
) )
}
docs = _.filter(docs, (doc) => doc.inS3 !== true)
const jobs = _.map(docs, (doc) => (cb) =>
DocArchive.archiveDoc(project_id, doc, cb)
)
return async.parallelLimit(jobs, 5, callback)
}
)
},
archiveDoc(project_id, doc, callback) { if (!docs) {
let options throw new Errors.NotFoundError(`No docs for project ${projectId}`)
logger.log({ project_id, doc_id: doc._id }, 'sending doc to s3')
try {
options = DocArchive.buildS3Options(project_id + '/' + doc._id)
} catch (e) {
return callback(e)
} }
return DocArchive._mongoDocToS3Doc(doc, function (error, json_doc) {
if (error != null) {
return callback(error)
}
options.body = json_doc
options.headers = { 'Content-Type': 'application/json' }
return request.put(options, function (err, res) {
if (err != null || res.statusCode !== 200) {
logger.err(
{
err,
res,
project_id,
doc_id: doc._id,
statusCode: res != null ? res.statusCode : undefined
},
'something went wrong archiving doc in aws'
)
return callback(new Error('Error in S3 request'))
}
const md5lines = crypto
.createHash('md5')
.update(json_doc, 'utf8')
.digest('hex')
const md5response = res.headers.etag.toString().replace(/\"/g, '')
if (md5lines !== md5response) {
logger.err(
{
responseMD5: md5response,
linesMD5: md5lines,
project_id,
doc_id: doc != null ? doc._id : undefined
},
'err in response md5 from s3'
)
return callback(new Error('Error in S3 md5 response'))
}
return MongoManager.markDocAsArchived(doc._id, doc.rev, function (err) {
if (err != null) {
return callback(err)
}
return callback()
})
})
})
},
unArchiveAllDocs(project_id, callback) { await AsyncPool(
if (callback == null) { PARALLEL_JOBS,
callback = function (err) {} docs.filter((doc) => !doc.inS3),
} (doc) => archiveDoc(projectId, doc)
return MongoManager.getArchivedProjectDocs(project_id, function (
err,
docs
) {
if (err != null) {
logger.err({ err, project_id }, 'error unarchiving all docs')
return callback(err)
} else if (docs == null) {
return callback(
new Errors.NotFoundError(`No docs for project ${project_id}`)
) )
} }
const jobs = _.map(
docs,
(doc) =>
function (cb) {
if (doc.inS3 == null) {
return cb()
} else {
return DocArchive.unarchiveDoc(project_id, doc._id, cb)
}
}
)
return async.parallelLimit(jobs, 5, callback)
})
},
unarchiveDoc(project_id, doc_id, callback) { async function archiveDoc(projectId, doc) {
let options logger.log(
logger.log({ project_id, doc_id }, 'getting doc from s3') { project_id: projectId, doc_id: doc._id },
try { 'sending doc to persistor'
options = DocArchive.buildS3Options(project_id + '/' + doc_id)
} catch (e) {
return callback(e)
}
options.json = true
return request.get(options, function (err, res, doc) {
if (err != null || res.statusCode !== 200) {
logger.err(
{ err, res, project_id, doc_id },
'something went wrong unarchiving doc from aws'
) )
return callback(new Errors.NotFoundError('Error in S3 request')) const key = `${projectId}/${doc._id}`
}
return DocArchive._s3DocToMongoDoc(doc, function (error, mongo_doc) {
if (error != null) {
return callback(error)
}
return MongoManager.upsertIntoDocCollection(
project_id,
doc_id.toString(),
mongo_doc,
function (err) {
if (err != null) {
return callback(err)
}
logger.log({ project_id, doc_id }, 'deleting doc from s3')
return DocArchive._deleteDocFromS3(project_id, doc_id, callback)
}
)
})
})
},
destroyAllDocs(project_id, callback) {
if (callback == null) {
callback = function (err) {}
}
return MongoManager.getProjectsDocs(
project_id,
{ include_deleted: true },
{ _id: 1 },
function (err, docs) {
if (err != null) {
logger.err({ err, project_id }, "error getting project's docs")
return callback(err)
} else if (docs == null) {
return callback()
}
const jobs = _.map(docs, (doc) => (cb) =>
DocArchive.destroyDoc(project_id, doc._id, cb)
)
return async.parallelLimit(jobs, 5, callback)
}
)
},
destroyDoc(project_id, doc_id, callback) {
logger.log({ project_id, doc_id }, 'removing doc from mongo and s3')
return MongoManager.findDoc(project_id, doc_id, { inS3: 1 }, function (
error,
doc
) {
if (error != null) {
return callback(error)
}
if (doc == null) {
return callback(new Errors.NotFoundError('Doc not found in Mongo'))
}
if (doc.inS3 === true) {
return DocArchive._deleteDocFromS3(project_id, doc_id, function (err) {
if (err != null) {
return err
}
return MongoManager.destroyDoc(doc_id, callback)
})
} else {
return MongoManager.destroyDoc(doc_id, callback)
}
})
},
_deleteDocFromS3(project_id, doc_id, callback) {
let options
try {
options = DocArchive.buildS3Options(project_id + '/' + doc_id)
} catch (e) {
return callback(e)
}
options.json = true
return request.del(options, function (err, res, body) {
if (err != null || res.statusCode !== 204) {
logger.err(
{ err, res, project_id, doc_id },
'something went wrong deleting doc from aws'
)
return callback(new Error('Error in S3 request'))
}
return callback()
})
},
_s3DocToMongoDoc(doc, callback) {
if (callback == null) {
callback = function (error, mongo_doc) {}
}
const mongo_doc = {}
if (doc.schema_v === 1 && doc.lines != null) {
mongo_doc.lines = doc.lines
if (doc.ranges != null) {
mongo_doc.ranges = RangeManager.jsonRangesToMongo(doc.ranges)
}
} else if (doc instanceof Array) {
mongo_doc.lines = doc
} else {
return callback(new Error("I don't understand the doc format in s3"))
}
return callback(null, mongo_doc)
},
_mongoDocToS3Doc(doc, callback) {
if (callback == null) {
callback = function (error, s3_doc) {}
}
if (doc.lines == null) { if (doc.lines == null) {
return callback(new Error('doc has no lines')) throw new Error('doc has no lines')
} }
const json = JSON.stringify({ const json = JSON.stringify({
lines: doc.lines, lines: doc.lines,
ranges: doc.ranges, ranges: doc.ranges,
schema_v: 1 schema_v: 1
}) })
if (json.indexOf('\u0000') !== -1) {
const error = new Error('null bytes detected')
logger.err({ err: error, doc, json }, error.message)
return callback(error)
}
return callback(null, json)
},
buildS3Options(key) { // this should never happen, but protects against memory-corruption errors that
if (settings.docstore.s3 == null) { // have happened in the past
throw new Error('S3 settings are not configured') if (json.indexOf('\u0000') > -1) {
const error = new Error('null bytes detected')
logger.err({ err: error, doc }, error.message)
throw error
} }
return {
aws: { const md5 = crypto.createHash('md5').update(json).digest('hex')
key: settings.docstore.s3.key, const stream = Streamifier.createReadStream(json)
secret: settings.docstore.s3.secret, await PersistorManager.sendStream(settings.docstore.bucket, key, stream, {
bucket: settings.docstore.s3.bucket sourceMd5: md5
}, })
timeout: thirtySeconds, await promisify(MongoManager.markDocAsArchived)(doc._id, doc.rev)
uri: `https://${settings.docstore.s3.bucket}.s3.amazonaws.com/${key}` }
async function unArchiveAllDocs(projectId) {
const docs = await promisify(MongoManager.getArchivedProjectDocs)(projectId)
if (!docs) {
throw new Errors.NotFoundError(`No docs for project ${projectId}`)
} }
await AsyncPool(PARALLEL_JOBS, docs, (doc) =>
unarchiveDoc(projectId, doc._id)
)
}
async function unarchiveDoc(projectId, docId) {
logger.log(
{ project_id: projectId, doc_id: docId },
'getting doc from persistor'
)
const key = `${projectId}/${docId}`
const sourceMd5 = await PersistorManager.getObjectMd5Hash(
settings.docstore.bucket,
key
)
const stream = await PersistorManager.getObjectStream(
settings.docstore.bucket,
key
)
stream.resume()
const json = await _streamToString(stream)
const md5 = crypto.createHash('md5').update(json).digest('hex')
if (sourceMd5 !== md5) {
throw new Errors.Md5MismatchError('md5 mismatch when downloading doc', {
key,
sourceMd5,
md5
})
}
const doc = JSON.parse(json)
const mongoDoc = {}
if (doc.schema_v === 1 && doc.lines != null) {
mongoDoc.lines = doc.lines
if (doc.ranges != null) {
mongoDoc.ranges = RangeManager.jsonRangesToMongo(doc.ranges)
}
} else if (Array.isArray(doc)) {
mongoDoc.lines = doc
} else {
throw new Error("I don't understand the doc format in s3")
}
await promisify(MongoManager.upsertIntoDocCollection)(
projectId,
docId,
mongoDoc
)
await PersistorManager.deleteObject(settings.docstore.bucket, key)
}
async function destroyAllDocs(projectId) {
const docs = await promisify(MongoManager.getProjectsDocs)(
projectId,
{ include_deleted: true },
{ _id: 1 }
)
if (docs) {
await AsyncPool(PARALLEL_JOBS, docs, (doc) =>
destroyDoc(projectId, doc._id)
)
} }
} }
async function destroyDoc(projectId, docId) {
logger.log(
{ project_id: projectId, doc_id: docId },
'removing doc from mongo and persistor'
)
const doc = await promisify(MongoManager.findDoc)(projectId, docId, {
inS3: 1
})
if (!doc) {
throw new Errors.NotFoundError('Doc not found in Mongo')
}
if (doc.inS3) {
await PersistorManager.deleteObject(
settings.docstore.bucket,
`${projectId}/${docId}`
)
}
await promisify(MongoManager.destroyDoc)(docId)
}
async function _streamToString(stream) {
const chunks = []
return new Promise((resolve, reject) => {
stream.on('data', (chunk) => chunks.push(chunk))
stream.on('error', reject)
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')))
})
}

View file

@ -1,16 +1,10 @@
/* eslint-disable // import Errors from object-persistor to pass instanceof checks
no-proto, const OError = require('@overleaf/o-error')
no-unused-vars, const { Errors } = require('@overleaf/object-persistor')
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
let Errors
var NotFoundError = function (message) {
const error = new Error(message)
error.name = 'NotFoundError'
error.__proto__ = NotFoundError.prototype
return error
}
NotFoundError.prototype.__proto__ = Error.prototype
module.exports = Errors = { NotFoundError } class Md5MismatchError extends OError {}
module.exports = {
Md5MismatchError,
...Errors
}

View file

@ -0,0 +1,9 @@
const settings = require('settings-sharelatex')
const persistorSettings = settings.docstore
persistorSettings.Metrics = require('metrics-sharelatex')
const ObjectPersistor = require('@overleaf/object-persistor')
const persistor = ObjectPersistor(persistorSettings)
module.exports = persistor