diff --git a/services/document-updater/app.coffee b/services/document-updater/app.coffee index 6c1d8d1136..2962860027 100644 --- a/services/document-updater/app.coffee +++ b/services/document-updater/app.coffee @@ -55,6 +55,7 @@ app.post '/project/:project_id/doc/:doc_id', HttpCont app.post '/project/:project_id/doc/:doc_id/flush', HttpController.flushDocIfLoaded app.delete '/project/:project_id/doc/:doc_id', HttpController.flushAndDeleteDoc app.delete '/project/:project_id', HttpController.deleteProject +app.delete '/project', HttpController.deleteMultipleProjects app.post '/project/:project_id', HttpController.updateProject app.post '/project/:project_id/history/resync', HttpController.resyncProjectHistory app.post '/project/:project_id/flush', HttpController.flushProject @@ -63,6 +64,7 @@ app.post '/project/:project_id/doc/:doc_id/change/accept', HttpCont app.del '/project/:project_id/doc/:doc_id/comment/:comment_id', HttpController.deleteComment app.get '/flush_all_projects', HttpController.flushAllProjects +app.get '/flush_queued_projects', HttpController.flushQueuedProjects app.get '/total', (req, res)-> timer = new Metrics.Timer("http.allDocList") diff --git a/services/document-updater/app/coffee/DeleteQueueManager.coffee b/services/document-updater/app/coffee/DeleteQueueManager.coffee new file mode 100644 index 0000000000..f1af0ba244 --- /dev/null +++ b/services/document-updater/app/coffee/DeleteQueueManager.coffee @@ -0,0 +1,63 @@ +RedisManager = require "./RedisManager" +ProjectManager = require "./ProjectManager" +logger = require "logger-sharelatex" +metrics = require "./Metrics" +async = require "async" + +# Maintain a sorted set of project flushAndDelete requests, ordered by timestamp +# (ZADD), and process them from oldest to newest. A flushAndDelete request comes +# from real-time and is triggered when a user leaves a project. +# +# The aim is to remove the project from redis 5 minutes after the last request +# if there has been no activity (document updates) in that time. If there is +# activity we can expect a further flushAndDelete request when the editing user +# leaves the project. +# +# If a new flushAndDelete request comes in while an existing request is already +# in the queue we update the timestamp as we can postpone flushing further. +# +# Documents are processed by checking the queue, seeing if the first entry is +# older than 5 minutes, and popping it from the queue in that case. + +module.exports = DeleteQueueManager = + flushAndDeleteOldProjects: (options, callback) -> + startTime = Date.now() + cutoffTime = startTime - options.min_delete_age + count = 0 + + flushProjectIfNotModified = (project_id, flushTimestamp, cb) -> + ProjectManager.getProjectDocsTimestamps project_id, (err, timestamps) -> + return callback(err) if err? + if timestamps.length == 0 + logger.log {project_id}, "skipping flush of queued project - no timestamps" + return cb() + # are any of the timestamps newer than the time the project was flushed? + for timestamp in timestamps when timestamp > flushTimestamp + metrics.inc "queued-delete-skipped" + logger.debug {project_id, timestamps, flushTimestamp}, "found newer timestamp, will skip delete" + return cb() + logger.log {project_id, flushTimestamp}, "flushing queued project" + ProjectManager.flushAndDeleteProjectWithLocks project_id, {skip_history_flush: true}, (err) -> + if err? + logger.err {project_id, err}, "error flushing queued project" + metrics.inc "queued-delete-completed" + return cb(null, true) + + flushNextProject = () -> + now = Date.now() + if now - startTime > options.timeout + logger.log "hit time limit on flushing old projects" + return callback(null, count) + if count > options.limit + logger.log "hit count limit on flushing old projects" + return callback(null, count) + RedisManager.getNextProjectToFlushAndDelete cutoffTime, (err, project_id, flushTimestamp, queueLength) -> + return callback(err) if err? + return callback(null, count) if !project_id? + logger.log {project_id, queueLength: queueLength}, "flushing queued project" + metrics.globalGauge "queued-flush-backlog", queueLength + flushProjectIfNotModified project_id, flushTimestamp, (err, flushed) -> + count++ if flushed + flushNextProject() + + flushNextProject() \ No newline at end of file diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee index 7cfafa9ba0..6b68b4b676 100644 --- a/services/document-updater/app/coffee/HistoryManager.coffee +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -29,7 +29,7 @@ module.exports = HistoryManager = flushProjectChanges: (project_id, options, callback = (error) ->) -> return callback() if !Settings.apis?.project_history?.enabled if options.skip_history_flush - logger.log {project_id}, "skipping flush of project history from realtime shutdown" + logger.log {project_id}, "skipping flush of project history" return callback() url = "#{Settings.apis.project_history.url}/project/#{project_id}/flush" qs = {} diff --git a/services/document-updater/app/coffee/HttpController.coffee b/services/document-updater/app/coffee/HttpController.coffee index d2ef5043d0..e2e2e712bc 100644 --- a/services/document-updater/app/coffee/HttpController.coffee +++ b/services/document-updater/app/coffee/HttpController.coffee @@ -5,7 +5,8 @@ Errors = require "./Errors" logger = require "logger-sharelatex" Metrics = require "./Metrics" ProjectFlusher = require("./ProjectFlusher") - +DeleteQueueManager = require("./DeleteQueueManager") +async = require "async" TWO_MEGABYTES = 2 * 1024 * 1024 @@ -130,14 +131,30 @@ module.exports = HttpController = deleteProject: (req, res, next = (error) ->) -> project_id = req.params.project_id logger.log project_id: project_id, "deleting project via http" - timer = new Metrics.Timer("http.deleteProject") options = {} options.background = true if req.query?.background # allow non-urgent flushes to be queued options.skip_history_flush = true if req.query?.shutdown # don't flush history when realtime shuts down - ProjectManager.flushAndDeleteProjectWithLocks project_id, options, (error) -> - timer.done() + if req.query?.background + ProjectManager.queueFlushAndDeleteProject project_id, (error) -> + return next(error) if error? + logger.log project_id: project_id, "queue delete of project via http" + res.send 204 # No Content + else + timer = new Metrics.Timer("http.deleteProject") + ProjectManager.flushAndDeleteProjectWithLocks project_id, options, (error) -> + timer.done() + return next(error) if error? + logger.log project_id: project_id, "deleted project via http" + res.send 204 # No Content + + deleteMultipleProjects: (req, res, next = (error) ->) -> + project_ids = req.body?.project_ids || [] + logger.log project_ids: project_ids, "deleting multiple projects via http" + async.eachSeries project_ids, (project_id, cb) -> + logger.log project_id: project_id, "queue delete of project via http" + ProjectManager.queueFlushAndDeleteProject project_id, cb + , (error) -> return next(error) if error? - logger.log project_id: project_id, "deleted project via http" res.send 204 # No Content acceptChanges: (req, res, next = (error) ->) -> @@ -198,4 +215,16 @@ module.exports = HttpController = else res.send project_ids - + flushQueuedProjects: (req, res, next = (error) ->) -> + res.setTimeout(10 * 60 * 1000) + options = + limit : req.query.limit || 1000 + timeout: 5 * 60 * 1000 + min_delete_age: req.query.min_delete_age || 5 * 60 * 1000 + DeleteQueueManager.flushAndDeleteOldProjects options, (err, flushed)-> + if err? + logger.err err:err, "error flushing old projects" + res.send 500 + else + logger.log {flushed: flushed}, "flush of queued projects completed" + res.send {flushed: flushed} diff --git a/services/document-updater/app/coffee/ProjectManager.coffee b/services/document-updater/app/coffee/ProjectManager.coffee index 4271186b7a..0d57687668 100644 --- a/services/document-updater/app/coffee/ProjectManager.coffee +++ b/services/document-updater/app/coffee/ProjectManager.coffee @@ -72,6 +72,22 @@ module.exports = ProjectManager = else callback(null) + queueFlushAndDeleteProject: (project_id, callback = (error) ->) -> + RedisManager.queueFlushAndDeleteProject project_id, (error) -> + if error? + logger.error {project_id: project_id, error:error}, "error adding project to flush and delete queue" + return callback(error) + Metrics.inc "queued-delete" + callback() + + getProjectDocsTimestamps: (project_id, callback = (error) ->) -> + RedisManager.getDocIdsInProject project_id, (error, doc_ids) -> + return callback(error) if error? + return callback(null, []) if !doc_ids?.length + RedisManager.getDocTimestamps doc_ids, (error, timestamps) -> + return callback(error) if error? + callback(null, timestamps) + getProjectDocsAndFlushIfOld: (project_id, projectStateHash, excludeVersions = {}, _callback = (error, docs) ->) -> timer = new Metrics.Timer("projectManager.getProjectDocsAndFlushIfOld") callback = (args...) -> diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 82b6caccd7..1490ac87f4 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -287,6 +287,35 @@ module.exports = RedisManager = getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback + getDocTimestamps: (doc_ids, callback = (error, result) ->) -> + # get lastupdatedat timestamps for an array of doc_ids + multi = rclient.multi() + for doc_id in doc_ids + multi.get keys.lastUpdatedAt(doc_id: doc_id) + multi.exec callback + + queueFlushAndDeleteProject: (project_id, callback) -> + # store the project id in a sorted set ordered by time + rclient.zadd keys.flushAndDeleteQueue(), Date.now(), project_id, callback + + getNextProjectToFlushAndDelete: (cutoffTime, callback = (error, key, timestamp)->) -> + # find the oldest queued flush that is before the cutoff time + rclient.zrangebyscore keys.flushAndDeleteQueue(), 0, cutoffTime, "WITHSCORES", "LIMIT", 0, 1, (err, reply) -> + return callback(err) if err? + return callback() if !reply?.length # return if no projects ready to be processed + # pop the oldest entry (get and remove in a multi) + multi = rclient.multi() + # Poor man's version of ZPOPMIN, which is only available in Redis 5. + multi.zrange keys.flushAndDeleteQueue(), 0, 0, "WITHSCORES" + multi.zremrangebyrank keys.flushAndDeleteQueue(), 0, 0 + multi.zcard keys.flushAndDeleteQueue() # the total length of the queue (for metrics) + multi.exec (err, reply) -> + return callback(err) if err? + return callback() if !reply?.length + [key, timestamp] = reply[0] + queueLength = reply[2] + callback(null, key, timestamp, queueLength) + _serializeRanges: (ranges, callback = (error, serializedRanges) ->) -> jsonRanges = JSON.stringify(ranges) if jsonRanges? and jsonRanges.length > MAX_RANGES_SIZE diff --git a/services/document-updater/config/settings.defaults.coffee b/services/document-updater/config/settings.defaults.coffee index a775dac5d3..bc83433484 100755 --- a/services/document-updater/config/settings.defaults.coffee +++ b/services/document-updater/config/settings.defaults.coffee @@ -80,6 +80,7 @@ module.exports = lastUpdatedBy: ({doc_id}) -> "lastUpdatedBy:{#{doc_id}}" lastUpdatedAt: ({doc_id}) -> "lastUpdatedAt:{#{doc_id}}" pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}" + flushAndDeleteQueue: () -> "DocUpdaterFlushAndDeleteQueue" redisOptions: keepAlive: 100 diff --git a/services/document-updater/test/acceptance/coffee/DeletingAProjectTests.coffee b/services/document-updater/test/acceptance/coffee/DeletingAProjectTests.coffee index cb1d3495d8..91e4378dc2 100644 --- a/services/document-updater/test/acceptance/coffee/DeletingAProjectTests.coffee +++ b/services/document-updater/test/acceptance/coffee/DeletingAProjectTests.coffee @@ -97,7 +97,7 @@ describe "Deleting a project", -> it "should flush each doc in project history", -> MockProjectHistoryApi.flushProject.calledWith(@project_id).should.equal true - describe "with the shutdown=true parameter from realtime", -> + describe "with the background=true parameter from realtime and no request to flush the queue", -> before (done) -> sinon.spy MockWebApi, "setDocument" sinon.spy MockTrackChangesApi, "flushDoc" @@ -122,6 +122,44 @@ describe "Deleting a project", -> it "should return a 204 status code", -> @statusCode.should.equal 204 + it "should not send any documents to the web api", -> + MockWebApi.setDocument.called.should.equal false + + it "should not flush any docs in track changes", -> + MockTrackChangesApi.flushDoc.called.should.equal false + + it "should not flush to project history", -> + MockProjectHistoryApi.flushProject.called.should.equal false + + describe "with the background=true parameter from realtime and a request to flush the queue", -> + before (done) -> + sinon.spy MockWebApi, "setDocument" + sinon.spy MockTrackChangesApi, "flushDoc" + sinon.spy MockProjectHistoryApi, "flushProject" + + async.series @docs.map((doc) => + (callback) => + DocUpdaterClient.preloadDoc @project_id, doc.id, callback + ), (error) => + throw error if error? + setTimeout () => + DocUpdaterClient.deleteProjectOnShutdown @project_id, (error, res, body) => + @statusCode = res.statusCode + # after deleting the project and putting it in the queue, flush the queue + setTimeout () -> + DocUpdaterClient.flushOldProjects (error, res, body) => + setTimeout done, 1000 # allow time for the flush to complete + , 100 + , 200 + + after -> + MockWebApi.setDocument.restore() + MockTrackChangesApi.flushDoc.restore() + MockProjectHistoryApi.flushProject.restore() + + it "should return a 204 status code", -> + @statusCode.should.equal 204 + it "should send each document to the web api", -> for doc in @docs MockWebApi.setDocument diff --git a/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee b/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee index 9525cc27e9..17067b5bf4 100644 --- a/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee +++ b/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee @@ -78,6 +78,9 @@ module.exports = DocUpdaterClient = deleteProjectOnShutdown: (project_id, callback = () ->) -> request.del "http://localhost:3003/project/#{project_id}?background=true&shutdown=true", callback + flushOldProjects: (callback = () ->) -> + request.get "http://localhost:3003/flush_queued_projects?min_delete_age=1", callback + acceptChange: (project_id, doc_id, change_id, callback = () ->) -> request.post "http://localhost:3003/project/#{project_id}/doc/#{doc_id}/change/#{change_id}/accept", callback diff --git a/services/document-updater/test/unit/coffee/HttpController/HttpControllerTests.coffee b/services/document-updater/test/unit/coffee/HttpController/HttpControllerTests.coffee index c1f5c5eca8..b8ace494f5 100644 --- a/services/document-updater/test/unit/coffee/HttpController/HttpControllerTests.coffee +++ b/services/document-updater/test/unit/coffee/HttpController/HttpControllerTests.coffee @@ -14,6 +14,7 @@ describe "HttpController", -> "./ProjectManager": @ProjectManager = {} "logger-sharelatex" : @logger = { log: sinon.stub() } "./ProjectFlusher": {flushAllProjects:->} + "./DeleteQueueManager": @DeleteQueueManager = {} "./Metrics": @Metrics = {} "./Errors" : Errors @Metrics.Timer = class Timer @@ -343,15 +344,15 @@ describe "HttpController", -> it "should time the request", -> @Metrics.Timer::done.called.should.equal true - describe "with the shutdown=true option from realtime", -> + describe "with the background=true option from realtime", -> beforeEach -> - @ProjectManager.flushAndDeleteProjectWithLocks = sinon.stub().callsArgWith(2) + @ProjectManager.queueFlushAndDeleteProject = sinon.stub().callsArgWith(1) @req.query = {background:true, shutdown:true} @HttpController.deleteProject(@req, @res, @next) - it "should pass the skip_history_flush option when flushing the project", -> - @ProjectManager.flushAndDeleteProjectWithLocks - .calledWith(@project_id, {background:true, skip_history_flush:true}) + it "should queue the flush and delete", -> + @ProjectManager.queueFlushAndDeleteProject + .calledWith(@project_id) .should.equal true describe "when an errors occurs", ->