mirror of
https://github.com/overleaf/overleaf.git
synced 2024-10-31 21:21:03 -04:00
116 lines
3.9 KiB
CoffeeScript
116 lines
3.9 KiB
CoffeeScript
settings = require "settings-sharelatex"
|
|
logger = require "logger-sharelatex"
|
|
AWS = require 'aws-sdk'
|
|
S3S = require 's3-streams'
|
|
{db, ObjectId} = require "./mongojs"
|
|
JSONStream = require "JSONStream"
|
|
ReadlineStream = require "byline"
|
|
zlib = require "zlib"
|
|
Metrics = require "metrics-sharelatex"
|
|
|
|
DAYS = 24 * 3600 * 1000 # one day in milliseconds
|
|
|
|
createStream = (streamConstructor, project_id, doc_id, pack_id) ->
|
|
AWS_CONFIG =
|
|
accessKeyId: settings.trackchanges.s3.key
|
|
secretAccessKey: settings.trackchanges.s3.secret
|
|
|
|
return streamConstructor new AWS.S3(AWS_CONFIG), {
|
|
"Bucket": settings.trackchanges.stores.doc_history,
|
|
"Key": project_id+"/changes-"+doc_id+"/pack-"+pack_id
|
|
}
|
|
|
|
module.exports = MongoAWS =
|
|
|
|
archivePack: (project_id, doc_id, pack_id, _callback = (error) ->) ->
|
|
|
|
callback = (args...) ->
|
|
_callback(args...)
|
|
_callback = () ->
|
|
|
|
query = {
|
|
_id: ObjectId(pack_id)
|
|
doc_id: ObjectId(doc_id)
|
|
}
|
|
|
|
return callback new Error("invalid project id") if not project_id?
|
|
return callback new Error("invalid doc id") if not doc_id?
|
|
return callback new Error("invalid pack id") if not pack_id?
|
|
|
|
logger.log {project_id, doc_id, pack_id}, "uploading data to s3"
|
|
|
|
upload = createStream S3S.WriteStream, project_id, doc_id, pack_id
|
|
|
|
db.docHistory.findOne query, (err, result) ->
|
|
return callback(err) if err?
|
|
return callback new Error("cannot find pack to send to s3") if not result?
|
|
return callback new Error("refusing to send pack with TTL to s3") if result.expiresAt?
|
|
uncompressedData = JSON.stringify(result)
|
|
if uncompressedData.indexOf("\u0000") != -1
|
|
error = new Error("null bytes found in upload")
|
|
logger.error err: error, project_id: project_id, doc_id: doc_id, pack_id: pack_id, error.message
|
|
return callback(error)
|
|
zlib.gzip uncompressedData, (err, buf) ->
|
|
logger.log {project_id, doc_id, pack_id, origSize: uncompressedData.length, newSize: buf.length}, "compressed pack"
|
|
return callback(err) if err?
|
|
upload.on 'error', (err) ->
|
|
callback(err)
|
|
upload.on 'finish', () ->
|
|
Metrics.inc("archive-pack")
|
|
logger.log {project_id, doc_id, pack_id}, "upload to s3 completed"
|
|
callback(null)
|
|
upload.write buf
|
|
upload.end()
|
|
|
|
readArchivedPack: (project_id, doc_id, pack_id, _callback = (error, result) ->) ->
|
|
callback = (args...) ->
|
|
_callback(args...)
|
|
_callback = () ->
|
|
|
|
return callback new Error("invalid project id") if not project_id?
|
|
return callback new Error("invalid doc id") if not doc_id?
|
|
return callback new Error("invalid pack id") if not pack_id?
|
|
|
|
logger.log {project_id, doc_id, pack_id}, "downloading data from s3"
|
|
|
|
download = createStream S3S.ReadStream, project_id, doc_id, pack_id
|
|
|
|
inputStream = download
|
|
.on 'open', (obj) ->
|
|
return 1
|
|
.on 'error', (err) ->
|
|
callback(err)
|
|
|
|
gunzip = zlib.createGunzip()
|
|
gunzip.setEncoding('utf8')
|
|
gunzip.on 'error', (err) ->
|
|
logger.log {project_id, doc_id, pack_id, err}, "error uncompressing gzip stream"
|
|
callback(err)
|
|
|
|
outputStream = inputStream.pipe gunzip
|
|
parts = []
|
|
outputStream.on 'error', (err) ->
|
|
return callback(err)
|
|
outputStream.on 'end', () ->
|
|
logger.log {project_id, doc_id, pack_id}, "download from s3 completed"
|
|
try
|
|
object = JSON.parse parts.join('')
|
|
catch e
|
|
return callback(e)
|
|
object._id = ObjectId(object._id)
|
|
object.doc_id = ObjectId(object.doc_id)
|
|
object.project_id = ObjectId(object.project_id)
|
|
for op in object.pack
|
|
op._id = ObjectId(op._id) if op._id?
|
|
callback null, object
|
|
outputStream.on 'data', (data) ->
|
|
parts.push data
|
|
|
|
unArchivePack: (project_id, doc_id, pack_id, callback = (error) ->) ->
|
|
MongoAWS.readArchivedPack project_id, doc_id, pack_id, (err, object) ->
|
|
return callback(err) if err?
|
|
Metrics.inc("unarchive-pack")
|
|
# allow the object to expire, we can always retrieve it again
|
|
object.expiresAt = new Date(Date.now() + 7 * DAYS)
|
|
logger.log {project_id, doc_id, pack_id}, "inserting object from s3"
|
|
db.docHistory.insert object, callback
|