mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
Merge pull request #89 from overleaf/bg-queue-deletes
queue deletes for deferred processing
This commit is contained in:
commit
89a90399fe
10 changed files with 195 additions and 13 deletions
|
@ -55,6 +55,7 @@ app.post '/project/:project_id/doc/:doc_id', HttpCont
|
|||
app.post '/project/:project_id/doc/:doc_id/flush', HttpController.flushDocIfLoaded
|
||||
app.delete '/project/:project_id/doc/:doc_id', HttpController.flushAndDeleteDoc
|
||||
app.delete '/project/:project_id', HttpController.deleteProject
|
||||
app.delete '/project', HttpController.deleteMultipleProjects
|
||||
app.post '/project/:project_id', HttpController.updateProject
|
||||
app.post '/project/:project_id/history/resync', HttpController.resyncProjectHistory
|
||||
app.post '/project/:project_id/flush', HttpController.flushProject
|
||||
|
@ -63,6 +64,7 @@ app.post '/project/:project_id/doc/:doc_id/change/accept', HttpCont
|
|||
app.del '/project/:project_id/doc/:doc_id/comment/:comment_id', HttpController.deleteComment
|
||||
|
||||
app.get '/flush_all_projects', HttpController.flushAllProjects
|
||||
app.get '/flush_queued_projects', HttpController.flushQueuedProjects
|
||||
|
||||
app.get '/total', (req, res)->
|
||||
timer = new Metrics.Timer("http.allDocList")
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
RedisManager = require "./RedisManager"
|
||||
ProjectManager = require "./ProjectManager"
|
||||
logger = require "logger-sharelatex"
|
||||
metrics = require "./Metrics"
|
||||
async = require "async"
|
||||
|
||||
# Maintain a sorted set of project flushAndDelete requests, ordered by timestamp
|
||||
# (ZADD), and process them from oldest to newest. A flushAndDelete request comes
|
||||
# from real-time and is triggered when a user leaves a project.
|
||||
#
|
||||
# The aim is to remove the project from redis 5 minutes after the last request
|
||||
# if there has been no activity (document updates) in that time. If there is
|
||||
# activity we can expect a further flushAndDelete request when the editing user
|
||||
# leaves the project.
|
||||
#
|
||||
# If a new flushAndDelete request comes in while an existing request is already
|
||||
# in the queue we update the timestamp as we can postpone flushing further.
|
||||
#
|
||||
# Documents are processed by checking the queue, seeing if the first entry is
|
||||
# older than 5 minutes, and popping it from the queue in that case.
|
||||
|
||||
module.exports = DeleteQueueManager =
|
||||
flushAndDeleteOldProjects: (options, callback) ->
|
||||
startTime = Date.now()
|
||||
cutoffTime = startTime - options.min_delete_age
|
||||
count = 0
|
||||
|
||||
flushProjectIfNotModified = (project_id, flushTimestamp, cb) ->
|
||||
ProjectManager.getProjectDocsTimestamps project_id, (err, timestamps) ->
|
||||
return callback(err) if err?
|
||||
if timestamps.length == 0
|
||||
logger.log {project_id}, "skipping flush of queued project - no timestamps"
|
||||
return cb()
|
||||
# are any of the timestamps newer than the time the project was flushed?
|
||||
for timestamp in timestamps when timestamp > flushTimestamp
|
||||
metrics.inc "queued-delete-skipped"
|
||||
logger.debug {project_id, timestamps, flushTimestamp}, "found newer timestamp, will skip delete"
|
||||
return cb()
|
||||
logger.log {project_id, flushTimestamp}, "flushing queued project"
|
||||
ProjectManager.flushAndDeleteProjectWithLocks project_id, {skip_history_flush: true}, (err) ->
|
||||
if err?
|
||||
logger.err {project_id, err}, "error flushing queued project"
|
||||
metrics.inc "queued-delete-completed"
|
||||
return cb(null, true)
|
||||
|
||||
flushNextProject = () ->
|
||||
now = Date.now()
|
||||
if now - startTime > options.timeout
|
||||
logger.log "hit time limit on flushing old projects"
|
||||
return callback(null, count)
|
||||
if count > options.limit
|
||||
logger.log "hit count limit on flushing old projects"
|
||||
return callback(null, count)
|
||||
RedisManager.getNextProjectToFlushAndDelete cutoffTime, (err, project_id, flushTimestamp, queueLength) ->
|
||||
return callback(err) if err?
|
||||
return callback(null, count) if !project_id?
|
||||
logger.log {project_id, queueLength: queueLength}, "flushing queued project"
|
||||
metrics.globalGauge "queued-flush-backlog", queueLength
|
||||
flushProjectIfNotModified project_id, flushTimestamp, (err, flushed) ->
|
||||
count++ if flushed
|
||||
flushNextProject()
|
||||
|
||||
flushNextProject()
|
|
@ -29,7 +29,7 @@ module.exports = HistoryManager =
|
|||
flushProjectChanges: (project_id, options, callback = (error) ->) ->
|
||||
return callback() if !Settings.apis?.project_history?.enabled
|
||||
if options.skip_history_flush
|
||||
logger.log {project_id}, "skipping flush of project history from realtime shutdown"
|
||||
logger.log {project_id}, "skipping flush of project history"
|
||||
return callback()
|
||||
url = "#{Settings.apis.project_history.url}/project/#{project_id}/flush"
|
||||
qs = {}
|
||||
|
|
|
@ -5,7 +5,8 @@ Errors = require "./Errors"
|
|||
logger = require "logger-sharelatex"
|
||||
Metrics = require "./Metrics"
|
||||
ProjectFlusher = require("./ProjectFlusher")
|
||||
|
||||
DeleteQueueManager = require("./DeleteQueueManager")
|
||||
async = require "async"
|
||||
|
||||
TWO_MEGABYTES = 2 * 1024 * 1024
|
||||
|
||||
|
@ -130,16 +131,32 @@ module.exports = HttpController =
|
|||
deleteProject: (req, res, next = (error) ->) ->
|
||||
project_id = req.params.project_id
|
||||
logger.log project_id: project_id, "deleting project via http"
|
||||
timer = new Metrics.Timer("http.deleteProject")
|
||||
options = {}
|
||||
options.background = true if req.query?.background # allow non-urgent flushes to be queued
|
||||
options.skip_history_flush = true if req.query?.shutdown # don't flush history when realtime shuts down
|
||||
if req.query?.background
|
||||
ProjectManager.queueFlushAndDeleteProject project_id, (error) ->
|
||||
return next(error) if error?
|
||||
logger.log project_id: project_id, "queue delete of project via http"
|
||||
res.send 204 # No Content
|
||||
else
|
||||
timer = new Metrics.Timer("http.deleteProject")
|
||||
ProjectManager.flushAndDeleteProjectWithLocks project_id, options, (error) ->
|
||||
timer.done()
|
||||
return next(error) if error?
|
||||
logger.log project_id: project_id, "deleted project via http"
|
||||
res.send 204 # No Content
|
||||
|
||||
deleteMultipleProjects: (req, res, next = (error) ->) ->
|
||||
project_ids = req.body?.project_ids || []
|
||||
logger.log project_ids: project_ids, "deleting multiple projects via http"
|
||||
async.eachSeries project_ids, (project_id, cb) ->
|
||||
logger.log project_id: project_id, "queue delete of project via http"
|
||||
ProjectManager.queueFlushAndDeleteProject project_id, cb
|
||||
, (error) ->
|
||||
return next(error) if error?
|
||||
res.send 204 # No Content
|
||||
|
||||
acceptChanges: (req, res, next = (error) ->) ->
|
||||
{project_id, doc_id} = req.params
|
||||
change_ids = req.body?.change_ids
|
||||
|
@ -198,4 +215,16 @@ module.exports = HttpController =
|
|||
else
|
||||
res.send project_ids
|
||||
|
||||
|
||||
flushQueuedProjects: (req, res, next = (error) ->) ->
|
||||
res.setTimeout(10 * 60 * 1000)
|
||||
options =
|
||||
limit : req.query.limit || 1000
|
||||
timeout: 5 * 60 * 1000
|
||||
min_delete_age: req.query.min_delete_age || 5 * 60 * 1000
|
||||
DeleteQueueManager.flushAndDeleteOldProjects options, (err, flushed)->
|
||||
if err?
|
||||
logger.err err:err, "error flushing old projects"
|
||||
res.send 500
|
||||
else
|
||||
logger.log {flushed: flushed}, "flush of queued projects completed"
|
||||
res.send {flushed: flushed}
|
||||
|
|
|
@ -72,6 +72,22 @@ module.exports = ProjectManager =
|
|||
else
|
||||
callback(null)
|
||||
|
||||
queueFlushAndDeleteProject: (project_id, callback = (error) ->) ->
|
||||
RedisManager.queueFlushAndDeleteProject project_id, (error) ->
|
||||
if error?
|
||||
logger.error {project_id: project_id, error:error}, "error adding project to flush and delete queue"
|
||||
return callback(error)
|
||||
Metrics.inc "queued-delete"
|
||||
callback()
|
||||
|
||||
getProjectDocsTimestamps: (project_id, callback = (error) ->) ->
|
||||
RedisManager.getDocIdsInProject project_id, (error, doc_ids) ->
|
||||
return callback(error) if error?
|
||||
return callback(null, []) if !doc_ids?.length
|
||||
RedisManager.getDocTimestamps doc_ids, (error, timestamps) ->
|
||||
return callback(error) if error?
|
||||
callback(null, timestamps)
|
||||
|
||||
getProjectDocsAndFlushIfOld: (project_id, projectStateHash, excludeVersions = {}, _callback = (error, docs) ->) ->
|
||||
timer = new Metrics.Timer("projectManager.getProjectDocsAndFlushIfOld")
|
||||
callback = (args...) ->
|
||||
|
|
|
@ -287,6 +287,35 @@ module.exports = RedisManager =
|
|||
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
|
||||
rclient.smembers keys.docsInProject(project_id: project_id), callback
|
||||
|
||||
getDocTimestamps: (doc_ids, callback = (error, result) ->) ->
|
||||
# get lastupdatedat timestamps for an array of doc_ids
|
||||
multi = rclient.multi()
|
||||
for doc_id in doc_ids
|
||||
multi.get keys.lastUpdatedAt(doc_id: doc_id)
|
||||
multi.exec callback
|
||||
|
||||
queueFlushAndDeleteProject: (project_id, callback) ->
|
||||
# store the project id in a sorted set ordered by time
|
||||
rclient.zadd keys.flushAndDeleteQueue(), Date.now(), project_id, callback
|
||||
|
||||
getNextProjectToFlushAndDelete: (cutoffTime, callback = (error, key, timestamp)->) ->
|
||||
# find the oldest queued flush that is before the cutoff time
|
||||
rclient.zrangebyscore keys.flushAndDeleteQueue(), 0, cutoffTime, "WITHSCORES", "LIMIT", 0, 1, (err, reply) ->
|
||||
return callback(err) if err?
|
||||
return callback() if !reply?.length # return if no projects ready to be processed
|
||||
# pop the oldest entry (get and remove in a multi)
|
||||
multi = rclient.multi()
|
||||
# Poor man's version of ZPOPMIN, which is only available in Redis 5.
|
||||
multi.zrange keys.flushAndDeleteQueue(), 0, 0, "WITHSCORES"
|
||||
multi.zremrangebyrank keys.flushAndDeleteQueue(), 0, 0
|
||||
multi.zcard keys.flushAndDeleteQueue() # the total length of the queue (for metrics)
|
||||
multi.exec (err, reply) ->
|
||||
return callback(err) if err?
|
||||
return callback() if !reply?.length
|
||||
[key, timestamp] = reply[0]
|
||||
queueLength = reply[2]
|
||||
callback(null, key, timestamp, queueLength)
|
||||
|
||||
_serializeRanges: (ranges, callback = (error, serializedRanges) ->) ->
|
||||
jsonRanges = JSON.stringify(ranges)
|
||||
if jsonRanges? and jsonRanges.length > MAX_RANGES_SIZE
|
||||
|
|
|
@ -80,6 +80,7 @@ module.exports =
|
|||
lastUpdatedBy: ({doc_id}) -> "lastUpdatedBy:{#{doc_id}}"
|
||||
lastUpdatedAt: ({doc_id}) -> "lastUpdatedAt:{#{doc_id}}"
|
||||
pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}"
|
||||
flushAndDeleteQueue: () -> "DocUpdaterFlushAndDeleteQueue"
|
||||
redisOptions:
|
||||
keepAlive: 100
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ describe "Deleting a project", ->
|
|||
it "should flush each doc in project history", ->
|
||||
MockProjectHistoryApi.flushProject.calledWith(@project_id).should.equal true
|
||||
|
||||
describe "with the shutdown=true parameter from realtime", ->
|
||||
describe "with the background=true parameter from realtime and no request to flush the queue", ->
|
||||
before (done) ->
|
||||
sinon.spy MockWebApi, "setDocument"
|
||||
sinon.spy MockTrackChangesApi, "flushDoc"
|
||||
|
@ -122,6 +122,44 @@ describe "Deleting a project", ->
|
|||
it "should return a 204 status code", ->
|
||||
@statusCode.should.equal 204
|
||||
|
||||
it "should not send any documents to the web api", ->
|
||||
MockWebApi.setDocument.called.should.equal false
|
||||
|
||||
it "should not flush any docs in track changes", ->
|
||||
MockTrackChangesApi.flushDoc.called.should.equal false
|
||||
|
||||
it "should not flush to project history", ->
|
||||
MockProjectHistoryApi.flushProject.called.should.equal false
|
||||
|
||||
describe "with the background=true parameter from realtime and a request to flush the queue", ->
|
||||
before (done) ->
|
||||
sinon.spy MockWebApi, "setDocument"
|
||||
sinon.spy MockTrackChangesApi, "flushDoc"
|
||||
sinon.spy MockProjectHistoryApi, "flushProject"
|
||||
|
||||
async.series @docs.map((doc) =>
|
||||
(callback) =>
|
||||
DocUpdaterClient.preloadDoc @project_id, doc.id, callback
|
||||
), (error) =>
|
||||
throw error if error?
|
||||
setTimeout () =>
|
||||
DocUpdaterClient.deleteProjectOnShutdown @project_id, (error, res, body) =>
|
||||
@statusCode = res.statusCode
|
||||
# after deleting the project and putting it in the queue, flush the queue
|
||||
setTimeout () ->
|
||||
DocUpdaterClient.flushOldProjects (error, res, body) =>
|
||||
setTimeout done, 1000 # allow time for the flush to complete
|
||||
, 100
|
||||
, 200
|
||||
|
||||
after ->
|
||||
MockWebApi.setDocument.restore()
|
||||
MockTrackChangesApi.flushDoc.restore()
|
||||
MockProjectHistoryApi.flushProject.restore()
|
||||
|
||||
it "should return a 204 status code", ->
|
||||
@statusCode.should.equal 204
|
||||
|
||||
it "should send each document to the web api", ->
|
||||
for doc in @docs
|
||||
MockWebApi.setDocument
|
||||
|
|
|
@ -78,6 +78,9 @@ module.exports = DocUpdaterClient =
|
|||
deleteProjectOnShutdown: (project_id, callback = () ->) ->
|
||||
request.del "http://localhost:3003/project/#{project_id}?background=true&shutdown=true", callback
|
||||
|
||||
flushOldProjects: (callback = () ->) ->
|
||||
request.get "http://localhost:3003/flush_queued_projects?min_delete_age=1", callback
|
||||
|
||||
acceptChange: (project_id, doc_id, change_id, callback = () ->) ->
|
||||
request.post "http://localhost:3003/project/#{project_id}/doc/#{doc_id}/change/#{change_id}/accept", callback
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ describe "HttpController", ->
|
|||
"./ProjectManager": @ProjectManager = {}
|
||||
"logger-sharelatex" : @logger = { log: sinon.stub() }
|
||||
"./ProjectFlusher": {flushAllProjects:->}
|
||||
"./DeleteQueueManager": @DeleteQueueManager = {}
|
||||
"./Metrics": @Metrics = {}
|
||||
"./Errors" : Errors
|
||||
@Metrics.Timer = class Timer
|
||||
|
@ -343,15 +344,15 @@ describe "HttpController", ->
|
|||
it "should time the request", ->
|
||||
@Metrics.Timer::done.called.should.equal true
|
||||
|
||||
describe "with the shutdown=true option from realtime", ->
|
||||
describe "with the background=true option from realtime", ->
|
||||
beforeEach ->
|
||||
@ProjectManager.flushAndDeleteProjectWithLocks = sinon.stub().callsArgWith(2)
|
||||
@ProjectManager.queueFlushAndDeleteProject = sinon.stub().callsArgWith(1)
|
||||
@req.query = {background:true, shutdown:true}
|
||||
@HttpController.deleteProject(@req, @res, @next)
|
||||
|
||||
it "should pass the skip_history_flush option when flushing the project", ->
|
||||
@ProjectManager.flushAndDeleteProjectWithLocks
|
||||
.calledWith(@project_id, {background:true, skip_history_flush:true})
|
||||
it "should queue the flush and delete", ->
|
||||
@ProjectManager.queueFlushAndDeleteProject
|
||||
.calledWith(@project_id)
|
||||
.should.equal true
|
||||
|
||||
describe "when an errors occurs", ->
|
||||
|
|
Loading…
Reference in a new issue