diff --git a/services/track-changes/app/coffee/MongoAWS.coffee b/services/track-changes/app/coffee/MongoAWS.coffee index 96435fd064..d5d243cb2d 100644 --- a/services/track-changes/app/coffee/MongoAWS.coffee +++ b/services/track-changes/app/coffee/MongoAWS.coffee @@ -5,15 +5,92 @@ logger = require "logger-sharelatex" AWS = require 'aws-sdk' fs = require 'fs' S3S = require 's3-streams' +{db, ObjectId} = require "./mongojs" +JSONStream = require "JSONStream" +ReadlineStream = require "readline-stream" module.exports = MongoAWS = + bulkLimit: 10 + archiveDocHistory: (project_id, doc_id, callback = (error) ->) -> + query = { + doc_id: ObjectId(doc_id) + expiresAt: {$exists : false} + } + + AWS.config.update { + accessKeyId: settings.filestore.s3.key + secretAccessKey: settings.filestore.s3.secret + } + + upload = S3S.WriteStream new AWS.S3(), { + "Bucket": settings.filestore.stores.user_files, + "Key": project_id+"/changes-"+doc_id + } + + db.docHistory.find(query) + .pipe JSONStream.stringify() + .pipe upload + .on 'finish', () -> + return callback(null) + + unArchiveDocHistory: (project_id, doc_id, callback = (error) ->) -> + + AWS.config.update { + accessKeyId: settings.filestore.s3.key + secretAccessKey: settings.filestore.s3.secret + } + + download = S3S.ReadStream new AWS.S3(), { + "Bucket": settings.filestore.stores.user_files, + "Key": project_id+"/changes-"+doc_id + }, { + encoding: "utf8" + } + + lineStream = new ReadlineStream(); + ops = [] + + download + .on 'open', (obj) -> + return 1 + .pipe lineStream + .on 'data', (line) -> + if line.length > 2 + ops.push(JSON.parse(line)) + if ops.length > MongoAWS.bulkLimit + MongoAWS.handleBulk ops, () -> + ops.splice(0,ops.length) + .on 'end', () -> + MongoAWS.handleBulk ops, callback + .on 'error', (err) -> + return callback(err) + + handleBulk: (ops, cb) -> + bulk = db.docHistory.initializeUnorderedBulkOp(); + + for op in ops + op._id = ObjectId(op._id) + op.doc_id = ObjectId(op.doc_id) + op.project_id = ObjectId(op.project_id) + bulk.find({_id:op._id}).upsert().updateOne(op) + + bulk.execute (err, result) -> + if err? + logger.error err:err, "error bulking ReadlineStream" + else + logger.log result:result, "bulked ReadlineStream" + cb(err) + + + archiveDocHistoryExternal: (project_id, doc_id, callback = (error) ->) -> MongoAWS.mongoExportDocHistory doc_id, (error, filepath) -> MongoAWS.s3upStream project_id, doc_id, filepath, callback #delete temp file? - unArchiveDocHistory: (project_id, doc_id, callback = (error) ->) -> + + unArchiveDocHistoryExternal: (project_id, doc_id, callback = (error) ->) -> MongoAWS.s3downStream project_id, doc_id, (error, filepath) -> if error == null MongoAWS.mongoImportDocHistory filepath, callback diff --git a/services/track-changes/package.json b/services/track-changes/package.json index f82b618359..4fe112262a 100644 --- a/services/track-changes/package.json +++ b/services/track-changes/package.json @@ -20,7 +20,9 @@ "redis": "~0.10.1", "underscore": "~1.7.0", "mongo-uri": "^0.1.2", - "s3-streams": "^0.3.0" + "s3-streams": "^0.3.0", + "JSONStream": "^1.0.4", + "readline-stream": "^1.0.1" }, "devDependencies": { "chai": "~1.9.0",