2020-05-06 06:09:33 -04:00
|
|
|
let RedisManager
|
2021-07-12 12:47:15 -04:00
|
|
|
const Settings = require('@overleaf/settings')
|
2020-11-10 06:32:04 -05:00
|
|
|
const rclient = require('@overleaf/redis-wrapper').createClient(
|
2020-05-06 06:09:33 -04:00
|
|
|
Settings.redis.documentupdater
|
|
|
|
)
|
2021-10-06 05:10:28 -04:00
|
|
|
const logger = require('@overleaf/logger')
|
2020-05-06 06:09:33 -04:00
|
|
|
const metrics = require('./Metrics')
|
|
|
|
const Errors = require('./Errors')
|
|
|
|
const crypto = require('crypto')
|
|
|
|
const async = require('async')
|
|
|
|
const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager')
|
2021-09-20 04:23:37 -04:00
|
|
|
const { docIsTooLarge } = require('./Limits')
|
2020-05-06 06:08:21 -04:00
|
|
|
|
|
|
|
// Sometimes Redis calls take an unexpectedly long time. We have to be
|
|
|
|
// quick with Redis calls because we're holding a lock that expires
|
|
|
|
// after 30 seconds. We can't let any errors in the rest of the stack
|
|
|
|
// hold us up, and need to bail out quickly if there is a problem.
|
2020-05-06 06:09:33 -04:00
|
|
|
const MAX_REDIS_REQUEST_LENGTH = 5000 // 5 seconds
|
2020-05-06 06:08:21 -04:00
|
|
|
|
|
|
|
// Make times easy to read
|
2020-05-06 06:09:33 -04:00
|
|
|
const minutes = 60 // seconds for Redis expire
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
const logHashReadErrors = Settings.documentupdater?.logHashErrors?.read
|
2020-05-06 06:09:33 -04:00
|
|
|
|
|
|
|
const MEGABYTES = 1024 * 1024
|
|
|
|
const MAX_RANGES_SIZE = 3 * MEGABYTES
|
|
|
|
|
|
|
|
const keys = Settings.redis.documentupdater.key_schema
|
|
|
|
const historyKeys = Settings.redis.history.key_schema // note: this is track changes, not project-history
|
|
|
|
|
|
|
|
module.exports = RedisManager = {
|
|
|
|
rclient,
|
|
|
|
|
|
|
|
putDocInMemory(
|
2021-11-30 08:26:20 -05:00
|
|
|
projectId,
|
|
|
|
docId,
|
2020-05-06 06:09:33 -04:00
|
|
|
docLines,
|
|
|
|
version,
|
|
|
|
ranges,
|
|
|
|
pathname,
|
|
|
|
projectHistoryId,
|
|
|
|
_callback
|
|
|
|
) {
|
|
|
|
const timer = new metrics.Timer('redis.put-doc')
|
2021-11-30 08:26:20 -05:00
|
|
|
const callback = error => {
|
2020-05-06 06:09:33 -04:00
|
|
|
timer.done()
|
2021-11-30 08:26:20 -05:00
|
|
|
_callback(error)
|
2020-05-06 06:09:33 -04:00
|
|
|
}
|
2021-09-20 04:23:37 -04:00
|
|
|
const docLinesArray = docLines
|
2020-05-06 06:09:33 -04:00
|
|
|
docLines = JSON.stringify(docLines)
|
|
|
|
if (docLines.indexOf('\u0000') !== -1) {
|
|
|
|
const error = new Error('null bytes found in doc lines')
|
|
|
|
// this check was added to catch memory corruption in JSON.stringify.
|
|
|
|
// It sometimes returned null bytes at the end of the string.
|
2021-11-30 08:26:20 -05:00
|
|
|
logger.error({ err: error, docId, docLines }, error.message)
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-09-20 04:23:37 -04:00
|
|
|
// Do an optimised size check on the docLines using the serialised
|
|
|
|
// length as an upper bound
|
|
|
|
const sizeBound = docLines.length
|
|
|
|
if (docIsTooLarge(sizeBound, docLinesArray, Settings.max_doc_length)) {
|
2021-05-06 12:19:23 -04:00
|
|
|
const docSize = docLines.length
|
|
|
|
const err = new Error('blocking doc insert into redis: doc is too large')
|
2021-11-30 08:26:20 -05:00
|
|
|
logger.error({ projectId, docId, err, docSize }, err.message)
|
2021-05-06 12:19:23 -04:00
|
|
|
return callback(err)
|
|
|
|
}
|
2020-05-06 06:09:33 -04:00
|
|
|
const docHash = RedisManager._computeHash(docLines)
|
|
|
|
// record bytes sent to redis
|
|
|
|
metrics.summary('redis.docLines', docLines.length, { status: 'set' })
|
2021-09-30 04:28:32 -04:00
|
|
|
logger.debug(
|
2021-11-30 08:26:20 -05:00
|
|
|
{ projectId, docId, version, docHash, pathname, projectHistoryId },
|
2020-05-06 06:09:33 -04:00
|
|
|
'putting doc in redis'
|
|
|
|
)
|
2021-11-30 08:26:20 -05:00
|
|
|
RedisManager._serializeRanges(ranges, (error, ranges) => {
|
|
|
|
if (error) {
|
|
|
|
logger.error({ err: error, docId, projectId }, error.message)
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-03-28 07:18:21 -04:00
|
|
|
// update docsInProject set before writing doc contents
|
2021-11-30 08:26:20 -05:00
|
|
|
rclient.sadd(
|
|
|
|
keys.docsInProject({ project_id: projectId }),
|
|
|
|
docId,
|
|
|
|
error => {
|
|
|
|
if (error) return callback(error)
|
2021-03-28 07:18:21 -04:00
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
if (!pathname) {
|
|
|
|
metrics.inc('pathname', 1, {
|
|
|
|
path: 'RedisManager.setDoc',
|
|
|
|
status: pathname === '' ? 'zero-length' : 'undefined',
|
|
|
|
})
|
|
|
|
}
|
2021-11-22 05:08:43 -05:00
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
rclient.mset(
|
|
|
|
{
|
|
|
|
[keys.docLines({ doc_id: docId })]: docLines,
|
|
|
|
[keys.projectKey({ doc_id: docId })]: projectId,
|
|
|
|
[keys.docVersion({ doc_id: docId })]: version,
|
|
|
|
[keys.docHash({ doc_id: docId })]: docHash,
|
|
|
|
[keys.ranges({ doc_id: docId })]: ranges,
|
|
|
|
[keys.pathname({ doc_id: docId })]: pathname,
|
|
|
|
[keys.projectHistoryId({ doc_id: docId })]: projectHistoryId,
|
|
|
|
},
|
|
|
|
callback
|
|
|
|
)
|
|
|
|
}
|
|
|
|
)
|
2020-05-06 06:09:33 -04:00
|
|
|
})
|
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
removeDocFromMemory(projectId, docId, _callback) {
|
|
|
|
logger.debug({ projectId, docId }, 'removing doc from redis')
|
|
|
|
const callback = err => {
|
|
|
|
if (err) {
|
|
|
|
logger.err({ projectId, docId, err }, 'error removing doc from redis')
|
|
|
|
_callback(err)
|
2020-05-06 06:09:33 -04:00
|
|
|
} else {
|
2021-11-30 08:26:20 -05:00
|
|
|
logger.debug({ projectId, docId }, 'removed doc from redis')
|
|
|
|
_callback()
|
2020-05-06 06:09:33 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let multi = rclient.multi()
|
2021-11-30 08:26:20 -05:00
|
|
|
multi.strlen(keys.docLines({ doc_id: docId }))
|
2021-03-28 07:30:51 -04:00
|
|
|
multi.del(
|
2021-11-30 08:26:20 -05:00
|
|
|
keys.docLines({ doc_id: docId }),
|
|
|
|
keys.projectKey({ doc_id: docId }),
|
|
|
|
keys.docVersion({ doc_id: docId }),
|
|
|
|
keys.docHash({ doc_id: docId }),
|
|
|
|
keys.ranges({ doc_id: docId }),
|
|
|
|
keys.pathname({ doc_id: docId }),
|
|
|
|
keys.projectHistoryId({ doc_id: docId }),
|
|
|
|
keys.projectHistoryType({ doc_id: docId }),
|
|
|
|
keys.unflushedTime({ doc_id: docId }),
|
|
|
|
keys.lastUpdatedAt({ doc_id: docId }),
|
|
|
|
keys.lastUpdatedBy({ doc_id: docId })
|
2021-03-28 07:30:51 -04:00
|
|
|
)
|
2021-11-30 08:26:20 -05:00
|
|
|
multi.exec((error, response) => {
|
|
|
|
if (error) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
const length = response?.[0]
|
2020-05-06 06:09:33 -04:00
|
|
|
if (length > 0) {
|
|
|
|
// record bytes freed in redis
|
|
|
|
metrics.summary('redis.docLines', length, { status: 'del' })
|
|
|
|
}
|
|
|
|
multi = rclient.multi()
|
2021-11-30 08:26:20 -05:00
|
|
|
multi.srem(keys.docsInProject({ project_id: projectId }), docId)
|
|
|
|
multi.del(keys.projectState({ project_id: projectId }))
|
|
|
|
multi.exec(callback)
|
2020-05-06 06:09:33 -04:00
|
|
|
})
|
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
checkOrSetProjectState(projectId, newState, callback) {
|
2020-05-06 06:09:33 -04:00
|
|
|
const multi = rclient.multi()
|
2021-11-30 08:26:20 -05:00
|
|
|
multi.getset(keys.projectState({ project_id: projectId }), newState)
|
|
|
|
multi.expire(keys.projectState({ project_id: projectId }), 30 * minutes)
|
|
|
|
multi.exec((error, response) => {
|
|
|
|
if (error) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-09-30 04:28:32 -04:00
|
|
|
logger.debug(
|
2021-11-30 08:26:20 -05:00
|
|
|
{ projectId, newState, oldState: response[0] },
|
2020-05-06 06:09:33 -04:00
|
|
|
'checking project state'
|
|
|
|
)
|
2021-11-30 08:26:20 -05:00
|
|
|
callback(null, response[0] !== newState)
|
2020-05-06 06:09:33 -04:00
|
|
|
})
|
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
clearProjectState(projectId, callback) {
|
|
|
|
rclient.del(keys.projectState({ project_id: projectId }), callback)
|
2020-05-06 06:09:33 -04:00
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
getDoc(projectId, docId, callback) {
|
2020-05-06 06:09:33 -04:00
|
|
|
const timer = new metrics.Timer('redis.get-doc')
|
2021-03-28 07:30:51 -04:00
|
|
|
const collectKeys = [
|
2021-11-30 08:26:20 -05:00
|
|
|
keys.docLines({ doc_id: docId }),
|
|
|
|
keys.docVersion({ doc_id: docId }),
|
|
|
|
keys.docHash({ doc_id: docId }),
|
|
|
|
keys.projectKey({ doc_id: docId }),
|
|
|
|
keys.ranges({ doc_id: docId }),
|
|
|
|
keys.pathname({ doc_id: docId }),
|
|
|
|
keys.projectHistoryId({ doc_id: docId }),
|
|
|
|
keys.unflushedTime({ doc_id: docId }),
|
|
|
|
keys.lastUpdatedAt({ doc_id: docId }),
|
|
|
|
keys.lastUpdatedBy({ doc_id: docId }),
|
2021-03-28 07:30:51 -04:00
|
|
|
]
|
2021-11-30 08:26:20 -05:00
|
|
|
rclient.mget(...collectKeys, (error, result) => {
|
|
|
|
if (error) {
|
|
|
|
return callback(error)
|
|
|
|
}
|
2020-05-06 06:09:33 -04:00
|
|
|
let [
|
|
|
|
docLines,
|
|
|
|
version,
|
|
|
|
storedHash,
|
2021-11-30 08:26:20 -05:00
|
|
|
docProjectId,
|
2020-05-06 06:09:33 -04:00
|
|
|
ranges,
|
|
|
|
pathname,
|
|
|
|
projectHistoryId,
|
|
|
|
unflushedTime,
|
|
|
|
lastUpdatedAt,
|
2021-07-13 07:04:42 -04:00
|
|
|
lastUpdatedBy,
|
2021-11-30 08:26:20 -05:00
|
|
|
] = result
|
2020-05-06 06:09:33 -04:00
|
|
|
const timeSpan = timer.done()
|
|
|
|
// check if request took too long and bail out. only do this for
|
|
|
|
// get, because it is the first call in each update, so if this
|
|
|
|
// passes we'll assume others have a reasonable chance to succeed.
|
|
|
|
if (timeSpan > MAX_REDIS_REQUEST_LENGTH) {
|
|
|
|
error = new Error('redis getDoc exceeded timeout')
|
|
|
|
return callback(error)
|
|
|
|
}
|
|
|
|
// record bytes loaded from redis
|
|
|
|
if (docLines != null) {
|
|
|
|
metrics.summary('redis.docLines', docLines.length, { status: 'get' })
|
|
|
|
}
|
|
|
|
// check sha1 hash value if present
|
|
|
|
if (docLines != null && storedHash != null) {
|
|
|
|
const computedHash = RedisManager._computeHash(docLines)
|
|
|
|
if (logHashReadErrors && computedHash !== storedHash) {
|
|
|
|
logger.error(
|
|
|
|
{
|
2021-11-30 08:26:20 -05:00
|
|
|
projectId,
|
|
|
|
docId,
|
|
|
|
docProjectId,
|
2020-05-06 06:09:33 -04:00
|
|
|
computedHash,
|
|
|
|
storedHash,
|
2021-07-13 07:04:42 -04:00
|
|
|
docLines,
|
2020-05-06 06:09:33 -04:00
|
|
|
},
|
|
|
|
'hash mismatch on retrieved document'
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
try {
|
|
|
|
docLines = JSON.parse(docLines)
|
|
|
|
ranges = RedisManager._deserializeRanges(ranges)
|
|
|
|
} catch (e) {
|
|
|
|
return callback(e)
|
|
|
|
}
|
|
|
|
|
|
|
|
version = parseInt(version || 0, 10)
|
|
|
|
// check doc is in requested project
|
2021-11-30 08:26:20 -05:00
|
|
|
if (docProjectId != null && docProjectId !== projectId) {
|
|
|
|
logger.error({ projectId, docId, docProjectId }, 'doc not in project')
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(new Errors.NotFoundError('document not found'))
|
|
|
|
}
|
|
|
|
|
|
|
|
if (projectHistoryId != null) {
|
|
|
|
projectHistoryId = parseInt(projectHistoryId)
|
|
|
|
}
|
|
|
|
|
2021-11-30 10:06:42 -05:00
|
|
|
if (docLines && version && !pathname) {
|
2021-11-22 05:08:43 -05:00
|
|
|
metrics.inc('pathname', 1, {
|
|
|
|
path: 'RedisManager.getDoc',
|
|
|
|
status: pathname === '' ? 'zero-length' : 'undefined',
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-03-28 07:18:21 -04:00
|
|
|
callback(
|
|
|
|
null,
|
|
|
|
docLines,
|
|
|
|
version,
|
|
|
|
ranges,
|
|
|
|
pathname,
|
|
|
|
projectHistoryId,
|
|
|
|
unflushedTime,
|
|
|
|
lastUpdatedAt,
|
|
|
|
lastUpdatedBy
|
|
|
|
)
|
2020-05-06 06:09:33 -04:00
|
|
|
})
|
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
getDocVersion(docId, callback) {
|
|
|
|
rclient.mget(
|
|
|
|
keys.docVersion({ doc_id: docId }),
|
|
|
|
keys.projectHistoryType({ doc_id: docId }),
|
|
|
|
(error, result) => {
|
|
|
|
if (error) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
let [version, projectHistoryType] = result || []
|
2020-05-06 06:09:33 -04:00
|
|
|
version = parseInt(version, 10)
|
2021-11-30 08:26:20 -05:00
|
|
|
callback(null, version, projectHistoryType)
|
2020-05-06 06:09:33 -04:00
|
|
|
}
|
|
|
|
)
|
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
getDocLines(docId, callback) {
|
|
|
|
rclient.get(keys.docLines({ doc_id: docId }), (error, docLines) => {
|
|
|
|
if (error) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
callback(null, docLines)
|
2020-05-06 06:09:33 -04:00
|
|
|
})
|
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
getPreviousDocOps(docId, start, end, callback) {
|
2020-05-06 06:09:33 -04:00
|
|
|
const timer = new metrics.Timer('redis.get-prev-docops')
|
2021-11-30 08:26:20 -05:00
|
|
|
rclient.llen(keys.docOps({ doc_id: docId }), (error, length) => {
|
|
|
|
if (error) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
rclient.get(keys.docVersion({ doc_id: docId }), (error, version) => {
|
|
|
|
if (error) {
|
|
|
|
return callback(error)
|
|
|
|
}
|
|
|
|
version = parseInt(version, 10)
|
|
|
|
const firstVersionInRedis = version - length
|
2021-07-13 07:04:42 -04:00
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
if (start < firstVersionInRedis || end > version) {
|
|
|
|
error = new Errors.OpRangeNotAvailableError(
|
|
|
|
'doc ops range is not loaded in redis'
|
|
|
|
)
|
|
|
|
logger.debug(
|
|
|
|
{ err: error, docId, length, version, start, end },
|
|
|
|
'doc ops range is not loaded in redis'
|
|
|
|
)
|
|
|
|
return callback(error)
|
|
|
|
}
|
2021-07-13 07:04:42 -04:00
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
start = start - firstVersionInRedis
|
|
|
|
if (end > -1) {
|
|
|
|
end = end - firstVersionInRedis
|
|
|
|
}
|
2021-07-13 07:04:42 -04:00
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
if (isNaN(start) || isNaN(end)) {
|
|
|
|
error = new Error('inconsistent version or lengths')
|
|
|
|
logger.error(
|
|
|
|
{ err: error, docId, length, version, start, end },
|
|
|
|
'inconsistent version or length'
|
2021-07-13 07:04:42 -04:00
|
|
|
)
|
2021-11-30 08:26:20 -05:00
|
|
|
return callback(error)
|
2021-07-13 07:04:42 -04:00
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
|
|
|
|
rclient.lrange(
|
|
|
|
keys.docOps({ doc_id: docId }),
|
|
|
|
start,
|
|
|
|
end,
|
|
|
|
(error, jsonOps) => {
|
|
|
|
let ops
|
|
|
|
if (error) {
|
|
|
|
return callback(error)
|
|
|
|
}
|
|
|
|
try {
|
|
|
|
ops = jsonOps.map(jsonOp => JSON.parse(jsonOp))
|
|
|
|
} catch (e) {
|
|
|
|
return callback(e)
|
|
|
|
}
|
|
|
|
const timeSpan = timer.done()
|
|
|
|
if (timeSpan > MAX_REDIS_REQUEST_LENGTH) {
|
|
|
|
error = new Error('redis getPreviousDocOps exceeded timeout')
|
|
|
|
return callback(error)
|
|
|
|
}
|
|
|
|
callback(null, ops)
|
|
|
|
}
|
|
|
|
)
|
|
|
|
})
|
2020-05-06 06:09:33 -04:00
|
|
|
})
|
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
getHistoryType(docId, callback) {
|
|
|
|
rclient.get(
|
|
|
|
keys.projectHistoryType({ doc_id: docId }),
|
|
|
|
(error, projectHistoryType) => {
|
|
|
|
if (error) {
|
2021-07-13 07:04:42 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
callback(null, projectHistoryType)
|
2020-05-06 06:09:33 -04:00
|
|
|
}
|
2021-07-13 07:04:42 -04:00
|
|
|
)
|
2020-05-06 06:09:33 -04:00
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
setHistoryType(docId, projectHistoryType, callback) {
|
|
|
|
rclient.set(
|
|
|
|
keys.projectHistoryType({ doc_id: docId }),
|
2020-05-06 06:09:33 -04:00
|
|
|
projectHistoryType,
|
|
|
|
callback
|
|
|
|
)
|
|
|
|
},
|
|
|
|
|
|
|
|
DOC_OPS_TTL: 60 * minutes,
|
|
|
|
DOC_OPS_MAX_LENGTH: 100,
|
|
|
|
updateDocument(
|
2021-11-30 08:26:20 -05:00
|
|
|
projectId,
|
|
|
|
docId,
|
2020-05-06 06:09:33 -04:00
|
|
|
docLines,
|
|
|
|
newVersion,
|
|
|
|
appliedOps,
|
|
|
|
ranges,
|
|
|
|
updateMeta,
|
|
|
|
callback
|
|
|
|
) {
|
|
|
|
if (appliedOps == null) {
|
|
|
|
appliedOps = []
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
RedisManager.getDocVersion(
|
|
|
|
docId,
|
|
|
|
(error, currentVersion, projectHistoryType) => {
|
|
|
|
if (error) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-07-13 07:04:42 -04:00
|
|
|
if (currentVersion + appliedOps.length !== newVersion) {
|
2021-11-30 08:26:20 -05:00
|
|
|
error = new Error(`Version mismatch. '${docId}' is corrupted.`)
|
2021-07-13 07:04:42 -04:00
|
|
|
logger.error(
|
|
|
|
{
|
|
|
|
err: error,
|
2021-11-30 08:26:20 -05:00
|
|
|
docId,
|
2021-07-13 07:04:42 -04:00
|
|
|
currentVersion,
|
|
|
|
newVersion,
|
|
|
|
opsLength: appliedOps.length,
|
|
|
|
},
|
|
|
|
'version mismatch'
|
|
|
|
)
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-07-13 07:04:42 -04:00
|
|
|
|
|
|
|
const jsonOps = appliedOps.map(op => JSON.stringify(op))
|
2021-11-30 08:26:20 -05:00
|
|
|
for (const op of jsonOps) {
|
2021-07-13 07:04:42 -04:00
|
|
|
if (op.indexOf('\u0000') !== -1) {
|
|
|
|
error = new Error('null bytes found in jsonOps')
|
|
|
|
// this check was added to catch memory corruption in JSON.stringify
|
2021-11-30 08:26:20 -05:00
|
|
|
logger.error({ err: error, docId, jsonOps }, error.message)
|
2021-07-13 07:04:42 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const newDocLines = JSON.stringify(docLines)
|
|
|
|
if (newDocLines.indexOf('\u0000') !== -1) {
|
|
|
|
error = new Error('null bytes found in doc lines')
|
2020-05-06 06:09:33 -04:00
|
|
|
// this check was added to catch memory corruption in JSON.stringify
|
2021-11-30 08:26:20 -05:00
|
|
|
logger.error({ err: error, docId, newDocLines }, error.message)
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-09-20 04:23:37 -04:00
|
|
|
// Do an optimised size check on the docLines using the serialised
|
|
|
|
// length as an upper bound
|
|
|
|
const sizeBound = newDocLines.length
|
|
|
|
if (docIsTooLarge(sizeBound, docLines, Settings.max_doc_length)) {
|
2021-07-13 07:04:42 -04:00
|
|
|
const err = new Error('blocking doc update: doc is too large')
|
|
|
|
const docSize = newDocLines.length
|
2021-11-30 08:26:20 -05:00
|
|
|
logger.error({ projectId, docId, err, docSize }, err.message)
|
2021-07-13 07:04:42 -04:00
|
|
|
return callback(err)
|
2020-05-06 06:09:33 -04:00
|
|
|
}
|
2021-07-13 07:04:42 -04:00
|
|
|
const newHash = RedisManager._computeHash(newDocLines)
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
const opVersions = appliedOps.map(op => op?.v)
|
2021-09-30 04:28:32 -04:00
|
|
|
logger.debug(
|
2021-07-13 07:04:42 -04:00
|
|
|
{
|
2021-11-30 08:26:20 -05:00
|
|
|
docId,
|
2021-07-13 07:04:42 -04:00
|
|
|
version: newVersion,
|
|
|
|
hash: newHash,
|
|
|
|
op_versions: opVersions,
|
|
|
|
},
|
|
|
|
'updating doc in redis'
|
|
|
|
)
|
|
|
|
// record bytes sent to redis in update
|
|
|
|
metrics.summary('redis.docLines', newDocLines.length, {
|
|
|
|
status: 'update',
|
|
|
|
})
|
2021-11-30 08:26:20 -05:00
|
|
|
RedisManager._serializeRanges(ranges, (error, ranges) => {
|
|
|
|
if (error) {
|
|
|
|
logger.error({ err: error, docId }, error.message)
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
if (ranges && ranges.indexOf('\u0000') !== -1) {
|
2021-07-13 07:04:42 -04:00
|
|
|
error = new Error('null bytes found in ranges')
|
|
|
|
// this check was added to catch memory corruption in JSON.stringify
|
2021-11-30 08:26:20 -05:00
|
|
|
logger.error({ err: error, docId, ranges }, error.message)
|
2021-07-13 07:04:42 -04:00
|
|
|
return callback(error)
|
2020-05-06 06:09:33 -04:00
|
|
|
}
|
2021-07-13 07:04:42 -04:00
|
|
|
const multi = rclient.multi()
|
|
|
|
multi.mset({
|
2021-11-30 08:26:20 -05:00
|
|
|
[keys.docLines({ doc_id: docId })]: newDocLines,
|
|
|
|
[keys.docVersion({ doc_id: docId })]: newVersion,
|
|
|
|
[keys.docHash({ doc_id: docId })]: newHash,
|
|
|
|
[keys.ranges({ doc_id: docId })]: ranges,
|
|
|
|
[keys.lastUpdatedAt({ doc_id: docId })]: Date.now(),
|
|
|
|
[keys.lastUpdatedBy({ doc_id: docId })]:
|
|
|
|
updateMeta && updateMeta.user_id,
|
2021-07-13 07:04:42 -04:00
|
|
|
})
|
|
|
|
multi.ltrim(
|
2021-11-30 08:26:20 -05:00
|
|
|
keys.docOps({ doc_id: docId }),
|
2021-07-13 07:04:42 -04:00
|
|
|
-RedisManager.DOC_OPS_MAX_LENGTH,
|
|
|
|
-1
|
|
|
|
) // index 3
|
|
|
|
// push the ops last so we can get the lengths at fixed index position 7
|
|
|
|
if (jsonOps.length > 0) {
|
2021-11-30 08:26:20 -05:00
|
|
|
multi.rpush(keys.docOps({ doc_id: docId }), ...jsonOps) // index 5
|
2021-07-13 07:04:42 -04:00
|
|
|
// expire must come after rpush since before it will be a no-op if the list is empty
|
2021-11-30 08:26:20 -05:00
|
|
|
multi.expire(
|
|
|
|
keys.docOps({ doc_id: docId }),
|
|
|
|
RedisManager.DOC_OPS_TTL
|
|
|
|
) // index 6
|
2021-07-13 07:04:42 -04:00
|
|
|
if (projectHistoryType === 'project-history') {
|
|
|
|
metrics.inc('history-queue', 1, { status: 'skip-track-changes' })
|
2021-09-30 04:28:32 -04:00
|
|
|
logger.debug(
|
2021-11-30 08:26:20 -05:00
|
|
|
{ docId },
|
2021-07-13 07:04:42 -04:00
|
|
|
'skipping push of uncompressed ops for project using project-history'
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
// project is using old track-changes history service
|
|
|
|
metrics.inc('history-queue', 1, { status: 'track-changes' })
|
|
|
|
multi.rpush(
|
2021-11-30 08:26:20 -05:00
|
|
|
historyKeys.uncompressedHistoryOps({ doc_id: docId }),
|
|
|
|
...jsonOps
|
2021-07-13 07:04:42 -04:00
|
|
|
) // index 7
|
|
|
|
}
|
|
|
|
// Set the unflushed timestamp to the current time if the doc
|
|
|
|
// hasn't been modified before (the content in mongo has been
|
|
|
|
// valid up to this point). Otherwise leave it alone ("NX" flag).
|
2021-11-30 08:26:20 -05:00
|
|
|
multi.set(keys.unflushedTime({ doc_id: docId }), Date.now(), 'NX')
|
2020-05-06 06:09:33 -04:00
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
multi.exec((error, result) => {
|
|
|
|
if (error) {
|
2021-07-13 07:04:42 -04:00
|
|
|
return callback(error)
|
|
|
|
}
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
let docUpdateCount
|
2021-07-13 07:04:42 -04:00
|
|
|
if (projectHistoryType === 'project-history') {
|
|
|
|
docUpdateCount = undefined // only using project history, don't bother with track-changes
|
|
|
|
} else {
|
|
|
|
// project is using old track-changes history service
|
|
|
|
docUpdateCount = result[4]
|
|
|
|
}
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
if (jsonOps.length > 0 && Settings.apis?.project_history?.enabled) {
|
2021-07-13 07:04:42 -04:00
|
|
|
metrics.inc('history-queue', 1, { status: 'project-history' })
|
2021-11-30 08:26:20 -05:00
|
|
|
ProjectHistoryRedisManager.queueOps(
|
|
|
|
projectId,
|
|
|
|
...jsonOps,
|
2021-10-27 05:49:18 -04:00
|
|
|
(error, projectUpdateCount) => {
|
|
|
|
if (error) {
|
|
|
|
// The full project history can re-sync a project in case
|
|
|
|
// updates went missing.
|
|
|
|
// Just record the error here and acknowledge the write-op.
|
|
|
|
metrics.inc('history-queue-error')
|
|
|
|
}
|
2021-07-13 07:04:42 -04:00
|
|
|
callback(null, docUpdateCount, projectUpdateCount)
|
2021-10-27 05:49:18 -04:00
|
|
|
}
|
2021-07-13 07:04:42 -04:00
|
|
|
)
|
|
|
|
} else {
|
2021-11-30 08:26:20 -05:00
|
|
|
callback(null, docUpdateCount)
|
2021-07-13 07:04:42 -04:00
|
|
|
}
|
|
|
|
})
|
2020-05-06 06:09:33 -04:00
|
|
|
})
|
2021-07-13 07:04:42 -04:00
|
|
|
}
|
|
|
|
)
|
2020-05-06 06:09:33 -04:00
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
renameDoc(projectId, docId, userId, update, projectHistoryId, callback) {
|
|
|
|
RedisManager.getDoc(projectId, docId, (error, lines, version) => {
|
|
|
|
if (error) {
|
|
|
|
return callback(error)
|
|
|
|
}
|
|
|
|
if (lines != null && version != null) {
|
|
|
|
if (!update.newPathname) {
|
|
|
|
logger.warn(
|
|
|
|
{ projectId, docId, update },
|
|
|
|
'missing pathname in RedisManager.renameDoc'
|
2021-07-13 07:04:42 -04:00
|
|
|
)
|
2021-11-30 08:26:20 -05:00
|
|
|
metrics.inc('pathname', 1, {
|
|
|
|
path: 'RedisManager.renameDoc',
|
|
|
|
status: update.newPathname === '' ? 'zero-length' : 'undefined',
|
|
|
|
})
|
2021-07-13 07:04:42 -04:00
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
rclient.set(
|
|
|
|
keys.pathname({ doc_id: docId }),
|
|
|
|
update.newPathname,
|
|
|
|
callback
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
callback()
|
2020-05-06 06:09:33 -04:00
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
})
|
2020-05-06 06:09:33 -04:00
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
clearUnflushedTime(docId, callback) {
|
|
|
|
rclient.del(keys.unflushedTime({ doc_id: docId }), callback)
|
2020-05-06 06:09:33 -04:00
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
getDocIdsInProject(projectId, callback) {
|
|
|
|
rclient.smembers(keys.docsInProject({ project_id: projectId }), callback)
|
2020-05-06 06:09:33 -04:00
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
/**
|
|
|
|
* Get lastupdatedat timestamps for an array of docIds
|
|
|
|
*/
|
|
|
|
getDocTimestamps(docIds, callback) {
|
|
|
|
async.mapSeries(
|
|
|
|
docIds,
|
|
|
|
(docId, cb) => rclient.get(keys.lastUpdatedAt({ doc_id: docId }), cb),
|
2020-05-06 06:09:33 -04:00
|
|
|
callback
|
|
|
|
)
|
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
/**
|
|
|
|
* Store the project id in a sorted set ordered by time with a random offset
|
|
|
|
* to smooth out spikes
|
|
|
|
*/
|
|
|
|
queueFlushAndDeleteProject(projectId, callback) {
|
2020-05-06 06:09:33 -04:00
|
|
|
const SMOOTHING_OFFSET =
|
|
|
|
Settings.smoothingOffset > 0
|
|
|
|
? Math.round(Settings.smoothingOffset * Math.random())
|
|
|
|
: 0
|
2021-11-30 08:26:20 -05:00
|
|
|
rclient.zadd(
|
2020-05-06 06:09:33 -04:00
|
|
|
keys.flushAndDeleteQueue(),
|
|
|
|
Date.now() + SMOOTHING_OFFSET,
|
2021-11-30 08:26:20 -05:00
|
|
|
projectId,
|
2020-05-06 06:09:33 -04:00
|
|
|
callback
|
|
|
|
)
|
|
|
|
},
|
|
|
|
|
2021-11-30 08:26:20 -05:00
|
|
|
/**
|
|
|
|
* Find the oldest queued flush that is before the cutoff time
|
|
|
|
*/
|
2020-05-06 06:09:33 -04:00
|
|
|
getNextProjectToFlushAndDelete(cutoffTime, callback) {
|
2021-11-30 08:26:20 -05:00
|
|
|
rclient.zrangebyscore(
|
2020-05-06 06:09:33 -04:00
|
|
|
keys.flushAndDeleteQueue(),
|
|
|
|
0,
|
|
|
|
cutoffTime,
|
|
|
|
'WITHSCORES',
|
|
|
|
'LIMIT',
|
|
|
|
0,
|
|
|
|
1,
|
2021-11-30 08:26:20 -05:00
|
|
|
(err, reply) => {
|
|
|
|
if (err) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(err)
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
// return if no projects ready to be processed
|
|
|
|
if (!reply || reply.length === 0) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback()
|
2021-11-30 08:26:20 -05:00
|
|
|
}
|
2020-05-06 06:09:33 -04:00
|
|
|
// pop the oldest entry (get and remove in a multi)
|
|
|
|
const multi = rclient.multi()
|
|
|
|
// Poor man's version of ZPOPMIN, which is only available in Redis 5.
|
|
|
|
multi.zrange(keys.flushAndDeleteQueue(), 0, 0, 'WITHSCORES')
|
|
|
|
multi.zremrangebyrank(keys.flushAndDeleteQueue(), 0, 0)
|
|
|
|
multi.zcard(keys.flushAndDeleteQueue()) // the total length of the queue (for metrics)
|
2021-11-30 08:26:20 -05:00
|
|
|
multi.exec((err, reply) => {
|
|
|
|
if (err) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(err)
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
if (!reply || reply.length === 0) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback()
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
const [key, timestamp] = reply[0]
|
2020-05-06 06:09:33 -04:00
|
|
|
const queueLength = reply[2]
|
2021-11-30 08:26:20 -05:00
|
|
|
callback(null, key, timestamp, queueLength)
|
2020-05-06 06:09:33 -04:00
|
|
|
})
|
|
|
|
}
|
|
|
|
)
|
|
|
|
},
|
|
|
|
|
|
|
|
_serializeRanges(ranges, callback) {
|
|
|
|
let jsonRanges = JSON.stringify(ranges)
|
2021-11-30 08:26:20 -05:00
|
|
|
if (jsonRanges && jsonRanges.length > MAX_RANGES_SIZE) {
|
2020-05-06 06:09:33 -04:00
|
|
|
return callback(new Error('ranges are too large'))
|
|
|
|
}
|
|
|
|
if (jsonRanges === '{}') {
|
|
|
|
// Most doc will have empty ranges so don't fill redis with lots of '{}' keys
|
|
|
|
jsonRanges = null
|
|
|
|
}
|
2021-11-30 08:26:20 -05:00
|
|
|
callback(null, jsonRanges)
|
2020-05-06 06:09:33 -04:00
|
|
|
},
|
|
|
|
|
|
|
|
_deserializeRanges(ranges) {
|
|
|
|
if (ranges == null || ranges === '') {
|
|
|
|
return {}
|
|
|
|
} else {
|
|
|
|
return JSON.parse(ranges)
|
|
|
|
}
|
|
|
|
},
|
|
|
|
|
|
|
|
_computeHash(docLines) {
|
|
|
|
// use sha1 checksum of doclines to detect data corruption.
|
|
|
|
//
|
|
|
|
// note: must specify 'utf8' encoding explicitly, as the default is
|
|
|
|
// binary in node < v5
|
|
|
|
return crypto.createHash('sha1').update(docLines, 'utf8').digest('hex')
|
2021-07-13 07:04:42 -04:00
|
|
|
},
|
2020-05-06 06:09:33 -04:00
|
|
|
}
|