Merge pull request #105 from overleaf/jpa-project-export

[misc] add a new endpoint for exporting all the project history
This commit is contained in:
Jakob Ackermann 2021-03-05 09:46:42 +00:00 committed by GitHub
commit ccb8a022a9
6 changed files with 215 additions and 2 deletions

View file

@ -64,6 +64,7 @@ app.get('/project/:project_id/doc/:doc_id/diff', HttpController.getDiff)
app.get('/project/:project_id/doc/:doc_id/check', HttpController.checkDoc)
app.get('/project/:project_id/updates', HttpController.getUpdates)
app.get('/project/:project_id/export', HttpController.exportProject)
app.post('/project/:project_id/flush', HttpController.flushProject)

View file

@ -200,6 +200,69 @@ module.exports = HttpController = {
)
},
exportProject(req, res, next) {
// The project history can be huge:
// - updates can weight MBs for insert/delete of full doc
// - multiple updates form a pack
// Flush updates per pack onto the wire.
const { project_id } = req.params
logger.log({ project_id }, 'exporting project history')
UpdatesManager.exportProject(project_id, function (
err,
{ updates, userIds },
confirmWrite
) {
const abortStreaming = req.aborted || res.finished || res.destroyed
if (abortStreaming) {
// Tell the producer to stop emitting data
if (confirmWrite) confirmWrite(new Error('stop'))
return
}
const hasStartedStreamingResponse = res.headersSent
if (err) {
logger.error({ project_id, err }, 'export failed')
if (!hasStartedStreamingResponse) {
// Generate a nice 500
return next(err)
} else {
// Stop streaming
return res.destroy()
}
}
// Compose the response incrementally
const isFirstWrite = !hasStartedStreamingResponse
const isLastWrite = updates.length === 0
if (isFirstWrite) {
// The first write will emit the 200 status, headers and start of the
// response payload (open array)
res.setHeader('Content-Type', 'application/json')
res.setHeader('Trailer', 'X-User-Ids')
res.writeHead(200)
res.write('[')
}
if (!isFirstWrite && !isLastWrite) {
// Starting from the 2nd non-empty write, emit a continuing comma.
// write 1: [updates1
// write 2: ,updates2
// write 3: ,updates3
// write N: ]
res.write(',')
}
// Every write will emit a blob onto the response stream:
// '[update1,update2,...]'
// ^^^^^^^^^^^^^^^^^^^
res.write(JSON.stringify(updates).slice(1, -1), confirmWrite)
if (isLastWrite) {
// The last write will have no updates and will finish the response
// payload (close array) and emit the userIds as trailer.
res.addTrailers({ 'X-User-Ids': JSON.stringify(userIds) })
res.end(']')
}
})
},
restore(req, res, next) {
if (next == null) {
next = function (error) {}

View file

@ -631,6 +631,57 @@ module.exports = UpdatesManager = {
)
},
exportProject(projectId, consumer) {
// Flush anything before collecting updates.
UpdatesManager.processUncompressedUpdatesForProject(projectId, (err) => {
if (err) return consumer(err)
// Fetch all the packs.
const before = undefined
PackManager.makeProjectIterator(projectId, before, (err, iterator) => {
if (err) return consumer(err)
const accumulatedUserIds = new Set()
async.whilst(
() => !iterator.done(),
(cb) =>
iterator.next((err, updatesFromASinglePack) => {
if (err) return cb(err)
if (updatesFromASinglePack.length === 0) {
// This should not happen when `iterator.done() == false`.
// Emitting an empty array would signal the consumer the final
// call.
return cb()
}
updatesFromASinglePack.forEach((update) => {
accumulatedUserIds.add(
// Super defensive access on update details.
String(update && update.meta && update.meta.user_id)
)
})
// Emit updates and wait for the consumer.
consumer(null, { updates: updatesFromASinglePack }, cb)
}),
(err) => {
if (err) return consumer(err)
// Adding undefined can happen for broken updates.
accumulatedUserIds.delete('undefined')
consumer(null, {
updates: [],
userIds: Array.from(accumulatedUserIds).sort()
})
}
)
})
})
},
fetchUserInfo(users, callback) {
if (callback == null) {
callback = function (error, fetchedUserInfo) {}

View file

@ -50,6 +50,7 @@ describe('Archiving updates', function () {
this.now = Date.now()
this.to = this.now
this.user_id = ObjectId().toString()
this.user_id_2 = ObjectId().toString()
this.doc_id = ObjectId().toString()
this.project_id = ObjectId().toString()
@ -92,7 +93,7 @@ describe('Archiving updates', function () {
op: [{ i: 'b', p: 0 }],
meta: {
ts: this.now + (i - 2048) * this.hours + 10 * this.minutes,
user_id: this.user_id
user_id: this.user_id_2
},
v: 2 * i + 2
})
@ -141,6 +142,56 @@ describe('Archiving updates', function () {
)
})
function testExportFeature() {
describe('exporting the project', function () {
before('fetch export', function (done) {
TrackChangesClient.exportProject(
this.project_id,
(error, updates, userIds) => {
if (error) {
return done(error)
}
this.exportedUpdates = updates
this.exportedUserIds = userIds
done()
}
)
})
it('should include all the imported updates, with ids, sorted by timestamp', function () {
// Add a safe guard for an empty array matching an empty export.
expect(this.updates).to.have.length(1024 + 22)
const expectedExportedUpdates = this.updates
.slice()
.reverse()
.map((update) => {
// clone object, updates are created once in before handler
const exportedUpdate = Object.assign({}, update)
exportedUpdate.meta = Object.assign({}, update.meta)
exportedUpdate.doc_id = this.doc_id
exportedUpdate.project_id = this.project_id
// This is for merged updates, which does not apply here.
exportedUpdate.meta.start_ts = exportedUpdate.meta.end_ts =
exportedUpdate.meta.ts
delete exportedUpdate.meta.ts
return exportedUpdate
})
expect(this.exportedUpdates).to.deep.equal(expectedExportedUpdates)
expect(this.exportedUserIds).to.deep.equal([
this.user_id,
this.user_id_2
])
})
})
}
describe("before archiving a doc's updates", function () {
testExportFeature()
})
describe("archiving a doc's updates", function () {
before(function (done) {
TrackChangesClient.pushDocHistory(
@ -219,7 +270,7 @@ describe('Archiving updates', function () {
)
})
return it('should store 1024 doc changes in S3 in one pack', function (done) {
it('should store 1024 doc changes in S3 in one pack', function (done) {
return db.docHistoryIndex.findOne(
{ _id: ObjectId(this.doc_id) },
(error, index) => {
@ -240,6 +291,8 @@ describe('Archiving updates', function () {
}
)
})
testExportFeature()
})
return describe("unarchiving a doc's updates", function () {

View file

@ -0,0 +1,34 @@
const { expect } = require('chai')
const { ObjectId } = require('../../../app/js/mongodb')
const TrackChangesApp = require('./helpers/TrackChangesApp')
const TrackChangesClient = require('./helpers/TrackChangesClient')
describe('ExportProject', function () {
before('start app', function (done) {
TrackChangesApp.ensureRunning(done)
})
describe('when there are no updates', function () {
before('fetch export', function (done) {
TrackChangesClient.exportProject(
ObjectId(),
(error, updates, userIds) => {
if (error) {
return done(error)
}
this.exportedUpdates = updates
this.exportedUserIds = userIds
done()
}
)
})
it('should export an empty array', function () {
expect(this.exportedUpdates).to.deep.equal([])
expect(this.exportedUserIds).to.deep.equal([])
})
})
// see ArchivingUpdatesTests for tests with data in mongo/s3
})

View file

@ -165,6 +165,17 @@ module.exports = TrackChangesClient = {
)
},
exportProject(project_id, callback) {
request.get(
{ url: `http://localhost:3015/project/${project_id}/export`, json: true },
(error, response, updates) => {
if (error) return callback(error)
response.statusCode.should.equal(200)
callback(null, updates, JSON.parse(response.trailers['x-user-ids']))
}
)
},
restoreDoc(project_id, doc_id, version, user_id, callback) {
if (callback == null) {
callback = function (error) {}