Merge pull request #16589 from overleaf/jpa-script-redis-mongo-mismatch

[document-updater] add script for checking for redis vs mongo de-syncs

GitOrigin-RevId: b51b61efd750f37180c09af5ccf5681712719ee2
This commit is contained in:
Jakob Ackermann 2024-01-23 10:45:06 +00:00 committed by Copybot
parent 784ebcbd57
commit a7c7f2e69b
9 changed files with 618 additions and 0 deletions

2
package-lock.json generated
View file

@ -43695,6 +43695,7 @@
"@overleaf/logger": "*",
"@overleaf/metrics": "*",
"@overleaf/o-error": "*",
"@overleaf/promise-utils": "*",
"@overleaf/ranges-tracker": "*",
"@overleaf/redis-wrapper": "*",
"@overleaf/settings": "*",
@ -54189,6 +54190,7 @@
"@overleaf/logger": "*",
"@overleaf/metrics": "*",
"@overleaf/o-error": "*",
"@overleaf/promise-utils": "*",
"@overleaf/ranges-tracker": "*",
"@overleaf/redis-wrapper": "*",
"@overleaf/settings": "*",

View file

@ -3,6 +3,7 @@ const redis = require('@overleaf/redis-wrapper')
const rclient = redis.createClient(Settings.redis.lock)
const keys = Settings.redis.lock.key_schema
const RedisLocker = require('@overleaf/redis-wrapper/RedisLocker')
const { promisify } = require('@overleaf/promise-utils')
module.exports = new RedisLocker({
rclient,
@ -16,3 +17,10 @@ module.exports = new RedisLocker({
metricsPrefix: 'doc',
lockTTLSeconds: Settings.redisLockTTLSeconds,
})
module.exports.promises = {
checkLock: promisify(module.exports.checkLock.bind(module.exports)),
getLock: promisify(module.exports.getLock.bind(module.exports)),
releaseLock: promisify(module.exports.releaseLock.bind(module.exports)),
tryLock: promisify(module.exports.tryLock.bind(module.exports)),
}

View file

@ -2,6 +2,7 @@ const Settings = require('@overleaf/settings')
const Errors = require('./Errors')
const Metrics = require('./Metrics')
const logger = require('@overleaf/logger')
const { promisifyAll } = require('@overleaf/promise-utils')
const request = require('requestretry').defaults({
maxAttempts: 2,
retryDelay: 10,
@ -175,3 +176,9 @@ function setDoc(
}
module.exports = { getDoc, setDoc }
module.exports.promises = promisifyAll(module.exports, {
multiResult: {
getDoc: ['lines', 'version', 'ranges', 'pathname', 'projectHistoryId'],
},
})

View file

@ -20,6 +20,7 @@ const async = require('async')
const ProjectManager = require('./ProjectManager')
const _ = require('lodash')
const logger = require('@overleaf/logger')
const { promisifyAll } = require('@overleaf/promise-utils')
const ProjectFlusher = {
// iterate over keys asynchronously using redis scan (non-blocking)
@ -135,3 +136,4 @@ const ProjectFlusher = {
}
module.exports = ProjectFlusher
module.exports.promises = promisifyAll(ProjectFlusher)

View file

@ -6,6 +6,7 @@ const async = require('async')
const logger = require('@overleaf/logger')
const Metrics = require('./Metrics')
const Errors = require('./Errors')
const { promisifyAll } = require('@overleaf/promise-utils')
module.exports = {
flushProjectWithLocks,
@ -17,6 +18,8 @@ module.exports = {
updateProjectWithLocks,
}
module.exports.promises = promisifyAll(module.exports)
function flushProjectWithLocks(projectId, _callback) {
const timer = new Metrics.Timer('projectManager.flushProjectWithLocks')
const callback = function (...args) {

View file

@ -10,6 +10,7 @@ const crypto = require('crypto')
const async = require('async')
const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager')
const { docIsTooLarge } = require('./Limits')
const { promisifyAll } = require('@overleaf/promise-utils')
// Sometimes Redis calls take an unexpectedly long time. We have to be
// quick with Redis calls because we're holding a lock that expires
@ -617,3 +618,23 @@ module.exports = RedisManager = {
return crypto.createHash('sha1').update(docLines, 'utf8').digest('hex')
},
}
module.exports.promises = promisifyAll(module.exports, {
multiResult: {
getDoc: [
'lines',
'version',
'ranges',
'pathname',
'projectHistoryId',
'unflushedTime',
'lastUpdatedAt',
'lastUpdatedBy',
],
getNextProjectToFlushAndDelete: [
'projectId',
'flushTimestamp',
'queueLength',
],
},
})

View file

@ -21,6 +21,7 @@
"@overleaf/logger": "*",
"@overleaf/metrics": "*",
"@overleaf/o-error": "*",
"@overleaf/promise-utils": "*",
"@overleaf/ranges-tracker": "*",
"@overleaf/redis-wrapper": "*",
"@overleaf/settings": "*",

View file

@ -0,0 +1,309 @@
const fs = require('fs')
const Path = require('path')
const _ = require('lodash')
const logger = require('@overleaf/logger')
const OError = require('@overleaf/o-error')
const LockManager = require('../app/js/LockManager')
const PersistenceManager = require('../app/js/PersistenceManager')
const ProjectFlusher = require('../app/js/ProjectFlusher')
const ProjectManager = require('../app/js/ProjectManager')
const RedisManager = require('../app/js/RedisManager')
const Settings = require('@overleaf/settings')
const AUTO_FIX_VERSION_MISMATCH =
process.env.AUTO_FIX_VERSION_MISMATCH === 'true'
const SCRIPT_LOG_LEVEL = process.env.SCRIPT_LOG_LEVEL || 'warn'
const FLUSH_IN_SYNC_PROJECTS = process.env.FLUSH_IN_SYNC_PROJECTS === 'true'
const FOLDER =
process.env.FOLDER || '/tmp/overleaf-check-redis-mongo-sync-state'
const LIMIT = parseInt(process.env.LIMIT || '1000', 10)
const RETRIES = parseInt(process.env.RETRIES || '5', 10)
const WRITE_CONTENT = process.env.WRITE_CONTENT === 'true'
process.env.LOG_LEVEL = SCRIPT_LOG_LEVEL
logger.initialize('check-redis-mongo-sync-state')
const COMPARE_AND_SET =
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("set", KEYS[1], ARGV[2]) else return 0 end'
/**
* @typedef {Object} Doc
* @property {number} version
* @property {Array<string>} lines
* @property {string} pathname
* @property {Object} ranges
*/
class TryAgainError extends Error {}
/**
* @param {string} docId
* @param {Doc} redisDoc
* @param {Doc} mongoDoc
* @return {Promise<void>}
*/
async function updateDocVersionInRedis(docId, redisDoc, mongoDoc) {
const lockValue = await LockManager.promises.getLock(docId)
try {
const key = Settings.redis.documentupdater.key_schema.docVersion({
doc_id: docId,
})
const numberOfKeys = 1
const ok = await RedisManager.rclient.eval(
COMPARE_AND_SET,
numberOfKeys,
key,
redisDoc.version,
mongoDoc.version
)
if (!ok) {
throw new TryAgainError(
'document has been updated, aborting overwrite. Try again.'
)
}
} finally {
await LockManager.promises.releaseLock(docId, lockValue)
}
}
/**
* @param {string} projectId
* @param {string} docId
* @return {Promise<boolean>}
*/
async function processDoc(projectId, docId) {
const redisDoc = /** @type Doc */ await RedisManager.promises.getDoc(
projectId,
docId
)
const mongoDoc = /** @type Doc */ await PersistenceManager.promises.getDoc(
projectId,
docId
)
if (mongoDoc.version < redisDoc.version) {
// mongo is behind, we can flush to mongo when all docs are processed.
return false
}
mongoDoc.snapshot = mongoDoc.lines.join('\n')
redisDoc.snapshot = redisDoc.lines.join('\n')
if (!mongoDoc.ranges) mongoDoc.ranges = {}
if (!redisDoc.ranges) redisDoc.ranges = {}
const sameLines = mongoDoc.snapshot === redisDoc.snapshot
const sameRanges = _.isEqual(mongoDoc.ranges, redisDoc.ranges)
if (sameLines && sameRanges) {
if (mongoDoc.version > redisDoc.version) {
// mongo is ahead, technically out of sync, but practically the content is identical
if (AUTO_FIX_VERSION_MISMATCH) {
console.log(
`Fixing out of sync doc version for doc ${docId} in project ${projectId}: mongo=${mongoDoc.version} > redis=${redisDoc.version}`
)
await updateDocVersionInRedis(docId, redisDoc, mongoDoc)
return false
} else {
console.error(
`Detected out of sync redis and mongo version for doc ${docId} in project ${projectId}, auto-fixable via AUTO_FIX_VERSION_MISMATCH=true`
)
return true
}
} else {
// same lines, same ranges, same version
return false
}
}
const dir = Path.join(FOLDER, projectId, docId)
console.error(
`Detected out of sync redis and mongo content for doc ${docId} in project ${projectId}`
)
if (!WRITE_CONTENT) return true
console.log(`pathname: ${mongoDoc.pathname}`)
console.log(`mongo version: ${mongoDoc.version}`)
console.log(`redis version: ${redisDoc.version}`)
await fs.promises.mkdir(dir, { recursive: true })
if (sameLines) {
console.log('mongo lines match redis lines')
} else {
console.log(
`mongo lines and redis lines out of sync, writing content into ${dir}`
)
await fs.promises.writeFile(
Path.join(dir, 'mongo-snapshot.txt'),
mongoDoc.snapshot
)
await fs.promises.writeFile(
Path.join(dir, 'redis-snapshot.txt'),
redisDoc.snapshot
)
}
if (sameRanges) {
console.log('mongo ranges match redis ranges')
} else {
console.log(
`mongo ranges and redis ranges out of sync, writing content into ${dir}`
)
await fs.promises.writeFile(
Path.join(dir, 'mongo-ranges.json'),
JSON.stringify(mongoDoc.ranges)
)
await fs.promises.writeFile(
Path.join(dir, 'redis-ranges.json'),
JSON.stringify(redisDoc.ranges)
)
}
console.log('---')
return true
}
/**
* @param {string} projectId
* @return {Promise<number>}
*/
async function processProject(projectId) {
const docIds = await RedisManager.promises.getDocIdsInProject(projectId)
let outOfSync = 0
for (const docId of docIds) {
let lastErr
for (let i = 0; i <= RETRIES; i++) {
try {
if (await processDoc(projectId, docId)) {
outOfSync++
}
break
} catch (err) {
lastErr = err
}
}
if (lastErr) {
throw OError.tag(lastErr, 'process doc', { docId })
}
}
if (outOfSync === 0 && FLUSH_IN_SYNC_PROJECTS) {
try {
await ProjectManager.promises.flushAndDeleteProjectWithLocks(
projectId,
{}
)
} catch (err) {
throw OError.tag(err, 'flush project with only in-sync docs')
}
}
return outOfSync
}
/**
* @param {Set<string>} processed
* @param {Set<string>} outOfSync
* @return {Promise<{perIterationOutOfSync: number, done: boolean}>}
*/
async function scanOnce(processed, outOfSync) {
const projectIds = await ProjectFlusher.promises.flushAllProjects({
limit: LIMIT,
dryRun: true,
})
let perIterationOutOfSync = 0
for (const projectId of projectIds) {
if (processed.has(projectId)) continue
processed.add(projectId)
let perProjectOutOfSync = 0
try {
perProjectOutOfSync = await processProject(projectId)
} catch (err) {
throw OError.tag(err, 'process project', { projectId })
}
perIterationOutOfSync += perProjectOutOfSync
if (perProjectOutOfSync > 0) {
outOfSync.add(projectId)
}
}
return { perIterationOutOfSync, done: projectIds.length < LIMIT }
}
/**
* @return {Promise<number>}
*/
async function main() {
if (!WRITE_CONTENT) {
console.warn()
console.warn(
` Use WRITE_CONTENT=true to write the content of out of sync docs to FOLDER=${FOLDER}`
)
console.warn()
} else {
console.log(
`Writing content for projects with out of sync docs into FOLDER=${FOLDER}`
)
await fs.promises.mkdir(FOLDER, { recursive: true })
const existing = await fs.promises.readdir(FOLDER)
if (existing.length > 0) {
console.warn()
console.warn(
` Found existing entries in FOLDER=${FOLDER}. Please delete or move these before running the script again.`
)
console.warn()
return 101
}
}
if (LIMIT < 100) {
console.warn()
console.warn(
` Using small LIMIT=${LIMIT}, this can take a while to SCAN in a large redis database.`
)
console.warn()
}
const processed = new Set()
const outOfSyncProjects = new Set()
let totalOutOfSyncDocs = 0
while (true) {
const before = processed.size
const { perIterationOutOfSync, done } = await scanOnce(
processed,
outOfSyncProjects
)
totalOutOfSyncDocs += perIterationOutOfSync
console.log(`Processed ${processed.size} projects`)
console.log(
`Found ${
outOfSyncProjects.size
} projects with ${totalOutOfSyncDocs} out of sync docs: ${JSON.stringify(
Array.from(outOfSyncProjects)
)}`
)
if (done) {
console.log('Finished iterating all projects in redis')
break
}
if (processed.size === before) {
console.error(
`Found too many un-flushed projects (LIMIT=${LIMIT}). Please fix the reported projects first, then try again.`
)
if (!FLUSH_IN_SYNC_PROJECTS) {
console.error(
'Use FLUSH_IN_SYNC_PROJECTS=true to flush projects that have been checked.'
)
}
return 2
}
}
return totalOutOfSyncDocs > 0 ? 1 : 0
}
main()
.then(code => {
process.exit(code)
})
.catch(error => {
console.error(OError.getFullStack(error))
console.error(OError.getFullInfo(error))
process.exit(1)
})

View file

@ -0,0 +1,265 @@
const MockWebApi = require('./helpers/MockWebApi')
const DocUpdaterClient = require('./helpers/DocUpdaterClient')
const DocUpdaterApp = require('./helpers/DocUpdaterApp')
const { promisify } = require('util')
const { exec } = require('child_process')
const { expect } = require('chai')
const Settings = require('@overleaf/settings')
const fs = require('fs')
const Path = require('path')
const rclient = require('@overleaf/redis-wrapper').createClient(
Settings.redis.documentupdater
)
describe('CheckRedisMongoSyncState', function () {
beforeEach(function (done) {
DocUpdaterApp.ensureRunning(done)
})
beforeEach(async function () {
await rclient.flushall()
})
async function runScript(options) {
let result
try {
result = await promisify(exec)(
Object.entries(options)
.map(([key, value]) => `${key}=${value}`)
.concat(['node', 'scripts/check_redis_mongo_sync_state.js'])
.join(' ')
)
} catch (error) {
// includes details like exit code, stdErr and stdOut
return error
}
result.code = 0
return result
}
describe('without projects', function () {
it('should work when in sync', async function () {
const result = await runScript({})
expect(result.code).to.equal(0)
expect(result.stdout).to.include('Processed 0 projects')
expect(result.stdout).to.include(
'Found 0 projects with 0 out of sync docs'
)
})
})
describe('with a project', function () {
let projectId, docId
beforeEach(function (done) {
projectId = DocUpdaterClient.randomId()
docId = DocUpdaterClient.randomId()
MockWebApi.insertDoc(projectId, docId, {
lines: ['mongo', 'lines'],
version: 1,
})
DocUpdaterClient.getDoc(projectId, docId, done)
})
it('should work when in sync', async function () {
const result = await runScript({})
expect(result.code).to.equal(0)
expect(result.stdout).to.include('Processed 1 projects')
expect(result.stdout).to.include(
'Found 0 projects with 0 out of sync docs'
)
})
describe('with out of sync lines', function () {
beforeEach(function () {
MockWebApi.insertDoc(projectId, docId, {
lines: ['updated', 'mongo', 'lines'],
version: 1,
})
})
it('should detect the out of sync state', async function () {
const result = await runScript({})
expect(result.code).to.equal(1)
expect(result.stdout).to.include('Processed 1 projects')
expect(result.stdout).to.include(
'Found 1 projects with 1 out of sync docs'
)
})
})
describe('with out of sync ranges', function () {
beforeEach(function () {
MockWebApi.insertDoc(projectId, docId, {
lines: ['mongo', 'lines'],
version: 1,
ranges: { changes: ['FAKE CHANGE'] },
})
})
it('should detect the out of sync state', async function () {
const result = await runScript({})
expect(result.code).to.equal(1)
expect(result.stdout).to.include('Processed 1 projects')
expect(result.stdout).to.include(
'Found 1 projects with 1 out of sync docs'
)
})
})
describe('with out of sync version', function () {
beforeEach(function () {
MockWebApi.insertDoc(projectId, docId, {
lines: ['mongo', 'lines'],
version: 2,
})
})
it('should detect the out of sync state', async function () {
const result = await runScript({})
expect(result.code).to.equal(1)
expect(result.stdout).to.include('Processed 1 projects')
expect(result.stdout).to.include(
'Found 1 projects with 1 out of sync docs'
)
})
it('should auto-fix the out of sync state', async function () {
const result = await runScript({
AUTO_FIX_VERSION_MISMATCH: 'true',
})
expect(result.code).to.equal(0)
expect(result.stdout).to.include('Processed 1 projects')
expect(result.stdout).to.include(
'Found 0 projects with 0 out of sync docs'
)
})
})
describe('with a project', function () {
let projectId2, docId2
beforeEach(function (done) {
projectId2 = DocUpdaterClient.randomId()
docId2 = DocUpdaterClient.randomId()
MockWebApi.insertDoc(projectId2, docId2, {
lines: ['mongo', 'lines'],
version: 1,
})
DocUpdaterClient.getDoc(projectId2, docId2, done)
})
it('should work when in sync', async function () {
const result = await runScript({})
expect(result.code).to.equal(0)
expect(result.stdout).to.include('Processed 2 projects')
expect(result.stdout).to.include(
'Found 0 projects with 0 out of sync docs'
)
})
describe('with one out of sync', function () {
beforeEach(function () {
MockWebApi.insertDoc(projectId, docId, {
lines: ['updated', 'mongo', 'lines'],
version: 1,
})
})
it('should detect one project out of sync', async function () {
const result = await runScript({})
expect(result.code).to.equal(1)
expect(result.stdout).to.include('Processed 2 projects')
expect(result.stdout).to.include(
'Found 1 projects with 1 out of sync docs'
)
})
it('should write differences to disk', async function () {
const FOLDER = '/tmp/folder'
await fs.promises.rm(FOLDER, { recursive: true, force: true })
const result = await runScript({
WRITE_CONTENT: 'true',
FOLDER,
})
expect(result.code).to.equal(1)
expect(result.stdout).to.include('Processed 2 projects')
expect(result.stdout).to.include(
'Found 1 projects with 1 out of sync docs'
)
const dir = Path.join(FOLDER, projectId, docId)
expect(await fs.promises.readdir(FOLDER)).to.deep.equal([projectId])
expect(await fs.promises.readdir(dir)).to.deep.equal([
'mongo-snapshot.txt',
'redis-snapshot.txt',
])
expect(
await fs.promises.readFile(
Path.join(dir, 'mongo-snapshot.txt'),
'utf-8'
)
).to.equal('updated\nmongo\nlines')
expect(
await fs.promises.readFile(
Path.join(dir, 'redis-snapshot.txt'),
'utf-8'
)
).to.equal('mongo\nlines')
})
})
describe('with both out of sync', function () {
beforeEach(function () {
MockWebApi.insertDoc(projectId, docId, {
lines: ['updated', 'mongo', 'lines'],
version: 1,
})
MockWebApi.insertDoc(projectId2, docId2, {
lines: ['updated2', 'mongo', 'lines'],
version: 1,
})
})
it('should detect both projects out of sync', async function () {
const result = await runScript({})
expect(result.code).to.equal(1)
expect(result.stdout).to.include('Processed 2 projects')
expect(result.stdout).to.include(
'Found 2 projects with 2 out of sync docs'
)
})
})
})
})
describe('with more projects than the LIMIT', function () {
for (let i = 0; i < 20; i++) {
beforeEach(function (done) {
const projectId = DocUpdaterClient.randomId()
const docId = DocUpdaterClient.randomId()
MockWebApi.insertDoc(projectId, docId, {
lines: ['mongo', 'lines'],
version: 1,
})
DocUpdaterClient.getDoc(projectId, docId, done)
})
}
it('should flag limit', async function () {
const result = await runScript({ LIMIT: '2' })
expect(result.code).to.equal(2)
expect(result.stdout).to.include('Processed 2 projects')
expect(result.stderr).to.include(
'Found too many un-flushed projects (LIMIT=2). Please fix the reported projects first, then try again.'
)
})
it('should continue with auto-flush', async function () {
const result = await runScript({
LIMIT: '2',
FLUSH_IN_SYNC_PROJECTS: 'true',
})
expect(result.code).to.equal(0)
expect(result.stdout).to.include('Processed 20 projects')
})
})
})