[misc] add a new endpoint for exporting all the project history

This commit is contained in:
Jakob Ackermann 2021-02-23 13:57:27 +00:00
parent 2ca170d31c
commit f411049b82
6 changed files with 183 additions and 1 deletions

View file

@ -64,6 +64,7 @@ app.get('/project/:project_id/doc/:doc_id/diff', HttpController.getDiff)
app.get('/project/:project_id/doc/:doc_id/check', HttpController.checkDoc)
app.get('/project/:project_id/updates', HttpController.getUpdates)
app.get('/project/:project_id/export', HttpController.exportProject)
app.post('/project/:project_id/flush', HttpController.flushProject)

View file

@ -200,6 +200,67 @@ module.exports = HttpController = {
)
},
exportProject(req, res, next) {
// The project history can be huge:
// - updates can weight MBs for insert/delete of full doc
// - multiple updates form a pack
// Flush updates per pack onto the wire.
const { project_id } = req.params
logger.log({ project_id }, 'exporting project history')
UpdatesManager.exportProject(project_id, function (
err,
updates,
confirmWrite
) {
const abortStreaming = req.aborted || res.finished || res.destroyed
if (abortStreaming) {
// Tell the producer to stop emitting data
if (confirmWrite) confirmWrite(new Error('stop'))
return
}
const hasStartedStreamingResponse = res.headersSent
if (err) {
logger.error({ project_id, err }, 'export failed')
if (!hasStartedStreamingResponse) {
// Generate a nice 500
return next(err)
} else {
// Stop streaming
return res.destroy()
}
}
// Compose the response incrementally
const isFirstWrite = !hasStartedStreamingResponse
const isLastWrite = updates.length === 0
if (isFirstWrite) {
// The first write will emit the 200 status, headers and start of the
// response payload (open array)
res.setHeader('Content-Type', 'application/json')
res.writeHead(200)
res.write('[')
}
if (!isFirstWrite && !isLastWrite) {
// Starting from the 2nd non-empty write, emit a continuing comma.
// write 1: [updates1
// write 2: ,updates2
// write 3: ,updates3
// write N: ]
res.write(',')
}
// Every write will emit a blob onto the response stream:
// '[update1,update2,...]'
// ^^^^^^^^^^^^^^^^^^^
res.write(JSON.stringify(updates).slice(1, -1), confirmWrite)
if (isLastWrite) {
// The last write will have no updates and will finish the response
// payload (close array).
res.end(']')
}
})
},
restore(req, res, next) {
if (next == null) {
next = function (error) {}

View file

@ -631,6 +631,42 @@ module.exports = UpdatesManager = {
)
},
exportProject(projectId, consumer) {
// Flush anything before collecting updates.
UpdatesManager.processUncompressedUpdatesForProject(projectId, (err) => {
if (err) return consumer(err)
// Fetch all the packs.
const before = undefined
PackManager.makeProjectIterator(projectId, before, (err, iterator) => {
if (err) return consumer(err)
async.whilst(
() => !iterator.done(),
(cb) =>
iterator.next((err, updatesFromASinglePack) => {
if (err) return cb(err)
if (updatesFromASinglePack.length === 0) {
// This should not happen when `iterator.done() == false`.
// Emitting an empty array would signal the consumer the final
// call.
return cb()
}
// Emit updates and wait for the consumer.
consumer(null, updatesFromASinglePack, cb)
}),
(err) => {
if (err) return consumer(err)
consumer(null, [])
}
)
})
})
},
fetchUserInfo(users, callback) {
if (callback == null) {
callback = function (error, fetchedUserInfo) {}

View file

@ -141,6 +141,48 @@ describe('Archiving updates', function () {
)
})
function testExportFeature() {
describe('exporting the project', function () {
before('fetch export', function (done) {
TrackChangesClient.exportProject(this.project_id, (error, updates) => {
if (error) {
return done(error)
}
this.exportedUpdates = updates
done()
})
})
it('should include all the imported updates, with ids, sorted by timestamp', function () {
// Add a safe guard for an empty array matching an empty export.
expect(this.updates).to.have.length(1024 + 22)
const expectedExportedUpdates = this.updates
.slice()
.reverse()
.map((update) => {
// clone object, updates are created once in before handler
const exportedUpdate = Object.assign({}, update)
exportedUpdate.meta = Object.assign({}, update.meta)
exportedUpdate.doc_id = this.doc_id
exportedUpdate.project_id = this.project_id
// This is for merged updates, which does not apply here.
exportedUpdate.meta.start_ts = exportedUpdate.meta.end_ts =
exportedUpdate.meta.ts
delete exportedUpdate.meta.ts
return exportedUpdate
})
expect(this.exportedUpdates).to.deep.equal(expectedExportedUpdates)
})
})
}
describe("before archiving a doc's updates", function () {
testExportFeature()
})
describe("archiving a doc's updates", function () {
before(function (done) {
TrackChangesClient.pushDocHistory(
@ -219,7 +261,7 @@ describe('Archiving updates', function () {
)
})
return it('should store 1024 doc changes in S3 in one pack', function (done) {
it('should store 1024 doc changes in S3 in one pack', function (done) {
return db.docHistoryIndex.findOne(
{ _id: ObjectId(this.doc_id) },
(error, index) => {
@ -240,6 +282,8 @@ describe('Archiving updates', function () {
}
)
})
testExportFeature()
})
return describe("unarchiving a doc's updates", function () {

View file

@ -0,0 +1,29 @@
const { expect } = require('chai')
const { ObjectId } = require('../../../app/js/mongodb')
const TrackChangesApp = require('./helpers/TrackChangesApp')
const TrackChangesClient = require('./helpers/TrackChangesClient')
describe('ExportProject', function () {
before('start app', function (done) {
TrackChangesApp.ensureRunning(done)
})
describe('when there are no updates', function () {
before('fetch export', function (done) {
TrackChangesClient.exportProject(ObjectId(), (error, updates) => {
if (error) {
return done(error)
}
this.exportedUpdates = updates
done()
})
})
it('should export an empty array', function () {
expect(this.exportedUpdates).to.deep.equal([])
})
})
// see ArchivingUpdatesTests for tests with data in mongo/s3
})

View file

@ -165,6 +165,17 @@ module.exports = TrackChangesClient = {
)
},
exportProject(project_id, callback) {
request.get(
{ url: `http://localhost:3015/project/${project_id}/export`, json: true },
(error, response, updates) => {
if (error) return callback(error)
response.statusCode.should.equal(200)
callback(null, updates)
}
)
},
restoreDoc(project_id, doc_id, version, user_id, callback) {
if (callback == null) {
callback = function (error) {}