Use AWS SDK for getFileStream()

The AWS SDK has a retry strategy to deal with rate limiting or transient
unavailability of S3. We hope it will reduce our error rates.
This commit is contained in:
Eric Mc Sween 2019-06-18 08:25:14 -04:00
parent 42d15fd8a9
commit f2521a29b9
2 changed files with 155 additions and 135 deletions

View file

@ -26,12 +26,22 @@ buildDefaultOptions = (bucketName, method, key)->
uri:"https://#{bucketName}.s3.amazonaws.com/#{key}"
}
s3 = new awsS3({
defaultS3Client = new awsS3({
credentials:
accessKeyId: settings.filestore.s3.key,
secretAccessKey: settings.filestore.s3.secret
})
getS3Client = (credentials) ->
if credentials?
return new awsS3({
credentials:
accessKeyId: credentials.auth_key
secretAccessKey: credentials.auth_secret
})
else
return defaultS3Client
module.exports =
sendFile: (bucketName, key, fsPath, callback)->
@ -71,34 +81,39 @@ module.exports =
# opts may be {start: Number, end: Number}
getFileStream: (bucketName, key, opts, callback = (err, res)->)->
opts = opts || {}
headers = {}
if opts.start? and opts.end?
headers['Range'] = "bytes=#{opts.start}-#{opts.end}"
callback = _.once callback
callback = _.once(callback)
logger.log bucketName:bucketName, key:key, "getting file from s3"
s3Client = knox.createClient
key: opts.credentials?.auth_key || settings.filestore.s3.key
secret: opts.credentials?.auth_secret || settings.filestore.s3.secret
bucket: bucketName
s3Stream = s3Client.get(key, headers)
s3Stream.end()
s3Stream.on 'response', (res) ->
if res.statusCode in [403, 404]
s3 = getS3Client(opts.credentials)
s3Params = {
Bucket: bucketName
Key: key
}
if opts.start? and opts.end?
s3Params['Range'] = "bytes=#{opts.start}-#{opts.end}"
request = s3.getObject(s3Params)
request.on 'httpHeaders', (statusCode, headers, response, statusMessage) =>
if statusCode in [403, 404]
# S3 returns a 403 instead of a 404 when the user doesn't have
# permission to list the bucket contents.
logger.log bucketName:bucketName, key:key, "file not found in s3"
return callback new Errors.NotFoundError("File not found in S3: #{bucketName}:#{key}"), null
else if res.statusCode not in [200, 206]
logger.log bucketName:bucketName, key:key, "error getting file from s3: #{res.statusCode}"
return callback new Error("Got non-200 response from S3: #{res.statusCode}"), null
else
return callback null, res
s3Stream.on 'error', (err) ->
logger.err err:err, bucketName:bucketName, key:key, "error getting file stream from s3"
callback err
logger.log({ bucketName: bucketName, key: key }, "file not found in s3")
return callback(new Errors.NotFoundError("File not found in S3: #{bucketName}:#{key}"), null)
if statusCode not in [200, 206]
logger.log({bucketName: bucketName, key: key }, "error getting file from s3: #{statusCode}")
return callback(new Error("Got non-200 response from S3: #{statusCode} #{statusMessage}"), null)
stream = response.httpResponse.getUnbufferedStream()
callback(null, stream)
request.on 'error', (err) =>
logger.err({ err: err, bucketName: bucketName, key: key }, "error getting file stream from s3")
callback(err)
request.send()
getFileSize: (bucketName, key, callback) ->
logger.log({ bucketName: bucketName, key: key }, "getting file size from S3")
s3 = getS3Client()
s3.headObject { Bucket: bucketName, Key: key }, (err, data) ->
if err?
if err.statusCode in [403, 404]
@ -125,6 +140,7 @@ module.exports =
logger.log bucketName:bucketName, sourceKey:sourceKey, destKey: destKey, "copying file in s3"
source = bucketName + '/' + sourceKey
# use the AWS SDK instead of knox due to problems with error handling (https://github.com/Automattic/knox/issues/114)
s3 = getS3Client()
s3.copyObject {Bucket: bucketName, Key: destKey, CopySource: source}, (err) ->
if err?
if err.code is 'NoSuchKey'

View file

@ -17,18 +17,27 @@ describe "S3PersistorManagerTests", ->
key: "this_key"
stores:
user_files:"sl_user_files"
@stubbedKnoxClient =
@knoxClient =
putFile:sinon.stub()
copyFile:sinon.stub()
list: sinon.stub()
deleteMultiple: sinon.stub()
get: sinon.stub()
@knox =
createClient: sinon.stub().returns(@stubbedKnoxClient)
@stubbedS3Client =
createClient: sinon.stub().returns(@knoxClient)
@s3EventHandlers = {}
@s3Request =
on: sinon.stub().callsFake (event, callback) =>
@s3EventHandlers[event] = callback
send: sinon.stub()
@s3Response =
httpResponse:
getUnbufferedStream: sinon.stub()
@s3Client =
copyObject: sinon.stub()
headObject: sinon.stub()
@awsS3 = sinon.stub().returns @stubbedS3Client
getObject: sinon.stub().returns(@s3Request)
@awsS3 = sinon.stub().returns(@s3Client)
@LocalFileWriter =
writeStream: sinon.stub()
deleteFile: sinon.stub()
@ -50,121 +59,116 @@ describe "S3PersistorManagerTests", ->
@S3PersistorManager = SandboxedModule.require modulePath, requires: @requires
describe "getFileStream", ->
beforeEach ->
@opts = {}
describe "success", ->
beforeEach () ->
@expectedStream = { expectedStream: true }
@s3Request.send.callsFake () =>
@s3EventHandlers.httpHeaders(200, {}, @s3Response, "OK")
@s3Response.httpResponse.getUnbufferedStream.returns(@expectedStream)
it "should use correct key", (done)->
@stubbedKnoxClient.get.returns(
on:->
end:->
)
@S3PersistorManager.getFileStream @bucketName, @key, @opts, (err)=> # empty callback
@stubbedKnoxClient.get.calledWith(@key).should.equal true
done()
it "returns a stream", (done) ->
@S3PersistorManager.getFileStream @bucketName, @key, {}, (err, stream) =>
if err?
return done(err)
expect(stream).to.equal(@expectedStream)
done()
it "should use default auth", (done)->
@stubbedKnoxClient.get.returns(
on:->
end:->
)
@S3PersistorManager.getFileStream @bucketName, @key, @opts, (err)=> # empty callback
clientParams =
key: @settings.filestore.s3.key
secret: @settings.filestore.s3.secret
bucket: @bucketName
@knox.createClient.calledWith(clientParams).should.equal true
done()
it "sets the AWS client up with credentials from settings", (done) ->
@S3PersistorManager.getFileStream @bucketName, @key, {}, (err, stream) =>
if err?
return done(err)
expect(@awsS3.lastCall.args).to.deep.equal([{
credentials:
accessKeyId: @settings.filestore.s3.key
secretAccessKey: @settings.filestore.s3.secret
}])
done()
describe "with supplied auth", ->
beforeEach ->
@credentials =
auth_key: "that_key"
auth_secret: "that_secret"
@opts =
credentials: @credentials
it "fetches the right key from the right bucket", (done) ->
@S3PersistorManager.getFileStream @bucketName, @key, {}, (err, stream) =>
if err?
return done(err)
expect(@s3Client.getObject.lastCall.args).to.deep.equal([{
Bucket: @bucketName,
Key: @key
}])
done()
it "should use supplied auth", (done)->
@stubbedKnoxClient.get.returns(
on:->
end:->
)
@S3PersistorManager.getFileStream @bucketName, @key, @opts, (err)=> # empty callback
clientParams =
key: @credentials.auth_key
secret: @credentials.auth_secret
bucket: @bucketName
@knox.createClient.calledWith(clientParams).should.equal true
done()
it "accepts alternative credentials", (done) ->
accessKeyId = "that_key"
secret = "that_secret"
opts = {
credentials:
auth_key: accessKeyId
auth_secret: secret
}
@S3PersistorManager.getFileStream @bucketName, @key, opts, (err, stream) =>
if err?
return done(err)
expect(@awsS3.lastCall.args).to.deep.equal([{
credentials:
accessKeyId: accessKeyId
secretAccessKey: secret
}])
expect(stream).to.equal(@expectedStream)
done()
describe "with start and end options", ->
beforeEach ->
@opts =
start: 0
end: 8
it "should pass headers to the knox.Client.get()", (done) ->
@stubbedKnoxClient.get.returns(
on:->
end:->
)
@S3PersistorManager.getFileStream @bucketName, @key, @opts, (err)=> # empty callback
@stubbedKnoxClient.get.calledWith(@key, {'Range': 'bytes=0-8'}).should.equal true
done()
describe "error conditions", ->
it "accepts byte range", (done) ->
start = 0
end = 8
opts = { start: start, end: end }
@S3PersistorManager.getFileStream @bucketName, @key, opts, (err, stream) =>
if err?
return done(err)
expect(@s3Client.getObject.lastCall.args).to.deep.equal([{
Bucket: @bucketName
Key: @key
Range: "bytes=#{start}-#{end}"
}])
expect(stream).to.equal(@expectedStream)
done()
describe "errors", ->
describe "when the file doesn't exist", ->
beforeEach ->
@fakeResponse =
statusCode: 404
@stubbedKnoxClient.get.returns(
on: (key, callback) =>
if key == 'response'
callback(@fakeResponse)
end: ->
)
@s3Request.send.callsFake () =>
@s3EventHandlers.httpHeaders(404, {}, @s3Response, "Not found")
it "should produce a NotFoundError", (done) ->
@S3PersistorManager.getFileStream @bucketName, @key, @opts, (err, stream)=> # empty callback
expect(stream).to.equal null
expect(err).to.not.equal null
expect(err instanceof @Errors.NotFoundError).to.equal true
it "returns a NotFoundError that indicates the bucket and key", (done) ->
@S3PersistorManager.getFileStream @bucketName, @key, {}, (err, stream) =>
expect(err).to.be.instanceof(@Errors.NotFoundError)
errMsg = @Errors.NotFoundError.lastCall.args[0]
expect(errMsg).to.match(new RegExp(".*#{@bucketName}.*"))
expect(errMsg).to.match(new RegExp(".*#{@key}.*"))
done()
it "should have bucket and key in the Error message", (done) ->
@S3PersistorManager.getFileStream @bucketName, @key, @opts, (err, stream)=> # empty callback
error_message = @Errors.NotFoundError.lastCall.args[0]
expect(error_message).to.not.equal null
error_message.should.match(new RegExp(".*#{@bucketName}.*"))
error_message.should.match(new RegExp(".*#{@key}.*"))
describe "when S3 encounters an unkown error", ->
beforeEach ->
@s3Request.send.callsFake () =>
@s3EventHandlers.httpHeaders(500, {}, @s3Response, "Internal server error")
it "returns an error", (done) ->
@S3PersistorManager.getFileStream @bucketName, @key, {}, (err, stream) =>
expect(err).to.be.instanceof(Error)
done()
describe "when the S3 service produces an error", ->
describe "when the S3 request errors out before receiving HTTP headers", ->
beforeEach ->
@fakeResponse =
statusCode: 500
@stubbedKnoxClient.get.returns(
on: (key, callback) =>
if key == 'response'
callback(@fakeResponse)
end: ->
)
@s3Request.send.callsFake () =>
@s3EventHandlers.error(new Error("connection failed"))
it "should produce an error", (done) ->
@S3PersistorManager.getFileStream @bucketName, @key, @opts, (err, stream)=> # empty callback
expect(stream).to.equal null
expect(err).to.not.equal null
expect(err instanceof Error).to.equal true
@Errors.NotFoundError.called.should.equal false
it "returns an error", (done) ->
@S3PersistorManager.getFileStream @bucketName, @key, {}, (err, stream) =>
expect(err).to.be.instanceof(Error)
done()
describe "getFileSize", ->
it "should obtain the file size from S3", (done) ->
expectedFileSize = 123
@stubbedS3Client.headObject.yields(new Error(
@s3Client.headObject.yields(new Error(
"s3Client.headObject got unexpected arguments"
))
@stubbedS3Client.headObject.withArgs({
@s3Client.headObject.withArgs({
Bucket: @bucketName
Key: @key
}).yields(null, { ContentLength: expectedFileSize })
@ -179,7 +183,7 @@ describe "S3PersistorManagerTests", ->
it "should throw NotFoundError when S3 responds with #{statusCode}", (done) ->
error = new Error()
error.statusCode = statusCode
@stubbedS3Client.headObject.yields(error)
@s3Client.headObject.yields(error)
@S3PersistorManager.getFileSize @bucketName, @key, (err, fileSize) =>
expect(err).to.be.an.instanceof(@Errors.NotFoundError)
@ -187,8 +191,8 @@ describe "S3PersistorManagerTests", ->
it "should rethrow any other error", (done) ->
error = new Error()
@stubbedS3Client.headObject.yields(error)
@stubbedS3Client.headObject.yields(error)
@s3Client.headObject.yields(error)
@s3Client.headObject.yields(error)
@S3PersistorManager.getFileSize @bucketName, @key, (err, fileSize) =>
expect(err).to.equal(error)
@ -197,21 +201,21 @@ describe "S3PersistorManagerTests", ->
describe "sendFile", ->
beforeEach ->
@stubbedKnoxClient.putFile.returns on:->
@knoxClient.putFile.returns on:->
it "should put file with knox", (done)->
@LocalFileWriter.deleteFile.callsArgWith(1)
@stubbedKnoxClient.putFile.callsArgWith(2, @error)
@knoxClient.putFile.callsArgWith(2, @error)
@S3PersistorManager.sendFile @bucketName, @key, @fsPath, (err)=>
@stubbedKnoxClient.putFile.calledWith(@fsPath, @key).should.equal true
@knoxClient.putFile.calledWith(@fsPath, @key).should.equal true
err.should.equal @error
done()
it "should delete the file and pass the error with it", (done)->
@LocalFileWriter.deleteFile.callsArgWith(1)
@stubbedKnoxClient.putFile.callsArgWith(2, @error)
@knoxClient.putFile.callsArgWith(2, @error)
@S3PersistorManager.sendFile @bucketName, @key, @fsPath, (err)=>
@stubbedKnoxClient.putFile.calledWith(@fsPath, @key).should.equal true
@knoxClient.putFile.calledWith(@fsPath, @key).should.equal true
err.should.equal @error
done()
@ -249,15 +253,15 @@ describe "S3PersistorManagerTests", ->
@destKey = "my/dest/key"
it "should use AWS SDK to copy file", (done)->
@stubbedS3Client.copyObject.callsArgWith(1, @error)
@s3Client.copyObject.callsArgWith(1, @error)
@S3PersistorManager.copyFile @bucketName, @sourceKey, @destKey, (err)=>
err.should.equal @error
@stubbedS3Client.copyObject.calledWith({Bucket: @bucketName, Key: @destKey, CopySource: @bucketName + '/' + @key}).should.equal true
@s3Client.copyObject.calledWith({Bucket: @bucketName, Key: @destKey, CopySource: @bucketName + '/' + @key}).should.equal true
done()
it "should return a NotFoundError object if the original file does not exist", (done)->
NoSuchKeyError = {code: "NoSuchKey"}
@stubbedS3Client.copyObject.callsArgWith(1, NoSuchKeyError)
@s3Client.copyObject.callsArgWith(1, NoSuchKeyError)
@S3PersistorManager.copyFile @bucketName, @sourceKey, @destKey, (err)=>
expect(err instanceof @Errors.NotFoundError).to.equal true
done()
@ -267,10 +271,10 @@ describe "S3PersistorManagerTests", ->
it "should list the contents passing them onto multi delete", (done)->
data =
Contents: [{Key:"1234"}, {Key: "456"}]
@stubbedKnoxClient.list.callsArgWith(1, null, data)
@stubbedKnoxClient.deleteMultiple.callsArgWith(1)
@knoxClient.list.callsArgWith(1, null, data)
@knoxClient.deleteMultiple.callsArgWith(1)
@S3PersistorManager.deleteDirectory @bucketName, @key, (err)=>
@stubbedKnoxClient.deleteMultiple.calledWith(["1234","456"]).should.equal true
@knoxClient.deleteMultiple.calledWith(["1234","456"]).should.equal true
done()
describe "deleteFile", ->
@ -332,7 +336,7 @@ describe "S3PersistorManagerTests", ->
it "should sum directory files size", (done) ->
data =
Contents: [ {Size: 1024}, {Size: 2048} ]
@stubbedKnoxClient.list.callsArgWith(1, null, data)
@knoxClient.list.callsArgWith(1, null, data)
@S3PersistorManager.directorySize @bucketName, @key, (err, totalSize)=>
totalSize.should.equal 3072
done()