Merge pull request #2995 from overleaf/jpa-project-download-fix-leaking-upstream-response

[misc] project download: fix leaking upstream stream from error in pipe

GitOrigin-RevId: cb7db90890853352dc69ea2e46cec7e7ea66e6e9
This commit is contained in:
Jakob Ackermann 2020-08-03 13:34:04 +02:00 committed by Copybot
parent 33365e56bc
commit a437eb3d59
3 changed files with 189 additions and 8 deletions

View file

@ -26,6 +26,7 @@ const HistoryManager = require('./HistoryManager')
const ProjectDetailsHandler = require('../Project/ProjectDetailsHandler')
const ProjectEntityUpdateHandler = require('../Project/ProjectEntityUpdateHandler')
const RestoreManager = require('./RestoreManager')
const { pipeline } = require('stream')
module.exports = HistoryController = {
selectHistoryApi(req, res, next) {
@ -289,13 +290,18 @@ module.exports = HistoryController = {
v1_id,
version,
`${project.name} (Version ${version})`,
req,
res,
next
)
})
},
_pipeHistoryZipToResponse(v1_project_id, version, name, res, next) {
_pipeHistoryZipToResponse(v1_project_id, version, name, req, res, next) {
if (req.aborted) {
// client has disconnected -- skip project history api call and download
return
}
// increase timeout to 6 minutes
res.setTimeout(6 * 60 * 1000)
const url = `${
@ -315,6 +321,10 @@ module.exports = HistoryController = {
logger.warn({ err, v1_project_id, version }, 'history API error')
return next(err)
}
if (req.aborted) {
// client has disconnected -- skip delayed s3 download
return
}
let retryAttempt = 0
let retryDelay = 2000
// retry for about 6 minutes starting with short delay
@ -322,6 +332,11 @@ module.exports = HistoryController = {
40,
callback =>
setTimeout(function() {
if (req.aborted) {
// client has disconnected -- skip s3 download
return callback() // stop async.retry loop
}
// increase delay by 1 second up to 10
if (retryDelay < 10000) {
retryDelay += 1000
@ -331,8 +346,16 @@ module.exports = HistoryController = {
url: body.zipUrl,
sendImmediately: true
})
const abortS3Request = () => getReq.abort()
req.on('aborted', abortS3Request)
res.on('timeout', abortS3Request)
function cleanupAbortTrigger() {
req.off('aborted', abortS3Request)
res.off('timeout', abortS3Request)
}
getReq.on('response', function(response) {
if (response.statusCode !== 200) {
cleanupAbortTrigger()
return callback(new Error('invalid response'))
}
// pipe also proxies the headers, but we want to customize these ones
@ -343,14 +366,22 @@ module.exports = HistoryController = {
filename: `${name}.zip`
})
res.contentType('application/zip')
getReq.pipe(res)
return callback()
pipeline(response, res, err => {
if (err) {
logger.warn(
{ err, v1_project_id, version, retryAttempt },
'history s3 proxying error'
)
}
})
callback()
})
return getReq.on('error', function(err) {
logger.warn(
{ err, v1_project_id, version, retryAttempt },
'history s3 download error'
)
cleanupAbortTrigger()
return callback(err)
})
}, retryDelay),

View file

@ -71,6 +71,122 @@ describe('History', function() {
)
})
describe('request abort', function() {
// Optional manual verification: add unique logging statements into
// HistoryController._pipeHistoryZipToResponse
// in each of the `req.aborted` branches and confirm that each branch
// was covered.
beforeEach(function setupNewProject(done) {
this.owner.createProject('example-project', (error, project_id) => {
this.project_id = project_id
if (error) {
return done(error)
}
this.v1_history_id = 42
db.projects.update(
{ _id: ObjectId(this.project_id) },
{
$set: {
'overleaf.history.id': this.v1_history_id
}
},
done
)
})
})
beforeEach(function resetCounter() {
MockV1HistoryApi.resetCounter()
})
it('should abort the upstream request', function(done) {
const request = this.owner.request(
`/project/${this.project_id}/version/100/zip`
)
request.on('error', done)
request.on('response', response => {
expect(response.statusCode).to.equal(200)
let receivedChunks = 0
response.on('data', () => {
receivedChunks++
})
response.resume()
setTimeout(() => {
request.abort()
const receivedSoFar = receivedChunks
const sentSoFar = MockV1HistoryApi.sentChunks
// Ihe next assertions should verify that chunks are emitted
// and received -- the exact number is not important.
// In theory we are now emitting the 3rd chunk,
// so this should be exactly 3, to not make this
// test flaky, we allow +- 2 chunks.
expect(sentSoFar).to.be.within(1, 4)
expect(receivedSoFar).to.be.within(1, 4)
setTimeout(() => {
// The fake-s3 service should have stopped emitting chunks.
// Ff not, that would be +5 in an ideal world (1 every 100ms).
// On the happy-path (it stopped) it emitted +1 which was
// in-flight and another +1 before it received the abort.
expect(MockV1HistoryApi.sentChunks).to.be.below(sentSoFar + 5)
expect(MockV1HistoryApi.sentChunks).to.be.within(
sentSoFar,
sentSoFar + 2
)
done()
}, 500)
}, 200)
})
})
it('should skip the v1-history request', function(done) {
const request = this.owner.request(
`/project/${this.project_id}/version/100/zip`
)
setTimeout(() => {
// This is a race-condition to abort the request after the
// processing of all the the express middleware completed.
// In case we abort before they complete, we do not hit our
// abort logic, but express internal logic, which is OK.
request.abort()
}, 2)
request.on('error', done)
setTimeout(() => {
expect(MockV1HistoryApi.requestedZipPacks).to.equal(0)
done()
}, 500)
})
it('should skip the async-polling', function(done) {
const request = this.owner.request(
`/project/${this.project_id}/version/100/zip`
)
MockV1HistoryApi.events.on('v1-history-pack-zip', () => {
request.abort()
})
request.on('error', done)
setTimeout(() => {
expect(MockV1HistoryApi.fakeZipCall).to.equal(0)
done()
}, 3000) // initial polling delay is 2s
})
it('should skip the upstream request', function(done) {
const request = this.owner.request(
`/project/${this.project_id}/version/100/zip`
)
MockV1HistoryApi.events.on('v1-history-pack-zip', () => {
setTimeout(() => {
request.abort()
}, 1000)
})
request.on('error', done)
setTimeout(() => {
expect(MockV1HistoryApi.fakeZipCall).to.equal(0)
done()
}, 3000) // initial polling delay is 2s
})
})
it('should return 402 for non-v2-history project', function(done) {
return this.owner.createProject('non-v2-project', (error, project_id) => {
this.project_id = project_id

View file

@ -11,6 +11,7 @@
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let MockV1HistoryApi
const { EventEmitter } = require('events')
const _ = require('lodash')
const express = require('express')
const bodyParser = require('body-parser')
@ -19,6 +20,14 @@ const { ObjectId } = require('mongojs')
module.exports = MockV1HistoryApi = {
fakeZipCall: 0,
requestedZipPacks: 0,
sentChunks: 0,
resetCounter() {
MockV1HistoryApi.fakeZipCall = 0
MockV1HistoryApi.sentChunks = 0
MockV1HistoryApi.requestedZipPacks = 0
},
events: new EventEmitter(),
run() {
app.get(
'/api/projects/:project_id/version/:version/zip',
@ -41,17 +50,42 @@ module.exports = MockV1HistoryApi = {
}
res.header('content-disposition', 'attachment; name=project.zip')
res.header('content-type', 'application/octet-stream')
return res.send(
`Mock zip for ${req.params.project_id} at version ${
req.params.version
}`
)
if (req.params.version === '42') {
return res.send(
`Mock zip for ${req.params.project_id} at version ${
req.params.version
}`
)
}
function writeChunk() {
res.write('chunk' + MockV1HistoryApi.sentChunks++)
}
function writeEvery(interval) {
if (req.aborted) return
// setInterval delays the first run
writeChunk()
const periodicWrite = setInterval(writeChunk, interval)
req.on('aborted', () => clearInterval(periodicWrite))
const deadLine = setTimeout(() => {
clearInterval(periodicWrite)
res.end()
}, 10 * 1000)
res.on('end', () => clearTimeout(deadLine))
}
if (req.params.version === '100') {
return writeEvery(100)
}
res.sendStatus(400)
}
)
app.post(
'/api/projects/:project_id/version/:version/zip',
(req, res, next) => {
MockV1HistoryApi.requestedZipPacks++
MockV1HistoryApi.events.emit('v1-history-pack-zip')
return res.json({
zipUrl: `http://localhost:3100/fake-zip-download/${
req.params.project_id