mirror of
https://github.com/overleaf/overleaf.git
synced 2025-01-22 13:33:48 +00:00
[misc] reland decaff cleanup (#166)
* [misc] decaff cleanup: RoomManager * [misc] decaff cleanup: RedisClientManager * [misc] decaff cleanup: SafeJsonParse * [misc] decaff cleanup: WebApiManager * [misc] decaff cleanup: WebsocketController * [misc] decaff cleanup: WebsocketLoadBalancer * [misc] decaff cleanup: SessionSockets * [misc] decaff cleanup: HttpController * [misc] decaff cleanup: HttpApiController * [misc] decaff cleanup: HealthCheckManager * [misc] decaff cleanup: EventLogger * [misc] decaff cleanup: Errors o-error will eliminate most of it -- when we migrate over. * [misc] decaff cleanup: DrainManager * [misc] decaff cleanup: DocumentUpdaterManager * [misc] decaff cleanup: DocumentUpdaterController: no-unused-vars * [misc] decaff cleanup: DocumentUpdaterController: Array.from * [misc] decaff cleanup: DocumentUpdaterController: implicit return * [misc] decaff cleanup: DocumentUpdaterController: IIFE * [misc] decaff cleanup: DocumentUpdaterController: null checks * [misc] decaff cleanup: DocumentUpdaterController: simpler loops * [misc] decaff cleanup: DocumentUpdaterController: move module name def * [misc] decaff cleanup: ConnectedUsersManager: handle-callback-err * [misc] decaff cleanup: ConnectedUsersManager: implicit returns * [misc] decaff cleanup: ConnectedUsersManager: null checks * [misc] decaff cleanup: ChannelManager: no-unused-vars * [misc] decaff cleanup: ChannelManager: implicit returns * [misc] decaff cleanup: ChannelManager: other cleanup - var -> const - drop variable assignment before return * [misc] decaff cleanup: AuthorizationManager: handle-callback-err Note: This requires a change in WebsocketController to provide a dummy callback. * [misc] decaff cleanup: AuthorizationManager: Array.from * [misc] decaff cleanup: AuthorizationManager: implicit returns * [misc] decaff cleanup: AuthorizationManager: null checks * [misc] decaff cleanup: Router: handle-callback-err * [misc] decaff cleanup: Router: standard/no-callback-literal * [misc] decaff cleanup: Router: Array.from * [misc] decaff cleanup: Router: implicit returns * [misc] decaff cleanup: Router: refactor __guard__ wrapper * [misc] decaff cleanup: Router: null checks And a minor bug fix: user.id -> user._id * [misc] decaff cleanup: Router: move variable declarations to assignments * [misc] decaff cleanup: app: implicit returns * [misc] decaff cleanup: app: __guard__ * [misc] decaff cleanup: app: null checks * [misc] decaff cleanup: app: function definitions * [misc] decaff cleanup: app: drop unused next argument * [misc] decaff cleanup: app: var -> const
This commit is contained in:
parent
e913c57aab
commit
aa9d6c8dc9
20 changed files with 483 additions and 983 deletions
|
@ -1,10 +1,3 @@
|
|||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS103: Rewrite code to no longer use __guard__
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
const Metrics = require('metrics-sharelatex')
|
||||
const Settings = require('settings-sharelatex')
|
||||
Metrics.initialize(Settings.appName || 'real-time')
|
||||
|
@ -17,7 +10,7 @@ Metrics.event_loop.monitor(logger)
|
|||
const express = require('express')
|
||||
const session = require('express-session')
|
||||
const redis = require('redis-sharelatex')
|
||||
if ((Settings.sentry != null ? Settings.sentry.dsn : undefined) != null) {
|
||||
if (Settings.sentry && Settings.sentry.dsn) {
|
||||
logger.initializeErrorReporting(Settings.sentry.dsn)
|
||||
}
|
||||
|
||||
|
@ -70,44 +63,43 @@ io.configure(function () {
|
|||
'xhr-polling',
|
||||
'jsonp-polling'
|
||||
])
|
||||
return io.set('log level', 1)
|
||||
io.set('log level', 1)
|
||||
})
|
||||
|
||||
app.get('/', (req, res, next) => res.send('real-time-sharelatex is alive'))
|
||||
app.get('/', (req, res) => res.send('real-time-sharelatex is alive'))
|
||||
|
||||
app.get('/status', function (req, res, next) {
|
||||
app.get('/status', function (req, res) {
|
||||
if (Settings.shutDownInProgress) {
|
||||
return res.send(503) // Service unavailable
|
||||
res.send(503) // Service unavailable
|
||||
} else {
|
||||
return res.send('real-time-sharelatex is alive')
|
||||
res.send('real-time-sharelatex is alive')
|
||||
}
|
||||
})
|
||||
|
||||
app.get('/debug/events', function (req, res, next) {
|
||||
Settings.debugEvents =
|
||||
parseInt(req.query != null ? req.query.count : undefined, 10) || 20
|
||||
app.get('/debug/events', function (req, res) {
|
||||
Settings.debugEvents = parseInt(req.query.count, 10) || 20
|
||||
logger.log({ count: Settings.debugEvents }, 'starting debug mode')
|
||||
return res.send(`debug mode will log next ${Settings.debugEvents} events`)
|
||||
res.send(`debug mode will log next ${Settings.debugEvents} events`)
|
||||
})
|
||||
|
||||
const rclient = require('redis-sharelatex').createClient(
|
||||
Settings.redis.realtime
|
||||
)
|
||||
|
||||
const healthCheck = (req, res, next) =>
|
||||
function healthCheck(req, res) {
|
||||
rclient.healthCheck(function (error) {
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
logger.err({ err: error }, 'failed redis health check')
|
||||
return res.sendStatus(500)
|
||||
res.sendStatus(500)
|
||||
} else if (HealthCheckManager.isFailing()) {
|
||||
const status = HealthCheckManager.status()
|
||||
logger.err({ pubSubErrors: status }, 'failed pubsub health check')
|
||||
return res.sendStatus(500)
|
||||
res.sendStatus(500)
|
||||
} else {
|
||||
return res.sendStatus(200)
|
||||
res.sendStatus(200)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
app.get('/health_check', healthCheck)
|
||||
|
||||
app.get('/health_check/redis', healthCheck)
|
||||
|
@ -125,30 +117,30 @@ const { port } = Settings.internal.realTime
|
|||
const { host } = Settings.internal.realTime
|
||||
|
||||
server.listen(port, host, function (error) {
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
throw error
|
||||
}
|
||||
return logger.info(`realtime starting up, listening on ${host}:${port}`)
|
||||
logger.info(`realtime starting up, listening on ${host}:${port}`)
|
||||
})
|
||||
|
||||
// Stop huge stack traces in logs from all the socket.io parsing steps.
|
||||
Error.stackTraceLimit = 10
|
||||
|
||||
var shutdownCleanly = function (signal) {
|
||||
const connectedClients = __guard__(io.sockets.clients(), (x) => x.length)
|
||||
function shutdownCleanly(signal) {
|
||||
const connectedClients = io.sockets.clients().length
|
||||
if (connectedClients === 0) {
|
||||
logger.warn('no clients connected, exiting')
|
||||
return process.exit()
|
||||
process.exit()
|
||||
} else {
|
||||
logger.warn(
|
||||
{ connectedClients },
|
||||
'clients still connected, not shutting down yet'
|
||||
)
|
||||
return setTimeout(() => shutdownCleanly(signal), 30 * 1000)
|
||||
setTimeout(() => shutdownCleanly(signal), 30 * 1000)
|
||||
}
|
||||
}
|
||||
|
||||
const drainAndShutdown = function (signal) {
|
||||
function drainAndShutdown(signal) {
|
||||
if (Settings.shutDownInProgress) {
|
||||
logger.warn({ signal }, 'shutdown already in progress, ignoring signal')
|
||||
} else {
|
||||
|
@ -160,20 +152,20 @@ const drainAndShutdown = function (signal) {
|
|||
`received interrupt, delay drain by ${statusCheckInterval}ms`
|
||||
)
|
||||
}
|
||||
return setTimeout(function () {
|
||||
setTimeout(function () {
|
||||
logger.warn(
|
||||
{ signal },
|
||||
`received interrupt, starting drain over ${shutdownDrainTimeWindow} mins`
|
||||
)
|
||||
DrainManager.startDrainTimeWindow(io, shutdownDrainTimeWindow)
|
||||
return shutdownCleanly(signal)
|
||||
shutdownCleanly(signal)
|
||||
}, statusCheckInterval)
|
||||
}
|
||||
}
|
||||
|
||||
Settings.shutDownInProgress = false
|
||||
if (Settings.shutdownDrainTimeWindow != null) {
|
||||
var shutdownDrainTimeWindow = parseInt(Settings.shutdownDrainTimeWindow, 10)
|
||||
const shutdownDrainTimeWindow = parseInt(Settings.shutdownDrainTimeWindow, 10)
|
||||
if (Settings.shutdownDrainTimeWindow) {
|
||||
logger.log({ shutdownDrainTimeWindow }, 'shutdownDrainTimeWindow enabled')
|
||||
for (const signal of [
|
||||
'SIGINT',
|
||||
|
@ -188,9 +180,7 @@ if (Settings.shutdownDrainTimeWindow != null) {
|
|||
} // signal is passed as argument to event handler
|
||||
|
||||
// global exception handler
|
||||
if (
|
||||
Settings.errors != null ? Settings.errors.catchUncaughtErrors : undefined
|
||||
) {
|
||||
if (Settings.errors && Settings.errors.catchUncaughtErrors) {
|
||||
process.removeAllListeners('uncaughtException')
|
||||
process.on('uncaughtException', function (error) {
|
||||
if (['EPIPE', 'ECONNRESET'].includes(error.code)) {
|
||||
|
@ -201,12 +191,8 @@ if (Settings.shutdownDrainTimeWindow != null) {
|
|||
)
|
||||
}
|
||||
logger.error({ err: error }, 'uncaught exception')
|
||||
if (
|
||||
Settings.errors != null
|
||||
? Settings.errors.shutdownOnUncaughtError
|
||||
: undefined
|
||||
) {
|
||||
return drainAndShutdown('SIGABRT')
|
||||
if (Settings.errors && Settings.errors.shutdownOnUncaughtError) {
|
||||
drainAndShutdown('SIGABRT')
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -227,26 +213,20 @@ if (Settings.continualPubsubTraffic) {
|
|||
date: new Date().toString()
|
||||
})
|
||||
Metrics.summary(`redis.publish.${channel}`, json.length)
|
||||
return pubsubClient.publish(channel, json, function (err) {
|
||||
if (err != null) {
|
||||
pubsubClient.publish(channel, json, function (err) {
|
||||
if (err) {
|
||||
logger.err({ err, channel }, 'error publishing pubsub traffic to redis')
|
||||
}
|
||||
const blob = JSON.stringify({ keep: 'alive' })
|
||||
Metrics.summary('redis.publish.cluster-continual-traffic', blob.length)
|
||||
return clusterClient.publish('cluster-continual-traffic', blob, callback)
|
||||
clusterClient.publish('cluster-continual-traffic', blob, callback)
|
||||
})
|
||||
}
|
||||
|
||||
var runPubSubTraffic = () =>
|
||||
const runPubSubTraffic = () =>
|
||||
async.map(['applied-ops', 'editor-events'], publishJob, () =>
|
||||
setTimeout(runPubSubTraffic, 1000 * 20)
|
||||
)
|
||||
|
||||
runPubSubTraffic()
|
||||
}
|
||||
|
||||
function __guard__(value, transform) {
|
||||
return typeof value !== 'undefined' && value !== null
|
||||
? transform(value)
|
||||
: undefined
|
||||
}
|
||||
|
|
|
@ -1,23 +1,10 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
handle-callback-err,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let AuthorizationManager
|
||||
module.exports = AuthorizationManager = {
|
||||
assertClientCanViewProject(client, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
return AuthorizationManager._assertClientHasPrivilegeLevel(
|
||||
AuthorizationManager._assertClientHasPrivilegeLevel(
|
||||
client,
|
||||
['readOnly', 'readAndWrite', 'owner'],
|
||||
callback
|
||||
|
@ -25,10 +12,7 @@ module.exports = AuthorizationManager = {
|
|||
},
|
||||
|
||||
assertClientCanEditProject(client, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
return AuthorizationManager._assertClientHasPrivilegeLevel(
|
||||
AuthorizationManager._assertClientHasPrivilegeLevel(
|
||||
client,
|
||||
['readAndWrite', 'owner'],
|
||||
callback
|
||||
|
@ -36,76 +20,46 @@ module.exports = AuthorizationManager = {
|
|||
},
|
||||
|
||||
_assertClientHasPrivilegeLevel(client, allowedLevels, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
if (Array.from(allowedLevels).includes(client.ol_context.privilege_level)) {
|
||||
return callback(null)
|
||||
if (allowedLevels.includes(client.ol_context.privilege_level)) {
|
||||
callback(null)
|
||||
} else {
|
||||
return callback(new Error('not authorized'))
|
||||
callback(new Error('not authorized'))
|
||||
}
|
||||
},
|
||||
|
||||
assertClientCanViewProjectAndDoc(client, doc_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
return AuthorizationManager.assertClientCanViewProject(client, function (
|
||||
error
|
||||
) {
|
||||
if (error != null) {
|
||||
AuthorizationManager.assertClientCanViewProject(client, function (error) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
return AuthorizationManager._assertClientCanAccessDoc(
|
||||
client,
|
||||
doc_id,
|
||||
callback
|
||||
)
|
||||
AuthorizationManager._assertClientCanAccessDoc(client, doc_id, callback)
|
||||
})
|
||||
},
|
||||
|
||||
assertClientCanEditProjectAndDoc(client, doc_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
return AuthorizationManager.assertClientCanEditProject(client, function (
|
||||
error
|
||||
) {
|
||||
if (error != null) {
|
||||
AuthorizationManager.assertClientCanEditProject(client, function (error) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
return AuthorizationManager._assertClientCanAccessDoc(
|
||||
client,
|
||||
doc_id,
|
||||
callback
|
||||
)
|
||||
AuthorizationManager._assertClientCanAccessDoc(client, doc_id, callback)
|
||||
})
|
||||
},
|
||||
|
||||
_assertClientCanAccessDoc(client, doc_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
if (client.ol_context[`doc:${doc_id}`] === 'allowed') {
|
||||
return callback(null)
|
||||
callback(null)
|
||||
} else {
|
||||
return callback(new Error('not authorized'))
|
||||
callback(new Error('not authorized'))
|
||||
}
|
||||
},
|
||||
|
||||
addAccessToDoc(client, doc_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
client.ol_context[`doc:${doc_id}`] = 'allowed'
|
||||
return callback(null)
|
||||
callback(null)
|
||||
},
|
||||
|
||||
removeAccessToDoc(client, doc_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
delete client.ol_context[`doc:${doc_id}`]
|
||||
return callback(null)
|
||||
callback(null)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,3 @@
|
|||
/* eslint-disable
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let ChannelManager
|
||||
const logger = require('logger-sharelatex')
|
||||
const metrics = require('metrics-sharelatex')
|
||||
const settings = require('settings-sharelatex')
|
||||
|
@ -19,7 +8,7 @@ const ClientMap = new Map() // for each redis client, store a Map of subscribed
|
|||
// that we never subscribe to a channel multiple times. The socket.io side is
|
||||
// handled by RoomManager.
|
||||
|
||||
module.exports = ChannelManager = {
|
||||
module.exports = {
|
||||
getClientMapEntry(rclient) {
|
||||
// return the per-client channel map if it exists, otherwise create and
|
||||
// return an empty map for the client.
|
||||
|
@ -36,22 +25,25 @@ module.exports = ChannelManager = {
|
|||
const p = rclient.subscribe(channel)
|
||||
p.finally(function () {
|
||||
if (clientChannelMap.get(channel) === subscribePromise) {
|
||||
return clientChannelMap.delete(channel)
|
||||
clientChannelMap.delete(channel)
|
||||
}
|
||||
})
|
||||
.then(function () {
|
||||
logger.log({ channel }, 'subscribed to channel')
|
||||
return metrics.inc(`subscribe.${baseChannel}`)
|
||||
metrics.inc(`subscribe.${baseChannel}`)
|
||||
})
|
||||
.catch(function (err) {
|
||||
logger.error({ channel, err }, 'failed to subscribe to channel')
|
||||
return metrics.inc(`subscribe.failed.${baseChannel}`)
|
||||
metrics.inc(`subscribe.failed.${baseChannel}`)
|
||||
})
|
||||
return p
|
||||
}
|
||||
|
||||
const pendingActions = clientChannelMap.get(channel) || Promise.resolve()
|
||||
var subscribePromise = pendingActions.then(actualSubscribe, actualSubscribe)
|
||||
const subscribePromise = pendingActions.then(
|
||||
actualSubscribe,
|
||||
actualSubscribe
|
||||
)
|
||||
clientChannelMap.set(channel, subscribePromise)
|
||||
logger.log({ channel }, 'planned to subscribe to channel')
|
||||
return subscribePromise
|
||||
|
@ -62,26 +54,25 @@ module.exports = ChannelManager = {
|
|||
const channel = `${baseChannel}:${id}`
|
||||
const actualUnsubscribe = function () {
|
||||
// unsubscribe is happening in the background, it should not reject
|
||||
const p = rclient
|
||||
return rclient
|
||||
.unsubscribe(channel)
|
||||
.finally(function () {
|
||||
if (clientChannelMap.get(channel) === unsubscribePromise) {
|
||||
return clientChannelMap.delete(channel)
|
||||
clientChannelMap.delete(channel)
|
||||
}
|
||||
})
|
||||
.then(function () {
|
||||
logger.log({ channel }, 'unsubscribed from channel')
|
||||
return metrics.inc(`unsubscribe.${baseChannel}`)
|
||||
metrics.inc(`unsubscribe.${baseChannel}`)
|
||||
})
|
||||
.catch(function (err) {
|
||||
logger.error({ channel, err }, 'unsubscribed from channel')
|
||||
return metrics.inc(`unsubscribe.failed.${baseChannel}`)
|
||||
metrics.inc(`unsubscribe.failed.${baseChannel}`)
|
||||
})
|
||||
return p
|
||||
}
|
||||
|
||||
const pendingActions = clientChannelMap.get(channel) || Promise.resolve()
|
||||
var unsubscribePromise = pendingActions.then(
|
||||
const unsubscribePromise = pendingActions.then(
|
||||
actualUnsubscribe,
|
||||
actualUnsubscribe
|
||||
)
|
||||
|
@ -100,6 +91,6 @@ module.exports = ChannelManager = {
|
|||
}
|
||||
// we publish on a different client to the subscribe, so we can't
|
||||
// check for the channel existing here
|
||||
return rclient.publish(channel, data)
|
||||
rclient.publish(channel, data)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +1,6 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
handle-callback-err,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
const async = require('async')
|
||||
const Settings = require('settings-sharelatex')
|
||||
const logger = require('logger-sharelatex')
|
||||
|
@ -29,9 +20,6 @@ module.exports = {
|
|||
// update. This way we don't care if the connected_user key has expired when
|
||||
// we receive a cursor update.
|
||||
updateUserPosition(project_id, client_id, user, cursorData, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (err) {}
|
||||
}
|
||||
logger.log({ project_id, client_id }, 'marking user as joined or connected')
|
||||
|
||||
const multi = rclient.multi()
|
||||
|
@ -65,7 +53,7 @@ module.exports = {
|
|||
user.email || ''
|
||||
)
|
||||
|
||||
if (cursorData != null) {
|
||||
if (cursorData) {
|
||||
multi.hset(
|
||||
Keys.connectedUser({ project_id, client_id }),
|
||||
'cursorData',
|
||||
|
@ -77,21 +65,18 @@ module.exports = {
|
|||
USER_TIMEOUT_IN_S
|
||||
)
|
||||
|
||||
return multi.exec(function (err) {
|
||||
if (err != null) {
|
||||
multi.exec(function (err) {
|
||||
if (err) {
|
||||
logger.err(
|
||||
{ err, project_id, client_id },
|
||||
'problem marking user as connected'
|
||||
)
|
||||
}
|
||||
return callback(err)
|
||||
callback(err)
|
||||
})
|
||||
},
|
||||
|
||||
refreshClient(project_id, client_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (err) {}
|
||||
}
|
||||
refreshClient(project_id, client_id) {
|
||||
logger.log({ project_id, client_id }, 'refreshing connected client')
|
||||
const multi = rclient.multi()
|
||||
multi.hset(
|
||||
|
@ -103,14 +88,13 @@ module.exports = {
|
|||
Keys.connectedUser({ project_id, client_id }),
|
||||
USER_TIMEOUT_IN_S
|
||||
)
|
||||
return multi.exec(function (err) {
|
||||
if (err != null) {
|
||||
multi.exec(function (err) {
|
||||
if (err) {
|
||||
logger.err(
|
||||
{ err, project_id, client_id },
|
||||
'problem refreshing connected client'
|
||||
)
|
||||
}
|
||||
return callback(err)
|
||||
})
|
||||
},
|
||||
|
||||
|
@ -120,74 +104,66 @@ module.exports = {
|
|||
multi.srem(Keys.clientsInProject({ project_id }), client_id)
|
||||
multi.expire(Keys.clientsInProject({ project_id }), FOUR_DAYS_IN_S)
|
||||
multi.del(Keys.connectedUser({ project_id, client_id }))
|
||||
return multi.exec(callback)
|
||||
multi.exec(callback)
|
||||
},
|
||||
|
||||
_getConnectedUser(project_id, client_id, callback) {
|
||||
return rclient.hgetall(
|
||||
Keys.connectedUser({ project_id, client_id }),
|
||||
function (err, result) {
|
||||
if (
|
||||
result == null ||
|
||||
Object.keys(result).length === 0 ||
|
||||
!result.user_id
|
||||
) {
|
||||
result = {
|
||||
connected: false,
|
||||
client_id
|
||||
}
|
||||
} else {
|
||||
result.connected = true
|
||||
result.client_id = client_id
|
||||
result.client_age =
|
||||
(Date.now() - parseInt(result.last_updated_at, 10)) / 1000
|
||||
if (result.cursorData != null) {
|
||||
try {
|
||||
result.cursorData = JSON.parse(result.cursorData)
|
||||
} catch (e) {
|
||||
logger.error(
|
||||
{
|
||||
err: e,
|
||||
project_id,
|
||||
client_id,
|
||||
cursorData: result.cursorData
|
||||
},
|
||||
'error parsing cursorData JSON'
|
||||
)
|
||||
return callback(e)
|
||||
}
|
||||
rclient.hgetall(Keys.connectedUser({ project_id, client_id }), function (
|
||||
err,
|
||||
result
|
||||
) {
|
||||
if (!(result && result.user_id)) {
|
||||
result = {
|
||||
connected: false,
|
||||
client_id
|
||||
}
|
||||
} else {
|
||||
result.connected = true
|
||||
result.client_id = client_id
|
||||
result.client_age =
|
||||
(Date.now() - parseInt(result.last_updated_at, 10)) / 1000
|
||||
if (result.cursorData) {
|
||||
try {
|
||||
result.cursorData = JSON.parse(result.cursorData)
|
||||
} catch (e) {
|
||||
logger.error(
|
||||
{
|
||||
err: e,
|
||||
project_id,
|
||||
client_id,
|
||||
cursorData: result.cursorData
|
||||
},
|
||||
'error parsing cursorData JSON'
|
||||
)
|
||||
return callback(e)
|
||||
}
|
||||
}
|
||||
return callback(err, result)
|
||||
}
|
||||
)
|
||||
callback(err, result)
|
||||
})
|
||||
},
|
||||
|
||||
getConnectedUsers(project_id, callback) {
|
||||
const self = this
|
||||
return rclient.smembers(Keys.clientsInProject({ project_id }), function (
|
||||
rclient.smembers(Keys.clientsInProject({ project_id }), function (
|
||||
err,
|
||||
results
|
||||
) {
|
||||
if (err != null) {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
const jobs = results.map((client_id) => (cb) =>
|
||||
self._getConnectedUser(project_id, client_id, cb)
|
||||
)
|
||||
return async.series(jobs, function (err, users) {
|
||||
if (users == null) {
|
||||
users = []
|
||||
}
|
||||
if (err != null) {
|
||||
async.series(jobs, function (err, users) {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
users = users.filter(
|
||||
(user) =>
|
||||
(user != null ? user.connected : undefined) &&
|
||||
(user != null ? user.client_age : undefined) < REFRESH_TIMEOUT_IN_S
|
||||
user && user.connected && user.client_age < REFRESH_TIMEOUT_IN_S
|
||||
)
|
||||
return callback(null, users)
|
||||
callback(null, users)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,18 +1,6 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS205: Consider reworking code to avoid use of IIFEs
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let DocumentUpdaterController
|
||||
const logger = require('logger-sharelatex')
|
||||
const settings = require('settings-sharelatex')
|
||||
const RedisClientManager = require('./RedisClientManager')
|
||||
|
@ -23,28 +11,25 @@ const RoomManager = require('./RoomManager')
|
|||
const ChannelManager = require('./ChannelManager')
|
||||
const metrics = require('metrics-sharelatex')
|
||||
|
||||
const MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 // 1Mb
|
||||
|
||||
let DocumentUpdaterController
|
||||
module.exports = DocumentUpdaterController = {
|
||||
// DocumentUpdaterController is responsible for updates that come via Redis
|
||||
// Pub/Sub from the document updater.
|
||||
rclientList: RedisClientManager.createClientList(settings.redis.pubsub),
|
||||
|
||||
listenForUpdatesFromDocumentUpdater(io) {
|
||||
let i, rclient
|
||||
logger.log(
|
||||
{ rclients: this.rclientList.length },
|
||||
'listening for applied-ops events'
|
||||
)
|
||||
for (i = 0; i < this.rclientList.length; i++) {
|
||||
rclient = this.rclientList[i]
|
||||
for (const rclient of this.rclientList) {
|
||||
rclient.subscribe('applied-ops')
|
||||
rclient.on('message', function (channel, message) {
|
||||
metrics.inc('rclient', 0.001) // global event rate metric
|
||||
if (settings.debugEvents > 0) {
|
||||
EventLogger.debugEvent(channel, message)
|
||||
}
|
||||
return DocumentUpdaterController._processMessageFromDocumentUpdater(
|
||||
DocumentUpdaterController._processMessageFromDocumentUpdater(
|
||||
io,
|
||||
channel,
|
||||
message
|
||||
|
@ -53,42 +38,41 @@ module.exports = DocumentUpdaterController = {
|
|||
}
|
||||
// create metrics for each redis instance only when we have multiple redis clients
|
||||
if (this.rclientList.length > 1) {
|
||||
for (i = 0; i < this.rclientList.length; i++) {
|
||||
rclient = this.rclientList[i]
|
||||
;((
|
||||
i // per client event rate metric
|
||||
) => rclient.on('message', () => metrics.inc(`rclient-${i}`, 0.001)))(i)
|
||||
}
|
||||
this.rclientList.forEach((rclient, i) => {
|
||||
// per client event rate metric
|
||||
const metricName = `rclient-${i}`
|
||||
rclient.on('message', () => metrics.inc(metricName, 0.001))
|
||||
})
|
||||
}
|
||||
return this.handleRoomUpdates(this.rclientList)
|
||||
this.handleRoomUpdates(this.rclientList)
|
||||
},
|
||||
|
||||
handleRoomUpdates(rclientSubList) {
|
||||
const roomEvents = RoomManager.eventSource()
|
||||
roomEvents.on('doc-active', function (doc_id) {
|
||||
const subscribePromises = Array.from(rclientSubList).map((rclient) =>
|
||||
const subscribePromises = rclientSubList.map((rclient) =>
|
||||
ChannelManager.subscribe(rclient, 'applied-ops', doc_id)
|
||||
)
|
||||
return RoomManager.emitOnCompletion(
|
||||
RoomManager.emitOnCompletion(
|
||||
subscribePromises,
|
||||
`doc-subscribed-${doc_id}`
|
||||
)
|
||||
})
|
||||
return roomEvents.on('doc-empty', (doc_id) =>
|
||||
Array.from(rclientSubList).map((rclient) =>
|
||||
roomEvents.on('doc-empty', (doc_id) =>
|
||||
rclientSubList.map((rclient) =>
|
||||
ChannelManager.unsubscribe(rclient, 'applied-ops', doc_id)
|
||||
)
|
||||
)
|
||||
},
|
||||
|
||||
_processMessageFromDocumentUpdater(io, channel, message) {
|
||||
return SafeJsonParse.parse(message, function (error, message) {
|
||||
if (error != null) {
|
||||
SafeJsonParse.parse(message, function (error, message) {
|
||||
if (error) {
|
||||
logger.error({ err: error, channel }, 'error parsing JSON')
|
||||
return
|
||||
}
|
||||
if (message.op != null) {
|
||||
if (message._id != null && settings.checkEventOrder) {
|
||||
if (message.op) {
|
||||
if (message._id && settings.checkEventOrder) {
|
||||
const status = EventLogger.checkEventOrder(
|
||||
'applied-ops',
|
||||
message._id,
|
||||
|
@ -98,24 +82,24 @@ module.exports = DocumentUpdaterController = {
|
|||
return // skip duplicate events
|
||||
}
|
||||
}
|
||||
return DocumentUpdaterController._applyUpdateFromDocumentUpdater(
|
||||
DocumentUpdaterController._applyUpdateFromDocumentUpdater(
|
||||
io,
|
||||
message.doc_id,
|
||||
message.op
|
||||
)
|
||||
} else if (message.error != null) {
|
||||
return DocumentUpdaterController._processErrorFromDocumentUpdater(
|
||||
} else if (message.error) {
|
||||
DocumentUpdaterController._processErrorFromDocumentUpdater(
|
||||
io,
|
||||
message.doc_id,
|
||||
message.error,
|
||||
message
|
||||
)
|
||||
} else if (message.health_check != null) {
|
||||
} else if (message.health_check) {
|
||||
logger.debug(
|
||||
{ message },
|
||||
'got health check message in applied ops channel'
|
||||
)
|
||||
return HealthCheckManager.check(channel, message.key)
|
||||
HealthCheckManager.check(channel, message.key)
|
||||
}
|
||||
})
|
||||
},
|
||||
|
@ -132,20 +116,14 @@ module.exports = DocumentUpdaterController = {
|
|||
{
|
||||
doc_id,
|
||||
version: update.v,
|
||||
source: update.meta != null ? update.meta.source : undefined,
|
||||
socketIoClients: (() => {
|
||||
const result = []
|
||||
for (client of Array.from(clientList)) {
|
||||
result.push(client.id)
|
||||
}
|
||||
return result
|
||||
})()
|
||||
source: update.meta && update.meta.source,
|
||||
socketIoClients: clientList.map((client) => client.id)
|
||||
},
|
||||
'distributing updates to clients'
|
||||
)
|
||||
const seen = {}
|
||||
// send messages only to unique clients (due to duplicate entries in io.sockets.clients)
|
||||
for (client of Array.from(clientList)) {
|
||||
for (client of clientList) {
|
||||
if (!seen[client.id]) {
|
||||
seen[client.id] = true
|
||||
if (client.publicId === update.meta.source) {
|
||||
|
@ -153,7 +131,7 @@ module.exports = DocumentUpdaterController = {
|
|||
{
|
||||
doc_id,
|
||||
version: update.v,
|
||||
source: update.meta != null ? update.meta.source : undefined
|
||||
source: update.meta.source
|
||||
},
|
||||
'distributing update to sender'
|
||||
)
|
||||
|
@ -164,7 +142,7 @@ module.exports = DocumentUpdaterController = {
|
|||
{
|
||||
doc_id,
|
||||
version: update.v,
|
||||
source: update.meta != null ? update.meta.source : undefined,
|
||||
source: update.meta.source,
|
||||
client_id: client.id
|
||||
},
|
||||
'distributing update to collaborator'
|
||||
|
@ -175,16 +153,10 @@ module.exports = DocumentUpdaterController = {
|
|||
}
|
||||
if (Object.keys(seen).length < clientList.length) {
|
||||
metrics.inc('socket-io.duplicate-clients', 0.1)
|
||||
return logger.log(
|
||||
logger.log(
|
||||
{
|
||||
doc_id,
|
||||
socketIoClients: (() => {
|
||||
const result1 = []
|
||||
for (client of Array.from(clientList)) {
|
||||
result1.push(client.id)
|
||||
}
|
||||
return result1
|
||||
})()
|
||||
socketIoClients: clientList.map((client) => client.id)
|
||||
},
|
||||
'discarded duplicate clients'
|
||||
)
|
||||
|
@ -192,17 +164,13 @@ module.exports = DocumentUpdaterController = {
|
|||
},
|
||||
|
||||
_processErrorFromDocumentUpdater(io, doc_id, error, message) {
|
||||
return (() => {
|
||||
const result = []
|
||||
for (const client of Array.from(io.sockets.clients(doc_id))) {
|
||||
logger.warn(
|
||||
{ err: error, doc_id, client_id: client.id },
|
||||
'error from document updater, disconnecting client'
|
||||
)
|
||||
client.emit('otUpdateError', error, message)
|
||||
result.push(client.disconnect())
|
||||
}
|
||||
return result
|
||||
})()
|
||||
for (const client of io.sockets.clients(doc_id)) {
|
||||
logger.warn(
|
||||
{ err: error, doc_id, client_id: client.id },
|
||||
'error from document updater, disconnecting client'
|
||||
)
|
||||
client.emit('otUpdateError', error, message)
|
||||
client.disconnect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,17 +1,6 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
handle-callback-err,
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let DocumentUpdaterManager
|
||||
const request = require('request')
|
||||
const _ = require('underscore')
|
||||
const logger = require('logger-sharelatex')
|
||||
|
@ -23,20 +12,17 @@ const rclient = require('redis-sharelatex').createClient(
|
|||
)
|
||||
const Keys = settings.redis.documentupdater.key_schema
|
||||
|
||||
module.exports = DocumentUpdaterManager = {
|
||||
module.exports = {
|
||||
getDocument(project_id, doc_id, fromVersion, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error, exists, doclines, version) {}
|
||||
}
|
||||
const timer = new metrics.Timer('get-document')
|
||||
const url = `${settings.apis.documentupdater.url}/project/${project_id}/doc/${doc_id}?fromVersion=${fromVersion}`
|
||||
logger.log(
|
||||
{ project_id, doc_id, fromVersion },
|
||||
'getting doc from document updater'
|
||||
)
|
||||
return request.get(url, function (err, res, body) {
|
||||
request.get(url, function (err, res, body) {
|
||||
timer.done()
|
||||
if (err != null) {
|
||||
if (err) {
|
||||
logger.error(
|
||||
{ err, url, project_id, doc_id },
|
||||
'error getting doc from doc updater'
|
||||
|
@ -53,13 +39,8 @@ module.exports = DocumentUpdaterManager = {
|
|||
} catch (error) {
|
||||
return callback(error)
|
||||
}
|
||||
return callback(
|
||||
null,
|
||||
body != null ? body.lines : undefined,
|
||||
body != null ? body.version : undefined,
|
||||
body != null ? body.ranges : undefined,
|
||||
body != null ? body.ops : undefined
|
||||
)
|
||||
body = body || {}
|
||||
callback(null, body.lines, body.version, body.ranges, body.ops)
|
||||
} else if ([404, 422].includes(res.statusCode)) {
|
||||
err = new Error('doc updater could not load requested ops')
|
||||
err.statusCode = res.statusCode
|
||||
|
@ -67,7 +48,7 @@ module.exports = DocumentUpdaterManager = {
|
|||
{ err, project_id, doc_id, url, fromVersion },
|
||||
'doc updater could not load requested ops'
|
||||
)
|
||||
return callback(err)
|
||||
callback(err)
|
||||
} else {
|
||||
err = new Error(
|
||||
`doc updater returned a non-success status code: ${res.statusCode}`
|
||||
|
@ -77,33 +58,30 @@ module.exports = DocumentUpdaterManager = {
|
|||
{ err, project_id, doc_id, url },
|
||||
`doc updater returned a non-success status code: ${res.statusCode}`
|
||||
)
|
||||
return callback(err)
|
||||
callback(err)
|
||||
}
|
||||
})
|
||||
},
|
||||
|
||||
flushProjectToMongoAndDelete(project_id, callback) {
|
||||
// this method is called when the last connected user leaves the project
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
logger.log({ project_id }, 'deleting project from document updater')
|
||||
const timer = new metrics.Timer('delete.mongo.project')
|
||||
// flush the project in the background when all users have left
|
||||
const url =
|
||||
`${settings.apis.documentupdater.url}/project/${project_id}?background=true` +
|
||||
(settings.shutDownInProgress ? '&shutdown=true' : '')
|
||||
return request.del(url, function (err, res, body) {
|
||||
request.del(url, function (err, res) {
|
||||
timer.done()
|
||||
if (err != null) {
|
||||
if (err) {
|
||||
logger.error(
|
||||
{ err, project_id },
|
||||
'error deleting project from document updater'
|
||||
)
|
||||
return callback(err)
|
||||
callback(err)
|
||||
} else if (res.statusCode >= 200 && res.statusCode < 300) {
|
||||
logger.log({ project_id }, 'deleted project from document updater')
|
||||
return callback(null)
|
||||
callback(null)
|
||||
} else {
|
||||
err = new Error(
|
||||
`document updater returned a failure status code: ${res.statusCode}`
|
||||
|
@ -113,16 +91,12 @@ module.exports = DocumentUpdaterManager = {
|
|||
{ err, project_id },
|
||||
`document updater returned failure status code: ${res.statusCode}`
|
||||
)
|
||||
return callback(err)
|
||||
callback(err)
|
||||
}
|
||||
})
|
||||
},
|
||||
|
||||
queueChange(project_id, doc_id, change, callback) {
|
||||
let error
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
const allowedKeys = [
|
||||
'doc',
|
||||
'op',
|
||||
|
@ -136,7 +110,7 @@ module.exports = DocumentUpdaterManager = {
|
|||
const jsonChange = JSON.stringify(change)
|
||||
if (jsonChange.indexOf('\u0000') !== -1) {
|
||||
// memory corruption check
|
||||
error = new Error('null bytes found in op')
|
||||
const error = new Error('null bytes found in op')
|
||||
logger.error(
|
||||
{ err: error, project_id, doc_id, jsonChange },
|
||||
error.message
|
||||
|
@ -146,7 +120,7 @@ module.exports = DocumentUpdaterManager = {
|
|||
|
||||
const updateSize = jsonChange.length
|
||||
if (updateSize > settings.maxUpdateSize) {
|
||||
error = new Error('update is too large')
|
||||
const error = new Error('update is too large')
|
||||
error.updateSize = updateSize
|
||||
return callback(error)
|
||||
}
|
||||
|
@ -157,13 +131,13 @@ module.exports = DocumentUpdaterManager = {
|
|||
const doc_key = `${project_id}:${doc_id}`
|
||||
// Push onto pendingUpdates for doc_id first, because once the doc updater
|
||||
// gets an entry on pending-updates-list, it starts processing.
|
||||
return rclient.rpush(Keys.pendingUpdates({ doc_id }), jsonChange, function (
|
||||
rclient.rpush(Keys.pendingUpdates({ doc_id }), jsonChange, function (
|
||||
error
|
||||
) {
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
return rclient.rpush('pending-updates-list', doc_key, callback)
|
||||
rclient.rpush('pending-updates-list', doc_key, callback)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,31 +1,21 @@
|
|||
/* eslint-disable
|
||||
no-return-assign,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let DrainManager
|
||||
const logger = require('logger-sharelatex')
|
||||
|
||||
module.exports = DrainManager = {
|
||||
module.exports = {
|
||||
startDrainTimeWindow(io, minsToDrain) {
|
||||
const drainPerMin = io.sockets.clients().length / minsToDrain
|
||||
return DrainManager.startDrain(io, Math.max(drainPerMin / 60, 4))
|
||||
}, // enforce minimum drain rate
|
||||
// enforce minimum drain rate
|
||||
this.startDrain(io, Math.max(drainPerMin / 60, 4))
|
||||
},
|
||||
|
||||
startDrain(io, rate) {
|
||||
// Clear out any old interval
|
||||
let pollingInterval
|
||||
clearInterval(this.interval)
|
||||
logger.log({ rate }, 'starting drain')
|
||||
if (rate === 0) {
|
||||
return
|
||||
} else if (rate < 1) {
|
||||
}
|
||||
let pollingInterval
|
||||
if (rate < 1) {
|
||||
// allow lower drain rates
|
||||
// e.g. rate=0.1 will drain one client every 10 seconds
|
||||
pollingInterval = 1000 / rate
|
||||
|
@ -33,15 +23,15 @@ module.exports = DrainManager = {
|
|||
} else {
|
||||
pollingInterval = 1000
|
||||
}
|
||||
return (this.interval = setInterval(() => {
|
||||
return this.reconnectNClients(io, rate)
|
||||
}, pollingInterval))
|
||||
this.interval = setInterval(() => {
|
||||
this.reconnectNClients(io, rate)
|
||||
}, pollingInterval)
|
||||
},
|
||||
|
||||
RECONNECTED_CLIENTS: {},
|
||||
reconnectNClients(io, N) {
|
||||
let drainedCount = 0
|
||||
for (const client of Array.from(io.sockets.clients())) {
|
||||
for (const client of io.sockets.clients()) {
|
||||
if (!this.RECONNECTED_CLIENTS[client.id]) {
|
||||
this.RECONNECTED_CLIENTS[client.id] = true
|
||||
logger.log(
|
||||
|
@ -57,7 +47,7 @@ module.exports = DrainManager = {
|
|||
}
|
||||
}
|
||||
if (drainedCount < N) {
|
||||
return logger.log('All clients have been told to reconnectGracefully')
|
||||
logger.log('All clients have been told to reconnectGracefully')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,17 +1,9 @@
|
|||
/* eslint-disable
|
||||
no-proto,
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
let Errors
|
||||
var CodedError = function (message, code) {
|
||||
const error = new Error(message)
|
||||
error.name = 'CodedError'
|
||||
error.code = code
|
||||
error.__proto__ = CodedError.prototype
|
||||
return error
|
||||
class CodedError extends Error {
|
||||
constructor(message, code) {
|
||||
super(message)
|
||||
this.name = this.constructor.name
|
||||
this.code = code
|
||||
}
|
||||
}
|
||||
CodedError.prototype.__proto__ = Error.prototype
|
||||
|
||||
module.exports = Errors = { CodedError }
|
||||
module.exports = { CodedError }
|
||||
|
|
|
@ -1,15 +1,6 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS205: Consider reworking code to avoid use of IIFEs
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let EventLogger
|
||||
const logger = require('logger-sharelatex')
|
||||
const metrics = require('metrics-sharelatex')
|
||||
|
@ -31,15 +22,15 @@ module.exports = EventLogger = {
|
|||
debugEvent(channel, message) {
|
||||
if (settings.debugEvents > 0) {
|
||||
logger.log({ channel, message, counter: COUNTER++ }, 'logging event')
|
||||
return settings.debugEvents--
|
||||
settings.debugEvents--
|
||||
}
|
||||
},
|
||||
|
||||
checkEventOrder(channel, message_id, message) {
|
||||
let result
|
||||
checkEventOrder(channel, message_id) {
|
||||
if (typeof message_id !== 'string') {
|
||||
return
|
||||
}
|
||||
let result
|
||||
if (!(result = message_id.match(/^(.*)-(\d+)$/))) {
|
||||
return
|
||||
}
|
||||
|
@ -51,7 +42,7 @@ module.exports = EventLogger = {
|
|||
}
|
||||
// store the last count in a hash for each host
|
||||
const previous = EventLogger._storeEventCount(key, count)
|
||||
if (previous == null || count === previous + 1) {
|
||||
if (!previous || count === previous + 1) {
|
||||
metrics.inc(`event.${channel}.valid`, 0.001) // downsample high rate docupdater events
|
||||
return // order is ok
|
||||
}
|
||||
|
@ -83,18 +74,11 @@ module.exports = EventLogger = {
|
|||
},
|
||||
|
||||
_cleanEventStream(now) {
|
||||
return (() => {
|
||||
const result = []
|
||||
for (const key in EVENT_LOG_TIMESTAMP) {
|
||||
const timestamp = EVENT_LOG_TIMESTAMP[key]
|
||||
if (now - timestamp > EventLogger.MAX_STALE_TIME_IN_MS) {
|
||||
delete EVENT_LOG_COUNTER[key]
|
||||
result.push(delete EVENT_LOG_TIMESTAMP[key])
|
||||
} else {
|
||||
result.push(undefined)
|
||||
}
|
||||
Object.entries(EVENT_LOG_TIMESTAMP).forEach(([key, timestamp]) => {
|
||||
if (now - timestamp > EventLogger.MAX_STALE_TIME_IN_MS) {
|
||||
delete EVENT_LOG_COUNTER[key]
|
||||
delete EVENT_LOG_TIMESTAMP[key]
|
||||
}
|
||||
return result
|
||||
})()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,3 @@
|
|||
/* eslint-disable
|
||||
no-return-assign,
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let HealthCheckManager
|
||||
const metrics = require('metrics-sharelatex')
|
||||
const logger = require('logger-sharelatex')
|
||||
|
||||
|
@ -22,22 +9,19 @@ let COUNT = 0
|
|||
const CHANNEL_MANAGER = {} // hash of event checkers by channel name
|
||||
const CHANNEL_ERROR = {} // error status by channel name
|
||||
|
||||
module.exports = HealthCheckManager = class HealthCheckManager {
|
||||
module.exports = class HealthCheckManager {
|
||||
// create an instance of this class which checks that an event with a unique
|
||||
// id is received only once within a timeout
|
||||
constructor(channel, timeout) {
|
||||
// unique event string
|
||||
this.channel = channel
|
||||
if (timeout == null) {
|
||||
timeout = 1000
|
||||
}
|
||||
this.id = `host=${HOST}:pid=${PID}:count=${COUNT++}`
|
||||
// count of number of times the event is received
|
||||
this.count = 0
|
||||
// after a timeout check the status of the count
|
||||
this.handler = setTimeout(() => {
|
||||
return this.setStatus()
|
||||
}, timeout)
|
||||
this.setStatus()
|
||||
}, timeout || 1000)
|
||||
// use a timer to record the latency of the channel
|
||||
this.timer = new metrics.Timer(`event.${this.channel}.latency`)
|
||||
// keep a record of these objects to dispatch on
|
||||
|
@ -48,31 +32,31 @@ module.exports = HealthCheckManager = class HealthCheckManager {
|
|||
// if this is our event record it
|
||||
if (id === this.id) {
|
||||
this.count++
|
||||
if (this.timer != null) {
|
||||
if (this.timer) {
|
||||
this.timer.done()
|
||||
}
|
||||
return (this.timer = null) // only time the latency of the first event
|
||||
this.timer = undefined // only time the latency of the first event
|
||||
}
|
||||
}
|
||||
|
||||
setStatus() {
|
||||
// if we saw the event anything other than a single time that is an error
|
||||
if (this.count !== 1) {
|
||||
const isFailing = this.count !== 1
|
||||
if (isFailing) {
|
||||
logger.err(
|
||||
{ channel: this.channel, count: this.count, id: this.id },
|
||||
'redis channel health check error'
|
||||
)
|
||||
}
|
||||
const error = this.count !== 1
|
||||
return (CHANNEL_ERROR[this.channel] = error)
|
||||
CHANNEL_ERROR[this.channel] = isFailing
|
||||
}
|
||||
|
||||
// class methods
|
||||
static check(channel, id) {
|
||||
// dispatch event to manager for channel
|
||||
return CHANNEL_MANAGER[channel] != null
|
||||
? CHANNEL_MANAGER[channel].processEvent(id)
|
||||
: undefined
|
||||
if (CHANNEL_MANAGER[channel]) {
|
||||
CHANNEL_MANAGER[channel].processEvent(id)
|
||||
}
|
||||
}
|
||||
|
||||
static status() {
|
||||
|
|
|
@ -1,25 +1,15 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let HttpApiController
|
||||
const WebsocketLoadBalancer = require('./WebsocketLoadBalancer')
|
||||
const DrainManager = require('./DrainManager')
|
||||
const logger = require('logger-sharelatex')
|
||||
|
||||
module.exports = HttpApiController = {
|
||||
sendMessage(req, res, next) {
|
||||
module.exports = {
|
||||
sendMessage(req, res) {
|
||||
logger.log({ message: req.params.message }, 'sending message')
|
||||
if (Array.isArray(req.body)) {
|
||||
for (const payload of Array.from(req.body)) {
|
||||
for (const payload of req.body) {
|
||||
WebsocketLoadBalancer.emitToRoom(
|
||||
req.params.project_id,
|
||||
req.params.message,
|
||||
|
@ -33,16 +23,16 @@ module.exports = HttpApiController = {
|
|||
req.body
|
||||
)
|
||||
}
|
||||
return res.send(204)
|
||||
}, // No content
|
||||
res.send(204)
|
||||
},
|
||||
|
||||
startDrain(req, res, next) {
|
||||
startDrain(req, res) {
|
||||
const io = req.app.get('io')
|
||||
let rate = req.query.rate || '4'
|
||||
rate = parseFloat(rate) || 0
|
||||
logger.log({ rate }, 'setting client drain rate')
|
||||
DrainManager.startDrain(io, rate)
|
||||
return res.send(204)
|
||||
res.send(204)
|
||||
},
|
||||
|
||||
disconnectClient(req, res, next) {
|
||||
|
@ -57,6 +47,6 @@ module.exports = HttpApiController = {
|
|||
}
|
||||
logger.warn({ client_id }, 'api: requesting client disconnect')
|
||||
client.on('disconnect', () => res.sendStatus(204))
|
||||
return client.disconnect()
|
||||
client.disconnect()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,28 +1,15 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
handle-callback-err,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let HttpController
|
||||
const async = require('async')
|
||||
|
||||
let HttpController
|
||||
module.exports = HttpController = {
|
||||
// The code in this controller is hard to unit test because of a lot of
|
||||
// dependencies on internal socket.io methods. It is not critical to the running
|
||||
// of ShareLaTeX, and is only used for getting stats about connected clients,
|
||||
// and for checking internal state in acceptance tests. The acceptances tests
|
||||
// should provide appropriate coverage.
|
||||
_getConnectedClientView(ioClient, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error, client) {}
|
||||
}
|
||||
_getConnectedClientView(ioClient) {
|
||||
const client_id = ioClient.id
|
||||
const {
|
||||
project_id,
|
||||
|
@ -41,32 +28,23 @@ module.exports = HttpController = {
|
|||
email,
|
||||
connected_time
|
||||
}
|
||||
client.rooms = []
|
||||
for (const name in ioClient.manager.roomClients[client_id]) {
|
||||
const joined = ioClient.manager.roomClients[client_id][name]
|
||||
if (joined && name !== '') {
|
||||
client.rooms.push(name.replace(/^\//, '')) // Remove leading /
|
||||
}
|
||||
}
|
||||
return callback(null, client)
|
||||
client.rooms = Object.keys(ioClient.manager.roomClients[client_id] || {})
|
||||
// drop the namespace
|
||||
.filter((room) => room !== '')
|
||||
// room names are composed as '<NAMESPACE>/<ROOM>' and the default
|
||||
// namespace is empty (see comments in RoomManager), just drop the '/'
|
||||
.map((fullRoomPath) => fullRoomPath.slice(1))
|
||||
return client
|
||||
},
|
||||
|
||||
getConnectedClients(req, res, next) {
|
||||
getConnectedClients(req, res) {
|
||||
const io = req.app.get('io')
|
||||
const ioClients = io.sockets.clients()
|
||||
return async.map(
|
||||
ioClients,
|
||||
HttpController._getConnectedClientView,
|
||||
function (error, clients) {
|
||||
if (error != null) {
|
||||
return next(error)
|
||||
}
|
||||
return res.json(clients)
|
||||
}
|
||||
)
|
||||
|
||||
res.json(ioClients.map(HttpController._getConnectedClientView))
|
||||
},
|
||||
|
||||
getConnectedClient(req, res, next) {
|
||||
getConnectedClient(req, res) {
|
||||
const { client_id } = req.params
|
||||
const io = req.app.get('io')
|
||||
const ioClient = io.sockets.sockets[client_id]
|
||||
|
@ -74,14 +52,6 @@ module.exports = HttpController = {
|
|||
res.sendStatus(404)
|
||||
return
|
||||
}
|
||||
return HttpController._getConnectedClientView(ioClient, function (
|
||||
error,
|
||||
client
|
||||
) {
|
||||
if (error != null) {
|
||||
return next(error)
|
||||
}
|
||||
return res.json(client)
|
||||
})
|
||||
res.json(HttpController._getConnectedClientView(ioClient))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,40 +1,19 @@
|
|||
/* eslint-disable
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS205: Consider reworking code to avoid use of IIFEs
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let RedisClientManager
|
||||
const redis = require('redis-sharelatex')
|
||||
const logger = require('logger-sharelatex')
|
||||
|
||||
module.exports = RedisClientManager = {
|
||||
module.exports = {
|
||||
createClientList(...configs) {
|
||||
// create a dynamic list of redis clients, excluding any configurations which are not defined
|
||||
const clientList = (() => {
|
||||
const result = []
|
||||
for (const x of Array.from(configs)) {
|
||||
if (x != null) {
|
||||
const redisType =
|
||||
x.cluster != null
|
||||
? 'cluster'
|
||||
: x.sentinels != null
|
||||
? 'sentinel'
|
||||
: x.host != null
|
||||
? 'single'
|
||||
: 'unknown'
|
||||
logger.log({ redis: redisType }, 'creating redis client')
|
||||
result.push(redis.createClient(x))
|
||||
}
|
||||
}
|
||||
return result
|
||||
})()
|
||||
return clientList
|
||||
return configs.filter(Boolean).map((x) => {
|
||||
const redisType = x.cluster
|
||||
? 'cluster'
|
||||
: x.sentinels
|
||||
? 'sentinel'
|
||||
: x.host
|
||||
? 'single'
|
||||
: 'unknown'
|
||||
logger.log({ redis: redisType }, 'creating redis client')
|
||||
return redis.createClient(x)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,19 +1,6 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS103: Rewrite code to no longer use __guard__
|
||||
* DS205: Consider reworking code to avoid use of IIFEs
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let RoomManager
|
||||
const logger = require('logger-sharelatex')
|
||||
const metrics = require('metrics-sharelatex')
|
||||
const { EventEmitter } = require('events')
|
||||
|
@ -31,23 +18,17 @@ const RoomEvents = new EventEmitter() // emits {project,doc}-active and {project
|
|||
//
|
||||
// The pubsub side is handled by ChannelManager
|
||||
|
||||
module.exports = RoomManager = {
|
||||
module.exports = {
|
||||
joinProject(client, project_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
return this.joinEntity(client, 'project', project_id, callback)
|
||||
this.joinEntity(client, 'project', project_id, callback)
|
||||
},
|
||||
|
||||
joinDoc(client, doc_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function () {}
|
||||
}
|
||||
return this.joinEntity(client, 'doc', doc_id, callback)
|
||||
this.joinEntity(client, 'doc', doc_id, callback)
|
||||
},
|
||||
|
||||
leaveDoc(client, doc_id) {
|
||||
return this.leaveEntity(client, 'doc', doc_id)
|
||||
this.leaveEntity(client, 'doc', doc_id)
|
||||
},
|
||||
|
||||
leaveProjectAndDocs(client) {
|
||||
|
@ -58,18 +39,14 @@ module.exports = RoomManager = {
|
|||
// has not joined any rooms and do a final disconnection.
|
||||
const roomsToLeave = this._roomsClientIsIn(client)
|
||||
logger.log({ client: client.id, roomsToLeave }, 'client leaving project')
|
||||
return (() => {
|
||||
const result = []
|
||||
for (const id of Array.from(roomsToLeave)) {
|
||||
const entity = IdMap.get(id)
|
||||
result.push(this.leaveEntity(client, entity, id))
|
||||
}
|
||||
return result
|
||||
})()
|
||||
for (const id of roomsToLeave) {
|
||||
const entity = IdMap.get(id)
|
||||
this.leaveEntity(client, entity, id)
|
||||
}
|
||||
},
|
||||
|
||||
emitOnCompletion(promiseList, eventName) {
|
||||
return Promise.all(promiseList)
|
||||
Promise.all(promiseList)
|
||||
.then(() => RoomEvents.emit(eventName))
|
||||
.catch((err) => RoomEvents.emit(eventName, err))
|
||||
},
|
||||
|
@ -92,19 +69,19 @@ module.exports = RoomManager = {
|
|||
{ client: client.id, entity, id, beforeCount },
|
||||
'client joined new room and subscribed to channel'
|
||||
)
|
||||
return callback(err)
|
||||
callback(err)
|
||||
})
|
||||
RoomEvents.emit(`${entity}-active`, id)
|
||||
IdMap.set(id, entity)
|
||||
// keep track of the number of listeners
|
||||
return metrics.gauge('room-listeners', RoomEvents.eventNames().length)
|
||||
metrics.gauge('room-listeners', RoomEvents.eventNames().length)
|
||||
} else {
|
||||
logger.log(
|
||||
{ client: client.id, entity, id, beforeCount },
|
||||
'client joined existing room'
|
||||
)
|
||||
client.join(id)
|
||||
return callback()
|
||||
callback()
|
||||
}
|
||||
},
|
||||
|
||||
|
@ -128,7 +105,7 @@ module.exports = RoomManager = {
|
|||
'client left room'
|
||||
)
|
||||
// is the room now empty? if so, unsubscribe
|
||||
if (entity == null) {
|
||||
if (!entity) {
|
||||
logger.error({ entity: id }, 'unknown entity when leaving with id')
|
||||
return
|
||||
}
|
||||
|
@ -136,54 +113,48 @@ module.exports = RoomManager = {
|
|||
logger.log({ entity, id }, 'room is now empty')
|
||||
RoomEvents.emit(`${entity}-empty`, id)
|
||||
IdMap.delete(id)
|
||||
return metrics.gauge('room-listeners', RoomEvents.eventNames().length)
|
||||
metrics.gauge('room-listeners', RoomEvents.eventNames().length)
|
||||
}
|
||||
},
|
||||
|
||||
// internal functions below, these access socket.io rooms data directly and
|
||||
// will need updating for socket.io v2
|
||||
|
||||
// The below code makes some assumptions that are always true for v0
|
||||
// - we are using the base namespace '', so room names are '/<ENTITY>'
|
||||
// https://github.com/socketio/socket.io/blob/e4d61b1be65ac3313a85da111a46777aa8d4aae3/lib/manager.js#L62
|
||||
// https://github.com/socketio/socket.io/blob/e4d61b1be65ac3313a85da111a46777aa8d4aae3/lib/manager.js#L1018
|
||||
// - client.namespace is a Namespace
|
||||
// https://github.com/socketio/socket.io/blob/e4d61b1be65ac3313a85da111a46777aa8d4aae3/lib/namespace.js#L204
|
||||
// https://github.com/socketio/socket.io/blob/e4d61b1be65ac3313a85da111a46777aa8d4aae3/lib/socket.js#L40
|
||||
// - client.manager is a Manager
|
||||
// https://github.com/socketio/socket.io/blob/e4d61b1be65ac3313a85da111a46777aa8d4aae3/lib/namespace.js#L204
|
||||
// https://github.com/socketio/socket.io/blob/e4d61b1be65ac3313a85da111a46777aa8d4aae3/lib/socket.js#L41
|
||||
// - a Manager has
|
||||
// - `.rooms={'NAMESPACE/ENTITY': []}` and
|
||||
// - `.roomClients={'CLIENT_ID': {'...': true}}`
|
||||
// https://github.com/socketio/socket.io/blob/e4d61b1be65ac3313a85da111a46777aa8d4aae3/lib/manager.js#L287-L288
|
||||
// https://github.com/socketio/socket.io/blob/e4d61b1be65ac3313a85da111a46777aa8d4aae3/lib/manager.js#L444-L455
|
||||
|
||||
_clientsInRoom(client, room) {
|
||||
const nsp = client.namespace.name
|
||||
const name = nsp + '/' + room
|
||||
return (
|
||||
__guard__(
|
||||
client.manager != null ? client.manager.rooms : undefined,
|
||||
(x) => x[name]
|
||||
) || []
|
||||
).length
|
||||
const clients = client.manager.rooms['/' + room] || []
|
||||
return clients.length
|
||||
},
|
||||
|
||||
_roomsClientIsIn(client) {
|
||||
const roomList = (() => {
|
||||
const result = []
|
||||
for (const fullRoomPath in client.manager.roomClients != null
|
||||
? client.manager.roomClients[client.id]
|
||||
: undefined) {
|
||||
// strip socket.io prefix from room to get original id
|
||||
if (fullRoomPath !== '') {
|
||||
const [prefix, room] = Array.from(fullRoomPath.split('/', 2))
|
||||
result.push(room)
|
||||
}
|
||||
}
|
||||
return result
|
||||
})()
|
||||
return roomList
|
||||
const rooms = client.manager.roomClients[client.id] || {}
|
||||
return (
|
||||
Object.keys(rooms)
|
||||
// drop the namespace
|
||||
.filter((room) => room !== '')
|
||||
// room names are composed as '<NAMESPACE>/<ROOM>' and the default
|
||||
// namespace is empty (see comments above), just drop the '/'
|
||||
.map((fullRoomPath) => fullRoomPath.slice(1))
|
||||
)
|
||||
},
|
||||
|
||||
_clientAlreadyInRoom(client, room) {
|
||||
const nsp = client.namespace.name
|
||||
const name = nsp + '/' + room
|
||||
return __guard__(
|
||||
client.manager.roomClients != null
|
||||
? client.manager.roomClients[client.id]
|
||||
: undefined,
|
||||
(x) => x[name]
|
||||
)
|
||||
const rooms = client.manager.roomClients[client.id] || {}
|
||||
return !!rooms['/' + room]
|
||||
}
|
||||
}
|
||||
function __guard__(value, transform) {
|
||||
return typeof value !== 'undefined' && value !== null
|
||||
? transform(value)
|
||||
: undefined
|
||||
}
|
||||
|
|
|
@ -1,19 +1,6 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
handle-callback-err,
|
||||
standard/no-callback-literal,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS103: Rewrite code to no longer use __guard__
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let Router
|
||||
const metrics = require('metrics-sharelatex')
|
||||
const logger = require('logger-sharelatex')
|
||||
const settings = require('settings-sharelatex')
|
||||
|
@ -34,14 +21,10 @@ const httpAuth = basicAuth(function (user, pass) {
|
|||
return isValid
|
||||
})
|
||||
|
||||
let Router
|
||||
module.exports = Router = {
|
||||
_handleError(callback, error, client, method, attrs) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
if (attrs == null) {
|
||||
attrs = {}
|
||||
}
|
||||
attrs = attrs || {}
|
||||
for (const key of ['project_id', 'doc_id', 'user_id']) {
|
||||
attrs[key] = client.ol_context[key]
|
||||
}
|
||||
|
@ -49,15 +32,15 @@ module.exports = Router = {
|
|||
attrs.err = error
|
||||
if (error.name === 'CodedError') {
|
||||
logger.warn(attrs, error.message, { code: error.code })
|
||||
return callback({ message: error.message, code: error.code })
|
||||
}
|
||||
if (error.message === 'unexpected arguments') {
|
||||
const serializedError = { message: error.message, code: error.code }
|
||||
callback(serializedError)
|
||||
} else if (error.message === 'unexpected arguments') {
|
||||
// the payload might be very large, put it on level info
|
||||
logger.log(attrs, 'unexpected arguments')
|
||||
metrics.inc('unexpected-arguments', 1, { status: method })
|
||||
return callback({ message: error.message })
|
||||
}
|
||||
if (
|
||||
const serializedError = { message: error.message }
|
||||
callback(serializedError)
|
||||
} else if (
|
||||
[
|
||||
'not authorized',
|
||||
'doc updater could not load requested ops',
|
||||
|
@ -65,11 +48,15 @@ module.exports = Router = {
|
|||
].includes(error.message)
|
||||
) {
|
||||
logger.warn(attrs, error.message)
|
||||
return callback({ message: error.message })
|
||||
const serializedError = { message: error.message }
|
||||
callback(serializedError)
|
||||
} else {
|
||||
logger.error(attrs, `server side error in ${method}`)
|
||||
// Don't return raw error to prevent leaking server side info
|
||||
return callback({ message: 'Something went wrong in real-time service' })
|
||||
const serializedError = {
|
||||
message: 'Something went wrong in real-time service'
|
||||
}
|
||||
callback(serializedError)
|
||||
}
|
||||
},
|
||||
|
||||
|
@ -80,7 +67,7 @@ module.exports = Router = {
|
|||
callback = function () {}
|
||||
}
|
||||
const attrs = { arguments: args }
|
||||
return Router._handleError(callback, error, client, method, attrs)
|
||||
Router._handleError(callback, error, client, method, attrs)
|
||||
},
|
||||
|
||||
configure(app, io, session) {
|
||||
|
@ -102,18 +89,17 @@ module.exports = Router = {
|
|||
HttpApiController.disconnectClient
|
||||
)
|
||||
|
||||
return session.on('connection', function (error, client, session) {
|
||||
session.on('connection', function (error, client, session) {
|
||||
// init client context, we may access it in Router._handleError before
|
||||
// setting any values
|
||||
let user
|
||||
client.ol_context = {}
|
||||
|
||||
if (client != null) {
|
||||
if (client) {
|
||||
client.on('error', function (err) {
|
||||
logger.err({ clientErr: err }, 'socket.io client error')
|
||||
if (client.connected) {
|
||||
client.emit('reconnectGracefully')
|
||||
return client.disconnect()
|
||||
client.disconnect()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -125,13 +111,12 @@ module.exports = Router = {
|
|||
}
|
||||
|
||||
if (
|
||||
client != null &&
|
||||
__guard__(error != null ? error.message : undefined, (x) =>
|
||||
x.match(/could not look up session by key/)
|
||||
)
|
||||
client &&
|
||||
error &&
|
||||
error.message.match(/could not look up session by key/)
|
||||
) {
|
||||
logger.warn(
|
||||
{ err: error, client: client != null, session: session != null },
|
||||
{ err: error, client: !!client, session: !!session },
|
||||
'invalid session'
|
||||
)
|
||||
// tell the client to reauthenticate if it has an invalid session key
|
||||
|
@ -140,15 +125,15 @@ module.exports = Router = {
|
|||
return
|
||||
}
|
||||
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
logger.err(
|
||||
{ err: error, client: client != null, session: session != null },
|
||||
{ err: error, client: !!client, session: !!session },
|
||||
'error when client connected'
|
||||
)
|
||||
if (client != null) {
|
||||
if (client) {
|
||||
client.emit('connectionRejected', { message: 'error' })
|
||||
}
|
||||
if (client != null) {
|
||||
if (client) {
|
||||
client.disconnect()
|
||||
}
|
||||
return
|
||||
|
@ -159,30 +144,21 @@ module.exports = Router = {
|
|||
client.emit('connectionAccepted', null, client.publicId)
|
||||
|
||||
metrics.inc('socket-io.connection')
|
||||
metrics.gauge(
|
||||
'socket-io.clients',
|
||||
__guard__(io.sockets.clients(), (x1) => x1.length)
|
||||
)
|
||||
metrics.gauge('socket-io.clients', io.sockets.clients().length)
|
||||
|
||||
logger.log({ session, client_id: client.id }, 'client connected')
|
||||
|
||||
if (
|
||||
__guard__(
|
||||
session != null ? session.passport : undefined,
|
||||
(x2) => x2.user
|
||||
) != null
|
||||
) {
|
||||
let user
|
||||
if (session && session.passport && session.passport.user) {
|
||||
;({ user } = session.passport)
|
||||
} else if ((session != null ? session.user : undefined) != null) {
|
||||
} else if (session && session.user) {
|
||||
;({ user } = session)
|
||||
} else {
|
||||
user = { _id: 'anonymous-user' }
|
||||
}
|
||||
|
||||
client.on('joinProject', function (data, callback) {
|
||||
if (data == null) {
|
||||
data = {}
|
||||
}
|
||||
data = data || {}
|
||||
if (typeof callback !== 'function') {
|
||||
return Router._handleInvalidArguments(
|
||||
client,
|
||||
|
@ -194,18 +170,18 @@ module.exports = Router = {
|
|||
if (data.anonymousAccessToken) {
|
||||
user.anonymousAccessToken = data.anonymousAccessToken
|
||||
}
|
||||
return WebsocketController.joinProject(
|
||||
WebsocketController.joinProject(
|
||||
client,
|
||||
user,
|
||||
data.project_id,
|
||||
function (err, ...args) {
|
||||
if (err != null) {
|
||||
return Router._handleError(callback, err, client, 'joinProject', {
|
||||
if (err) {
|
||||
Router._handleError(callback, err, client, 'joinProject', {
|
||||
project_id: data.project_id,
|
||||
user_id: user != null ? user.id : undefined
|
||||
user_id: user._id
|
||||
})
|
||||
} else {
|
||||
return callback(null, ...Array.from(args))
|
||||
callback(null, ...args)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
@ -213,19 +189,11 @@ module.exports = Router = {
|
|||
|
||||
client.on('disconnect', function () {
|
||||
metrics.inc('socket-io.disconnect')
|
||||
metrics.gauge(
|
||||
'socket-io.clients',
|
||||
__guard__(io.sockets.clients(), (x3) => x3.length) - 1
|
||||
)
|
||||
metrics.gauge('socket-io.clients', io.sockets.clients().length)
|
||||
|
||||
return WebsocketController.leaveProject(io, client, function (err) {
|
||||
if (err != null) {
|
||||
return Router._handleError(
|
||||
function () {},
|
||||
err,
|
||||
client,
|
||||
'leaveProject'
|
||||
)
|
||||
WebsocketController.leaveProject(io, client, function (err) {
|
||||
if (err) {
|
||||
Router._handleError(function () {}, err, client, 'leaveProject')
|
||||
}
|
||||
})
|
||||
})
|
||||
|
@ -263,19 +231,19 @@ module.exports = Router = {
|
|||
return Router._handleInvalidArguments(client, 'joinDoc', arguments)
|
||||
}
|
||||
|
||||
return WebsocketController.joinDoc(
|
||||
WebsocketController.joinDoc(
|
||||
client,
|
||||
doc_id,
|
||||
fromVersion,
|
||||
options,
|
||||
function (err, ...args) {
|
||||
if (err != null) {
|
||||
return Router._handleError(callback, err, client, 'joinDoc', {
|
||||
if (err) {
|
||||
Router._handleError(callback, err, client, 'joinDoc', {
|
||||
doc_id,
|
||||
fromVersion
|
||||
})
|
||||
} else {
|
||||
return callback(null, ...Array.from(args))
|
||||
callback(null, ...args)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
@ -286,22 +254,16 @@ module.exports = Router = {
|
|||
return Router._handleInvalidArguments(client, 'leaveDoc', arguments)
|
||||
}
|
||||
|
||||
return WebsocketController.leaveDoc(client, doc_id, function (
|
||||
err,
|
||||
...args
|
||||
) {
|
||||
if (err != null) {
|
||||
return Router._handleError(callback, err, client, 'leaveDoc')
|
||||
WebsocketController.leaveDoc(client, doc_id, function (err, ...args) {
|
||||
if (err) {
|
||||
Router._handleError(callback, err, client, 'leaveDoc')
|
||||
} else {
|
||||
return callback(null, ...Array.from(args))
|
||||
callback(null, ...args)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
client.on('clientTracking.getConnectedUsers', function (callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error, users) {}
|
||||
}
|
||||
if (typeof callback !== 'function') {
|
||||
return Router._handleInvalidArguments(
|
||||
client,
|
||||
|
@ -310,19 +272,16 @@ module.exports = Router = {
|
|||
)
|
||||
}
|
||||
|
||||
return WebsocketController.getConnectedUsers(client, function (
|
||||
err,
|
||||
users
|
||||
) {
|
||||
if (err != null) {
|
||||
return Router._handleError(
|
||||
WebsocketController.getConnectedUsers(client, function (err, users) {
|
||||
if (err) {
|
||||
Router._handleError(
|
||||
callback,
|
||||
err,
|
||||
client,
|
||||
'clientTracking.getConnectedUsers'
|
||||
)
|
||||
} else {
|
||||
return callback(null, users)
|
||||
callback(null, users)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
@ -331,8 +290,8 @@ module.exports = Router = {
|
|||
cursorData,
|
||||
callback
|
||||
) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
if (!callback) {
|
||||
callback = function () {}
|
||||
}
|
||||
if (typeof callback !== 'function') {
|
||||
return Router._handleInvalidArguments(
|
||||
|
@ -342,28 +301,23 @@ module.exports = Router = {
|
|||
)
|
||||
}
|
||||
|
||||
return WebsocketController.updateClientPosition(
|
||||
client,
|
||||
cursorData,
|
||||
function (err) {
|
||||
if (err != null) {
|
||||
return Router._handleError(
|
||||
callback,
|
||||
err,
|
||||
client,
|
||||
'clientTracking.updatePosition'
|
||||
)
|
||||
} else {
|
||||
return callback()
|
||||
}
|
||||
WebsocketController.updateClientPosition(client, cursorData, function (
|
||||
err
|
||||
) {
|
||||
if (err) {
|
||||
Router._handleError(
|
||||
callback,
|
||||
err,
|
||||
client,
|
||||
'clientTracking.updatePosition'
|
||||
)
|
||||
} else {
|
||||
callback()
|
||||
}
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
return client.on('applyOtUpdate', function (doc_id, update, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
client.on('applyOtUpdate', function (doc_id, update, callback) {
|
||||
if (typeof callback !== 'function') {
|
||||
return Router._handleInvalidArguments(
|
||||
client,
|
||||
|
@ -372,31 +326,19 @@ module.exports = Router = {
|
|||
)
|
||||
}
|
||||
|
||||
return WebsocketController.applyOtUpdate(
|
||||
client,
|
||||
doc_id,
|
||||
update,
|
||||
function (err) {
|
||||
if (err != null) {
|
||||
return Router._handleError(
|
||||
callback,
|
||||
err,
|
||||
client,
|
||||
'applyOtUpdate',
|
||||
{ doc_id, update }
|
||||
)
|
||||
} else {
|
||||
return callback()
|
||||
}
|
||||
WebsocketController.applyOtUpdate(client, doc_id, update, function (
|
||||
err
|
||||
) {
|
||||
if (err) {
|
||||
Router._handleError(callback, err, client, 'applyOtUpdate', {
|
||||
doc_id,
|
||||
update
|
||||
})
|
||||
} else {
|
||||
callback()
|
||||
}
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function __guard__(value, transform) {
|
||||
return typeof value !== 'undefined' && value !== null
|
||||
? transform(value)
|
||||
: undefined
|
||||
}
|
||||
|
|
|
@ -1,23 +1,8 @@
|
|||
/* eslint-disable
|
||||
handle-callback-err,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
const Settings = require('settings-sharelatex')
|
||||
const logger = require('logger-sharelatex')
|
||||
|
||||
module.exports = {
|
||||
parse(data, callback) {
|
||||
let parsed
|
||||
if (callback == null) {
|
||||
callback = function (error, parsed) {}
|
||||
}
|
||||
if (data.length > Settings.maxUpdateSize) {
|
||||
logger.error(
|
||||
{ head: data.slice(0, 1024), length: data.length },
|
||||
|
@ -25,11 +10,12 @@ module.exports = {
|
|||
)
|
||||
return callback(new Error('data too large to parse'))
|
||||
}
|
||||
let parsed
|
||||
try {
|
||||
parsed = JSON.parse(data)
|
||||
} catch (e) {
|
||||
return callback(e)
|
||||
}
|
||||
return callback(null, parsed)
|
||||
callback(null, parsed)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,34 +1,28 @@
|
|||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Sanity-check the conversion and remove this comment.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
const { EventEmitter } = require('events')
|
||||
|
||||
module.exports = function (io, sessionStore, cookieParser, cookieName) {
|
||||
const missingSessionError = new Error('could not look up session by key')
|
||||
|
||||
const sessionSockets = new EventEmitter()
|
||||
const next = (error, socket, session) =>
|
||||
function next(error, socket, session) {
|
||||
sessionSockets.emit('connection', error, socket, session)
|
||||
}
|
||||
|
||||
io.on('connection', function (socket) {
|
||||
const req = socket.handshake
|
||||
return cookieParser(req, {}, function () {
|
||||
cookieParser(req, {}, function () {
|
||||
const sessionId = req.signedCookies && req.signedCookies[cookieName]
|
||||
if (!sessionId) {
|
||||
return next(missingSessionError, socket)
|
||||
}
|
||||
return sessionStore.get(sessionId, function (error, session) {
|
||||
sessionStore.get(sessionId, function (error, session) {
|
||||
if (error) {
|
||||
return next(error, socket)
|
||||
}
|
||||
if (!session) {
|
||||
return next(missingSessionError, socket)
|
||||
}
|
||||
return next(null, socket, session)
|
||||
next(null, socket, session)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
@ -1,35 +1,21 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
handle-callback-err,
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let WebApiManager
|
||||
const request = require('request')
|
||||
const settings = require('settings-sharelatex')
|
||||
const logger = require('logger-sharelatex')
|
||||
const { CodedError } = require('./Errors')
|
||||
|
||||
module.exports = WebApiManager = {
|
||||
module.exports = {
|
||||
joinProject(project_id, user, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error, project, privilegeLevel, isRestrictedUser) {}
|
||||
}
|
||||
const user_id = user._id
|
||||
logger.log({ project_id, user_id }, 'sending join project request to web')
|
||||
const url = `${settings.apis.web.url}/project/${project_id}/join`
|
||||
const headers = {}
|
||||
if (user.anonymousAccessToken != null) {
|
||||
if (user.anonymousAccessToken) {
|
||||
headers['x-sl-anonymous-access-token'] = user.anonymousAccessToken
|
||||
}
|
||||
return request.post(
|
||||
request.post(
|
||||
{
|
||||
url,
|
||||
qs: { user_id },
|
||||
|
@ -43,15 +29,12 @@ module.exports = WebApiManager = {
|
|||
headers
|
||||
},
|
||||
function (error, response, data) {
|
||||
let err
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
let err
|
||||
if (response.statusCode >= 200 && response.statusCode < 300) {
|
||||
if (
|
||||
data == null ||
|
||||
(data != null ? data.project : undefined) == null
|
||||
) {
|
||||
if (!(data && data.project)) {
|
||||
err = new Error('no data returned from joinProject request')
|
||||
logger.error(
|
||||
{ err, project_id, user_id },
|
||||
|
@ -59,7 +42,7 @@ module.exports = WebApiManager = {
|
|||
)
|
||||
return callback(err)
|
||||
}
|
||||
return callback(
|
||||
callback(
|
||||
null,
|
||||
data.project,
|
||||
data.privilegeLevel,
|
||||
|
@ -67,7 +50,7 @@ module.exports = WebApiManager = {
|
|||
)
|
||||
} else if (response.statusCode === 429) {
|
||||
logger.log(project_id, user_id, 'rate-limit hit when joining project')
|
||||
return callback(
|
||||
callback(
|
||||
new CodedError(
|
||||
'rate-limit hit when joining project',
|
||||
'TooManyRequests'
|
||||
|
@ -78,7 +61,7 @@ module.exports = WebApiManager = {
|
|||
`non-success status code from web: ${response.statusCode}`
|
||||
)
|
||||
logger.error({ err, project_id, user_id }, 'error accessing web api')
|
||||
return callback(err)
|
||||
callback(err)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
|
|
@ -1,22 +1,8 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
handle-callback-err,
|
||||
no-unused-vars,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS103: Rewrite code to no longer use __guard__
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let WebsocketController
|
||||
const logger = require('logger-sharelatex')
|
||||
const metrics = require('metrics-sharelatex')
|
||||
const settings = require('settings-sharelatex')
|
||||
const WebApiManager = require('./WebApiManager')
|
||||
const AuthorizationManager = require('./AuthorizationManager')
|
||||
const DocumentUpdaterManager = require('./DocumentUpdaterManager')
|
||||
|
@ -24,6 +10,7 @@ const ConnectedUsersManager = require('./ConnectedUsersManager')
|
|||
const WebsocketLoadBalancer = require('./WebsocketLoadBalancer')
|
||||
const RoomManager = require('./RoomManager')
|
||||
|
||||
let WebsocketController
|
||||
module.exports = WebsocketController = {
|
||||
// If the protocol version changes when the client reconnects,
|
||||
// it will force a full refresh of the page. Useful for non-backwards
|
||||
|
@ -31,9 +18,6 @@ module.exports = WebsocketController = {
|
|||
PROTOCOL_VERSION: 2,
|
||||
|
||||
joinProject(client, user, project_id, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error, project, privilegeLevel, protocolVersion) {}
|
||||
}
|
||||
if (client.disconnected) {
|
||||
metrics.inc('editor.join-project.disconnected', 1, {
|
||||
status: 'immediately'
|
||||
|
@ -41,19 +25,19 @@ module.exports = WebsocketController = {
|
|||
return callback()
|
||||
}
|
||||
|
||||
const user_id = user != null ? user._id : undefined
|
||||
const user_id = user._id
|
||||
logger.log(
|
||||
{ user_id, project_id, client_id: client.id },
|
||||
'user joining project'
|
||||
)
|
||||
metrics.inc('editor.join-project')
|
||||
return WebApiManager.joinProject(project_id, user, function (
|
||||
WebApiManager.joinProject(project_id, user, function (
|
||||
error,
|
||||
project,
|
||||
privilegeLevel,
|
||||
isRestrictedUser
|
||||
) {
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
if (client.disconnected) {
|
||||
|
@ -63,7 +47,7 @@ module.exports = WebsocketController = {
|
|||
return callback()
|
||||
}
|
||||
|
||||
if (!privilegeLevel || privilegeLevel === '') {
|
||||
if (!privilegeLevel) {
|
||||
const err = new Error('not authorized')
|
||||
logger.warn(
|
||||
{ err, project_id, user_id, client_id: client.id },
|
||||
|
@ -76,16 +60,13 @@ module.exports = WebsocketController = {
|
|||
client.ol_context.privilege_level = privilegeLevel
|
||||
client.ol_context.user_id = user_id
|
||||
client.ol_context.project_id = project_id
|
||||
client.ol_context.owner_id = __guard__(
|
||||
project != null ? project.owner : undefined,
|
||||
(x) => x._id
|
||||
)
|
||||
client.ol_context.first_name = user != null ? user.first_name : undefined
|
||||
client.ol_context.last_name = user != null ? user.last_name : undefined
|
||||
client.ol_context.email = user != null ? user.email : undefined
|
||||
client.ol_context.owner_id = project.owner && project.owner._id
|
||||
client.ol_context.first_name = user.first_name
|
||||
client.ol_context.last_name = user.last_name
|
||||
client.ol_context.email = user.email
|
||||
client.ol_context.connected_time = new Date()
|
||||
client.ol_context.signup_date = user != null ? user.signUpDate : undefined
|
||||
client.ol_context.login_count = user != null ? user.loginCount : undefined
|
||||
client.ol_context.signup_date = user.signUpDate
|
||||
client.ol_context.login_count = user.loginCount
|
||||
client.ol_context.is_restricted_user = !!isRestrictedUser
|
||||
|
||||
RoomManager.joinProject(client, project_id, function (err) {
|
||||
|
@ -96,7 +77,7 @@ module.exports = WebsocketController = {
|
|||
{ user_id, project_id, client_id: client.id },
|
||||
'user joined project'
|
||||
)
|
||||
return callback(
|
||||
callback(
|
||||
null,
|
||||
project,
|
||||
privilegeLevel,
|
||||
|
@ -105,7 +86,7 @@ module.exports = WebsocketController = {
|
|||
})
|
||||
|
||||
// No need to block for setting the user as connected in the cursor tracking
|
||||
return ConnectedUsersManager.updateUserPosition(
|
||||
ConnectedUsersManager.updateUserPosition(
|
||||
project_id,
|
||||
client.publicId,
|
||||
user,
|
||||
|
@ -120,9 +101,6 @@ module.exports = WebsocketController = {
|
|||
// is determined by FLUSH_IF_EMPTY_DELAY.
|
||||
FLUSH_IF_EMPTY_DELAY: 500, // ms
|
||||
leaveProject(io, client, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
const { project_id, user_id } = client.ol_context
|
||||
if (!project_id) {
|
||||
return callback()
|
||||
|
@ -144,8 +122,8 @@ module.exports = WebsocketController = {
|
|||
project_id,
|
||||
client.publicId,
|
||||
function (err) {
|
||||
if (err != null) {
|
||||
return logger.error(
|
||||
if (err) {
|
||||
logger.error(
|
||||
{ err, project_id, user_id, client_id: client.id },
|
||||
'error marking client as disconnected'
|
||||
)
|
||||
|
@ -154,15 +132,15 @@ module.exports = WebsocketController = {
|
|||
)
|
||||
|
||||
RoomManager.leaveProjectAndDocs(client)
|
||||
return setTimeout(function () {
|
||||
setTimeout(function () {
|
||||
const remainingClients = io.sockets.clients(project_id)
|
||||
if (remainingClients.length === 0) {
|
||||
// Flush project in the background
|
||||
DocumentUpdaterManager.flushProjectToMongoAndDelete(
|
||||
project_id,
|
||||
function (err) {
|
||||
if (err != null) {
|
||||
return logger.error(
|
||||
if (err) {
|
||||
logger.error(
|
||||
{ err, project_id, user_id, client_id: client.id },
|
||||
'error flushing to doc updater after leaving project'
|
||||
)
|
||||
|
@ -170,17 +148,11 @@ module.exports = WebsocketController = {
|
|||
}
|
||||
)
|
||||
}
|
||||
return callback()
|
||||
callback()
|
||||
}, WebsocketController.FLUSH_IF_EMPTY_DELAY)
|
||||
},
|
||||
|
||||
joinDoc(client, doc_id, fromVersion, options, callback) {
|
||||
if (fromVersion == null) {
|
||||
fromVersion = -1
|
||||
}
|
||||
if (callback == null) {
|
||||
callback = function (error, doclines, version, ops, ranges) {}
|
||||
}
|
||||
if (client.disconnected) {
|
||||
metrics.inc('editor.join-doc.disconnected', 1, { status: 'immediately' })
|
||||
return callback()
|
||||
|
@ -188,7 +160,7 @@ module.exports = WebsocketController = {
|
|||
|
||||
metrics.inc('editor.join-doc')
|
||||
const { project_id, user_id, is_restricted_user } = client.ol_context
|
||||
if (project_id == null) {
|
||||
if (!project_id) {
|
||||
return callback(new Error('no project_id found on client'))
|
||||
}
|
||||
logger.log(
|
||||
|
@ -196,16 +168,14 @@ module.exports = WebsocketController = {
|
|||
'client joining doc'
|
||||
)
|
||||
|
||||
return AuthorizationManager.assertClientCanViewProject(client, function (
|
||||
error
|
||||
) {
|
||||
if (error != null) {
|
||||
AuthorizationManager.assertClientCanViewProject(client, function (error) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
// ensure the per-doc applied-ops channel is subscribed before sending the
|
||||
// doc to the client, so that no events are missed.
|
||||
return RoomManager.joinDoc(client, doc_id, function (error) {
|
||||
if (error != null) {
|
||||
RoomManager.joinDoc(client, doc_id, function (error) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
if (client.disconnected) {
|
||||
|
@ -216,13 +186,12 @@ module.exports = WebsocketController = {
|
|||
return callback()
|
||||
}
|
||||
|
||||
return DocumentUpdaterManager.getDocument(
|
||||
DocumentUpdaterManager.getDocument(
|
||||
project_id,
|
||||
doc_id,
|
||||
fromVersion,
|
||||
function (error, lines, version, ranges, ops) {
|
||||
let err
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
if (client.disconnected) {
|
||||
|
@ -233,10 +202,7 @@ module.exports = WebsocketController = {
|
|||
return callback()
|
||||
}
|
||||
|
||||
if (
|
||||
is_restricted_user &&
|
||||
(ranges != null ? ranges.comments : undefined) != null
|
||||
) {
|
||||
if (is_restricted_user && ranges && ranges.comments) {
|
||||
ranges.comments = []
|
||||
}
|
||||
|
||||
|
@ -245,11 +211,10 @@ module.exports = WebsocketController = {
|
|||
const encodeForWebsockets = (text) =>
|
||||
unescape(encodeURIComponent(text))
|
||||
const escapedLines = []
|
||||
for (let line of Array.from(lines)) {
|
||||
for (let line of lines) {
|
||||
try {
|
||||
line = encodeForWebsockets(line)
|
||||
} catch (error1) {
|
||||
err = error1
|
||||
} catch (err) {
|
||||
logger.err(
|
||||
{
|
||||
err,
|
||||
|
@ -267,25 +232,20 @@ module.exports = WebsocketController = {
|
|||
}
|
||||
if (options.encodeRanges) {
|
||||
try {
|
||||
for (const comment of Array.from(
|
||||
(ranges != null ? ranges.comments : undefined) || []
|
||||
)) {
|
||||
if (comment.op.c != null) {
|
||||
for (const comment of (ranges && ranges.comments) || []) {
|
||||
if (comment.op.c) {
|
||||
comment.op.c = encodeForWebsockets(comment.op.c)
|
||||
}
|
||||
}
|
||||
for (const change of Array.from(
|
||||
(ranges != null ? ranges.changes : undefined) || []
|
||||
)) {
|
||||
if (change.op.i != null) {
|
||||
for (const change of (ranges && ranges.changes) || []) {
|
||||
if (change.op.i) {
|
||||
change.op.i = encodeForWebsockets(change.op.i)
|
||||
}
|
||||
if (change.op.d != null) {
|
||||
if (change.op.d) {
|
||||
change.op.d = encodeForWebsockets(change.op.d)
|
||||
}
|
||||
}
|
||||
} catch (error2) {
|
||||
err = error2
|
||||
} catch (err) {
|
||||
logger.err(
|
||||
{
|
||||
err,
|
||||
|
@ -301,7 +261,7 @@ module.exports = WebsocketController = {
|
|||
}
|
||||
}
|
||||
|
||||
AuthorizationManager.addAccessToDoc(client, doc_id)
|
||||
AuthorizationManager.addAccessToDoc(client, doc_id, () => {})
|
||||
logger.log(
|
||||
{
|
||||
user_id,
|
||||
|
@ -312,7 +272,7 @@ module.exports = WebsocketController = {
|
|||
},
|
||||
'client joined doc'
|
||||
)
|
||||
return callback(null, escapedLines, version, ops, ranges)
|
||||
callback(null, escapedLines, version, ops, ranges)
|
||||
}
|
||||
)
|
||||
})
|
||||
|
@ -321,9 +281,6 @@ module.exports = WebsocketController = {
|
|||
|
||||
leaveDoc(client, doc_id, callback) {
|
||||
// client may have disconnected, but we have to cleanup internal state.
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
metrics.inc('editor.leave-doc')
|
||||
const { project_id, user_id } = client.ol_context
|
||||
logger.log(
|
||||
|
@ -335,12 +292,9 @@ module.exports = WebsocketController = {
|
|||
// the connection is per-project, we continue to allow access
|
||||
// after the initial joinDoc since we know they are already authorised.
|
||||
// # AuthorizationManager.removeAccessToDoc client, doc_id
|
||||
return callback()
|
||||
callback()
|
||||
},
|
||||
updateClientPosition(client, cursorData, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
if (client.disconnected) {
|
||||
// do not create a ghost entry in redis
|
||||
return callback()
|
||||
|
@ -359,11 +313,11 @@ module.exports = WebsocketController = {
|
|||
'updating client position'
|
||||
)
|
||||
|
||||
return AuthorizationManager.assertClientCanViewProjectAndDoc(
|
||||
AuthorizationManager.assertClientCanViewProjectAndDoc(
|
||||
client,
|
||||
cursorData.doc_id,
|
||||
function (error) {
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
logger.warn(
|
||||
{ err: error, client_id: client.id, project_id, user_id },
|
||||
"silently ignoring unauthorized updateClientPosition. Client likely hasn't called joinProject yet."
|
||||
|
@ -371,10 +325,10 @@ module.exports = WebsocketController = {
|
|||
return callback()
|
||||
}
|
||||
cursorData.id = client.publicId
|
||||
if (user_id != null) {
|
||||
if (user_id) {
|
||||
cursorData.user_id = user_id
|
||||
}
|
||||
if (email != null) {
|
||||
if (email) {
|
||||
cursorData.email = email
|
||||
}
|
||||
// Don't store anonymous users in redis to avoid influx
|
||||
|
@ -404,7 +358,7 @@ module.exports = WebsocketController = {
|
|||
callback
|
||||
)
|
||||
}
|
||||
return WebsocketLoadBalancer.emitToRoom(
|
||||
WebsocketLoadBalancer.emitToRoom(
|
||||
project_id,
|
||||
'clientTracking.clientUpdated',
|
||||
cursorData
|
||||
|
@ -415,9 +369,6 @@ module.exports = WebsocketController = {
|
|||
|
||||
CLIENT_REFRESH_DELAY: 1000,
|
||||
getConnectedUsers(client, callback) {
|
||||
if (callback == null) {
|
||||
callback = function (error, users) {}
|
||||
}
|
||||
if (client.disconnected) {
|
||||
// they are not interested anymore, skip the redis lookups
|
||||
return callback()
|
||||
|
@ -428,34 +379,32 @@ module.exports = WebsocketController = {
|
|||
if (is_restricted_user) {
|
||||
return callback(null, [])
|
||||
}
|
||||
if (project_id == null) {
|
||||
if (!project_id) {
|
||||
return callback(new Error('no project_id found on client'))
|
||||
}
|
||||
logger.log(
|
||||
{ user_id, project_id, client_id: client.id },
|
||||
'getting connected users'
|
||||
)
|
||||
return AuthorizationManager.assertClientCanViewProject(client, function (
|
||||
error
|
||||
) {
|
||||
if (error != null) {
|
||||
AuthorizationManager.assertClientCanViewProject(client, function (error) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
WebsocketLoadBalancer.emitToRoom(project_id, 'clientTracking.refresh')
|
||||
return setTimeout(
|
||||
setTimeout(
|
||||
() =>
|
||||
ConnectedUsersManager.getConnectedUsers(project_id, function (
|
||||
error,
|
||||
users
|
||||
) {
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
return callback(error)
|
||||
}
|
||||
callback(null, users)
|
||||
return logger.log(
|
||||
logger.log(
|
||||
{ user_id, project_id, client_id: client.id },
|
||||
'got connected users'
|
||||
)
|
||||
callback(null, users)
|
||||
}),
|
||||
WebsocketController.CLIENT_REFRESH_DELAY
|
||||
)
|
||||
|
@ -464,20 +413,17 @@ module.exports = WebsocketController = {
|
|||
|
||||
applyOtUpdate(client, doc_id, update, callback) {
|
||||
// client may have disconnected, but we can submit their update to doc-updater anyways.
|
||||
if (callback == null) {
|
||||
callback = function (error) {}
|
||||
}
|
||||
const { user_id, project_id } = client.ol_context
|
||||
if (project_id == null) {
|
||||
if (!project_id) {
|
||||
return callback(new Error('no project_id found on client'))
|
||||
}
|
||||
|
||||
return WebsocketController._assertClientCanApplyUpdate(
|
||||
WebsocketController._assertClientCanApplyUpdate(
|
||||
client,
|
||||
doc_id,
|
||||
update,
|
||||
function (error) {
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
logger.warn(
|
||||
{ err: error, doc_id, client_id: client.id, version: update.v },
|
||||
'client is not authorized to make update'
|
||||
|
@ -508,15 +454,12 @@ module.exports = WebsocketController = {
|
|||
'sending update to doc updater'
|
||||
)
|
||||
|
||||
return DocumentUpdaterManager.queueChange(
|
||||
DocumentUpdaterManager.queueChange(
|
||||
project_id,
|
||||
doc_id,
|
||||
update,
|
||||
function (error) {
|
||||
if (
|
||||
(error != null ? error.message : undefined) ===
|
||||
'update is too large'
|
||||
) {
|
||||
if ((error && error.message) === 'update is too large') {
|
||||
metrics.inc('update_too_large')
|
||||
const { updateSize } = error
|
||||
logger.warn(
|
||||
|
@ -541,12 +484,12 @@ module.exports = WebsocketController = {
|
|||
})
|
||||
}
|
||||
client.emit('otUpdateError', message.error, message)
|
||||
return client.disconnect()
|
||||
client.disconnect()
|
||||
}, 100)
|
||||
return
|
||||
}
|
||||
|
||||
if (error != null) {
|
||||
if (error) {
|
||||
logger.error(
|
||||
{
|
||||
err: error,
|
||||
|
@ -559,7 +502,7 @@ module.exports = WebsocketController = {
|
|||
)
|
||||
client.disconnect()
|
||||
}
|
||||
return callback(error)
|
||||
callback(error)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
@ -567,43 +510,37 @@ module.exports = WebsocketController = {
|
|||
},
|
||||
|
||||
_assertClientCanApplyUpdate(client, doc_id, update, callback) {
|
||||
return AuthorizationManager.assertClientCanEditProjectAndDoc(
|
||||
AuthorizationManager.assertClientCanEditProjectAndDoc(
|
||||
client,
|
||||
doc_id,
|
||||
function (error) {
|
||||
if (error != null) {
|
||||
if (
|
||||
error.message === 'not authorized' &&
|
||||
WebsocketController._isCommentUpdate(update)
|
||||
) {
|
||||
// This might be a comment op, which we only need read-only priveleges for
|
||||
return AuthorizationManager.assertClientCanViewProjectAndDoc(
|
||||
client,
|
||||
doc_id,
|
||||
callback
|
||||
)
|
||||
} else {
|
||||
return callback(error)
|
||||
}
|
||||
} else {
|
||||
return callback(null)
|
||||
if (
|
||||
error &&
|
||||
error.message === 'not authorized' &&
|
||||
WebsocketController._isCommentUpdate(update)
|
||||
) {
|
||||
// This might be a comment op, which we only need read-only priveleges for
|
||||
AuthorizationManager.assertClientCanViewProjectAndDoc(
|
||||
client,
|
||||
doc_id,
|
||||
callback
|
||||
)
|
||||
return
|
||||
}
|
||||
callback(error)
|
||||
}
|
||||
)
|
||||
},
|
||||
|
||||
_isCommentUpdate(update) {
|
||||
for (const op of Array.from(update.op)) {
|
||||
if (op.c == null) {
|
||||
if (!(update && update.op instanceof Array)) {
|
||||
return false
|
||||
}
|
||||
for (const op of update.op) {
|
||||
if (!op.c) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
function __guard__(value, transform) {
|
||||
return typeof value !== 'undefined' && value !== null
|
||||
? transform(value)
|
||||
: undefined
|
||||
}
|
||||
|
|
|
@ -1,17 +1,6 @@
|
|||
/* eslint-disable
|
||||
camelcase,
|
||||
*/
|
||||
// TODO: This file was created by bulk-decaffeinate.
|
||||
// Fix any style issues and re-enable lint.
|
||||
/*
|
||||
* decaffeinate suggestions:
|
||||
* DS101: Remove unnecessary use of Array.from
|
||||
* DS102: Remove unnecessary code created because of implicit returns
|
||||
* DS205: Consider reworking code to avoid use of IIFEs
|
||||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
let WebsocketLoadBalancer
|
||||
const Settings = require('settings-sharelatex')
|
||||
const logger = require('logger-sharelatex')
|
||||
const RedisClientManager = require('./RedisClientManager')
|
||||
|
@ -33,12 +22,13 @@ const RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST = [
|
|||
'removeEntity'
|
||||
]
|
||||
|
||||
let WebsocketLoadBalancer
|
||||
module.exports = WebsocketLoadBalancer = {
|
||||
rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub),
|
||||
rclientSubList: RedisClientManager.createClientList(Settings.redis.pubsub),
|
||||
|
||||
emitToRoom(room_id, message, ...payload) {
|
||||
if (room_id == null) {
|
||||
if (!room_id) {
|
||||
logger.warn(
|
||||
{ message, payload },
|
||||
'no room_id provided, ignoring emitToRoom'
|
||||
|
@ -55,99 +45,78 @@ module.exports = WebsocketLoadBalancer = {
|
|||
'emitting to room'
|
||||
)
|
||||
|
||||
return Array.from(this.rclientPubList).map((rclientPub) =>
|
||||
this.rclientPubList.map((rclientPub) =>
|
||||
ChannelManager.publish(rclientPub, 'editor-events', room_id, data)
|
||||
)
|
||||
},
|
||||
|
||||
emitToAll(message, ...payload) {
|
||||
return this.emitToRoom('all', message, ...Array.from(payload))
|
||||
this.emitToRoom('all', message, ...payload)
|
||||
},
|
||||
|
||||
listenForEditorEvents(io) {
|
||||
logger.log(
|
||||
{ rclients: this.rclientPubList.length },
|
||||
'publishing editor events'
|
||||
)
|
||||
logger.log(
|
||||
{ rclients: this.rclientSubList.length },
|
||||
'listening for editor events'
|
||||
)
|
||||
for (const rclientSub of Array.from(this.rclientSubList)) {
|
||||
for (const rclientSub of this.rclientSubList) {
|
||||
rclientSub.subscribe('editor-events')
|
||||
rclientSub.on('message', function (channel, message) {
|
||||
if (Settings.debugEvents > 0) {
|
||||
EventLogger.debugEvent(channel, message)
|
||||
}
|
||||
return WebsocketLoadBalancer._processEditorEvent(io, channel, message)
|
||||
WebsocketLoadBalancer._processEditorEvent(io, channel, message)
|
||||
})
|
||||
}
|
||||
return this.handleRoomUpdates(this.rclientSubList)
|
||||
this.handleRoomUpdates(this.rclientSubList)
|
||||
},
|
||||
|
||||
handleRoomUpdates(rclientSubList) {
|
||||
const roomEvents = RoomManager.eventSource()
|
||||
roomEvents.on('project-active', function (project_id) {
|
||||
const subscribePromises = Array.from(rclientSubList).map((rclient) =>
|
||||
const subscribePromises = rclientSubList.map((rclient) =>
|
||||
ChannelManager.subscribe(rclient, 'editor-events', project_id)
|
||||
)
|
||||
return RoomManager.emitOnCompletion(
|
||||
RoomManager.emitOnCompletion(
|
||||
subscribePromises,
|
||||
`project-subscribed-${project_id}`
|
||||
)
|
||||
})
|
||||
return roomEvents.on('project-empty', (project_id) =>
|
||||
Array.from(rclientSubList).map((rclient) =>
|
||||
roomEvents.on('project-empty', (project_id) =>
|
||||
rclientSubList.map((rclient) =>
|
||||
ChannelManager.unsubscribe(rclient, 'editor-events', project_id)
|
||||
)
|
||||
)
|
||||
},
|
||||
|
||||
_processEditorEvent(io, channel, message) {
|
||||
return SafeJsonParse.parse(message, function (error, message) {
|
||||
let clientList
|
||||
let client
|
||||
if (error != null) {
|
||||
SafeJsonParse.parse(message, function (error, message) {
|
||||
if (error) {
|
||||
logger.error({ err: error, channel }, 'error parsing JSON')
|
||||
return
|
||||
}
|
||||
if (message.room_id === 'all') {
|
||||
return io.sockets.emit(message.message, ...Array.from(message.payload))
|
||||
io.sockets.emit(message.message, ...message.payload)
|
||||
} else if (
|
||||
message.message === 'clientTracking.refresh' &&
|
||||
message.room_id != null
|
||||
message.room_id
|
||||
) {
|
||||
clientList = io.sockets.clients(message.room_id)
|
||||
const clientList = io.sockets.clients(message.room_id)
|
||||
logger.log(
|
||||
{
|
||||
channel,
|
||||
message: message.message,
|
||||
room_id: message.room_id,
|
||||
message_id: message._id,
|
||||
socketIoClients: (() => {
|
||||
const result = []
|
||||
for (client of Array.from(clientList)) {
|
||||
result.push(client.id)
|
||||
}
|
||||
return result
|
||||
})()
|
||||
socketIoClients: clientList.map((client) => client.id)
|
||||
},
|
||||
'refreshing client list'
|
||||
)
|
||||
return (() => {
|
||||
const result1 = []
|
||||
for (client of Array.from(clientList)) {
|
||||
result1.push(
|
||||
ConnectedUsersManager.refreshClient(
|
||||
message.room_id,
|
||||
client.publicId
|
||||
)
|
||||
)
|
||||
}
|
||||
return result1
|
||||
})()
|
||||
} else if (message.room_id != null) {
|
||||
if (message._id != null && Settings.checkEventOrder) {
|
||||
for (const client of clientList) {
|
||||
ConnectedUsersManager.refreshClient(message.room_id, client.publicId)
|
||||
}
|
||||
} else if (message.room_id) {
|
||||
if (message._id && Settings.checkEventOrder) {
|
||||
const status = EventLogger.checkEventOrder(
|
||||
'editor-events',
|
||||
message._id,
|
||||
|
@ -158,12 +127,12 @@ module.exports = WebsocketLoadBalancer = {
|
|||
}
|
||||
}
|
||||
|
||||
const is_restricted_message = !Array.from(
|
||||
RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST
|
||||
).includes(message.message)
|
||||
const is_restricted_message = !RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST.includes(
|
||||
message.message
|
||||
)
|
||||
|
||||
// send messages only to unique clients (due to duplicate entries in io.sockets.clients)
|
||||
clientList = io.sockets
|
||||
const clientList = io.sockets
|
||||
.clients(message.room_id)
|
||||
.filter(
|
||||
(client) =>
|
||||
|
@ -180,37 +149,23 @@ module.exports = WebsocketLoadBalancer = {
|
|||
message: message.message,
|
||||
room_id: message.room_id,
|
||||
message_id: message._id,
|
||||
socketIoClients: (() => {
|
||||
const result2 = []
|
||||
for (client of Array.from(clientList)) {
|
||||
result2.push(client.id)
|
||||
}
|
||||
return result2
|
||||
})()
|
||||
socketIoClients: clientList.map((client) => client.id)
|
||||
},
|
||||
'distributing event to clients'
|
||||
)
|
||||
const seen = {}
|
||||
return (() => {
|
||||
const result3 = []
|
||||
for (client of Array.from(clientList)) {
|
||||
if (!seen[client.id]) {
|
||||
seen[client.id] = true
|
||||
result3.push(
|
||||
client.emit(message.message, ...Array.from(message.payload))
|
||||
)
|
||||
} else {
|
||||
result3.push(undefined)
|
||||
}
|
||||
const seen = new Map()
|
||||
for (const client of clientList) {
|
||||
if (!seen.has(client.id)) {
|
||||
seen.set(client.id, true)
|
||||
client.emit(message.message, ...message.payload)
|
||||
}
|
||||
return result3
|
||||
})()
|
||||
} else if (message.health_check != null) {
|
||||
}
|
||||
} else if (message.health_check) {
|
||||
logger.debug(
|
||||
{ message },
|
||||
'got health check message in editor events channel'
|
||||
)
|
||||
return HealthCheckManager.check(channel, message.key)
|
||||
HealthCheckManager.check(channel, message.key)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue