overleaf/services/track-changes/app/coffee/MongoAWS.coffee

115 lines
2.7 KiB
CoffeeScript
Raw Normal View History

2015-08-06 14:46:44 -04:00
settings = require "settings-sharelatex"
logger = require "logger-sharelatex"
AWS = require 'aws-sdk'
2015-08-09 14:47:47 -04:00
S3S = require 's3-streams'
{db, ObjectId} = require "./mongojs"
JSONStream = require "JSONStream"
ReadlineStream = require "byline"
2015-08-06 14:46:44 -04:00
module.exports = MongoAWS =
2015-09-17 08:23:13 -04:00
MAX_SIZE: 1024*1024 # almost max size
MAX_COUNT: 512 # almost max count
archiveDocHistory: (project_id, doc_id, update, _callback = (error) ->) ->
callback = (args...) ->
_callback(args...)
_callback = () ->
query = {
doc_id: ObjectId(doc_id)
v: {$lte: update.v}
expiresAt: {$exists : false}
}
AWS.config.update {
accessKeyId: settings.filestore.s3.key
secretAccessKey: settings.filestore.s3.secret
}
2016-01-12 05:36:00 -05:00
logger.log {project_id, doc_id}, "uploading data to s3"
upload = S3S.WriteStream new AWS.S3(), {
"Bucket": settings.filestore.stores.user_files,
"Key": project_id+"/changes-"+doc_id
}
db.docHistory.find(query)
.on 'error', (err) ->
callback(err)
.pipe JSONStream.stringify()
.pipe upload
.on 'error', (err) ->
callback(err)
.on 'finish', () ->
return callback(null)
unArchiveDocHistory: (project_id, doc_id, _callback = (error) ->) ->
callback = (args...) ->
_callback(args...)
_callback = () ->
AWS.config.update {
accessKeyId: settings.filestore.s3.key
secretAccessKey: settings.filestore.s3.secret
}
2016-01-12 05:36:00 -05:00
logger.log {project_id, doc_id}, "downloading data from s3"
download = S3S.ReadStream new AWS.S3(), {
"Bucket": settings.filestore.stores.user_files,
"Key": project_id+"/changes-"+doc_id
}, {
encoding: "utf8"
}
lineStream = new ReadlineStream();
ops = []
2015-09-17 08:23:13 -04:00
sz = 0
inputStream = download
.on 'open', (obj) ->
return 1
.on 'error', (err) ->
callback(err)
.pipe lineStream
inputStream.on 'data', (line) ->
if line.length > 2
try
ops.push(JSON.parse(line))
catch err
return callback(err)
sz += line.length
2015-09-17 08:23:13 -04:00
if ops.length >= MongoAWS.MAX_COUNT || sz >= MongoAWS.MAX_SIZE
inputStream.pause()
2015-09-17 09:41:53 -04:00
MongoAWS.handleBulk ops.slice(0), sz, () ->
inputStream.resume()
2015-08-14 18:58:38 -04:00
ops.splice(0,ops.length)
2015-09-17 08:23:13 -04:00
sz = 0
.on 'end', () ->
2015-09-17 09:41:53 -04:00
MongoAWS.handleBulk ops, sz, callback
.on 'error', (err) ->
return callback(err)
2015-09-17 09:41:53 -04:00
handleBulk: (ops, size, cb) ->
bulk = db.docHistory.initializeUnorderedBulkOp();
for op in ops
op._id = ObjectId(op._id)
op.doc_id = ObjectId(op.doc_id)
op.project_id = ObjectId(op.project_id)
bulk.find({_id:op._id}).upsert().updateOne(op)
2015-09-03 07:36:32 -04:00
if ops.length > 0
bulk.execute (err, result) ->
if err?
logger.error err:err, "error bulking ReadlineStream"
else
2015-09-17 09:41:53 -04:00
logger.log count:ops.length, result:result, size: size, "bulked ReadlineStream"
2015-09-03 07:36:32 -04:00
cb(err)
else
cb()