Merge branch 'master' into ja-return-latest-doc-on-no-version

This commit is contained in:
James Allen 2017-08-04 09:31:19 +02:00
commit b8d0eb2823
14 changed files with 122 additions and 25 deletions

View file

@ -61,7 +61,7 @@ app.post "/pack", (req, res, next) ->
else else
logger.log "running pack" logger.log "running pack"
packWorker = child_process.fork(__dirname + '/app/js/PackWorker.js', packWorker = child_process.fork(__dirname + '/app/js/PackWorker.js',
[req.query.limit, req.query.delay, req.query.timeout]) [req.query.limit || 1000, req.query.delay || 1000, req.query.timeout || 30*60*1000])
packWorker.on 'exit', (code, signal) -> packWorker.on 'exit', (code, signal) ->
logger.log {code, signal}, "history auto pack exited" logger.log {code, signal}, "history auto pack exited"
packWorker = null packWorker = null

View file

@ -1,6 +1,6 @@
Settings = require "settings-sharelatex" Settings = require "settings-sharelatex"
redis = require("redis-sharelatex") redis = require("redis-sharelatex")
rclient = redis.createClient(Settings.redis.web) rclient = redis.createClient(Settings.redis.lock)
os = require "os" os = require "os"
crypto = require "crypto" crypto = require "crypto"
logger = require "logger-sharelatex" logger = require "logger-sharelatex"

View file

@ -46,6 +46,10 @@ module.exports = MongoAWS =
return callback new Error("cannot find pack to send to s3") if not result? return callback new Error("cannot find pack to send to s3") if not result?
return callback new Error("refusing to send pack with TTL to s3") if result.expiresAt? return callback new Error("refusing to send pack with TTL to s3") if result.expiresAt?
uncompressedData = JSON.stringify(result) uncompressedData = JSON.stringify(result)
if uncompressedData.indexOf("\u0000") != -1
error = new Error("null bytes found in upload")
logger.error err: error, project_id: project_id, doc_id: doc_id, pack_id: pack_id, error.message
return callback(error)
zlib.gzip uncompressedData, (err, buf) -> zlib.gzip uncompressedData, (err, buf) ->
logger.log {project_id, doc_id, pack_id, origSize: uncompressedData.length, newSize: buf.length}, "compressed pack" logger.log {project_id, doc_id, pack_id, origSize: uncompressedData.length, newSize: buf.length}, "compressed pack"
return callback(err) if err? return callback(err) if err?

View file

