mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-05 02:46:55 +00:00
queue deletes for deferred processing
This commit is contained in:
parent
f05e048203
commit
f6b2ac7360
9 changed files with 138 additions and 13 deletions
|
@ -55,6 +55,7 @@ app.post '/project/:project_id/doc/:doc_id', HttpCont
|
|||
app.post '/project/:project_id/doc/:doc_id/flush', HttpController.flushDocIfLoaded
|
||||
app.delete '/project/:project_id/doc/:doc_id', HttpController.flushAndDeleteDoc
|
||||
app.delete '/project/:project_id', HttpController.deleteProject
|
||||
app.delete '/project', HttpController.deleteMultipleProjects
|
||||
app.post '/project/:project_id', HttpController.updateProject
|
||||
app.post '/project/:project_id/history/resync', HttpController.resyncProjectHistory
|
||||
app.post '/project/:project_id/flush', HttpController.flushProject
|
||||
|
@ -63,6 +64,7 @@ app.post '/project/:project_id/doc/:doc_id/change/accept', HttpCont
|
|||
app.del '/project/:project_id/doc/:doc_id/comment/:comment_id', HttpController.deleteComment
|
||||
|
||||
app.get '/flush_all_projects', HttpController.flushAllProjects
|
||||
app.get '/flush_queued_projects', HttpController.flushQueuedProjects
|
||||
|
||||
app.get '/total', (req, res)->
|
||||
timer = new Metrics.Timer("http.allDocList")
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
RedisManager = require "./RedisManager"
|
||||
ProjectManager = require "./ProjectManager"
|
||||
logger = require "logger-sharelatex"
|
||||
metrics = require "./Metrics"
|
||||
async = require "async"
|
||||
|
||||
module.exports = DeleteQueueManager =
|
||||
flushAndDeleteOldProjects: (options, callback) ->
|
||||
startTime = Date.now()
|
||||
count = 0
|
||||
|
||||
flushProjectIfNotModified = (project_id, flushTimestamp, cb) ->
|
||||
ProjectManager.getProjectDocsTimestamps project_id, (err, timestamps) ->
|
||||
return callback(err) if err?
|
||||
if !timestamps?
|
||||
logger.log {project_id}, "skipping flush of queued project - no timestamps"
|
||||
return cb()
|
||||
# are any of the timestamps newer than the time the project was flushed?
|
||||
for timestamp in timestamps or [] when timestamp > flushTimestamp
|
||||
metrics.inc "queued-delete-skipped"
|
||||
logger.debug {project_id, timestamps, flushTimestamp}, "found newer timestamp, will skip delete"
|
||||
return cb()
|
||||
logger.log {project_id, flushTimestamp}, "flushing queued project"
|
||||
ProjectManager.flushAndDeleteProjectWithLocks project_id, {skip_history_flush: true}, (err) ->
|
||||
logger.err {project_id, err}, "error flushing queued project"
|
||||
metrics.inc "queued-delete-completed"
|
||||
return cb(null, true)
|
||||
|
||||
flushNextProject = () ->
|
||||
now = Date.now()
|
||||
if now - startTime > options.timeout
|
||||
logger.log "hit time limit on flushing old projects"
|
||||
return callback()
|
||||
if count > options.limit
|
||||
logger.log "hit count limit on flushing old projects"
|
||||
return callback()
|
||||
cutoffTime = now - options.min_delete_age
|
||||
RedisManager.getNextProjectToFlushAndDelete cutoffTime, (err, project_id, flushTimestamp) ->
|
||||
return callback(err) if err?
|
||||
return callback() if !project_id?
|
||||
flushProjectIfNotModified project_id, flushTimestamp, (err, flushed) ->
|
||||
count++ if flushed
|
||||
flushNextProject()
|
||||
|
||||
flushNextProject()
|
|
@ -5,7 +5,8 @@ Errors = require "./Errors"
|
|||
logger = require "logger-sharelatex"
|
||||
Metrics = require "./Metrics"
|
||||
ProjectFlusher = require("./ProjectFlusher")
|
||||
|
||||
DeleteQueueManager = require("./DeleteQueueManager")
|
||||
async = require "async"
|
||||
|
||||
TWO_MEGABYTES = 2 * 1024 * 1024
|
||||
|
||||
|
@ -130,14 +131,30 @@ module.exports = HttpController =
|
|||
deleteProject: (req, res, next = (error) ->) ->
|
||||
project_id = req.params.project_id
|
||||
logger.log project_id: project_id, "deleting project via http"
|
||||
timer = new Metrics.Timer("http.deleteProject")
|
||||
options = {}
|
||||
options.background = true if req.query?.background # allow non-urgent flushes to be queued
|
||||
options.skip_history_flush = true if req.query?.shutdown # don't flush history when realtime shuts down
|
||||
ProjectManager.flushAndDeleteProjectWithLocks project_id, options, (error) ->
|
||||
timer.done()
|
||||
if req.query?.background
|
||||
ProjectManager.queueFlushAndDeleteProject project_id, (error) ->
|
||||
return next(error) if error?
|
||||
logger.log project_id: project_id, "queue delete of project via http"
|
||||
res.send 204 # No Content
|
||||
else
|
||||
timer = new Metrics.Timer("http.deleteProject")
|
||||
ProjectManager.flushAndDeleteProjectWithLocks project_id, options, (error) ->
|
||||
timer.done()
|
||||
return next(error) if error?
|
||||
logger.log project_id: project_id, "deleted project via http"
|
||||
res.send 204 # No Content
|
||||
|
||||
deleteMultipleProjects: (req, res, next = (error) ->) ->
|
||||
project_ids = req.body?.project_ids || []
|
||||
logger.log project_ids: project_ids, "deleting multiple projects via http"
|
||||
async.eachSeries project_ids, (project_id, cb) ->
|
||||
logger.log project_id: project_id, "queue delete of project via http"
|
||||
ProjectManager.queueFlushAndDeleteProject project_id, cb
|
||||
, (error) ->
|
||||
return next(error) if error?
|
||||
logger.log project_id: project_id, "deleted project via http"
|
||||
res.send 204 # No Content
|
||||
|
||||
acceptChanges: (req, res, next = (error) ->) ->
|
||||
|
@ -198,4 +215,16 @@ module.exports = HttpController =
|
|||
else
|
||||
res.send project_ids
|
||||
|
||||
|
||||
flushQueuedProjects: (req, res, next = (error) ->) ->
|
||||
res.setTimeout(5 * 60 * 1000)
|
||||
options =
|
||||
limit : req.query.limit || 1000
|
||||
timeout: 5 * 60 * 1000
|
||||
dryRun : req.query.dryRun || false
|
||||
min_delete_age: req.query.min_delete_age || 5 * 60 * 1000
|
||||
DeleteQueueManager.flushAndDeleteOldProjects options, (err, project_ids)->
|
||||
if err?
|
||||
logger.err err:err, "error flushing old projects"
|
||||
res.send 500
|
||||
else
|
||||
res.send project_ids
|
||||
|
|
|
@ -72,6 +72,22 @@ module.exports = ProjectManager =
|
|||
else
|
||||
callback(null)
|
||||
|
||||
queueFlushAndDeleteProject: (project_id, callback = (error) ->) ->
|
||||
RedisManager.queueFlushAndDeleteProject project_id, (error) ->
|
||||
if error?
|
||||
logger.error {project_id: project_id, error:error}, "error adding project to flush and delete queue"
|
||||
return callback(error)
|
||||
Metrics.inc "queued-delete"
|
||||
callback()
|
||||
|
||||
getProjectDocsTimestamps: (project_id, callback = (error) ->) ->
|
||||
RedisManager.getDocIdsInProject project_id, (error, doc_ids) ->
|
||||
return callback(error) if error?
|
||||
return callback() if !doc_ids?.length
|
||||
RedisManager.getDocTimestamps doc_ids, (error, timestamps) ->
|
||||
return callback(error) if error?
|
||||
callback(null, timestamps)
|
||||
|
||||
getProjectDocsAndFlushIfOld: (project_id, projectStateHash, excludeVersions = {}, _callback = (error, docs) ->) ->
|
||||
timer = new Metrics.Timer("projectManager.getProjectDocsAndFlushIfOld")
|
||||
callback = (args...) ->
|
||||
|
|
|
@ -287,6 +287,30 @@ module.exports = RedisManager =
|
|||
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
|
||||
rclient.smembers keys.docsInProject(project_id: project_id), callback
|
||||
|
||||
getDocTimestamps: (doc_ids, callback = (error, result) ->) ->
|
||||
# get lastupdatedat timestamps for an array of doc_ids
|
||||
multi = rclient.multi()
|
||||
for doc_id in doc_ids
|
||||
multi.get keys.lastUpdatedAt(doc_id: doc_id)
|
||||
multi.exec callback
|
||||
|
||||
queueFlushAndDeleteProject: (project_id, callback) ->
|
||||
rclient.zadd keys.flushAndDeleteQueue(), Date.now(), project_id, callback
|
||||
|
||||
getNextProjectToFlushAndDelete: (cutoffTime, callback = (error, key, timestamp)->) ->
|
||||
# find the oldest queued flsus
|
||||
rclient.zrangebyscore keys.flushAndDeleteQueue(), 0, cutoffTime, "WITHSCORES", "LIMIT", 0, 1, (err, reply) ->
|
||||
return callback(err) if err?
|
||||
return callback() if !reply?.length
|
||||
multi = rclient.multi()
|
||||
multi.zrange keys.flushAndDeleteQueue(), 0, 0, "WITHSCORES"
|
||||
multi.zremrangebyrank keys.flushAndDeleteQueue(), 0, 0
|
||||
multi.exec (err, reply) ->
|
||||
return callback(err) if err?
|
||||
return callback() if !reply?.length
|
||||
[key, timestamp] = reply[0]
|
||||
callback(null, key, timestamp)
|
||||
|
||||
_serializeRanges: (ranges, callback = (error, serializedRanges) ->) ->
|
||||
jsonRanges = JSON.stringify(ranges)
|
||||
if jsonRanges? and jsonRanges.length > MAX_RANGES_SIZE
|
||||
|
|
|
@ -80,6 +80,7 @@ module.exports =
|
|||
lastUpdatedBy: ({doc_id}) -> "lastUpdatedBy:{#{doc_id}}"
|
||||
lastUpdatedAt: ({doc_id}) -> "lastUpdatedAt:{#{doc_id}}"
|
||||
pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}"
|
||||
flushAndDeleteQueue: () -> "DocUpdaterFlushAndDeleteQueue"
|
||||
redisOptions:
|
||||
keepAlive: 100
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ describe "Deleting a project", ->
|
|||
it "should flush each doc in project history", ->
|
||||
MockProjectHistoryApi.flushProject.calledWith(@project_id).should.equal true
|
||||
|
||||
describe "with the shutdown=true parameter from realtime", ->
|
||||
describe "with the background=true parameter from realtime", ->
|
||||
before (done) ->
|
||||
sinon.spy MockWebApi, "setDocument"
|
||||
sinon.spy MockTrackChangesApi, "flushDoc"
|
||||
|
@ -111,7 +111,11 @@ describe "Deleting a project", ->
|
|||
setTimeout () =>
|
||||
DocUpdaterClient.deleteProjectOnShutdown @project_id, (error, res, body) =>
|
||||
@statusCode = res.statusCode
|
||||
done()
|
||||
# after deleting the project and putting it in the queue, flush the queue
|
||||
setTimeout () ->
|
||||
DocUpdaterClient.flushOldProjects (error, res, body) =>
|
||||
done()
|
||||
, 100
|
||||
, 200
|
||||
|
||||
after ->
|
||||
|
|
|
@ -78,6 +78,9 @@ module.exports = DocUpdaterClient =
|
|||
deleteProjectOnShutdown: (project_id, callback = () ->) ->
|
||||
request.del "http://localhost:3003/project/#{project_id}?background=true&shutdown=true", callback
|
||||
|
||||
flushOldProjects: (callback = () ->) ->
|
||||
request.get "http://localhost:3003/flush_queued_projects?min_delete_age=1", callback
|
||||
|
||||
acceptChange: (project_id, doc_id, change_id, callback = () ->) ->
|
||||
request.post "http://localhost:3003/project/#{project_id}/doc/#{doc_id}/change/#{change_id}/accept", callback
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ describe "HttpController", ->
|
|||
"./ProjectManager": @ProjectManager = {}
|
||||
"logger-sharelatex" : @logger = { log: sinon.stub() }
|
||||
"./ProjectFlusher": {flushAllProjects:->}
|
||||
"./DeleteQueueManager": @DeleteQueueManager = {}
|
||||
"./Metrics": @Metrics = {}
|
||||
"./Errors" : Errors
|
||||
@Metrics.Timer = class Timer
|
||||
|
@ -343,15 +344,15 @@ describe "HttpController", ->
|
|||
it "should time the request", ->
|
||||
@Metrics.Timer::done.called.should.equal true
|
||||
|
||||
describe "with the shutdown=true option from realtime", ->
|
||||
describe "with the background=true option from realtime", ->
|
||||
beforeEach ->
|
||||
@ProjectManager.flushAndDeleteProjectWithLocks = sinon.stub().callsArgWith(2)
|
||||
@ProjectManager.queueFlushAndDeleteProject = sinon.stub().callsArgWith(1)
|
||||
@req.query = {background:true, shutdown:true}
|
||||
@HttpController.deleteProject(@req, @res, @next)
|
||||
|
||||
it "should pass the skip_history_flush option when flushing the project", ->
|
||||
@ProjectManager.flushAndDeleteProjectWithLocks
|
||||
.calledWith(@project_id, {background:true, skip_history_flush:true})
|
||||
it "should queue the flush and delete", ->
|
||||
@ProjectManager.queueFlushAndDeleteProject
|
||||
.calledWith(@project_id)
|
||||
.should.equal true
|
||||
|
||||
describe "when an errors occurs", ->
|
||||
|
|
Loading…
Reference in a new issue