mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
change mongo stream method (still have a bug in bulk insert limit)
This commit is contained in:
parent
fd4afb3574
commit
26c8048729
2 changed files with 81 additions and 2 deletions
|
@ -5,15 +5,92 @@ logger = require "logger-sharelatex"
|
|||
AWS = require 'aws-sdk'
|
||||
fs = require 'fs'
|
||||
S3S = require 's3-streams'
|
||||
{db, ObjectId} = require "./mongojs"
|
||||
JSONStream = require "JSONStream"
|
||||
ReadlineStream = require "readline-stream"
|
||||
|
||||
module.exports = MongoAWS =
|
||||
|
||||
bulkLimit: 10
|
||||
|
||||
archiveDocHistory: (project_id, doc_id, callback = (error) ->) ->
|
||||
query = {
|
||||
doc_id: ObjectId(doc_id)
|
||||
expiresAt: {$exists : false}
|
||||
}
|
||||
|
||||
AWS.config.update {
|
||||
accessKeyId: settings.filestore.s3.key
|
||||
secretAccessKey: settings.filestore.s3.secret
|
||||
}
|
||||
|
||||
upload = S3S.WriteStream new AWS.S3(), {
|
||||
"Bucket": settings.filestore.stores.user_files,
|
||||
"Key": project_id+"/changes-"+doc_id
|
||||
}
|
||||
|
||||
db.docHistory.find(query)
|
||||
.pipe JSONStream.stringify()
|
||||
.pipe upload
|
||||
.on 'finish', () ->
|
||||
return callback(null)
|
||||
|
||||
unArchiveDocHistory: (project_id, doc_id, callback = (error) ->) ->
|
||||
|
||||
AWS.config.update {
|
||||
accessKeyId: settings.filestore.s3.key
|
||||
secretAccessKey: settings.filestore.s3.secret
|
||||
}
|
||||
|
||||
download = S3S.ReadStream new AWS.S3(), {
|
||||
"Bucket": settings.filestore.stores.user_files,
|
||||
"Key": project_id+"/changes-"+doc_id
|
||||
}, {
|
||||
encoding: "utf8"
|
||||
}
|
||||
|
||||
lineStream = new ReadlineStream();
|
||||
ops = []
|
||||
|
||||
download
|
||||
.on 'open', (obj) ->
|
||||
return 1
|
||||
.pipe lineStream
|
||||
.on 'data', (line) ->
|
||||
if line.length > 2
|
||||
ops.push(JSON.parse(line))
|
||||
if ops.length > MongoAWS.bulkLimit
|
||||
MongoAWS.handleBulk ops, () ->
|
||||
ops.splice(0,ops.length)
|
||||
.on 'end', () ->
|
||||
MongoAWS.handleBulk ops, callback
|
||||
.on 'error', (err) ->
|
||||
return callback(err)
|
||||
|
||||
handleBulk: (ops, cb) ->
|
||||
bulk = db.docHistory.initializeUnorderedBulkOp();
|
||||
|
||||
for op in ops
|
||||
op._id = ObjectId(op._id)
|
||||
op.doc_id = ObjectId(op.doc_id)
|
||||
op.project_id = ObjectId(op.project_id)
|
||||
bulk.find({_id:op._id}).upsert().updateOne(op)
|
||||
|
||||
bulk.execute (err, result) ->
|
||||
if err?
|
||||
logger.error err:err, "error bulking ReadlineStream"
|
||||
else
|
||||
logger.log result:result, "bulked ReadlineStream"
|
||||
cb(err)
|
||||
|
||||
|
||||
archiveDocHistoryExternal: (project_id, doc_id, callback = (error) ->) ->
|
||||
MongoAWS.mongoExportDocHistory doc_id, (error, filepath) ->
|
||||
MongoAWS.s3upStream project_id, doc_id, filepath, callback
|
||||
#delete temp file?
|
||||
|
||||
unArchiveDocHistory: (project_id, doc_id, callback = (error) ->) ->
|
||||
|
||||
unArchiveDocHistoryExternal: (project_id, doc_id, callback = (error) ->) ->
|
||||
MongoAWS.s3downStream project_id, doc_id, (error, filepath) ->
|
||||
if error == null
|
||||
MongoAWS.mongoImportDocHistory filepath, callback
|
||||
|
|
|
@ -20,7 +20,9 @@
|
|||
"redis": "~0.10.1",
|
||||
"underscore": "~1.7.0",
|
||||
"mongo-uri": "^0.1.2",
|
||||
"s3-streams": "^0.3.0"
|
||||
"s3-streams": "^0.3.0",
|
||||
"JSONStream": "^1.0.4",
|
||||
"readline-stream": "^1.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"chai": "~1.9.0",
|
||||
|
|
Loading…
Reference in a new issue