diff --git a/services/docstore/app/js/DocArchiveManager.js b/services/docstore/app/js/DocArchiveManager.js index f763d081ee..8b9c44bbce 100644 --- a/services/docstore/app/js/DocArchiveManager.js +++ b/services/docstore/app/js/DocArchiveManager.js @@ -1,295 +1,179 @@ -/* eslint-disable - camelcase, - handle-callback-err, - no-useless-escape, -*/ -// TODO: This file was created by bulk-decaffeinate. -// Fix any style issues and re-enable lint. -/* - * decaffeinate suggestions: - * DS102: Remove unnecessary code created because of implicit returns - * DS207: Consider shorter variations of null checks - * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md - */ -let DocArchive +const { promisify, callbackify } = require('util') const MongoManager = require('./MongoManager') const Errors = require('./Errors') const logger = require('logger-sharelatex') -const _ = require('underscore') -const async = require('async') const settings = require('settings-sharelatex') -const request = require('request') const crypto = require('crypto') +const Streamifier = require('streamifier') const RangeManager = require('./RangeManager') -const thirtySeconds = 30 * 1000 +const PersistorManager = require('./PersistorManager') +const AsyncPool = require('tiny-async-pool') -module.exports = DocArchive = { - archiveAllDocs(project_id, callback) { - if (callback == null) { - callback = function (err, docs) {} - } - return MongoManager.getProjectsDocs( - project_id, - { include_deleted: true }, - { lines: true, ranges: true, rev: true, inS3: true }, - function (err, docs) { - if (err != null) { - return callback(err) - } else if (docs == null) { - return callback( - new Errors.NotFoundError(`No docs for project ${project_id}`) - ) - } - docs = _.filter(docs, (doc) => doc.inS3 !== true) - const jobs = _.map(docs, (doc) => (cb) => - DocArchive.archiveDoc(project_id, doc, cb) - ) - return async.parallelLimit(jobs, 5, callback) - } - ) - }, +const PARALLEL_JOBS = 5 - archiveDoc(project_id, doc, callback) { - let options - logger.log({ project_id, doc_id: doc._id }, 'sending doc to s3') - try { - options = DocArchive.buildS3Options(project_id + '/' + doc._id) - } catch (e) { - return callback(e) - } - return DocArchive._mongoDocToS3Doc(doc, function (error, json_doc) { - if (error != null) { - return callback(error) - } - options.body = json_doc - options.headers = { 'Content-Type': 'application/json' } - return request.put(options, function (err, res) { - if (err != null || res.statusCode !== 200) { - logger.err( - { - err, - res, - project_id, - doc_id: doc._id, - statusCode: res != null ? res.statusCode : undefined - }, - 'something went wrong archiving doc in aws' - ) - return callback(new Error('Error in S3 request')) - } - const md5lines = crypto - .createHash('md5') - .update(json_doc, 'utf8') - .digest('hex') - const md5response = res.headers.etag.toString().replace(/\"/g, '') - if (md5lines !== md5response) { - logger.err( - { - responseMD5: md5response, - linesMD5: md5lines, - project_id, - doc_id: doc != null ? doc._id : undefined - }, - 'err in response md5 from s3' - ) - return callback(new Error('Error in S3 md5 response')) - } - return MongoManager.markDocAsArchived(doc._id, doc.rev, function (err) { - if (err != null) { - return callback(err) - } - return callback() - }) - }) - }) - }, - - unArchiveAllDocs(project_id, callback) { - if (callback == null) { - callback = function (err) {} - } - return MongoManager.getArchivedProjectDocs(project_id, function ( - err, - docs - ) { - if (err != null) { - logger.err({ err, project_id }, 'error unarchiving all docs') - return callback(err) - } else if (docs == null) { - return callback( - new Errors.NotFoundError(`No docs for project ${project_id}`) - ) - } - const jobs = _.map( - docs, - (doc) => - function (cb) { - if (doc.inS3 == null) { - return cb() - } else { - return DocArchive.unarchiveDoc(project_id, doc._id, cb) - } - } - ) - return async.parallelLimit(jobs, 5, callback) - }) - }, - - unarchiveDoc(project_id, doc_id, callback) { - let options - logger.log({ project_id, doc_id }, 'getting doc from s3') - try { - options = DocArchive.buildS3Options(project_id + '/' + doc_id) - } catch (e) { - return callback(e) - } - options.json = true - return request.get(options, function (err, res, doc) { - if (err != null || res.statusCode !== 200) { - logger.err( - { err, res, project_id, doc_id }, - 'something went wrong unarchiving doc from aws' - ) - return callback(new Errors.NotFoundError('Error in S3 request')) - } - return DocArchive._s3DocToMongoDoc(doc, function (error, mongo_doc) { - if (error != null) { - return callback(error) - } - return MongoManager.upsertIntoDocCollection( - project_id, - doc_id.toString(), - mongo_doc, - function (err) { - if (err != null) { - return callback(err) - } - logger.log({ project_id, doc_id }, 'deleting doc from s3') - return DocArchive._deleteDocFromS3(project_id, doc_id, callback) - } - ) - }) - }) - }, - - destroyAllDocs(project_id, callback) { - if (callback == null) { - callback = function (err) {} - } - return MongoManager.getProjectsDocs( - project_id, - { include_deleted: true }, - { _id: 1 }, - function (err, docs) { - if (err != null) { - logger.err({ err, project_id }, "error getting project's docs") - return callback(err) - } else if (docs == null) { - return callback() - } - const jobs = _.map(docs, (doc) => (cb) => - DocArchive.destroyDoc(project_id, doc._id, cb) - ) - return async.parallelLimit(jobs, 5, callback) - } - ) - }, - - destroyDoc(project_id, doc_id, callback) { - logger.log({ project_id, doc_id }, 'removing doc from mongo and s3') - return MongoManager.findDoc(project_id, doc_id, { inS3: 1 }, function ( - error, - doc - ) { - if (error != null) { - return callback(error) - } - if (doc == null) { - return callback(new Errors.NotFoundError('Doc not found in Mongo')) - } - if (doc.inS3 === true) { - return DocArchive._deleteDocFromS3(project_id, doc_id, function (err) { - if (err != null) { - return err - } - return MongoManager.destroyDoc(doc_id, callback) - }) - } else { - return MongoManager.destroyDoc(doc_id, callback) - } - }) - }, - - _deleteDocFromS3(project_id, doc_id, callback) { - let options - try { - options = DocArchive.buildS3Options(project_id + '/' + doc_id) - } catch (e) { - return callback(e) - } - options.json = true - return request.del(options, function (err, res, body) { - if (err != null || res.statusCode !== 204) { - logger.err( - { err, res, project_id, doc_id }, - 'something went wrong deleting doc from aws' - ) - return callback(new Error('Error in S3 request')) - } - return callback() - }) - }, - - _s3DocToMongoDoc(doc, callback) { - if (callback == null) { - callback = function (error, mongo_doc) {} - } - const mongo_doc = {} - if (doc.schema_v === 1 && doc.lines != null) { - mongo_doc.lines = doc.lines - if (doc.ranges != null) { - mongo_doc.ranges = RangeManager.jsonRangesToMongo(doc.ranges) - } - } else if (doc instanceof Array) { - mongo_doc.lines = doc - } else { - return callback(new Error("I don't understand the doc format in s3")) - } - return callback(null, mongo_doc) - }, - - _mongoDocToS3Doc(doc, callback) { - if (callback == null) { - callback = function (error, s3_doc) {} - } - if (doc.lines == null) { - return callback(new Error('doc has no lines')) - } - const json = JSON.stringify({ - lines: doc.lines, - ranges: doc.ranges, - schema_v: 1 - }) - if (json.indexOf('\u0000') !== -1) { - const error = new Error('null bytes detected') - logger.err({ err: error, doc, json }, error.message) - return callback(error) - } - return callback(null, json) - }, - - buildS3Options(key) { - if (settings.docstore.s3 == null) { - throw new Error('S3 settings are not configured') - } - return { - aws: { - key: settings.docstore.s3.key, - secret: settings.docstore.s3.secret, - bucket: settings.docstore.s3.bucket - }, - timeout: thirtySeconds, - uri: `https://${settings.docstore.s3.bucket}.s3.amazonaws.com/${key}` - } +module.exports = { + archiveAllDocs: callbackify(archiveAllDocs), + archiveDoc: callbackify(archiveDoc), + unArchiveAllDocs: callbackify(unArchiveAllDocs), + unarchiveDoc: callbackify(unarchiveDoc), + destroyAllDocs: callbackify(destroyAllDocs), + destroyDoc: callbackify(destroyDoc), + promises: { + archiveAllDocs, + archiveDoc, + unArchiveAllDocs, + unarchiveDoc, + destroyAllDocs, + destroyDoc } } + +async function archiveAllDocs(projectId) { + const docs = await promisify(MongoManager.getProjectsDocs)( + projectId, + { include_deleted: true }, + { lines: true, ranges: true, rev: true, inS3: true } + ) + + if (!docs) { + throw new Errors.NotFoundError(`No docs for project ${projectId}`) + } + + await AsyncPool( + PARALLEL_JOBS, + docs.filter((doc) => !doc.inS3), + (doc) => archiveDoc(projectId, doc) + ) +} + +async function archiveDoc(projectId, doc) { + logger.log( + { project_id: projectId, doc_id: doc._id }, + 'sending doc to persistor' + ) + const key = `${projectId}/${doc._id}` + + if (doc.lines == null) { + throw new Error('doc has no lines') + } + + const json = JSON.stringify({ + lines: doc.lines, + ranges: doc.ranges, + schema_v: 1 + }) + + // this should never happen, but protects against memory-corruption errors that + // have happened in the past + if (json.indexOf('\u0000') > -1) { + const error = new Error('null bytes detected') + logger.err({ err: error, doc }, error.message) + throw error + } + + const md5 = crypto.createHash('md5').update(json).digest('hex') + const stream = Streamifier.createReadStream(json) + await PersistorManager.sendStream(settings.docstore.bucket, key, stream, { + sourceMd5: md5 + }) + await promisify(MongoManager.markDocAsArchived)(doc._id, doc.rev) +} + +async function unArchiveAllDocs(projectId) { + const docs = await promisify(MongoManager.getArchivedProjectDocs)(projectId) + if (!docs) { + throw new Errors.NotFoundError(`No docs for project ${projectId}`) + } + await AsyncPool(PARALLEL_JOBS, docs, (doc) => + unarchiveDoc(projectId, doc._id) + ) +} + +async function unarchiveDoc(projectId, docId) { + logger.log( + { project_id: projectId, doc_id: docId }, + 'getting doc from persistor' + ) + const key = `${projectId}/${docId}` + const sourceMd5 = await PersistorManager.getObjectMd5Hash( + settings.docstore.bucket, + key + ) + const stream = await PersistorManager.getObjectStream( + settings.docstore.bucket, + key + ) + stream.resume() + const json = await _streamToString(stream) + const md5 = crypto.createHash('md5').update(json).digest('hex') + if (sourceMd5 !== md5) { + throw new Errors.Md5MismatchError('md5 mismatch when downloading doc', { + key, + sourceMd5, + md5 + }) + } + + const doc = JSON.parse(json) + + const mongoDoc = {} + if (doc.schema_v === 1 && doc.lines != null) { + mongoDoc.lines = doc.lines + if (doc.ranges != null) { + mongoDoc.ranges = RangeManager.jsonRangesToMongo(doc.ranges) + } + } else if (Array.isArray(doc)) { + mongoDoc.lines = doc + } else { + throw new Error("I don't understand the doc format in s3") + } + await promisify(MongoManager.upsertIntoDocCollection)( + projectId, + docId, + mongoDoc + ) + await PersistorManager.deleteObject(settings.docstore.bucket, key) +} + +async function destroyAllDocs(projectId) { + const docs = await promisify(MongoManager.getProjectsDocs)( + projectId, + { include_deleted: true }, + { _id: 1 } + ) + if (docs) { + await AsyncPool(PARALLEL_JOBS, docs, (doc) => + destroyDoc(projectId, doc._id) + ) + } +} + +async function destroyDoc(projectId, docId) { + logger.log( + { project_id: projectId, doc_id: docId }, + 'removing doc from mongo and persistor' + ) + const doc = await promisify(MongoManager.findDoc)(projectId, docId, { + inS3: 1 + }) + if (!doc) { + throw new Errors.NotFoundError('Doc not found in Mongo') + } + + if (doc.inS3) { + await PersistorManager.deleteObject( + settings.docstore.bucket, + `${projectId}/${docId}` + ) + } + await promisify(MongoManager.destroyDoc)(docId) +} + +async function _streamToString(stream) { + const chunks = [] + return new Promise((resolve, reject) => { + stream.on('data', (chunk) => chunks.push(chunk)) + stream.on('error', reject) + stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))) + }) +} diff --git a/services/docstore/app/js/Errors.js b/services/docstore/app/js/Errors.js index a950b6de20..6a74485494 100644 --- a/services/docstore/app/js/Errors.js +++ b/services/docstore/app/js/Errors.js @@ -1,16 +1,10 @@ -/* eslint-disable - no-proto, - no-unused-vars, -*/ -// TODO: This file was created by bulk-decaffeinate. -// Fix any style issues and re-enable lint. -let Errors -var NotFoundError = function (message) { - const error = new Error(message) - error.name = 'NotFoundError' - error.__proto__ = NotFoundError.prototype - return error -} -NotFoundError.prototype.__proto__ = Error.prototype +// import Errors from object-persistor to pass instanceof checks +const OError = require('@overleaf/o-error') +const { Errors } = require('@overleaf/object-persistor') -module.exports = Errors = { NotFoundError } +class Md5MismatchError extends OError {} + +module.exports = { + Md5MismatchError, + ...Errors +} diff --git a/services/docstore/app/js/PersistorManager.js b/services/docstore/app/js/PersistorManager.js new file mode 100644 index 0000000000..b3d194a210 --- /dev/null +++ b/services/docstore/app/js/PersistorManager.js @@ -0,0 +1,9 @@ +const settings = require('settings-sharelatex') + +const persistorSettings = settings.docstore +persistorSettings.Metrics = require('metrics-sharelatex') + +const ObjectPersistor = require('@overleaf/object-persistor') +const persistor = ObjectPersistor(persistorSettings) + +module.exports = persistor