mirror of
https://github.com/overleaf/overleaf.git
synced 2024-12-01 17:51:39 -05:00
22257cf037
Ensure that MULTI commands always go to a single node GitOrigin-RevId: 223d29986766577df89c82983126dabca5d16eed
234 lines
6.1 KiB
JavaScript
234 lines
6.1 KiB
JavaScript
let DocUpdaterClient
|
|
const Settings = require('@overleaf/settings')
|
|
const _ = require('lodash')
|
|
const rclient = require('@overleaf/redis-wrapper').createClient(
|
|
Settings.redis.documentupdater
|
|
)
|
|
const keys = Settings.redis.documentupdater.key_schema
|
|
const request = require('request').defaults({ jar: false })
|
|
const async = require('async')
|
|
|
|
const rclientSub = require('@overleaf/redis-wrapper').createClient(
|
|
Settings.redis.pubsub
|
|
)
|
|
rclientSub.subscribe('applied-ops')
|
|
rclientSub.setMaxListeners(0)
|
|
|
|
module.exports = DocUpdaterClient = {
|
|
randomId() {
|
|
let str = ''
|
|
for (let i = 0; i < 24; i++) {
|
|
str += Math.floor(Math.random() * 16).toString(16)
|
|
}
|
|
return str
|
|
},
|
|
|
|
subscribeToAppliedOps(callback) {
|
|
rclientSub.on('message', callback)
|
|
},
|
|
|
|
_getPendingUpdateListKey() {
|
|
const shard = _.random(0, Settings.dispatcherCount - 1)
|
|
if (shard === 0) {
|
|
return 'pending-updates-list'
|
|
} else {
|
|
return `pending-updates-list-${shard}`
|
|
}
|
|
},
|
|
|
|
sendUpdate(projectId, docId, update, callback) {
|
|
rclient.rpush(
|
|
keys.pendingUpdates({ doc_id: docId }),
|
|
JSON.stringify(update),
|
|
error => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
const docKey = `${projectId}:${docId}`
|
|
rclient.sadd('DocsWithPendingUpdates', docKey, error => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
|
|
rclient.rpush(
|
|
DocUpdaterClient._getPendingUpdateListKey(),
|
|
docKey,
|
|
callback
|
|
)
|
|
})
|
|
}
|
|
)
|
|
},
|
|
|
|
sendUpdates(projectId, docId, updates, callback) {
|
|
DocUpdaterClient.preloadDoc(projectId, docId, error => {
|
|
if (error) {
|
|
return callback(error)
|
|
}
|
|
const jobs = updates.map(update => callback => {
|
|
DocUpdaterClient.sendUpdate(projectId, docId, update, callback)
|
|
})
|
|
async.series(jobs, err => {
|
|
if (err) {
|
|
return callback(err)
|
|
}
|
|
DocUpdaterClient.waitForPendingUpdates(projectId, docId, callback)
|
|
})
|
|
})
|
|
},
|
|
|
|
waitForPendingUpdates(projectId, docId, callback) {
|
|
async.retry(
|
|
{ times: 30, interval: 100 },
|
|
cb =>
|
|
rclient.llen(keys.pendingUpdates({ doc_id: docId }), (err, length) => {
|
|
if (err) {
|
|
return cb(err)
|
|
}
|
|
if (length > 0) {
|
|
cb(new Error('updates still pending'))
|
|
} else {
|
|
cb()
|
|
}
|
|
}),
|
|
callback
|
|
)
|
|
},
|
|
|
|
getDoc(projectId, docId, callback) {
|
|
request.get(
|
|
`http://127.0.0.1:3003/project/${projectId}/doc/${docId}`,
|
|
(error, res, body) => {
|
|
if (body != null && res.statusCode >= 200 && res.statusCode < 300) {
|
|
body = JSON.parse(body)
|
|
}
|
|
callback(error, res, body)
|
|
}
|
|
)
|
|
},
|
|
|
|
getDocAndRecentOps(projectId, docId, fromVersion, callback) {
|
|
request.get(
|
|
`http://127.0.0.1:3003/project/${projectId}/doc/${docId}?fromVersion=${fromVersion}`,
|
|
(error, res, body) => {
|
|
if (body != null && res.statusCode >= 200 && res.statusCode < 300) {
|
|
body = JSON.parse(body)
|
|
}
|
|
callback(error, res, body)
|
|
}
|
|
)
|
|
},
|
|
|
|
preloadDoc(projectId, docId, callback) {
|
|
DocUpdaterClient.getDoc(projectId, docId, callback)
|
|
},
|
|
|
|
peekDoc(projectId, docId, callback) {
|
|
request.get(
|
|
`http://127.0.0.1:3003/project/${projectId}/doc/${docId}/peek`,
|
|
(error, res, body) => {
|
|
if (body != null && res.statusCode >= 200 && res.statusCode < 300) {
|
|
body = JSON.parse(body)
|
|
}
|
|
callback(error, res, body)
|
|
}
|
|
)
|
|
},
|
|
|
|
flushDoc(projectId, docId, callback) {
|
|
request.post(
|
|
`http://127.0.0.1:3003/project/${projectId}/doc/${docId}/flush`,
|
|
(error, res, body) => callback(error, res, body)
|
|
)
|
|
},
|
|
|
|
setDocLines(projectId, docId, lines, source, userId, undoing, callback) {
|
|
request.post(
|
|
{
|
|
url: `http://127.0.0.1:3003/project/${projectId}/doc/${docId}`,
|
|
json: {
|
|
lines,
|
|
source,
|
|
user_id: userId,
|
|
undoing,
|
|
},
|
|
},
|
|
(error, res, body) => callback(error, res, body)
|
|
)
|
|
},
|
|
|
|
deleteDoc(projectId, docId, callback) {
|
|
request.del(
|
|
`http://127.0.0.1:3003/project/${projectId}/doc/${docId}`,
|
|
(error, res, body) => callback(error, res, body)
|
|
)
|
|
},
|
|
|
|
flushProject(projectId, callback) {
|
|
request.post(`http://127.0.0.1:3003/project/${projectId}/flush`, callback)
|
|
},
|
|
|
|
deleteProject(projectId, callback) {
|
|
request.del(`http://127.0.0.1:3003/project/${projectId}`, callback)
|
|
},
|
|
|
|
deleteProjectOnShutdown(projectId, callback) {
|
|
request.del(
|
|
`http://127.0.0.1:3003/project/${projectId}?background=true&shutdown=true`,
|
|
callback
|
|
)
|
|
},
|
|
|
|
flushOldProjects(callback) {
|
|
request.get(
|
|
'http://127.0.0.1:3003/flush_queued_projects?min_delete_age=1',
|
|
callback
|
|
)
|
|
},
|
|
|
|
acceptChange(projectId, docId, changeId, callback) {
|
|
request.post(
|
|
`http://127.0.0.1:3003/project/${projectId}/doc/${docId}/change/${changeId}/accept`,
|
|
callback
|
|
)
|
|
},
|
|
|
|
acceptChanges(projectId, docId, changeIds, callback) {
|
|
request.post(
|
|
{
|
|
url: `http://127.0.0.1:3003/project/${projectId}/doc/${docId}/change/accept`,
|
|
json: { change_ids: changeIds },
|
|
},
|
|
callback
|
|
)
|
|
},
|
|
|
|
removeComment(projectId, docId, comment, callback) {
|
|
request.del(
|
|
`http://127.0.0.1:3003/project/${projectId}/doc/${docId}/comment/${comment}`,
|
|
callback
|
|
)
|
|
},
|
|
|
|
getProjectDocs(projectId, projectStateHash, callback) {
|
|
request.get(
|
|
`http://127.0.0.1:3003/project/${projectId}/doc?state=${projectStateHash}`,
|
|
(error, res, body) => {
|
|
if (body != null && res.statusCode >= 200 && res.statusCode < 300) {
|
|
body = JSON.parse(body)
|
|
}
|
|
callback(error, res, body)
|
|
}
|
|
)
|
|
},
|
|
|
|
sendProjectUpdate(projectId, userId, updates, version, callback) {
|
|
request.post(
|
|
{
|
|
url: `http://127.0.0.1:3003/project/${projectId}`,
|
|
json: { userId, updates, version },
|
|
},
|
|
(error, res, body) => callback(error, res, body)
|
|
)
|
|
},
|
|
}
|