@ -6,6 +6,8 @@ LockManager = require "./LockManager"
MongoAWS = require "./MongoAWS" MongoAWS = require "./MongoAWS"
Metrics = require "metrics-sharelatex" Metrics = require "metrics-sharelatex"
ProjectIterator = require "./ProjectIterator" ProjectIterator = require "./ProjectIterator"
Settings = require "settings-sharelatex"
keys = Settings.redis.lock.key_schema
# Sharejs operations are stored in a 'pack' object # Sharejs operations are stored in a 'pack' object
# #
@ -319,7 +321,7 @@ module.exports = PackManager =
insertPacksIntoIndexWithLock: (project_id, doc_id, newPacks, callback) -> insertPacksIntoIndexWithLock: (project_id, doc_id, newPacks, callback) ->
LockManager.runWithLock( LockManager.runWithLock(
"HistoryIndexLock:#{doc_id}", keys.historyIndexLock({doc_id}),
(releaseLock) -> (releaseLock) ->
PackManager._insertPacksIntoIndex project_id, doc_id, newPacks, releaseLock PackManager._insertPacksIntoIndex project_id, doc_id, newPacks, releaseLock
callback callback
@ -438,7 +440,7 @@ module.exports = PackManager =
markPackAsFinalisedWithLock: (project_id, doc_id, pack_id, callback) -> markPackAsFinalisedWithLock: (project_id, doc_id, pack_id, callback) ->
LockManager.runWithLock( LockManager.runWithLock(
"HistoryLock:#{doc_id}", keys.historyLock({doc_id}),
(releaseLock) -> (releaseLock) ->
PackManager._markPackAsFinalised project_id, doc_id, pack_id, releaseLock PackManager._markPackAsFinalised project_id, doc_id, pack_id, releaseLock
callback callback

View file

@ -21,8 +21,10 @@ PackManager = require "./PackManager"
source = process.argv[2] source = process.argv[2]
DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000 DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000
TIMEOUT = Number(process.argv[4]) || 30*60*1000 TIMEOUT = Number(process.argv[4]) || 30*60*1000
COUNT = 0 # number processed
TOTAL = 0 # total number to process
if source.match(/[^0-9]/) if !source.match(/^[0-9]+$/)
file = fs.readFileSync source file = fs.readFileSync source
result = for line in file.toString().split('\n') result = for line in file.toString().split('\n')
[project_id, doc_id] = line.split(' ') [project_id, doc_id] = line.split(' ')
@ -37,10 +39,11 @@ shutDownTimer = setTimeout () ->
# start the shutdown on the next pack # start the shutdown on the next pack
shutDownRequested = true shutDownRequested = true
# do a hard shutdown after a further 5 minutes # do a hard shutdown after a further 5 minutes
setTimeout () -> hardTimeout = setTimeout () ->
logger.error "HARD TIMEOUT in pack archive worker" logger.error "HARD TIMEOUT in pack archive worker"
process.exit() process.exit()
, 5*60*1000 , 5*60*1000
hardTimeout.unref()
, TIMEOUT , TIMEOUT
logger.log "checking for updates, limit=#{LIMIT}, delay=#{DOCUMENT_PACK_DELAY}, timeout=#{TIMEOUT}" logger.log "checking for updates, limit=#{LIMIT}, delay=#{DOCUMENT_PACK_DELAY}, timeout=#{TIMEOUT}"
@ -61,7 +64,7 @@ finish = () ->
db.close () -> db.close () ->
logger.log 'closing LockManager Redis Connection' logger.log 'closing LockManager Redis Connection'
LockManager.close () -> LockManager.close () ->
logger.log 'ready to exit from pack archive worker' logger.log {processedCount: COUNT, allCount: TOTAL}, 'ready to exit from pack archive worker'
hardTimeout = setTimeout () -> hardTimeout = setTimeout () ->
logger.error 'hard exit from pack archive worker' logger.error 'hard exit from pack archive worker'
process.exit(1) process.exit(1)
@ -74,7 +77,8 @@ process.on 'exit', (code) ->
processUpdates = (pending) -> processUpdates = (pending) ->
async.eachSeries pending, (result, callback) -> async.eachSeries pending, (result, callback) ->
{_id, project_id, doc_id} = result {_id, project_id, doc_id} = result
logger.log {project_id, doc_id}, "processing" COUNT++
logger.log {project_id, doc_id}, "processing #{COUNT}/#{TOTAL}"
if not project_id? or not doc_id? if not project_id? or not doc_id?
logger.log {project_id, doc_id}, "skipping pack, missing project/doc id" logger.log {project_id, doc_id}, "skipping pack, missing project/doc id"
return callback() return callback()
@ -87,7 +91,7 @@ processUpdates = (pending) ->
logger.error {err, result}, "error in pack archive worker" logger.error {err, result}, "error in pack archive worker"
return callback(err) return callback(err)
if shutDownRequested if shutDownRequested
logger.error "shutting down pack archive worker" logger.warn "shutting down pack archive worker"
return callback(new Error("shutdown")) return callback(new Error("shutdown"))
setTimeout () -> setTimeout () ->
callback(err, result) callback(err, result)
@ -115,11 +119,13 @@ if pending?
logger.log "got #{pending.length} entries from #{source}" logger.log "got #{pending.length} entries from #{source}"
processUpdates pending processUpdates pending
else else
oneWeekAgo = new Date(Date.now() - 7 * DAYS)
db.docHistory.find({ db.docHistory.find({
expiresAt: {$exists: false} expiresAt: {$exists: false}
project_id: {$exists: true} project_id: {$exists: true}
v_end: {$exists: true} v_end: {$exists: true}
_id: {$lt: ObjectIdFromDate(new Date(Date.now() - 7 * DAYS))} _id: {$lt: ObjectIdFromDate(oneWeekAgo)}
last_checked: {$lt: oneWeekAgo}
}, {_id:1, doc_id:1, project_id:1}).sort({ }, {_id:1, doc_id:1, project_id:1}).sort({
last_checked:1 last_checked:1
}).limit LIMIT, (err, results) -> }).limit LIMIT, (err, results) ->
@ -128,5 +134,6 @@ else
finish() finish()
return return
pending = _.uniq results, false, (result) -> result.doc_id.toString() pending = _.uniq results, false, (result) -> result.doc_id.toString()
logger.log "found #{pending.length} documents to archive" TOTAL = pending.length
logger.log "found #{TOTAL} documents to archive"
processUpdates pending processUpdates pending

View file

@ -2,6 +2,7 @@ Settings = require "settings-sharelatex"
redis = require("redis-sharelatex") redis = require("redis-sharelatex")
rclient = redis.createClient(Settings.redis.history) rclient = redis.createClient(Settings.redis.history)
Keys = Settings.redis.history.key_schema Keys = Settings.redis.history.key_schema
async = require "async"
module.exports = RedisManager = module.exports = RedisManager =
@ -33,12 +34,19 @@ module.exports = RedisManager =
rclient.smembers Keys.docsWithHistoryOps({project_id}), callback rclient.smembers Keys.docsWithHistoryOps({project_id}), callback
# iterate over keys asynchronously using redis scan (non-blocking) # iterate over keys asynchronously using redis scan (non-blocking)
# handle all the cluster nodes or single redis server
_getKeys: (pattern, callback) -> _getKeys: (pattern, callback) ->
nodes = rclient.nodes?('master') || [ rclient ];
doKeyLookupForNode = (node, cb) ->
RedisManager._getKeysFromNode node, pattern, cb
async.concatSeries nodes, doKeyLookupForNode, callback
_getKeysFromNode: (node, pattern, callback) ->
cursor = 0 # redis iterator cursor = 0 # redis iterator
keySet = {} # use hash to avoid duplicate results keySet = {} # use hash to avoid duplicate results
# scan over all keys looking for pattern # scan over all keys looking for pattern
doIteration = (cb) -> doIteration = (cb) ->
rclient.scan cursor, "MATCH", pattern, "COUNT", 1000, (error, reply) -> node.scan cursor, "MATCH", pattern, "COUNT", 1000, (error, reply) ->
return callback(error) if error? return callback(error) if error?
[cursor, keys] = reply [cursor, keys] = reply
for key in keys for key in keys
@ -50,21 +58,20 @@ module.exports = RedisManager =
doIteration() doIteration()
# extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b # extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
# or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
_extractIds: (keyList) -> _extractIds: (keyList) ->
ids = (key.split(":")[1] for key in keyList) ids = for key in keyList
m = key.match(/:\{?([0-9a-f]{24})\}?/) # extract object id
m[1]
return ids return ids
# this will only work on single node redis, not redis cluster
getProjectIdsWithHistoryOps: (callback = (error, project_ids) ->) -> getProjectIdsWithHistoryOps: (callback = (error, project_ids) ->) ->
return callback(new Error("not supported")) if rclient.nodes?
RedisManager._getKeys Keys.docsWithHistoryOps({project_id:"*"}), (error, project_keys) -> RedisManager._getKeys Keys.docsWithHistoryOps({project_id:"*"}), (error, project_keys) ->
return callback(error) if error? return callback(error) if error?
project_ids = RedisManager._extractIds project_keys project_ids = RedisManager._extractIds project_keys
callback(error, project_ids) callback(error, project_ids)
# this will only work on single node redis, not redis cluster
getAllDocIdsWithHistoryOps: (callback = (error, doc_ids) ->) -> getAllDocIdsWithHistoryOps: (callback = (error, doc_ids) ->) ->
return callback(new Error("not supported")) if rclient.nodes?
# return all the docids, to find dangling history entries after # return all the docids, to find dangling history entries after
# everything is flushed. # everything is flushed.
RedisManager._getKeys Keys.uncompressedHistoryOps({doc_id:"*"}), (error, doc_keys) -> RedisManager._getKeys Keys.uncompressedHistoryOps({doc_id:"*"}), (error, doc_keys) ->

View file

@ -9,6 +9,7 @@ logger = require "logger-sharelatex"
async = require "async" async = require "async"
_ = require "underscore" _ = require "underscore"
Settings = require "settings-sharelatex" Settings = require "settings-sharelatex"
keys = Settings.redis.lock.key_schema
module.exports = UpdatesManager = module.exports = UpdatesManager =
compressAndSaveRawUpdates: (project_id, doc_id, rawUpdates, temporary, callback = (error) ->) -> compressAndSaveRawUpdates: (project_id, doc_id, rawUpdates, temporary, callback = (error) ->) ->
@ -96,7 +97,9 @@ module.exports = UpdatesManager =
length = docUpdates.length length = docUpdates.length
# parse the redis strings into ShareJs updates # parse the redis strings into ShareJs updates
RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) -> RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) ->
return callback(error) if error? if error?
logger.err project_id: project_id, doc_id: doc_id, docUpdates: docUpdates, "failed to parse docUpdates"
return callback(error)
logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis" logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis"
UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) ->
return callback(error) if error? return callback(error) if error?
@ -126,7 +129,7 @@ module.exports = UpdatesManager =
UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) -> UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) ->
return callback(error) if error? return callback(error) if error?
LockManager.runWithLock( LockManager.runWithLock(
"HistoryLock:#{doc_id}", keys.historyLock({doc_id}),
(releaseLock) -> (releaseLock) ->
UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, releaseLock UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, releaseLock
callback callback

View file

@ -18,10 +18,13 @@ module.exports =
user: "sharelatex" user: "sharelatex"
pass: "password" pass: "password"
redis: redis:
web: lock:
host: "localhost" host: "localhost"
port: 6379 port: 6379
pass: "" pass: ""
key_schema:
historyLock: ({doc_id}) -> "HistoryLock:#{doc_id}"
historyIndexLock: ({project_id}) -> "HistoryIndexLock:#{project_id}"
history: history:
port:"6379" port:"6379"
host:"localhost" host:"localhost"

View file

@ -0,0 +1,54 @@
Settings = require "settings-sharelatex"
logger = require "logger-sharelatex"
TrackChangesLogger = logger.initialize("track-changes").logger
async = require "async"
fs = require "fs"
request = require "request"
cli = require "cli"
mongojs = require "mongojs"
bson = require "bson"
db = mongojs(Settings.mongo.url, ["docs"])
ObjectId = mongojs.ObjectId
options = cli.parse({
port: ['p', 'port number for track changes', 'number'],
force: ['f', 'actually make the fix']
});
if cli.args.length < 1
console.log "fixdangling -p PORT file_of_doc_ids"
process.exit()
file = cli.args.pop()
doc_ids = fs.readFileSync(file).toString().trim().split("\n")
missing = 0
errored = 0
success = 0
fixDangling = (doc_id, callback) ->
# look up project id from doc id
db.docs.find {_id:ObjectId(doc_id)}, {project_id:1}, (err, result) ->
#console.log "doc_id", doc_id, "err", err, "result", result
if err?
errored++
return callback()
if !result? or result.length == 0
missing++
return callback()
project_id = result[0].project_id
console.log "found project_id", project_id, "for doc_id", doc_id
url = "http://localhost:#{options.port}/project/#{project_id}/doc/#{doc_id}/flush"
if options.force
request.post url, (err, response, body) ->
if err? then errored++ else success++
callback()
else
console.log "URL:", url
success++
callback()
async.eachSeries doc_ids, fixDangling, (err) ->
console.log "final result", err, "missing", missing, "errored", errored, "success", success
db.close()

View file

@ -1025,10 +1025,22 @@
} }
}, },
"redis-sharelatex": { "redis-sharelatex": {
"version": "1.0.0", "version": "1.0.2",
"from": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.0", "from": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.2",
"resolved": "git+https://github.com/sharelatex/redis-sharelatex.git#0cd669a9ed73c0330e3b912fc9d9a42dae3c4fbd", "resolved": "git+https://github.com/sharelatex/redis-sharelatex.git#143b7eb192675f36d835080e534a4ac4899f918a",
"dependencies": { "dependencies": {
"async": {
"version": "2.4.0",
"from": "async@>=2.4.0 <3.0.0",
"resolved": "https://registry.npmjs.org/async/-/async-2.4.0.tgz",
"dependencies": {
"lodash": {
"version": "4.17.4",
"from": "lodash@>=4.14.0 <5.0.0",
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.4.tgz"
}
}
},
"chai": { "chai": {
"version": "1.9.1", "version": "1.9.1",
"from": "chai@1.9.1", "from": "chai@1.9.1",

View file

@ -19,7 +19,7 @@
"metrics-sharelatex": "git+https://github.com/sharelatex/metrics-sharelatex.git#v1.7.1", "metrics-sharelatex": "git+https://github.com/sharelatex/metrics-sharelatex.git#v1.7.1",
"request": "~2.33.0", "request": "~2.33.0",
"requestretry": "^1.12.0", "requestretry": "^1.12.0",
"redis-sharelatex": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.0", "redis-sharelatex": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.2",
"redis": "~0.10.1", "redis": "~0.10.1",
"underscore": "~1.7.0", "underscore": "~1.7.0",
"mongo-uri": "^0.1.2", "mongo-uri": "^0.1.2",

View file

@ -9,7 +9,7 @@ describe "LockManager", ->
beforeEach -> beforeEach ->
@Settings = @Settings =
redis: redis:
web:{} lock:{}
@LockManager = SandboxedModule.require modulePath, requires: @LockManager = SandboxedModule.require modulePath, requires:
"redis-sharelatex": "redis-sharelatex":
createClient: () => @rclient = createClient: () => @rclient =

View file

@ -22,6 +22,8 @@ describe "PackManager", ->
"logger-sharelatex": { log: sinon.stub(), error: sinon.stub() } "logger-sharelatex": { log: sinon.stub(), error: sinon.stub() }
'metrics-sharelatex': {inc: ()->} 'metrics-sharelatex': {inc: ()->}
"./ProjectIterator": require("../../../../app/js/ProjectIterator.js") # Cache for speed "./ProjectIterator": require("../../../../app/js/ProjectIterator.js") # Cache for speed
"settings-sharelatex":
redis: lock: key_schema: {}
@callback = sinon.stub() @callback = sinon.stub()
@doc_id = ObjectId().toString() @doc_id = ObjectId().toString()
@project_id = ObjectId().toString() @project_id = ObjectId().toString()

View file

@ -17,6 +17,9 @@ describe "UpdatesManager", ->
"./UpdateTrimmer": @UpdateTrimmer = {} "./UpdateTrimmer": @UpdateTrimmer = {}
"./DocArchiveManager": @DocArchiveManager = {} "./DocArchiveManager": @DocArchiveManager = {}
"logger-sharelatex": { log: sinon.stub(), error: sinon.stub() } "logger-sharelatex": { log: sinon.stub(), error: sinon.stub() }
"settings-sharelatex":
redis: lock: key_schema:
historyLock: ({doc_id}) -> "HistoryLock:#{doc_id}"
@doc_id = "doc-id-123" @doc_id = "doc-id-123"
@project_id = "project-id-123" @project_id = "project-id-123"
@callback = sinon.stub() @callback = sinon.stub()