From d9ed026d9174916c58e298cb863cbc1ace731d1b Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 12 Apr 2017 16:34:28 +0100 Subject: [PATCH] simple flush for all projects does not work with redis cluster, only single redis --- services/track-changes/app.coffee | 2 ++ .../app/coffee/HttpController.coffee | 12 ++++++++++++ .../app/coffee/RedisManager.coffee | 9 +++++++++ .../app/coffee/UpdatesManager.coffee | 17 +++++++++++++++++ 4 files changed, 40 insertions(+) diff --git a/services/track-changes/app.coffee b/services/track-changes/app.coffee index 0a43cd1503..9cd9edee75 100644 --- a/services/track-changes/app.coffee +++ b/services/track-changes/app.coffee @@ -50,6 +50,8 @@ app.post "/project/:project_id/doc/:doc_id/version/:version/restore", HttpContro app.post '/project/:project_id/doc/:doc_id/push', HttpController.pushDocHistory app.post '/project/:project_id/doc/:doc_id/pull', HttpController.pullDocHistory +app.post '/flush/all', HttpController.flushAll + packWorker = null # use a single packing worker app.post "/pack", (req, res, next) -> diff --git a/services/track-changes/app/coffee/HttpController.coffee b/services/track-changes/app/coffee/HttpController.coffee index eecc618330..44ba8d48ba 100644 --- a/services/track-changes/app/coffee/HttpController.coffee +++ b/services/track-changes/app/coffee/HttpController.coffee @@ -22,6 +22,18 @@ module.exports = HttpController = return next(error) if error? res.send 204 + flushAll: (req, res, next = (error) ->) -> + logger.log "flushing all projects" + UpdatesManager.flushAll (error, result) -> + return next(error) if error? + {failed, succeeded} = result + status = "#{succeeded.length} succeeded, #{failed.length} failed" + if failed.length > 0 + logger.log {failed: failed, succeeded: succeeded}, "error flushing projects" + res.status(500).send "#{status}\nfailed to flush:\n#{failed.join('\n')}\n" + else + res.status(200).send "#{status}\nflushed all #{succeeded.length} projects\n" + checkDoc: (req, res, next = (error) ->) -> doc_id = req.params.doc_id project_id = req.params.project_id diff --git a/services/track-changes/app/coffee/RedisManager.coffee b/services/track-changes/app/coffee/RedisManager.coffee index a634bbfed9..25719e753f 100644 --- a/services/track-changes/app/coffee/RedisManager.coffee +++ b/services/track-changes/app/coffee/RedisManager.coffee @@ -32,3 +32,12 @@ module.exports = RedisManager = getDocIdsWithHistoryOps: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers docsWithHistoryOpsKey(project_id), callback + + # this will only work on single node redis, not redis cluster + getProjectIdsWithHistoryOps: (callback = (error, project_ids) ->) -> + rclient.keys docsWithHistoryOpsKey("*"), (error, project_keys) -> + return callback(error) if error? + project_ids = for key in project_keys + [prefix, project_id] = key.split(":") + project_id + callback(error, project_ids) diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index f01681e5f0..e1a7b086d8 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -144,6 +144,23 @@ module.exports = UpdatesManager = UpdatesManager._processUncompressedUpdatesForDocWithLock project_id, doc_id, temporary, cb async.parallelLimit jobs, 5, callback + # flush all outstanding changes + flushAll: (callback = (error, result) ->) -> + RedisManager.getProjectIdsWithHistoryOps (error, project_ids) -> + return callback(error) if error? + logger.log {count: project_ids?.length, project_ids: project_ids}, "found projects" + jobs = [] + for project_id in project_ids + do (project_id) -> + jobs.push (cb) -> + UpdatesManager.processUncompressedUpdatesForProject project_id, (err) -> + return cb(null, {failed: err?, project_id: project_id}) + async.series jobs, (error, result) -> + return callback(error) if error? + failedProjects = (x.project_id for x in result when x.failed) + succeededProjects = (x.project_id for x in result when not x.failed) + callback(null, {failed: failedProjects, succeeded: succeededProjects}) + getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) -> UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) -> return callback(error) if error?