prettier: convert app/js decaffeinated files to Prettier format

This commit is contained in:
decaffeinate 2020-06-23 18:29:44 +01:00 committed by Jakob Ackermann
parent 04a85a6716
commit 817844515d
19 changed files with 2425 additions and 1624 deletions

View file

@ -11,61 +11,101 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let AuthorizationManager; let AuthorizationManager
module.exports = (AuthorizationManager = { module.exports = AuthorizationManager = {
assertClientCanViewProject(client, callback) { assertClientCanViewProject(client, callback) {
if (callback == null) { callback = function(error) {}; } if (callback == null) {
return AuthorizationManager._assertClientHasPrivilegeLevel(client, ["readOnly", "readAndWrite", "owner"], callback); callback = function (error) {}
}, }
return AuthorizationManager._assertClientHasPrivilegeLevel(
client,
['readOnly', 'readAndWrite', 'owner'],
callback
)
},
assertClientCanEditProject(client, callback) { assertClientCanEditProject(client, callback) {
if (callback == null) { callback = function(error) {}; } if (callback == null) {
return AuthorizationManager._assertClientHasPrivilegeLevel(client, ["readAndWrite", "owner"], callback); callback = function (error) {}
}, }
return AuthorizationManager._assertClientHasPrivilegeLevel(
_assertClientHasPrivilegeLevel(client, allowedLevels, callback) { client,
if (callback == null) { callback = function(error) {}; } ['readAndWrite', 'owner'],
if (Array.from(allowedLevels).includes(client.ol_context.privilege_level)) { callback
return callback(null); )
} else { },
return callback(new Error("not authorized"));
}
},
assertClientCanViewProjectAndDoc(client, doc_id, callback) { _assertClientHasPrivilegeLevel(client, allowedLevels, callback) {
if (callback == null) { callback = function(error) {}; } if (callback == null) {
return AuthorizationManager.assertClientCanViewProject(client, function(error) { callback = function (error) {}
if (error != null) { return callback(error); } }
return AuthorizationManager._assertClientCanAccessDoc(client, doc_id, callback); if (Array.from(allowedLevels).includes(client.ol_context.privilege_level)) {
}); return callback(null)
}, } else {
return callback(new Error('not authorized'))
}
},
assertClientCanEditProjectAndDoc(client, doc_id, callback) { assertClientCanViewProjectAndDoc(client, doc_id, callback) {
if (callback == null) { callback = function(error) {}; } if (callback == null) {
return AuthorizationManager.assertClientCanEditProject(client, function(error) { callback = function (error) {}
if (error != null) { return callback(error); } }
return AuthorizationManager._assertClientCanAccessDoc(client, doc_id, callback); return AuthorizationManager.assertClientCanViewProject(client, function (
}); error
}, ) {
if (error != null) {
return callback(error)
}
return AuthorizationManager._assertClientCanAccessDoc(
client,
doc_id,
callback
)
})
},
_assertClientCanAccessDoc(client, doc_id, callback) { assertClientCanEditProjectAndDoc(client, doc_id, callback) {
if (callback == null) { callback = function(error) {}; } if (callback == null) {
if (client.ol_context[`doc:${doc_id}`] === "allowed") { callback = function (error) {}
return callback(null); }
} else { return AuthorizationManager.assertClientCanEditProject(client, function (
return callback(new Error("not authorized")); error
} ) {
}, if (error != null) {
return callback(error)
}
return AuthorizationManager._assertClientCanAccessDoc(
client,
doc_id,
callback
)
})
},
addAccessToDoc(client, doc_id, callback) { _assertClientCanAccessDoc(client, doc_id, callback) {
if (callback == null) { callback = function(error) {}; } if (callback == null) {
client.ol_context[`doc:${doc_id}`] = "allowed"; callback = function (error) {}
return callback(null); }
}, if (client.ol_context[`doc:${doc_id}`] === 'allowed') {
return callback(null)
} else {
return callback(new Error('not authorized'))
}
},
removeAccessToDoc(client, doc_id, callback) { addAccessToDoc(client, doc_id, callback) {
if (callback == null) { callback = function(error) {}; } if (callback == null) {
delete client.ol_context[`doc:${doc_id}`]; callback = function (error) {}
return callback(null); }
} client.ol_context[`doc:${doc_id}`] = 'allowed'
}); return callback(null)
},
removeAccessToDoc(client, doc_id, callback) {
if (callback == null) {
callback = function (error) {}
}
delete client.ol_context[`doc:${doc_id}`]
return callback(null)
}
}

View file

@ -8,84 +8,98 @@
* DS102: Remove unnecessary code created because of implicit returns * DS102: Remove unnecessary code created because of implicit returns
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let ChannelManager; let ChannelManager
const logger = require('logger-sharelatex'); const logger = require('logger-sharelatex')
const metrics = require("metrics-sharelatex"); const metrics = require('metrics-sharelatex')
const settings = require("settings-sharelatex"); const settings = require('settings-sharelatex')
const ClientMap = new Map(); // for each redis client, store a Map of subscribed channels (channelname -> subscribe promise) const ClientMap = new Map() // for each redis client, store a Map of subscribed channels (channelname -> subscribe promise)
// Manage redis pubsub subscriptions for individual projects and docs, ensuring // Manage redis pubsub subscriptions for individual projects and docs, ensuring
// that we never subscribe to a channel multiple times. The socket.io side is // that we never subscribe to a channel multiple times. The socket.io side is
// handled by RoomManager. // handled by RoomManager.
module.exports = (ChannelManager = { module.exports = ChannelManager = {
getClientMapEntry(rclient) { getClientMapEntry(rclient) {
// return the per-client channel map if it exists, otherwise create and // return the per-client channel map if it exists, otherwise create and
// return an empty map for the client. // return an empty map for the client.
return ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient); return (
}, ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient)
)
},
subscribe(rclient, baseChannel, id) { subscribe(rclient, baseChannel, id) {
const clientChannelMap = this.getClientMapEntry(rclient); const clientChannelMap = this.getClientMapEntry(rclient)
const channel = `${baseChannel}:${id}`; const channel = `${baseChannel}:${id}`
const actualSubscribe = function() { const actualSubscribe = function () {
// subscribe is happening in the foreground and it should reject // subscribe is happening in the foreground and it should reject
const p = rclient.subscribe(channel); const p = rclient.subscribe(channel)
p.finally(function() { p.finally(function () {
if (clientChannelMap.get(channel) === subscribePromise) { if (clientChannelMap.get(channel) === subscribePromise) {
return clientChannelMap.delete(channel); return clientChannelMap.delete(channel)
}}).then(function() {
logger.log({channel}, "subscribed to channel");
return metrics.inc(`subscribe.${baseChannel}`);}).catch(function(err) {
logger.error({channel, err}, "failed to subscribe to channel");
return metrics.inc(`subscribe.failed.${baseChannel}`);
});
return p;
};
const pendingActions = clientChannelMap.get(channel) || Promise.resolve();
var subscribePromise = pendingActions.then(actualSubscribe, actualSubscribe);
clientChannelMap.set(channel, subscribePromise);
logger.log({channel}, "planned to subscribe to channel");
return subscribePromise;
},
unsubscribe(rclient, baseChannel, id) {
const clientChannelMap = this.getClientMapEntry(rclient);
const channel = `${baseChannel}:${id}`;
const actualUnsubscribe = function() {
// unsubscribe is happening in the background, it should not reject
const p = rclient.unsubscribe(channel)
.finally(function() {
if (clientChannelMap.get(channel) === unsubscribePromise) {
return clientChannelMap.delete(channel);
}}).then(function() {
logger.log({channel}, "unsubscribed from channel");
return metrics.inc(`unsubscribe.${baseChannel}`);}).catch(function(err) {
logger.error({channel, err}, "unsubscribed from channel");
return metrics.inc(`unsubscribe.failed.${baseChannel}`);
});
return p;
};
const pendingActions = clientChannelMap.get(channel) || Promise.resolve();
var unsubscribePromise = pendingActions.then(actualUnsubscribe, actualUnsubscribe);
clientChannelMap.set(channel, unsubscribePromise);
logger.log({channel}, "planned to unsubscribe from channel");
return unsubscribePromise;
},
publish(rclient, baseChannel, id, data) {
let channel;
metrics.summary(`redis.publish.${baseChannel}`, data.length);
if ((id === 'all') || !settings.publishOnIndividualChannels) {
channel = baseChannel;
} else {
channel = `${baseChannel}:${id}`;
} }
// we publish on a different client to the subscribe, so we can't })
// check for the channel existing here .then(function () {
return rclient.publish(channel, data); logger.log({ channel }, 'subscribed to channel')
return metrics.inc(`subscribe.${baseChannel}`)
})
.catch(function (err) {
logger.error({ channel, err }, 'failed to subscribe to channel')
return metrics.inc(`subscribe.failed.${baseChannel}`)
})
return p
} }
});
const pendingActions = clientChannelMap.get(channel) || Promise.resolve()
var subscribePromise = pendingActions.then(actualSubscribe, actualSubscribe)
clientChannelMap.set(channel, subscribePromise)
logger.log({ channel }, 'planned to subscribe to channel')
return subscribePromise
},
unsubscribe(rclient, baseChannel, id) {
const clientChannelMap = this.getClientMapEntry(rclient)
const channel = `${baseChannel}:${id}`
const actualUnsubscribe = function () {
// unsubscribe is happening in the background, it should not reject
const p = rclient
.unsubscribe(channel)
.finally(function () {
if (clientChannelMap.get(channel) === unsubscribePromise) {
return clientChannelMap.delete(channel)
}
})
.then(function () {
logger.log({ channel }, 'unsubscribed from channel')
return metrics.inc(`unsubscribe.${baseChannel}`)
})
.catch(function (err) {
logger.error({ channel, err }, 'unsubscribed from channel')
return metrics.inc(`unsubscribe.failed.${baseChannel}`)
})
return p
}
const pendingActions = clientChannelMap.get(channel) || Promise.resolve()
var unsubscribePromise = pendingActions.then(
actualUnsubscribe,
actualUnsubscribe
)
clientChannelMap.set(channel, unsubscribePromise)
logger.log({ channel }, 'planned to unsubscribe from channel')
return unsubscribePromise
},
publish(rclient, baseChannel, id, data) {
let channel
metrics.summary(`redis.publish.${baseChannel}`, data.length)
if (id === 'all' || !settings.publishOnIndividualChannels) {
channel = baseChannel
} else {
channel = `${baseChannel}:${id}`
}
// we publish on a different client to the subscribe, so we can't
// check for the channel existing here
return rclient.publish(channel, data)
}
}

View file

@ -10,112 +10,185 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
const async = require("async"); const async = require('async')
const Settings = require('settings-sharelatex'); const Settings = require('settings-sharelatex')
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
const redis = require("redis-sharelatex"); const redis = require('redis-sharelatex')
const rclient = redis.createClient(Settings.redis.realtime); const rclient = redis.createClient(Settings.redis.realtime)
const Keys = Settings.redis.realtime.key_schema; const Keys = Settings.redis.realtime.key_schema
const ONE_HOUR_IN_S = 60 * 60; const ONE_HOUR_IN_S = 60 * 60
const ONE_DAY_IN_S = ONE_HOUR_IN_S * 24; const ONE_DAY_IN_S = ONE_HOUR_IN_S * 24
const FOUR_DAYS_IN_S = ONE_DAY_IN_S * 4; const FOUR_DAYS_IN_S = ONE_DAY_IN_S * 4
const USER_TIMEOUT_IN_S = ONE_HOUR_IN_S / 4; const USER_TIMEOUT_IN_S = ONE_HOUR_IN_S / 4
const REFRESH_TIMEOUT_IN_S = 10; // only show clients which have responded to a refresh request in the last 10 seconds const REFRESH_TIMEOUT_IN_S = 10 // only show clients which have responded to a refresh request in the last 10 seconds
module.exports = { module.exports = {
// Use the same method for when a user connects, and when a user sends a cursor // Use the same method for when a user connects, and when a user sends a cursor
// update. This way we don't care if the connected_user key has expired when // update. This way we don't care if the connected_user key has expired when
// we receive a cursor update. // we receive a cursor update.
updateUserPosition(project_id, client_id, user, cursorData, callback){ updateUserPosition(project_id, client_id, user, cursorData, callback) {
if (callback == null) { callback = function(err){}; } if (callback == null) {
logger.log({project_id, client_id}, "marking user as joined or connected"); callback = function (err) {}
}
logger.log({ project_id, client_id }, 'marking user as joined or connected')
const multi = rclient.multi(); const multi = rclient.multi()
multi.sadd(Keys.clientsInProject({project_id}), client_id);
multi.expire(Keys.clientsInProject({project_id}), FOUR_DAYS_IN_S);
multi.hset(Keys.connectedUser({project_id, client_id}), "last_updated_at", Date.now());
multi.hset(Keys.connectedUser({project_id, client_id}), "user_id", user._id);
multi.hset(Keys.connectedUser({project_id, client_id}), "first_name", user.first_name || "");
multi.hset(Keys.connectedUser({project_id, client_id}), "last_name", user.last_name || "");
multi.hset(Keys.connectedUser({project_id, client_id}), "email", user.email || "");
if (cursorData != null) {
multi.hset(Keys.connectedUser({project_id, client_id}), "cursorData", JSON.stringify(cursorData));
}
multi.expire(Keys.connectedUser({project_id, client_id}), USER_TIMEOUT_IN_S);
return multi.exec(function(err){
if (err != null) {
logger.err({err, project_id, client_id}, "problem marking user as connected");
}
return callback(err);
});
},
refreshClient(project_id, client_id, callback) { multi.sadd(Keys.clientsInProject({ project_id }), client_id)
if (callback == null) { callback = function(err) {}; } multi.expire(Keys.clientsInProject({ project_id }), FOUR_DAYS_IN_S)
logger.log({project_id, client_id}, "refreshing connected client");
const multi = rclient.multi();
multi.hset(Keys.connectedUser({project_id, client_id}), "last_updated_at", Date.now());
multi.expire(Keys.connectedUser({project_id, client_id}), USER_TIMEOUT_IN_S);
return multi.exec(function(err){
if (err != null) {
logger.err({err, project_id, client_id}, "problem refreshing connected client");
}
return callback(err);
});
},
markUserAsDisconnected(project_id, client_id, callback){ multi.hset(
logger.log({project_id, client_id}, "marking user as disconnected"); Keys.connectedUser({ project_id, client_id }),
const multi = rclient.multi(); 'last_updated_at',
multi.srem(Keys.clientsInProject({project_id}), client_id); Date.now()
multi.expire(Keys.clientsInProject({project_id}), FOUR_DAYS_IN_S); )
multi.del(Keys.connectedUser({project_id, client_id})); multi.hset(
return multi.exec(callback); Keys.connectedUser({ project_id, client_id }),
}, 'user_id',
user._id
)
multi.hset(
Keys.connectedUser({ project_id, client_id }),
'first_name',
user.first_name || ''
)
multi.hset(
Keys.connectedUser({ project_id, client_id }),
'last_name',
user.last_name || ''
)
multi.hset(
Keys.connectedUser({ project_id, client_id }),
'email',
user.email || ''
)
if (cursorData != null) {
multi.hset(
Keys.connectedUser({ project_id, client_id }),
'cursorData',
JSON.stringify(cursorData)
)
}
multi.expire(
Keys.connectedUser({ project_id, client_id }),
USER_TIMEOUT_IN_S
)
_getConnectedUser(project_id, client_id, callback){ return multi.exec(function (err) {
return rclient.hgetall(Keys.connectedUser({project_id, client_id}), function(err, result){ if (err != null) {
if ((result == null) || (Object.keys(result).length === 0) || !result.user_id) { logger.err(
result = { { err, project_id, client_id },
connected : false, 'problem marking user as connected'
client_id )
}; }
} else { return callback(err)
result.connected = true; })
result.client_id = client_id; },
result.client_age = (Date.now() - parseInt(result.last_updated_at,10)) / 1000;
if (result.cursorData != null) {
try {
result.cursorData = JSON.parse(result.cursorData);
} catch (e) {
logger.error({err: e, project_id, client_id, cursorData: result.cursorData}, "error parsing cursorData JSON");
return callback(e);
}
}
}
return callback(err, result);
});
},
getConnectedUsers(project_id, callback){ refreshClient(project_id, client_id, callback) {
const self = this; if (callback == null) {
return rclient.smembers(Keys.clientsInProject({project_id}), function(err, results){ callback = function (err) {}
if (err != null) { return callback(err); } }
const jobs = results.map(client_id => cb => self._getConnectedUser(project_id, client_id, cb)); logger.log({ project_id, client_id }, 'refreshing connected client')
return async.series(jobs, function(err, users){ const multi = rclient.multi()
if (users == null) { users = []; } multi.hset(
if (err != null) { return callback(err); } Keys.connectedUser({ project_id, client_id }),
users = users.filter(user => (user != null ? user.connected : undefined) && ((user != null ? user.client_age : undefined) < REFRESH_TIMEOUT_IN_S)); 'last_updated_at',
return callback(null, users); Date.now()
}); )
}); multi.expire(
} Keys.connectedUser({ project_id, client_id }),
}; USER_TIMEOUT_IN_S
)
return multi.exec(function (err) {
if (err != null) {
logger.err(
{ err, project_id, client_id },
'problem refreshing connected client'
)
}
return callback(err)
})
},
markUserAsDisconnected(project_id, client_id, callback) {
logger.log({ project_id, client_id }, 'marking user as disconnected')
const multi = rclient.multi()
multi.srem(Keys.clientsInProject({ project_id }), client_id)
multi.expire(Keys.clientsInProject({ project_id }), FOUR_DAYS_IN_S)
multi.del(Keys.connectedUser({ project_id, client_id }))
return multi.exec(callback)
},
_getConnectedUser(project_id, client_id, callback) {
return rclient.hgetall(
Keys.connectedUser({ project_id, client_id }),
function (err, result) {
if (
result == null ||
Object.keys(result).length === 0 ||
!result.user_id
) {
result = {
connected: false,
client_id
}
} else {
result.connected = true
result.client_id = client_id
result.client_age =
(Date.now() - parseInt(result.last_updated_at, 10)) / 1000
if (result.cursorData != null) {
try {
result.cursorData = JSON.parse(result.cursorData)
} catch (e) {
logger.error(
{
err: e,
project_id,
client_id,
cursorData: result.cursorData
},
'error parsing cursorData JSON'
)
return callback(e)
}
}
}
return callback(err, result)
}
)
},
getConnectedUsers(project_id, callback) {
const self = this
return rclient.smembers(Keys.clientsInProject({ project_id }), function (
err,
results
) {
if (err != null) {
return callback(err)
}
const jobs = results.map((client_id) => (cb) =>
self._getConnectedUser(project_id, client_id, cb)
)
return async.series(jobs, function (err, users) {
if (users == null) {
users = []
}
if (err != null) {
return callback(err)
}
users = users.filter(
(user) =>
(user != null ? user.connected : undefined) &&
(user != null ? user.client_age : undefined) < REFRESH_TIMEOUT_IN_S
)
return callback(null, users)
})
})
}
}

View file

@ -12,131 +12,197 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let DocumentUpdaterController; let DocumentUpdaterController
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
const settings = require('settings-sharelatex'); const settings = require('settings-sharelatex')
const RedisClientManager = require("./RedisClientManager"); const RedisClientManager = require('./RedisClientManager')
const SafeJsonParse = require("./SafeJsonParse"); const SafeJsonParse = require('./SafeJsonParse')
const EventLogger = require("./EventLogger"); const EventLogger = require('./EventLogger')
const HealthCheckManager = require("./HealthCheckManager"); const HealthCheckManager = require('./HealthCheckManager')
const RoomManager = require("./RoomManager"); const RoomManager = require('./RoomManager')
const ChannelManager = require("./ChannelManager"); const ChannelManager = require('./ChannelManager')
const metrics = require("metrics-sharelatex"); const metrics = require('metrics-sharelatex')
const MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024; // 1Mb const MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 // 1Mb
module.exports = (DocumentUpdaterController = { module.exports = DocumentUpdaterController = {
// DocumentUpdaterController is responsible for updates that come via Redis // DocumentUpdaterController is responsible for updates that come via Redis
// Pub/Sub from the document updater. // Pub/Sub from the document updater.
rclientList: RedisClientManager.createClientList(settings.redis.pubsub), rclientList: RedisClientManager.createClientList(settings.redis.pubsub),
listenForUpdatesFromDocumentUpdater(io) { listenForUpdatesFromDocumentUpdater(io) {
let i, rclient; let i, rclient
logger.log({rclients: this.rclientList.length}, "listening for applied-ops events"); logger.log(
for (i = 0; i < this.rclientList.length; i++) { { rclients: this.rclientList.length },
rclient = this.rclientList[i]; 'listening for applied-ops events'
rclient.subscribe("applied-ops"); )
rclient.on("message", function(channel, message) { for (i = 0; i < this.rclientList.length; i++) {
metrics.inc("rclient", 0.001); // global event rate metric rclient = this.rclientList[i]
if (settings.debugEvents > 0) { EventLogger.debugEvent(channel, message); } rclient.subscribe('applied-ops')
return DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message); rclient.on('message', function (channel, message) {
}); metrics.inc('rclient', 0.001) // global event rate metric
} if (settings.debugEvents > 0) {
// create metrics for each redis instance only when we have multiple redis clients EventLogger.debugEvent(channel, message)
if (this.rclientList.length > 1) { }
for (i = 0; i < this.rclientList.length; i++) { return DocumentUpdaterController._processMessageFromDocumentUpdater(
rclient = this.rclientList[i]; io,
((i => // per client event rate metric channel,
rclient.on("message", () => metrics.inc(`rclient-${i}`, 0.001))))(i); message
} )
} })
return this.handleRoomUpdates(this.rclientList); }
}, // create metrics for each redis instance only when we have multiple redis clients
if (this.rclientList.length > 1) {
for (i = 0; i < this.rclientList.length; i++) {
rclient = this.rclientList[i]
;((
i // per client event rate metric
) => rclient.on('message', () => metrics.inc(`rclient-${i}`, 0.001)))(i)
}
}
return this.handleRoomUpdates(this.rclientList)
},
handleRoomUpdates(rclientSubList) { handleRoomUpdates(rclientSubList) {
const roomEvents = RoomManager.eventSource(); const roomEvents = RoomManager.eventSource()
roomEvents.on('doc-active', function(doc_id) { roomEvents.on('doc-active', function (doc_id) {
const subscribePromises = Array.from(rclientSubList).map((rclient) => const subscribePromises = Array.from(rclientSubList).map((rclient) =>
ChannelManager.subscribe(rclient, "applied-ops", doc_id)); ChannelManager.subscribe(rclient, 'applied-ops', doc_id)
return RoomManager.emitOnCompletion(subscribePromises, `doc-subscribed-${doc_id}`); )
}); return RoomManager.emitOnCompletion(
return roomEvents.on('doc-empty', doc_id => Array.from(rclientSubList).map((rclient) => subscribePromises,
ChannelManager.unsubscribe(rclient, "applied-ops", doc_id))); `doc-subscribed-${doc_id}`
}, )
})
return roomEvents.on('doc-empty', (doc_id) =>
Array.from(rclientSubList).map((rclient) =>
ChannelManager.unsubscribe(rclient, 'applied-ops', doc_id)
)
)
},
_processMessageFromDocumentUpdater(io, channel, message) { _processMessageFromDocumentUpdater(io, channel, message) {
return SafeJsonParse.parse(message, function(error, message) { return SafeJsonParse.parse(message, function (error, message) {
if (error != null) { if (error != null) {
logger.error({err: error, channel}, "error parsing JSON"); logger.error({ err: error, channel }, 'error parsing JSON')
return; return
} }
if (message.op != null) { if (message.op != null) {
if ((message._id != null) && settings.checkEventOrder) { if (message._id != null && settings.checkEventOrder) {
const status = EventLogger.checkEventOrder("applied-ops", message._id, message); const status = EventLogger.checkEventOrder(
if (status === 'duplicate') { 'applied-ops',
return; // skip duplicate events message._id,
} message
} )
return DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op); if (status === 'duplicate') {
} else if (message.error != null) { return // skip duplicate events
return DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message); }
} else if (message.health_check != null) { }
logger.debug({message}, "got health check message in applied ops channel"); return DocumentUpdaterController._applyUpdateFromDocumentUpdater(
return HealthCheckManager.check(channel, message.key); io,
} message.doc_id,
}); message.op
}, )
} else if (message.error != null) {
_applyUpdateFromDocumentUpdater(io, doc_id, update) { return DocumentUpdaterController._processErrorFromDocumentUpdater(
let client; io,
const clientList = io.sockets.clients(doc_id); message.doc_id,
// avoid unnecessary work if no clients are connected message.error,
if (clientList.length === 0) { message
return; )
} } else if (message.health_check != null) {
// send updates to clients logger.debug(
logger.log({doc_id, version: update.v, source: (update.meta != null ? update.meta.source : undefined), socketIoClients: (((() => { { message },
const result = []; 'got health check message in applied ops channel'
for (client of Array.from(clientList)) { result.push(client.id); )
} return HealthCheckManager.check(channel, message.key)
return result; }
})()))}, "distributing updates to clients"); })
const seen = {}; },
// send messages only to unique clients (due to duplicate entries in io.sockets.clients)
for (client of Array.from(clientList)) {
if (!seen[client.id]) {
seen[client.id] = true;
if (client.publicId === update.meta.source) {
logger.log({doc_id, version: update.v, source: (update.meta != null ? update.meta.source : undefined)}, "distributing update to sender");
client.emit("otUpdateApplied", {v: update.v, doc: update.doc});
} else if (!update.dup) { // Duplicate ops should just be sent back to sending client for acknowledgement
logger.log({doc_id, version: update.v, source: (update.meta != null ? update.meta.source : undefined), client_id: client.id}, "distributing update to collaborator");
client.emit("otUpdateApplied", update);
}
}
}
if (Object.keys(seen).length < clientList.length) {
metrics.inc("socket-io.duplicate-clients", 0.1);
return logger.log({doc_id, socketIoClients: (((() => {
const result1 = [];
for (client of Array.from(clientList)) { result1.push(client.id);
}
return result1;
})()))}, "discarded duplicate clients");
}
},
_processErrorFromDocumentUpdater(io, doc_id, error, message) {
return (() => {
const result = [];
for (const client of Array.from(io.sockets.clients(doc_id))) {
logger.warn({err: error, doc_id, client_id: client.id}, "error from document updater, disconnecting client");
client.emit("otUpdateError", error, message);
result.push(client.disconnect());
}
return result;
})();
}
});
_applyUpdateFromDocumentUpdater(io, doc_id, update) {
let client
const clientList = io.sockets.clients(doc_id)
// avoid unnecessary work if no clients are connected
if (clientList.length === 0) {
return
}
// send updates to clients
logger.log(
{
doc_id,
version: update.v,
source: update.meta != null ? update.meta.source : undefined,
socketIoClients: (() => {
const result = []
for (client of Array.from(clientList)) {
result.push(client.id)
}
return result
})()
},
'distributing updates to clients'
)
const seen = {}
// send messages only to unique clients (due to duplicate entries in io.sockets.clients)
for (client of Array.from(clientList)) {
if (!seen[client.id]) {
seen[client.id] = true
if (client.publicId === update.meta.source) {
logger.log(
{
doc_id,
version: update.v,
source: update.meta != null ? update.meta.source : undefined
},
'distributing update to sender'
)
client.emit('otUpdateApplied', { v: update.v, doc: update.doc })
} else if (!update.dup) {
// Duplicate ops should just be sent back to sending client for acknowledgement
logger.log(
{
doc_id,
version: update.v,
source: update.meta != null ? update.meta.source : undefined,
client_id: client.id
},
'distributing update to collaborator'
)
client.emit('otUpdateApplied', update)
}
}
}
if (Object.keys(seen).length < clientList.length) {
metrics.inc('socket-io.duplicate-clients', 0.1)
return logger.log(
{
doc_id,
socketIoClients: (() => {
const result1 = []
for (client of Array.from(clientList)) {
result1.push(client.id)
}
return result1
})()
},
'discarded duplicate clients'
)
}
},
_processErrorFromDocumentUpdater(io, doc_id, error, message) {
return (() => {
const result = []
for (const client of Array.from(io.sockets.clients(doc_id))) {
logger.warn(
{ err: error, doc_id, client_id: client.id },
'error from document updater, disconnecting client'
)
client.emit('otUpdateError', error, message)
result.push(client.disconnect())
}
return result
})()
}
}

View file

@ -11,104 +11,159 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let DocumentUpdaterManager; let DocumentUpdaterManager
const request = require("request"); const request = require('request')
const _ = require("underscore"); const _ = require('underscore')
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
const settings = require("settings-sharelatex"); const settings = require('settings-sharelatex')
const metrics = require("metrics-sharelatex"); const metrics = require('metrics-sharelatex')
const rclient = require("redis-sharelatex").createClient(settings.redis.documentupdater); const rclient = require('redis-sharelatex').createClient(
const Keys = settings.redis.documentupdater.key_schema; settings.redis.documentupdater
)
const Keys = settings.redis.documentupdater.key_schema
module.exports = (DocumentUpdaterManager = { module.exports = DocumentUpdaterManager = {
getDocument(project_id, doc_id, fromVersion, callback) { getDocument(project_id, doc_id, fromVersion, callback) {
if (callback == null) { callback = function(error, exists, doclines, version) {}; } if (callback == null) {
const timer = new metrics.Timer("get-document"); callback = function (error, exists, doclines, version) {}
const url = `${settings.apis.documentupdater.url}/project/${project_id}/doc/${doc_id}?fromVersion=${fromVersion}`; }
logger.log({project_id, doc_id, fromVersion}, "getting doc from document updater"); const timer = new metrics.Timer('get-document')
return request.get(url, function(err, res, body) { const url = `${settings.apis.documentupdater.url}/project/${project_id}/doc/${doc_id}?fromVersion=${fromVersion}`
timer.done(); logger.log(
if (err != null) { { project_id, doc_id, fromVersion },
logger.error({err, url, project_id, doc_id}, "error getting doc from doc updater"); 'getting doc from document updater'
return callback(err); )
} return request.get(url, function (err, res, body) {
if (res.statusCode >= 200 && res.statusCode < 300) { timer.done()
logger.log({project_id, doc_id}, "got doc from document document updater"); if (err != null) {
try { logger.error(
body = JSON.parse(body); { err, url, project_id, doc_id },
} catch (error) { 'error getting doc from doc updater'
return callback(error); )
} return callback(err)
return callback(null, body != null ? body.lines : undefined, body != null ? body.version : undefined, body != null ? body.ranges : undefined, body != null ? body.ops : undefined); }
} else if ([404, 422].includes(res.statusCode)) { if (res.statusCode >= 200 && res.statusCode < 300) {
err = new Error("doc updater could not load requested ops"); logger.log(
err.statusCode = res.statusCode; { project_id, doc_id },
logger.warn({err, project_id, doc_id, url, fromVersion}, "doc updater could not load requested ops"); 'got doc from document document updater'
return callback(err); )
} else { try {
err = new Error(`doc updater returned a non-success status code: ${res.statusCode}`); body = JSON.parse(body)
err.statusCode = res.statusCode; } catch (error) {
logger.error({err, project_id, doc_id, url}, `doc updater returned a non-success status code: ${res.statusCode}`); return callback(error)
return callback(err); }
} return callback(
}); null,
}, body != null ? body.lines : undefined,
body != null ? body.version : undefined,
body != null ? body.ranges : undefined,
body != null ? body.ops : undefined
)
} else if ([404, 422].includes(res.statusCode)) {
err = new Error('doc updater could not load requested ops')
err.statusCode = res.statusCode
logger.warn(
{ err, project_id, doc_id, url, fromVersion },
'doc updater could not load requested ops'
)
return callback(err)
} else {
err = new Error(
`doc updater returned a non-success status code: ${res.statusCode}`
)
err.statusCode = res.statusCode
logger.error(
{ err, project_id, doc_id, url },
`doc updater returned a non-success status code: ${res.statusCode}`
)
return callback(err)
}
})
},
flushProjectToMongoAndDelete(project_id, callback) { flushProjectToMongoAndDelete(project_id, callback) {
// this method is called when the last connected user leaves the project // this method is called when the last connected user leaves the project
if (callback == null) { callback = function(){}; } if (callback == null) {
logger.log({project_id}, "deleting project from document updater"); callback = function () {}
const timer = new metrics.Timer("delete.mongo.project"); }
// flush the project in the background when all users have left logger.log({ project_id }, 'deleting project from document updater')
const url = `${settings.apis.documentupdater.url}/project/${project_id}?background=true` + const timer = new metrics.Timer('delete.mongo.project')
(settings.shutDownInProgress ? "&shutdown=true" : ""); // flush the project in the background when all users have left
return request.del(url, function(err, res, body){ const url =
timer.done(); `${settings.apis.documentupdater.url}/project/${project_id}?background=true` +
if (err != null) { (settings.shutDownInProgress ? '&shutdown=true' : '')
logger.error({err, project_id}, "error deleting project from document updater"); return request.del(url, function (err, res, body) {
return callback(err); timer.done()
} else if (res.statusCode >= 200 && res.statusCode < 300) { if (err != null) {
logger.log({project_id}, "deleted project from document updater"); logger.error(
return callback(null); { err, project_id },
} else { 'error deleting project from document updater'
err = new Error(`document updater returned a failure status code: ${res.statusCode}`); )
err.statusCode = res.statusCode; return callback(err)
logger.error({err, project_id}, `document updater returned failure status code: ${res.statusCode}`); } else if (res.statusCode >= 200 && res.statusCode < 300) {
return callback(err); logger.log({ project_id }, 'deleted project from document updater')
} return callback(null)
}); } else {
}, err = new Error(
`document updater returned a failure status code: ${res.statusCode}`
)
err.statusCode = res.statusCode
logger.error(
{ err, project_id },
`document updater returned failure status code: ${res.statusCode}`
)
return callback(err)
}
})
},
queueChange(project_id, doc_id, change, callback){ queueChange(project_id, doc_id, change, callback) {
let error; let error
if (callback == null) { callback = function(){}; } if (callback == null) {
const allowedKeys = [ 'doc', 'op', 'v', 'dupIfSource', 'meta', 'lastV', 'hash']; callback = function () {}
change = _.pick(change, allowedKeys); }
const jsonChange = JSON.stringify(change); const allowedKeys = [
if (jsonChange.indexOf("\u0000") !== -1) { 'doc',
// memory corruption check 'op',
error = new Error("null bytes found in op"); 'v',
logger.error({err: error, project_id, doc_id, jsonChange}, error.message); 'dupIfSource',
return callback(error); 'meta',
} 'lastV',
'hash'
]
change = _.pick(change, allowedKeys)
const jsonChange = JSON.stringify(change)
if (jsonChange.indexOf('\u0000') !== -1) {
// memory corruption check
error = new Error('null bytes found in op')
logger.error(
{ err: error, project_id, doc_id, jsonChange },
error.message
)
return callback(error)
}
const updateSize = jsonChange.length; const updateSize = jsonChange.length
if (updateSize > settings.maxUpdateSize) { if (updateSize > settings.maxUpdateSize) {
error = new Error("update is too large"); error = new Error('update is too large')
error.updateSize = updateSize; error.updateSize = updateSize
return callback(error); return callback(error)
} }
// record metric for each update added to queue // record metric for each update added to queue
metrics.summary('redis.pendingUpdates', updateSize, {status: 'push'}); metrics.summary('redis.pendingUpdates', updateSize, { status: 'push' })
const doc_key = `${project_id}:${doc_id}`; const doc_key = `${project_id}:${doc_id}`
// Push onto pendingUpdates for doc_id first, because once the doc updater // Push onto pendingUpdates for doc_id first, because once the doc updater
// gets an entry on pending-updates-list, it starts processing. // gets an entry on pending-updates-list, it starts processing.
return rclient.rpush(Keys.pendingUpdates({doc_id}), jsonChange, function(error) { return rclient.rpush(Keys.pendingUpdates({ doc_id }), jsonChange, function (
if (error != null) { return callback(error); } error
return rclient.rpush("pending-updates-list", doc_key, callback); ) {
}); if (error != null) {
} return callback(error)
}); }
return rclient.rpush('pending-updates-list', doc_key, callback)
})
}
}

View file

@ -9,54 +9,55 @@
* DS102: Remove unnecessary code created because of implicit returns * DS102: Remove unnecessary code created because of implicit returns
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let DrainManager; let DrainManager
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
module.exports = (DrainManager = { module.exports = DrainManager = {
startDrainTimeWindow(io, minsToDrain) {
const drainPerMin = io.sockets.clients().length / minsToDrain
return DrainManager.startDrain(io, Math.max(drainPerMin / 60, 4))
}, // enforce minimum drain rate
startDrainTimeWindow(io, minsToDrain){ startDrain(io, rate) {
const drainPerMin = io.sockets.clients().length / minsToDrain; // Clear out any old interval
return DrainManager.startDrain(io, Math.max(drainPerMin / 60, 4)); let pollingInterval
}, // enforce minimum drain rate clearInterval(this.interval)
logger.log({ rate }, 'starting drain')
if (rate === 0) {
return
} else if (rate < 1) {
// allow lower drain rates
// e.g. rate=0.1 will drain one client every 10 seconds
pollingInterval = 1000 / rate
rate = 1
} else {
pollingInterval = 1000
}
return (this.interval = setInterval(() => {
return this.reconnectNClients(io, rate)
}, pollingInterval))
},
startDrain(io, rate) { RECONNECTED_CLIENTS: {},
// Clear out any old interval reconnectNClients(io, N) {
let pollingInterval; let drainedCount = 0
clearInterval(this.interval); for (const client of Array.from(io.sockets.clients())) {
logger.log({rate}, "starting drain"); if (!this.RECONNECTED_CLIENTS[client.id]) {
if (rate === 0) { this.RECONNECTED_CLIENTS[client.id] = true
return; logger.log(
} else if (rate < 1) { { client_id: client.id },
// allow lower drain rates 'Asking client to reconnect gracefully'
// e.g. rate=0.1 will drain one client every 10 seconds )
pollingInterval = 1000 / rate; client.emit('reconnectGracefully')
rate = 1; drainedCount++
} else { }
pollingInterval = 1000; const haveDrainedNClients = drainedCount === N
} if (haveDrainedNClients) {
return this.interval = setInterval(() => { break
return this.reconnectNClients(io, rate); }
} }
, pollingInterval); if (drainedCount < N) {
}, return logger.log('All clients have been told to reconnectGracefully')
}
RECONNECTED_CLIENTS: {}, }
reconnectNClients(io, N) { }
let drainedCount = 0;
for (const client of Array.from(io.sockets.clients())) {
if (!this.RECONNECTED_CLIENTS[client.id]) {
this.RECONNECTED_CLIENTS[client.id] = true;
logger.log({client_id: client.id}, "Asking client to reconnect gracefully");
client.emit("reconnectGracefully");
drainedCount++;
}
const haveDrainedNClients = (drainedCount === N);
if (haveDrainedNClients) {
break;
}
}
if (drainedCount < N) {
return logger.log("All clients have been told to reconnectGracefully");
}
}
});

View file

@ -4,15 +4,14 @@
*/ */
// TODO: This file was created by bulk-decaffeinate. // TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint. // Fix any style issues and re-enable lint.
let Errors; let Errors
var CodedError = function(message, code) { var CodedError = function (message, code) {
const error = new Error(message); const error = new Error(message)
error.name = "CodedError"; error.name = 'CodedError'
error.code = code; error.code = code
error.__proto__ = CodedError.prototype; error.__proto__ = CodedError.prototype
return error; return error
}; }
CodedError.prototype.__proto__ = Error.prototype; CodedError.prototype.__proto__ = Error.prototype
module.exports = (Errors = module.exports = Errors = { CodedError }
{CodedError});

View file

@ -10,84 +10,91 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let EventLogger; let EventLogger
const logger = require('logger-sharelatex'); const logger = require('logger-sharelatex')
const metrics = require('metrics-sharelatex'); const metrics = require('metrics-sharelatex')
const settings = require('settings-sharelatex'); const settings = require('settings-sharelatex')
// keep track of message counters to detect duplicate and out of order events // keep track of message counters to detect duplicate and out of order events
// messsage ids have the format "UNIQUEHOSTKEY-COUNTER" // messsage ids have the format "UNIQUEHOSTKEY-COUNTER"
const EVENT_LOG_COUNTER = {}; const EVENT_LOG_COUNTER = {}
const EVENT_LOG_TIMESTAMP = {}; const EVENT_LOG_TIMESTAMP = {}
let EVENT_LAST_CLEAN_TIMESTAMP = 0; let EVENT_LAST_CLEAN_TIMESTAMP = 0
// counter for debug logs // counter for debug logs
let COUNTER = 0; let COUNTER = 0
module.exports = (EventLogger = { module.exports = EventLogger = {
MAX_STALE_TIME_IN_MS: 3600 * 1000,
MAX_STALE_TIME_IN_MS: 3600 * 1000, debugEvent(channel, message) {
if (settings.debugEvents > 0) {
logger.log({ channel, message, counter: COUNTER++ }, 'logging event')
return settings.debugEvents--
}
},
debugEvent(channel, message) { checkEventOrder(channel, message_id, message) {
if (settings.debugEvents > 0) { let result
logger.log({channel, message, counter: COUNTER++}, "logging event"); if (typeof message_id !== 'string') {
return settings.debugEvents--; return
} }
}, if (!(result = message_id.match(/^(.*)-(\d+)$/))) {
return
}
const key = result[1]
const count = parseInt(result[2], 0)
if (!(count >= 0)) {
// ignore checks if counter is not present
return
}
// store the last count in a hash for each host
const previous = EventLogger._storeEventCount(key, count)
if (previous == null || count === previous + 1) {
metrics.inc(`event.${channel}.valid`, 0.001) // downsample high rate docupdater events
return // order is ok
}
if (count === previous) {
metrics.inc(`event.${channel}.duplicate`)
logger.warn({ channel, message_id }, 'duplicate event')
return 'duplicate'
} else {
metrics.inc(`event.${channel}.out-of-order`)
logger.warn(
{ channel, message_id, key, previous, count },
'out of order event'
)
return 'out-of-order'
}
},
checkEventOrder(channel, message_id, message) { _storeEventCount(key, count) {
let result; const previous = EVENT_LOG_COUNTER[key]
if (typeof(message_id) !== 'string') { return; } const now = Date.now()
if (!(result = message_id.match(/^(.*)-(\d+)$/))) { return; } EVENT_LOG_COUNTER[key] = count
const key = result[1]; EVENT_LOG_TIMESTAMP[key] = now
const count = parseInt(result[2], 0); // periodically remove old counts
if (!(count >= 0)) {// ignore checks if counter is not present if (now - EVENT_LAST_CLEAN_TIMESTAMP > EventLogger.MAX_STALE_TIME_IN_MS) {
return; EventLogger._cleanEventStream(now)
} EVENT_LAST_CLEAN_TIMESTAMP = now
// store the last count in a hash for each host }
const previous = EventLogger._storeEventCount(key, count); return previous
if ((previous == null) || (count === (previous + 1))) { },
metrics.inc(`event.${channel}.valid`, 0.001); // downsample high rate docupdater events
return; // order is ok
}
if (count === previous) {
metrics.inc(`event.${channel}.duplicate`);
logger.warn({channel, message_id}, "duplicate event");
return "duplicate";
} else {
metrics.inc(`event.${channel}.out-of-order`);
logger.warn({channel, message_id, key, previous, count}, "out of order event");
return "out-of-order";
}
},
_storeEventCount(key, count) { _cleanEventStream(now) {
const previous = EVENT_LOG_COUNTER[key]; return (() => {
const now = Date.now(); const result = []
EVENT_LOG_COUNTER[key] = count; for (const key in EVENT_LOG_TIMESTAMP) {
EVENT_LOG_TIMESTAMP[key] = now; const timestamp = EVENT_LOG_TIMESTAMP[key]
// periodically remove old counts if (now - timestamp > EventLogger.MAX_STALE_TIME_IN_MS) {
if ((now - EVENT_LAST_CLEAN_TIMESTAMP) > EventLogger.MAX_STALE_TIME_IN_MS) { delete EVENT_LOG_COUNTER[key]
EventLogger._cleanEventStream(now); result.push(delete EVENT_LOG_TIMESTAMP[key])
EVENT_LAST_CLEAN_TIMESTAMP = now; } else {
} result.push(undefined)
return previous; }
}, }
return result
_cleanEventStream(now) { })()
return (() => { }
const result = []; }
for (const key in EVENT_LOG_TIMESTAMP) {
const timestamp = EVENT_LOG_TIMESTAMP[key];
if ((now - timestamp) > EventLogger.MAX_STALE_TIME_IN_MS) {
delete EVENT_LOG_COUNTER[key];
result.push(delete EVENT_LOG_TIMESTAMP[key]);
} else {
result.push(undefined);
}
}
return result;
})();
}
});

View file

@ -10,76 +10,84 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let HealthCheckManager; let HealthCheckManager
const metrics = require("metrics-sharelatex"); const metrics = require('metrics-sharelatex')
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
const os = require("os"); const os = require('os')
const HOST = os.hostname(); const HOST = os.hostname()
const PID = process.pid; const PID = process.pid
let COUNT = 0; let COUNT = 0
const CHANNEL_MANAGER = {}; // hash of event checkers by channel name const CHANNEL_MANAGER = {} // hash of event checkers by channel name
const CHANNEL_ERROR = {}; // error status by channel name const CHANNEL_ERROR = {} // error status by channel name
module.exports = (HealthCheckManager = class HealthCheckManager { module.exports = HealthCheckManager = class HealthCheckManager {
// create an instance of this class which checks that an event with a unique // create an instance of this class which checks that an event with a unique
// id is received only once within a timeout // id is received only once within a timeout
constructor(channel, timeout) { constructor(channel, timeout) {
// unique event string // unique event string
this.channel = channel; this.channel = channel
if (timeout == null) { timeout = 1000; } if (timeout == null) {
this.id = `host=${HOST}:pid=${PID}:count=${COUNT++}`; timeout = 1000
// count of number of times the event is received
this.count = 0;
// after a timeout check the status of the count
this.handler = setTimeout(() => {
return this.setStatus();
}
, timeout);
// use a timer to record the latency of the channel
this.timer = new metrics.Timer(`event.${this.channel}.latency`);
// keep a record of these objects to dispatch on
CHANNEL_MANAGER[this.channel] = this;
} }
this.id = `host=${HOST}:pid=${PID}:count=${COUNT++}`
// count of number of times the event is received
this.count = 0
// after a timeout check the status of the count
this.handler = setTimeout(() => {
return this.setStatus()
}, timeout)
// use a timer to record the latency of the channel
this.timer = new metrics.Timer(`event.${this.channel}.latency`)
// keep a record of these objects to dispatch on
CHANNEL_MANAGER[this.channel] = this
}
processEvent(id) { processEvent(id) {
// if this is our event record it // if this is our event record it
if (id === this.id) { if (id === this.id) {
this.count++; this.count++
if (this.timer != null) { if (this.timer != null) {
this.timer.done(); this.timer.done()
} }
return this.timer = null; // only time the latency of the first event return (this.timer = null) // only time the latency of the first event
}
} }
}
setStatus() { setStatus() {
// if we saw the event anything other than a single time that is an error // if we saw the event anything other than a single time that is an error
if (this.count !== 1) { if (this.count !== 1) {
logger.err({channel:this.channel, count:this.count, id:this.id}, "redis channel health check error"); logger.err(
} { channel: this.channel, count: this.count, id: this.id },
const error = (this.count !== 1); 'redis channel health check error'
return CHANNEL_ERROR[this.channel] = error; )
} }
const error = this.count !== 1
return (CHANNEL_ERROR[this.channel] = error)
}
// class methods // class methods
static check(channel, id) { static check(channel, id) {
// dispatch event to manager for channel // dispatch event to manager for channel
return (CHANNEL_MANAGER[channel] != null ? CHANNEL_MANAGER[channel].processEvent(id) : undefined); return CHANNEL_MANAGER[channel] != null
} ? CHANNEL_MANAGER[channel].processEvent(id)
: undefined
}
static status() { static status() {
// return status of all channels for logging // return status of all channels for logging
return CHANNEL_ERROR; return CHANNEL_ERROR
} }
static isFailing() { static isFailing() {
// check if any channel status is bad // check if any channel status is bad
for (const channel in CHANNEL_ERROR) { for (const channel in CHANNEL_ERROR) {
const error = CHANNEL_ERROR[channel]; const error = CHANNEL_ERROR[channel]
if (error === true) { return true; } if (error === true) {
} return true
return false; }
} }
}); return false
}
}

View file

@ -10,47 +10,53 @@
* DS102: Remove unnecessary code created because of implicit returns * DS102: Remove unnecessary code created because of implicit returns
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let HttpApiController; let HttpApiController
const WebsocketLoadBalancer = require("./WebsocketLoadBalancer"); const WebsocketLoadBalancer = require('./WebsocketLoadBalancer')
const DrainManager = require("./DrainManager"); const DrainManager = require('./DrainManager')
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
module.exports = (HttpApiController = { module.exports = HttpApiController = {
sendMessage(req, res, next) { sendMessage(req, res, next) {
logger.log({message: req.params.message}, "sending message"); logger.log({ message: req.params.message }, 'sending message')
if (Array.isArray(req.body)) { if (Array.isArray(req.body)) {
for (const payload of Array.from(req.body)) { for (const payload of Array.from(req.body)) {
WebsocketLoadBalancer.emitToRoom(req.params.project_id, req.params.message, payload); WebsocketLoadBalancer.emitToRoom(
} req.params.project_id,
} else { req.params.message,
WebsocketLoadBalancer.emitToRoom(req.params.project_id, req.params.message, req.body); payload
} )
return res.send(204); }
}, // No content } else {
WebsocketLoadBalancer.emitToRoom(
startDrain(req, res, next) { req.params.project_id,
const io = req.app.get("io"); req.params.message,
let rate = req.query.rate || "4"; req.body
rate = parseFloat(rate) || 0; )
logger.log({rate}, "setting client drain rate"); }
DrainManager.startDrain(io, rate); return res.send(204)
return res.send(204); }, // No content
},
disconnectClient(req, res, next) { startDrain(req, res, next) {
const io = req.app.get("io"); const io = req.app.get('io')
const { let rate = req.query.rate || '4'
client_id rate = parseFloat(rate) || 0
} = req.params; logger.log({ rate }, 'setting client drain rate')
const client = io.sockets.sockets[client_id]; DrainManager.startDrain(io, rate)
return res.send(204)
},
if (!client) { disconnectClient(req, res, next) {
logger.info({client_id}, "api: client already disconnected"); const io = req.app.get('io')
res.sendStatus(404); const { client_id } = req.params
return; const client = io.sockets.sockets[client_id]
}
logger.warn({client_id}, "api: requesting client disconnect"); if (!client) {
client.on("disconnect", () => res.sendStatus(204)); logger.info({ client_id }, 'api: client already disconnected')
return client.disconnect(); res.sendStatus(404)
} return
}); }
logger.warn({ client_id }, 'api: requesting client disconnect')
client.on('disconnect', () => res.sendStatus(204))
return client.disconnect()
}
}

View file

@ -10,50 +10,78 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let HttpController; let HttpController
const async = require("async"); const async = require('async')
module.exports = (HttpController = { module.exports = HttpController = {
// The code in this controller is hard to unit test because of a lot of // The code in this controller is hard to unit test because of a lot of
// dependencies on internal socket.io methods. It is not critical to the running // dependencies on internal socket.io methods. It is not critical to the running
// of ShareLaTeX, and is only used for getting stats about connected clients, // of ShareLaTeX, and is only used for getting stats about connected clients,
// and for checking internal state in acceptance tests. The acceptances tests // and for checking internal state in acceptance tests. The acceptances tests
// should provide appropriate coverage. // should provide appropriate coverage.
_getConnectedClientView(ioClient, callback) { _getConnectedClientView(ioClient, callback) {
if (callback == null) { callback = function(error, client) {}; } if (callback == null) {
const client_id = ioClient.id; callback = function (error, client) {}
const {project_id, user_id, first_name, last_name, email, connected_time} = ioClient.ol_context; }
const client = {client_id, project_id, user_id, first_name, last_name, email, connected_time}; const client_id = ioClient.id
client.rooms = []; const {
for (const name in ioClient.manager.roomClients[client_id]) { project_id,
const joined = ioClient.manager.roomClients[client_id][name]; user_id,
if (joined && (name !== "")) { first_name,
client.rooms.push(name.replace(/^\//, "")); // Remove leading / last_name,
} email,
} connected_time
return callback(null, client); } = ioClient.ol_context
}, const client = {
client_id,
project_id,
user_id,
first_name,
last_name,
email,
connected_time
}
client.rooms = []
for (const name in ioClient.manager.roomClients[client_id]) {
const joined = ioClient.manager.roomClients[client_id][name]
if (joined && name !== '') {
client.rooms.push(name.replace(/^\//, '')) // Remove leading /
}
}
return callback(null, client)
},
getConnectedClients(req, res, next) { getConnectedClients(req, res, next) {
const io = req.app.get("io"); const io = req.app.get('io')
const ioClients = io.sockets.clients(); const ioClients = io.sockets.clients()
return async.map(ioClients, HttpController._getConnectedClientView, function(error, clients) { return async.map(
if (error != null) { return next(error); } ioClients,
return res.json(clients); HttpController._getConnectedClientView,
}); function (error, clients) {
}, if (error != null) {
return next(error)
getConnectedClient(req, res, next) { }
const {client_id} = req.params; return res.json(clients)
const io = req.app.get("io"); }
const ioClient = io.sockets.sockets[client_id]; )
if (!ioClient) { },
res.sendStatus(404);
return; getConnectedClient(req, res, next) {
} const { client_id } = req.params
return HttpController._getConnectedClientView(ioClient, function(error, client) { const io = req.app.get('io')
if (error != null) { return next(error); } const ioClient = io.sockets.sockets[client_id]
return res.json(client); if (!ioClient) {
}); res.sendStatus(404)
} return
}); }
return HttpController._getConnectedClientView(ioClient, function (
error,
client
) {
if (error != null) {
return next(error)
}
return res.json(client)
})
}
}

View file

@ -10,31 +10,31 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let RedisClientManager; let RedisClientManager
const redis = require("redis-sharelatex"); const redis = require('redis-sharelatex')
const logger = require('logger-sharelatex'); const logger = require('logger-sharelatex')
module.exports = (RedisClientManager = { module.exports = RedisClientManager = {
createClientList(...configs) { createClientList(...configs) {
// create a dynamic list of redis clients, excluding any configurations which are not defined // create a dynamic list of redis clients, excluding any configurations which are not defined
const clientList = (() => { const clientList = (() => {
const result = []; const result = []
for (const x of Array.from(configs)) { for (const x of Array.from(configs)) {
if (x != null) { if (x != null) {
const redisType = (x.cluster != null) ? const redisType =
"cluster" x.cluster != null
: (x.sentinels != null) ? ? 'cluster'
"sentinel" : x.sentinels != null
: (x.host != null) ? ? 'sentinel'
"single" : x.host != null
: ? 'single'
"unknown"; : 'unknown'
logger.log({redis: redisType}, "creating redis client"); logger.log({ redis: redisType }, 'creating redis client')
result.push(redis.createClient(x)); result.push(redis.createClient(x))
} }
} }
return result; return result
})(); })()
return clientList; return clientList
} }
}); }

View file

@ -13,13 +13,13 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let RoomManager; let RoomManager
const logger = require('logger-sharelatex'); const logger = require('logger-sharelatex')
const metrics = require("metrics-sharelatex"); const metrics = require('metrics-sharelatex')
const {EventEmitter} = require('events'); const { EventEmitter } = require('events')
const IdMap = new Map(); // keep track of whether ids are from projects or docs const IdMap = new Map() // keep track of whether ids are from projects or docs
const RoomEvents = new EventEmitter(); // emits {project,doc}-active and {project,doc}-empty events const RoomEvents = new EventEmitter() // emits {project,doc}-active and {project,doc}-empty events
// Manage socket.io rooms for individual projects and docs // Manage socket.io rooms for individual projects and docs
// //
@ -31,130 +31,159 @@ const RoomEvents = new EventEmitter(); // emits {project,doc}-active and {projec
// //
// The pubsub side is handled by ChannelManager // The pubsub side is handled by ChannelManager
module.exports = (RoomManager = { module.exports = RoomManager = {
joinProject(client, project_id, callback) {
joinProject(client, project_id, callback) { if (callback == null) {
if (callback == null) { callback = function() {}; } callback = function () {}
return this.joinEntity(client, "project", project_id, callback);
},
joinDoc(client, doc_id, callback) {
if (callback == null) { callback = function() {}; }
return this.joinEntity(client, "doc", doc_id, callback);
},
leaveDoc(client, doc_id) {
return this.leaveEntity(client, "doc", doc_id);
},
leaveProjectAndDocs(client) {
// what rooms is this client in? we need to leave them all. socket.io
// will cause us to leave the rooms, so we only need to manage our
// channel subscriptions... but it will be safer if we leave them
// explicitly, and then socket.io will just regard this as a client that
// has not joined any rooms and do a final disconnection.
const roomsToLeave = this._roomsClientIsIn(client);
logger.log({client: client.id, roomsToLeave}, "client leaving project");
return (() => {
const result = [];
for (const id of Array.from(roomsToLeave)) {
const entity = IdMap.get(id);
result.push(this.leaveEntity(client, entity, id));
}
return result;
})();
},
emitOnCompletion(promiseList, eventName) {
return Promise.all(promiseList)
.then(() => RoomEvents.emit(eventName))
.catch(err => RoomEvents.emit(eventName, err));
},
eventSource() {
return RoomEvents;
},
joinEntity(client, entity, id, callback) {
const beforeCount = this._clientsInRoom(client, id);
// client joins room immediately but joinDoc request does not complete
// until room is subscribed
client.join(id);
// is this a new room? if so, subscribe
if (beforeCount === 0) {
logger.log({entity, id}, "room is now active");
RoomEvents.once(`${entity}-subscribed-${id}`, function(err) {
// only allow the client to join when all the relevant channels have subscribed
logger.log({client: client.id, entity, id, beforeCount}, "client joined new room and subscribed to channel");
return callback(err);
});
RoomEvents.emit(`${entity}-active`, id);
IdMap.set(id, entity);
// keep track of the number of listeners
return metrics.gauge("room-listeners", RoomEvents.eventNames().length);
} else {
logger.log({client: client.id, entity, id, beforeCount}, "client joined existing room");
client.join(id);
return callback();
}
},
leaveEntity(client, entity, id) {
// Ignore any requests to leave when the client is not actually in the
// room. This can happen if the client sends spurious leaveDoc requests
// for old docs after a reconnection.
// This can now happen all the time, as we skip the join for clients that
// disconnect before joinProject/joinDoc completed.
if (!this._clientAlreadyInRoom(client, id)) {
logger.log({client: client.id, entity, id}, "ignoring request from client to leave room it is not in");
return;
}
client.leave(id);
const afterCount = this._clientsInRoom(client, id);
logger.log({client: client.id, entity, id, afterCount}, "client left room");
// is the room now empty? if so, unsubscribe
if ((entity == null)) {
logger.error({entity: id}, "unknown entity when leaving with id");
return;
}
if (afterCount === 0) {
logger.log({entity, id}, "room is now empty");
RoomEvents.emit(`${entity}-empty`, id);
IdMap.delete(id);
return metrics.gauge("room-listeners", RoomEvents.eventNames().length);
}
},
// internal functions below, these access socket.io rooms data directly and
// will need updating for socket.io v2
_clientsInRoom(client, room) {
const nsp = client.namespace.name;
const name = (nsp + '/') + room;
return (__guard__(client.manager != null ? client.manager.rooms : undefined, x => x[name]) || []).length;
},
_roomsClientIsIn(client) {
const roomList = (() => {
const result = [];
for (const fullRoomPath in (client.manager.roomClients != null ? client.manager.roomClients[client.id] : undefined)) {
// strip socket.io prefix from room to get original id
if (fullRoomPath !== '') {
const [prefix, room] = Array.from(fullRoomPath.split('/', 2));
result.push(room);
}
}
return result;
})();
return roomList;
},
_clientAlreadyInRoom(client, room) {
const nsp = client.namespace.name;
const name = (nsp + '/') + room;
return __guard__(client.manager.roomClients != null ? client.manager.roomClients[client.id] : undefined, x => x[name]);
} }
}); return this.joinEntity(client, 'project', project_id, callback)
},
joinDoc(client, doc_id, callback) {
if (callback == null) {
callback = function () {}
}
return this.joinEntity(client, 'doc', doc_id, callback)
},
leaveDoc(client, doc_id) {
return this.leaveEntity(client, 'doc', doc_id)
},
leaveProjectAndDocs(client) {
// what rooms is this client in? we need to leave them all. socket.io
// will cause us to leave the rooms, so we only need to manage our
// channel subscriptions... but it will be safer if we leave them
// explicitly, and then socket.io will just regard this as a client that
// has not joined any rooms and do a final disconnection.
const roomsToLeave = this._roomsClientIsIn(client)
logger.log({ client: client.id, roomsToLeave }, 'client leaving project')
return (() => {
const result = []
for (const id of Array.from(roomsToLeave)) {
const entity = IdMap.get(id)
result.push(this.leaveEntity(client, entity, id))
}
return result
})()
},
emitOnCompletion(promiseList, eventName) {
return Promise.all(promiseList)
.then(() => RoomEvents.emit(eventName))
.catch((err) => RoomEvents.emit(eventName, err))
},
eventSource() {
return RoomEvents
},
joinEntity(client, entity, id, callback) {
const beforeCount = this._clientsInRoom(client, id)
// client joins room immediately but joinDoc request does not complete
// until room is subscribed
client.join(id)
// is this a new room? if so, subscribe
if (beforeCount === 0) {
logger.log({ entity, id }, 'room is now active')
RoomEvents.once(`${entity}-subscribed-${id}`, function (err) {
// only allow the client to join when all the relevant channels have subscribed
logger.log(
{ client: client.id, entity, id, beforeCount },
'client joined new room and subscribed to channel'
)
return callback(err)
})
RoomEvents.emit(`${entity}-active`, id)
IdMap.set(id, entity)
// keep track of the number of listeners
return metrics.gauge('room-listeners', RoomEvents.eventNames().length)
} else {
logger.log(
{ client: client.id, entity, id, beforeCount },
'client joined existing room'
)
client.join(id)
return callback()
}
},
leaveEntity(client, entity, id) {
// Ignore any requests to leave when the client is not actually in the
// room. This can happen if the client sends spurious leaveDoc requests
// for old docs after a reconnection.
// This can now happen all the time, as we skip the join for clients that
// disconnect before joinProject/joinDoc completed.
if (!this._clientAlreadyInRoom(client, id)) {
logger.log(
{ client: client.id, entity, id },
'ignoring request from client to leave room it is not in'
)
return
}
client.leave(id)
const afterCount = this._clientsInRoom(client, id)
logger.log(
{ client: client.id, entity, id, afterCount },
'client left room'
)
// is the room now empty? if so, unsubscribe
if (entity == null) {
logger.error({ entity: id }, 'unknown entity when leaving with id')
return
}
if (afterCount === 0) {
logger.log({ entity, id }, 'room is now empty')
RoomEvents.emit(`${entity}-empty`, id)
IdMap.delete(id)
return metrics.gauge('room-listeners', RoomEvents.eventNames().length)
}
},
// internal functions below, these access socket.io rooms data directly and
// will need updating for socket.io v2
_clientsInRoom(client, room) {
const nsp = client.namespace.name
const name = nsp + '/' + room
return (
__guard__(
client.manager != null ? client.manager.rooms : undefined,
(x) => x[name]
) || []
).length
},
_roomsClientIsIn(client) {
const roomList = (() => {
const result = []
for (const fullRoomPath in client.manager.roomClients != null
? client.manager.roomClients[client.id]
: undefined) {
// strip socket.io prefix from room to get original id
if (fullRoomPath !== '') {
const [prefix, room] = Array.from(fullRoomPath.split('/', 2))
result.push(room)
}
}
return result
})()
return roomList
},
_clientAlreadyInRoom(client, room) {
const nsp = client.namespace.name
const name = nsp + '/' + room
return __guard__(
client.manager.roomClients != null
? client.manager.roomClients[client.id]
: undefined,
(x) => x[name]
)
}
}
function __guard__(value, transform) { function __guard__(value, transform) {
return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined; return typeof value !== 'undefined' && value !== null
} ? transform(value)
: undefined
}

View file

@ -13,259 +13,390 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let Router; let Router
const metrics = require("metrics-sharelatex"); const metrics = require('metrics-sharelatex')
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
const settings = require("settings-sharelatex"); const settings = require('settings-sharelatex')
const WebsocketController = require("./WebsocketController"); const WebsocketController = require('./WebsocketController')
const HttpController = require("./HttpController"); const HttpController = require('./HttpController')
const HttpApiController = require("./HttpApiController"); const HttpApiController = require('./HttpApiController')
const bodyParser = require("body-parser"); const bodyParser = require('body-parser')
const base64id = require("base64id"); const base64id = require('base64id')
const basicAuth = require('basic-auth-connect'); const basicAuth = require('basic-auth-connect')
const httpAuth = basicAuth(function(user, pass){ const httpAuth = basicAuth(function (user, pass) {
const isValid = (user === settings.internal.realTime.user) && (pass === settings.internal.realTime.pass); const isValid =
if (!isValid) { user === settings.internal.realTime.user &&
logger.err({user, pass}, "invalid login details"); pass === settings.internal.realTime.pass
} if (!isValid) {
return isValid; logger.err({ user, pass }, 'invalid login details')
}); }
return isValid
})
module.exports = (Router = { module.exports = Router = {
_handleError(callback, error, client, method, attrs) { _handleError(callback, error, client, method, attrs) {
if (callback == null) { callback = function(error) {}; } if (callback == null) {
if (attrs == null) { attrs = {}; } callback = function (error) {}
for (const key of ["project_id", "doc_id", "user_id"]) { }
attrs[key] = client.ol_context[key]; if (attrs == null) {
} attrs = {}
attrs.client_id = client.id; }
attrs.err = error; for (const key of ['project_id', 'doc_id', 'user_id']) {
if (error.name === "CodedError") { attrs[key] = client.ol_context[key]
logger.warn(attrs, error.message, {code: error.code}); }
return callback({message: error.message, code: error.code}); attrs.client_id = client.id
} attrs.err = error
if (error.message === 'unexpected arguments') { if (error.name === 'CodedError') {
// the payload might be very large, put it on level info logger.warn(attrs, error.message, { code: error.code })
logger.log(attrs, 'unexpected arguments'); return callback({ message: error.message, code: error.code })
metrics.inc('unexpected-arguments', 1, { status: method }); }
return callback({ message: error.message }); if (error.message === 'unexpected arguments') {
} // the payload might be very large, put it on level info
if (["not authorized", "doc updater could not load requested ops", "no project_id found on client"].includes(error.message)) { logger.log(attrs, 'unexpected arguments')
logger.warn(attrs, error.message); metrics.inc('unexpected-arguments', 1, { status: method })
return callback({message: error.message}); return callback({ message: error.message })
} else { }
logger.error(attrs, `server side error in ${method}`); if (
// Don't return raw error to prevent leaking server side info [
return callback({message: "Something went wrong in real-time service"}); 'not authorized',
} 'doc updater could not load requested ops',
}, 'no project_id found on client'
].includes(error.message)
) {
logger.warn(attrs, error.message)
return callback({ message: error.message })
} else {
logger.error(attrs, `server side error in ${method}`)
// Don't return raw error to prevent leaking server side info
return callback({ message: 'Something went wrong in real-time service' })
}
},
_handleInvalidArguments(client, method, args) { _handleInvalidArguments(client, method, args) {
const error = new Error("unexpected arguments"); const error = new Error('unexpected arguments')
let callback = args[args.length - 1]; let callback = args[args.length - 1]
if (typeof callback !== 'function') { if (typeof callback !== 'function') {
callback = (function() {}); callback = function () {}
} }
const attrs = {arguments: args}; const attrs = { arguments: args }
return Router._handleError(callback, error, client, method, attrs); return Router._handleError(callback, error, client, method, attrs)
}, },
configure(app, io, session) { configure(app, io, session) {
app.set("io", io); app.set('io', io)
app.get("/clients", HttpController.getConnectedClients); app.get('/clients', HttpController.getConnectedClients)
app.get("/clients/:client_id", HttpController.getConnectedClient); app.get('/clients/:client_id', HttpController.getConnectedClient)
app.post("/project/:project_id/message/:message", httpAuth, bodyParser.json({limit: "5mb"}), HttpApiController.sendMessage); app.post(
'/project/:project_id/message/:message',
app.post("/drain", httpAuth, HttpApiController.startDrain); httpAuth,
app.post("/client/:client_id/disconnect", httpAuth, HttpApiController.disconnectClient); bodyParser.json({ limit: '5mb' }),
HttpApiController.sendMessage
)
return session.on('connection', function(error, client, session) { app.post('/drain', httpAuth, HttpApiController.startDrain)
// init client context, we may access it in Router._handleError before app.post(
// setting any values '/client/:client_id/disconnect',
let user; httpAuth,
client.ol_context = {}; HttpApiController.disconnectClient
)
if (client != null) { return session.on('connection', function (error, client, session) {
client.on("error", function(err) { // init client context, we may access it in Router._handleError before
logger.err({ clientErr: err }, "socket.io client error"); // setting any values
if (client.connected) { let user
client.emit("reconnectGracefully"); client.ol_context = {}
return client.disconnect();
}
});
}
if (settings.shutDownInProgress) { if (client != null) {
client.emit("connectionRejected", {message: "retry"}); client.on('error', function (err) {
client.disconnect(); logger.err({ clientErr: err }, 'socket.io client error')
return; if (client.connected) {
} client.emit('reconnectGracefully')
return client.disconnect()
}
})
}
if ((client != null) && __guard__(error != null ? error.message : undefined, x => x.match(/could not look up session by key/))) { if (settings.shutDownInProgress) {
logger.warn({err: error, client: (client != null), session: (session != null)}, "invalid session"); client.emit('connectionRejected', { message: 'retry' })
// tell the client to reauthenticate if it has an invalid session key client.disconnect()
client.emit("connectionRejected", {message: "invalid session"}); return
client.disconnect(); }
return;
}
if (error != null) { if (
logger.err({err: error, client: (client != null), session: (session != null)}, "error when client connected"); client != null &&
if (client != null) { __guard__(error != null ? error.message : undefined, (x) =>
client.emit("connectionRejected", {message: "error"}); x.match(/could not look up session by key/)
} )
if (client != null) { ) {
client.disconnect(); logger.warn(
} { err: error, client: client != null, session: session != null },
return; 'invalid session'
} )
// tell the client to reauthenticate if it has an invalid session key
client.emit('connectionRejected', { message: 'invalid session' })
client.disconnect()
return
}
// send positive confirmation that the client has a valid connection if (error != null) {
client.publicId = 'P.' + base64id.generateId(); logger.err(
client.emit("connectionAccepted", null, client.publicId); { err: error, client: client != null, session: session != null },
'error when client connected'
)
if (client != null) {
client.emit('connectionRejected', { message: 'error' })
}
if (client != null) {
client.disconnect()
}
return
}
metrics.inc('socket-io.connection'); // send positive confirmation that the client has a valid connection
metrics.gauge('socket-io.clients', __guard__(io.sockets.clients(), x1 => x1.length)); client.publicId = 'P.' + base64id.generateId()
client.emit('connectionAccepted', null, client.publicId)
logger.log({session, client_id: client.id}, "client connected"); metrics.inc('socket-io.connection')
metrics.gauge(
'socket-io.clients',
__guard__(io.sockets.clients(), (x1) => x1.length)
)
if (__guard__(session != null ? session.passport : undefined, x2 => x2.user) != null) { logger.log({ session, client_id: client.id }, 'client connected')
({
user
} = session.passport);
} else if ((session != null ? session.user : undefined) != null) {
({
user
} = session);
} else {
user = {_id: "anonymous-user"};
}
client.on("joinProject", function(data, callback) { if (
if (data == null) { data = {}; } __guard__(
if (typeof callback !== 'function') { session != null ? session.passport : undefined,
return Router._handleInvalidArguments(client, 'joinProject', arguments); (x2) => x2.user
} ) != null
) {
;({ user } = session.passport)
} else if ((session != null ? session.user : undefined) != null) {
;({ user } = session)
} else {
user = { _id: 'anonymous-user' }
}
if (data.anonymousAccessToken) { client.on('joinProject', function (data, callback) {
user.anonymousAccessToken = data.anonymousAccessToken; if (data == null) {
} data = {}
return WebsocketController.joinProject(client, user, data.project_id, function(err, ...args) { }
if (err != null) { if (typeof callback !== 'function') {
return Router._handleError(callback, err, client, "joinProject", {project_id: data.project_id, user_id: (user != null ? user.id : undefined)}); return Router._handleInvalidArguments(
} else { client,
return callback(null, ...Array.from(args)); 'joinProject',
} arguments
}); )
}); }
client.on("disconnect", function() { if (data.anonymousAccessToken) {
metrics.inc('socket-io.disconnect'); user.anonymousAccessToken = data.anonymousAccessToken
metrics.gauge('socket-io.clients', __guard__(io.sockets.clients(), x3 => x3.length) - 1); }
return WebsocketController.joinProject(
client,
user,
data.project_id,
function (err, ...args) {
if (err != null) {
return Router._handleError(callback, err, client, 'joinProject', {
project_id: data.project_id,
user_id: user != null ? user.id : undefined
})
} else {
return callback(null, ...Array.from(args))
}
}
)
})
return WebsocketController.leaveProject(io, client, function(err) { client.on('disconnect', function () {
if (err != null) { metrics.inc('socket-io.disconnect')
return Router._handleError((function() {}), err, client, "leaveProject"); metrics.gauge(
} 'socket-io.clients',
}); __guard__(io.sockets.clients(), (x3) => x3.length) - 1
}); )
// Variadic. The possible arguments: return WebsocketController.leaveProject(io, client, function (err) {
// doc_id, callback if (err != null) {
// doc_id, fromVersion, callback return Router._handleError(
// doc_id, options, callback function () {},
// doc_id, fromVersion, options, callback err,
client.on("joinDoc", function(doc_id, fromVersion, options, callback) { client,
if ((typeof fromVersion === "function") && !options) { 'leaveProject'
callback = fromVersion; )
fromVersion = -1; }
options = {}; })
} else if ((typeof fromVersion === "number") && (typeof options === "function")) { })
callback = options;
options = {};
} else if ((typeof fromVersion === "object") && (typeof options === "function")) {
callback = options;
options = fromVersion;
fromVersion = -1;
} else if ((typeof fromVersion === "number") && (typeof options === "object") && (typeof callback === 'function')) {
// Called with 4 args, things are as expected
} else {
return Router._handleInvalidArguments(client, 'joinDoc', arguments);
}
return WebsocketController.joinDoc(client, doc_id, fromVersion, options, function(err, ...args) { // Variadic. The possible arguments:
if (err != null) { // doc_id, callback
return Router._handleError(callback, err, client, "joinDoc", {doc_id, fromVersion}); // doc_id, fromVersion, callback
} else { // doc_id, options, callback
return callback(null, ...Array.from(args)); // doc_id, fromVersion, options, callback
} client.on('joinDoc', function (doc_id, fromVersion, options, callback) {
}); if (typeof fromVersion === 'function' && !options) {
}); callback = fromVersion
fromVersion = -1
options = {}
} else if (
typeof fromVersion === 'number' &&
typeof options === 'function'
) {
callback = options
options = {}
} else if (
typeof fromVersion === 'object' &&
typeof options === 'function'
) {
callback = options
options = fromVersion
fromVersion = -1
} else if (
typeof fromVersion === 'number' &&
typeof options === 'object' &&
typeof callback === 'function'
) {
// Called with 4 args, things are as expected
} else {
return Router._handleInvalidArguments(client, 'joinDoc', arguments)
}
client.on("leaveDoc", function(doc_id, callback) { return WebsocketController.joinDoc(
if (typeof callback !== 'function') { client,
return Router._handleInvalidArguments(client, 'leaveDoc', arguments); doc_id,
} fromVersion,
options,
function (err, ...args) {
if (err != null) {
return Router._handleError(callback, err, client, 'joinDoc', {
doc_id,
fromVersion
})
} else {
return callback(null, ...Array.from(args))
}
}
)
})
return WebsocketController.leaveDoc(client, doc_id, function(err, ...args) { client.on('leaveDoc', function (doc_id, callback) {
if (err != null) { if (typeof callback !== 'function') {
return Router._handleError(callback, err, client, "leaveDoc"); return Router._handleInvalidArguments(client, 'leaveDoc', arguments)
} else { }
return callback(null, ...Array.from(args));
}
});
});
client.on("clientTracking.getConnectedUsers", function(callback) { return WebsocketController.leaveDoc(client, doc_id, function (
if (callback == null) { callback = function(error, users) {}; } err,
if (typeof callback !== 'function') { ...args
return Router._handleInvalidArguments(client, 'clientTracking.getConnectedUsers', arguments); ) {
} if (err != null) {
return Router._handleError(callback, err, client, 'leaveDoc')
} else {
return callback(null, ...Array.from(args))
}
})
})
return WebsocketController.getConnectedUsers(client, function(err, users) { client.on('clientTracking.getConnectedUsers', function (callback) {
if (err != null) { if (callback == null) {
return Router._handleError(callback, err, client, "clientTracking.getConnectedUsers"); callback = function (error, users) {}
} else { }
return callback(null, users); if (typeof callback !== 'function') {
} return Router._handleInvalidArguments(
}); client,
}); 'clientTracking.getConnectedUsers',
arguments
)
}
client.on("clientTracking.updatePosition", function(cursorData, callback) { return WebsocketController.getConnectedUsers(client, function (
if (callback == null) { callback = function(error) {}; } err,
if (typeof callback !== 'function') { users
return Router._handleInvalidArguments(client, 'clientTracking.updatePosition', arguments); ) {
} if (err != null) {
return Router._handleError(
callback,
err,
client,
'clientTracking.getConnectedUsers'
)
} else {
return callback(null, users)
}
})
})
return WebsocketController.updateClientPosition(client, cursorData, function(err) { client.on('clientTracking.updatePosition', function (
if (err != null) { cursorData,
return Router._handleError(callback, err, client, "clientTracking.updatePosition"); callback
} else { ) {
return callback(); if (callback == null) {
} callback = function (error) {}
}); }
}); if (typeof callback !== 'function') {
return Router._handleInvalidArguments(
client,
'clientTracking.updatePosition',
arguments
)
}
return client.on("applyOtUpdate", function(doc_id, update, callback) { return WebsocketController.updateClientPosition(
if (callback == null) { callback = function(error) {}; } client,
if (typeof callback !== 'function') { cursorData,
return Router._handleInvalidArguments(client, 'applyOtUpdate', arguments); function (err) {
} if (err != null) {
return Router._handleError(
callback,
err,
client,
'clientTracking.updatePosition'
)
} else {
return callback()
}
}
)
})
return WebsocketController.applyOtUpdate(client, doc_id, update, function(err) { return client.on('applyOtUpdate', function (doc_id, update, callback) {
if (err != null) { if (callback == null) {
return Router._handleError(callback, err, client, "applyOtUpdate", {doc_id, update}); callback = function (error) {}
} else { }
return callback(); if (typeof callback !== 'function') {
} return Router._handleInvalidArguments(
}); client,
}); 'applyOtUpdate',
}); arguments
} )
}); }
return WebsocketController.applyOtUpdate(
client,
doc_id,
update,
function (err) {
if (err != null) {
return Router._handleError(
callback,
err,
client,
'applyOtUpdate',
{ doc_id, update }
)
} else {
return callback()
}
}
)
})
})
}
}
function __guard__(value, transform) { function __guard__(value, transform) {
return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined; return typeof value !== 'undefined' && value !== null
} ? transform(value)
: undefined
}

View file

@ -9,22 +9,27 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
const Settings = require("settings-sharelatex"); const Settings = require('settings-sharelatex')
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
module.exports = { module.exports = {
parse(data, callback) { parse(data, callback) {
let parsed; let parsed
if (callback == null) { callback = function(error, parsed) {}; } if (callback == null) {
if (data.length > Settings.maxUpdateSize) { callback = function (error, parsed) {}
logger.error({head: data.slice(0,1024), length: data.length}, "data too large to parse"); }
return callback(new Error("data too large to parse")); if (data.length > Settings.maxUpdateSize) {
} logger.error(
try { { head: data.slice(0, 1024), length: data.length },
parsed = JSON.parse(data); 'data too large to parse'
} catch (e) { )
return callback(e); return callback(new Error('data too large to parse'))
} }
return callback(null, parsed); try {
} parsed = JSON.parse(data)
}; } catch (e) {
return callback(e)
}
return callback(null, parsed)
}
}

View file

@ -5,32 +5,33 @@
* DS102: Remove unnecessary code created because of implicit returns * DS102: Remove unnecessary code created because of implicit returns
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
const {EventEmitter} = require('events'); const { EventEmitter } = require('events')
module.exports = function(io, sessionStore, cookieParser, cookieName) { module.exports = function (io, sessionStore, cookieParser, cookieName) {
const missingSessionError = new Error('could not look up session by key'); const missingSessionError = new Error('could not look up session by key')
const sessionSockets = new EventEmitter(); const sessionSockets = new EventEmitter()
const next = (error, socket, session) => sessionSockets.emit('connection', error, socket, session); const next = (error, socket, session) =>
sessionSockets.emit('connection', error, socket, session)
io.on('connection', function(socket) { io.on('connection', function (socket) {
const req = socket.handshake; const req = socket.handshake
return cookieParser(req, {}, function() { return cookieParser(req, {}, function () {
const sessionId = req.signedCookies && req.signedCookies[cookieName]; const sessionId = req.signedCookies && req.signedCookies[cookieName]
if (!sessionId) { if (!sessionId) {
return next(missingSessionError, socket); return next(missingSessionError, socket)
} }
return sessionStore.get(sessionId, function(error, session) { return sessionStore.get(sessionId, function (error, session) {
if (error) { if (error) {
return next(error, socket); return next(error, socket)
} }
if (!session) { if (!session) {
return next(missingSessionError, socket); return next(missingSessionError, socket)
} }
return next(null, socket, session); return next(null, socket, session)
}); })
}); })
}); })
return sessionSockets; return sessionSockets
}; }

View file

@ -11,51 +11,76 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let WebApiManager; let WebApiManager
const request = require("request"); const request = require('request')
const settings = require("settings-sharelatex"); const settings = require('settings-sharelatex')
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
const { CodedError } = require("./Errors"); const { CodedError } = require('./Errors')
module.exports = (WebApiManager = { module.exports = WebApiManager = {
joinProject(project_id, user, callback) { joinProject(project_id, user, callback) {
if (callback == null) { callback = function(error, project, privilegeLevel, isRestrictedUser) {}; } if (callback == null) {
const user_id = user._id; callback = function (error, project, privilegeLevel, isRestrictedUser) {}
logger.log({project_id, user_id}, "sending join project request to web"); }
const url = `${settings.apis.web.url}/project/${project_id}/join`; const user_id = user._id
const headers = {}; logger.log({ project_id, user_id }, 'sending join project request to web')
if (user.anonymousAccessToken != null) { const url = `${settings.apis.web.url}/project/${project_id}/join`
headers['x-sl-anonymous-access-token'] = user.anonymousAccessToken; const headers = {}
} if (user.anonymousAccessToken != null) {
return request.post({ headers['x-sl-anonymous-access-token'] = user.anonymousAccessToken
url, }
qs: {user_id}, return request.post(
auth: { {
user: settings.apis.web.user, url,
pass: settings.apis.web.pass, qs: { user_id },
sendImmediately: true auth: {
}, user: settings.apis.web.user,
json: true, pass: settings.apis.web.pass,
jar: false, sendImmediately: true
headers },
}, function(error, response, data) { json: true,
let err; jar: false,
if (error != null) { return callback(error); } headers
if (response.statusCode >= 200 && response.statusCode < 300) { },
if ((data == null) || ((data != null ? data.project : undefined) == null)) { function (error, response, data) {
err = new Error('no data returned from joinProject request'); let err
logger.error({err, project_id, user_id}, "error accessing web api"); if (error != null) {
return callback(err); return callback(error)
} }
return callback(null, data.project, data.privilegeLevel, data.isRestrictedUser); if (response.statusCode >= 200 && response.statusCode < 300) {
} else if (response.statusCode === 429) { if (
logger.log(project_id, user_id, "rate-limit hit when joining project"); data == null ||
return callback(new CodedError("rate-limit hit when joining project", "TooManyRequests")); (data != null ? data.project : undefined) == null
} else { ) {
err = new Error(`non-success status code from web: ${response.statusCode}`); err = new Error('no data returned from joinProject request')
logger.error({err, project_id, user_id}, "error accessing web api"); logger.error(
return callback(err); { err, project_id, user_id },
} 'error accessing web api'
}); )
} return callback(err)
}); }
return callback(
null,
data.project,
data.privilegeLevel,
data.isRestrictedUser
)
} else if (response.statusCode === 429) {
logger.log(project_id, user_id, 'rate-limit hit when joining project')
return callback(
new CodedError(
'rate-limit hit when joining project',
'TooManyRequests'
)
)
} else {
err = new Error(
`non-success status code from web: ${response.statusCode}`
)
logger.error({ err, project_id, user_id }, 'error accessing web api')
return callback(err)
}
}
)
}
}

View file

@ -13,344 +13,596 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let WebsocketController; let WebsocketController
const logger = require("logger-sharelatex"); const logger = require('logger-sharelatex')
const metrics = require("metrics-sharelatex"); const metrics = require('metrics-sharelatex')
const settings = require("settings-sharelatex"); const settings = require('settings-sharelatex')
const WebApiManager = require("./WebApiManager"); const WebApiManager = require('./WebApiManager')
const AuthorizationManager = require("./AuthorizationManager"); const AuthorizationManager = require('./AuthorizationManager')
const DocumentUpdaterManager = require("./DocumentUpdaterManager"); const DocumentUpdaterManager = require('./DocumentUpdaterManager')
const ConnectedUsersManager = require("./ConnectedUsersManager"); const ConnectedUsersManager = require('./ConnectedUsersManager')
const WebsocketLoadBalancer = require("./WebsocketLoadBalancer"); const WebsocketLoadBalancer = require('./WebsocketLoadBalancer')
const RoomManager = require("./RoomManager"); const RoomManager = require('./RoomManager')
module.exports = (WebsocketController = { module.exports = WebsocketController = {
// If the protocol version changes when the client reconnects, // If the protocol version changes when the client reconnects,
// it will force a full refresh of the page. Useful for non-backwards // it will force a full refresh of the page. Useful for non-backwards
// compatible protocol changes. Use only in extreme need. // compatible protocol changes. Use only in extreme need.
PROTOCOL_VERSION: 2, PROTOCOL_VERSION: 2,
joinProject(client, user, project_id, callback) { joinProject(client, user, project_id, callback) {
if (callback == null) { callback = function(error, project, privilegeLevel, protocolVersion) {}; } if (callback == null) {
if (client.disconnected) { callback = function (error, project, privilegeLevel, protocolVersion) {}
metrics.inc('editor.join-project.disconnected', 1, {status: 'immediately'}); }
return callback(); if (client.disconnected) {
} metrics.inc('editor.join-project.disconnected', 1, {
status: 'immediately'
})
return callback()
}
const user_id = user != null ? user._id : undefined; const user_id = user != null ? user._id : undefined
logger.log({user_id, project_id, client_id: client.id}, "user joining project"); logger.log(
metrics.inc("editor.join-project"); { user_id, project_id, client_id: client.id },
return WebApiManager.joinProject(project_id, user, function(error, project, privilegeLevel, isRestrictedUser) { 'user joining project'
if (error != null) { return callback(error); } )
if (client.disconnected) { metrics.inc('editor.join-project')
metrics.inc('editor.join-project.disconnected', 1, {status: 'after-web-api-call'}); return WebApiManager.joinProject(project_id, user, function (
return callback(); error,
} project,
privilegeLevel,
isRestrictedUser
) {
if (error != null) {
return callback(error)
}
if (client.disconnected) {
metrics.inc('editor.join-project.disconnected', 1, {
status: 'after-web-api-call'
})
return callback()
}
if (!privilegeLevel || (privilegeLevel === "")) { if (!privilegeLevel || privilegeLevel === '') {
const err = new Error("not authorized"); const err = new Error('not authorized')
logger.warn({err, project_id, user_id, client_id: client.id}, "user is not authorized to join project"); logger.warn(
return callback(err); { err, project_id, user_id, client_id: client.id },
} 'user is not authorized to join project'
)
return callback(err)
}
client.ol_context = {}; client.ol_context = {}
client.ol_context.privilege_level = privilegeLevel; client.ol_context.privilege_level = privilegeLevel
client.ol_context.user_id = user_id; client.ol_context.user_id = user_id
client.ol_context.project_id = project_id; client.ol_context.project_id = project_id
client.ol_context.owner_id = __guard__(project != null ? project.owner : undefined, x => x._id); client.ol_context.owner_id = __guard__(
client.ol_context.first_name = user != null ? user.first_name : undefined; project != null ? project.owner : undefined,
client.ol_context.last_name = user != null ? user.last_name : undefined; (x) => x._id
client.ol_context.email = user != null ? user.email : undefined; )
client.ol_context.connected_time = new Date(); client.ol_context.first_name = user != null ? user.first_name : undefined
client.ol_context.signup_date = user != null ? user.signUpDate : undefined; client.ol_context.last_name = user != null ? user.last_name : undefined
client.ol_context.login_count = user != null ? user.loginCount : undefined; client.ol_context.email = user != null ? user.email : undefined
client.ol_context.is_restricted_user = !!(isRestrictedUser); client.ol_context.connected_time = new Date()
client.ol_context.signup_date = user != null ? user.signUpDate : undefined
client.ol_context.login_count = user != null ? user.loginCount : undefined
client.ol_context.is_restricted_user = !!isRestrictedUser
RoomManager.joinProject(client, project_id, function(err) { RoomManager.joinProject(client, project_id, function (err) {
if (err) { return callback(err); } if (err) {
logger.log({user_id, project_id, client_id: client.id}, "user joined project"); return callback(err)
return callback(null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION); }
}); logger.log(
{ user_id, project_id, client_id: client.id },
'user joined project'
)
return callback(
null,
project,
privilegeLevel,
WebsocketController.PROTOCOL_VERSION
)
})
// No need to block for setting the user as connected in the cursor tracking // No need to block for setting the user as connected in the cursor tracking
return ConnectedUsersManager.updateUserPosition(project_id, client.publicId, user, null, function() {}); return ConnectedUsersManager.updateUserPosition(
}); project_id,
}, client.publicId,
user,
null,
function () {}
)
})
},
// We want to flush a project if there are no more (local) connected clients // We want to flush a project if there are no more (local) connected clients
// but we need to wait for the triggering client to disconnect. How long we wait // but we need to wait for the triggering client to disconnect. How long we wait
// is determined by FLUSH_IF_EMPTY_DELAY. // is determined by FLUSH_IF_EMPTY_DELAY.
FLUSH_IF_EMPTY_DELAY: 500, // ms FLUSH_IF_EMPTY_DELAY: 500, // ms
leaveProject(io, client, callback) { leaveProject(io, client, callback) {
if (callback == null) { callback = function(error) {}; } if (callback == null) {
const {project_id, user_id} = client.ol_context; callback = function (error) {}
if (!project_id) { return callback(); } // client did not join project }
const { project_id, user_id } = client.ol_context
if (!project_id) {
return callback()
} // client did not join project
metrics.inc("editor.leave-project"); metrics.inc('editor.leave-project')
logger.log({project_id, user_id, client_id: client.id}, "client leaving project"); logger.log(
WebsocketLoadBalancer.emitToRoom(project_id, "clientTracking.clientDisconnected", client.publicId); { project_id, user_id, client_id: client.id },
'client leaving project'
)
WebsocketLoadBalancer.emitToRoom(
project_id,
'clientTracking.clientDisconnected',
client.publicId
)
// We can do this in the background // We can do this in the background
ConnectedUsersManager.markUserAsDisconnected(project_id, client.publicId, function(err) { ConnectedUsersManager.markUserAsDisconnected(
if (err != null) { project_id,
return logger.error({err, project_id, user_id, client_id: client.id}, "error marking client as disconnected"); client.publicId,
} function (err) {
}); if (err != null) {
return logger.error(
{ err, project_id, user_id, client_id: client.id },
'error marking client as disconnected'
)
}
}
)
RoomManager.leaveProjectAndDocs(client); RoomManager.leaveProjectAndDocs(client)
return setTimeout(function() { return setTimeout(function () {
const remainingClients = io.sockets.clients(project_id); const remainingClients = io.sockets.clients(project_id)
if (remainingClients.length === 0) { if (remainingClients.length === 0) {
// Flush project in the background // Flush project in the background
DocumentUpdaterManager.flushProjectToMongoAndDelete(project_id, function(err) { DocumentUpdaterManager.flushProjectToMongoAndDelete(
if (err != null) { project_id,
return logger.error({err, project_id, user_id, client_id: client.id}, "error flushing to doc updater after leaving project"); function (err) {
} if (err != null) {
}); return logger.error(
} { err, project_id, user_id, client_id: client.id },
return callback(); 'error flushing to doc updater after leaving project'
} )
, WebsocketController.FLUSH_IF_EMPTY_DELAY); }
}, }
)
}
return callback()
}, WebsocketController.FLUSH_IF_EMPTY_DELAY)
},
joinDoc(client, doc_id, fromVersion, options, callback) { joinDoc(client, doc_id, fromVersion, options, callback) {
if (fromVersion == null) { fromVersion = -1; } if (fromVersion == null) {
if (callback == null) { callback = function(error, doclines, version, ops, ranges) {}; } fromVersion = -1
if (client.disconnected) { }
metrics.inc('editor.join-doc.disconnected', 1, {status: 'immediately'}); if (callback == null) {
return callback(); callback = function (error, doclines, version, ops, ranges) {}
} }
if (client.disconnected) {
metrics.inc('editor.join-doc.disconnected', 1, { status: 'immediately' })
return callback()
}
metrics.inc("editor.join-doc"); metrics.inc('editor.join-doc')
const {project_id, user_id, is_restricted_user} = client.ol_context; const { project_id, user_id, is_restricted_user } = client.ol_context
if ((project_id == null)) { return callback(new Error("no project_id found on client")); } if (project_id == null) {
logger.log({user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joining doc"); return callback(new Error('no project_id found on client'))
}
logger.log(
{ user_id, project_id, doc_id, fromVersion, client_id: client.id },
'client joining doc'
)
return AuthorizationManager.assertClientCanViewProject(client, function(error) { return AuthorizationManager.assertClientCanViewProject(client, function (
if (error != null) { return callback(error); } error
// ensure the per-doc applied-ops channel is subscribed before sending the ) {
// doc to the client, so that no events are missed. if (error != null) {
return RoomManager.joinDoc(client, doc_id, function(error) { return callback(error)
if (error != null) { return callback(error); } }
if (client.disconnected) { // ensure the per-doc applied-ops channel is subscribed before sending the
metrics.inc('editor.join-doc.disconnected', 1, {status: 'after-joining-room'}); // doc to the client, so that no events are missed.
// the client will not read the response anyways return RoomManager.joinDoc(client, doc_id, function (error) {
return callback(); if (error != null) {
} return callback(error)
}
if (client.disconnected) {
metrics.inc('editor.join-doc.disconnected', 1, {
status: 'after-joining-room'
})
// the client will not read the response anyways
return callback()
}
return DocumentUpdaterManager.getDocument(project_id, doc_id, fromVersion, function(error, lines, version, ranges, ops) { return DocumentUpdaterManager.getDocument(
let err; project_id,
if (error != null) { return callback(error); } doc_id,
if (client.disconnected) { fromVersion,
metrics.inc('editor.join-doc.disconnected', 1, {status: 'after-doc-updater-call'}); function (error, lines, version, ranges, ops) {
// the client will not read the response anyways let err
return callback(); if (error != null) {
} return callback(error)
}
if (client.disconnected) {
metrics.inc('editor.join-doc.disconnected', 1, {
status: 'after-doc-updater-call'
})
// the client will not read the response anyways
return callback()
}
if (is_restricted_user && ((ranges != null ? ranges.comments : undefined) != null)) { if (
ranges.comments = []; is_restricted_user &&
} (ranges != null ? ranges.comments : undefined) != null
) {
ranges.comments = []
}
// Encode any binary bits of data so it can go via WebSockets // Encode any binary bits of data so it can go via WebSockets
// See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html // See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html
const encodeForWebsockets = text => unescape(encodeURIComponent(text)); const encodeForWebsockets = (text) =>
const escapedLines = []; unescape(encodeURIComponent(text))
for (let line of Array.from(lines)) { const escapedLines = []
try { for (let line of Array.from(lines)) {
line = encodeForWebsockets(line); try {
} catch (error1) { line = encodeForWebsockets(line)
err = error1; } catch (error1) {
logger.err({err, project_id, doc_id, fromVersion, line, client_id: client.id}, "error encoding line uri component"); err = error1
return callback(err); logger.err(
} {
escapedLines.push(line); err,
} project_id,
if (options.encodeRanges) { doc_id,
try { fromVersion,
for (const comment of Array.from((ranges != null ? ranges.comments : undefined) || [])) { line,
if (comment.op.c != null) { comment.op.c = encodeForWebsockets(comment.op.c); } client_id: client.id
} },
for (const change of Array.from((ranges != null ? ranges.changes : undefined) || [])) { 'error encoding line uri component'
if (change.op.i != null) { change.op.i = encodeForWebsockets(change.op.i); } )
if (change.op.d != null) { change.op.d = encodeForWebsockets(change.op.d); } return callback(err)
} }
} catch (error2) { escapedLines.push(line)
err = error2; }
logger.err({err, project_id, doc_id, fromVersion, ranges, client_id: client.id}, "error encoding range uri component"); if (options.encodeRanges) {
return callback(err); try {
} for (const comment of Array.from(
} (ranges != null ? ranges.comments : undefined) || []
)) {
if (comment.op.c != null) {
comment.op.c = encodeForWebsockets(comment.op.c)
}
}
for (const change of Array.from(
(ranges != null ? ranges.changes : undefined) || []
)) {
if (change.op.i != null) {
change.op.i = encodeForWebsockets(change.op.i)
}
if (change.op.d != null) {
change.op.d = encodeForWebsockets(change.op.d)
}
}
} catch (error2) {
err = error2
logger.err(
{
err,
project_id,
doc_id,
fromVersion,
ranges,
client_id: client.id
},
'error encoding range uri component'
)
return callback(err)
}
}
AuthorizationManager.addAccessToDoc(client, doc_id); AuthorizationManager.addAccessToDoc(client, doc_id)
logger.log({user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc"); logger.log(
return callback(null, escapedLines, version, ops, ranges); {
}); user_id,
}); project_id,
}); doc_id,
}, fromVersion,
client_id: client.id
},
'client joined doc'
)
return callback(null, escapedLines, version, ops, ranges)
}
)
})
})
},
leaveDoc(client, doc_id, callback) { leaveDoc(client, doc_id, callback) {
// client may have disconnected, but we have to cleanup internal state. // client may have disconnected, but we have to cleanup internal state.
if (callback == null) { callback = function(error) {}; } if (callback == null) {
metrics.inc("editor.leave-doc"); callback = function (error) {}
const {project_id, user_id} = client.ol_context; }
logger.log({user_id, project_id, doc_id, client_id: client.id}, "client leaving doc"); metrics.inc('editor.leave-doc')
RoomManager.leaveDoc(client, doc_id); const { project_id, user_id } = client.ol_context
// we could remove permission when user leaves a doc, but because logger.log(
// the connection is per-project, we continue to allow access { user_id, project_id, doc_id, client_id: client.id },
// after the initial joinDoc since we know they are already authorised. 'client leaving doc'
// # AuthorizationManager.removeAccessToDoc client, doc_id )
return callback(); RoomManager.leaveDoc(client, doc_id)
}, // we could remove permission when user leaves a doc, but because
updateClientPosition(client, cursorData, callback) { // the connection is per-project, we continue to allow access
if (callback == null) { callback = function(error) {}; } // after the initial joinDoc since we know they are already authorised.
if (client.disconnected) { // # AuthorizationManager.removeAccessToDoc client, doc_id
// do not create a ghost entry in redis return callback()
return callback(); },
} updateClientPosition(client, cursorData, callback) {
if (callback == null) {
callback = function (error) {}
}
if (client.disconnected) {
// do not create a ghost entry in redis
return callback()
}
metrics.inc("editor.update-client-position", 0.1); metrics.inc('editor.update-client-position', 0.1)
const {project_id, first_name, last_name, email, user_id} = client.ol_context; const {
logger.log({user_id, project_id, client_id: client.id, cursorData}, "updating client position"); project_id,
first_name,
last_name,
email,
user_id
} = client.ol_context
logger.log(
{ user_id, project_id, client_id: client.id, cursorData },
'updating client position'
)
return AuthorizationManager.assertClientCanViewProjectAndDoc(client, cursorData.doc_id, function(error) { return AuthorizationManager.assertClientCanViewProjectAndDoc(
if (error != null) { client,
logger.warn({err: error, client_id: client.id, project_id, user_id}, "silently ignoring unauthorized updateClientPosition. Client likely hasn't called joinProject yet."); cursorData.doc_id,
return callback(); function (error) {
} if (error != null) {
cursorData.id = client.publicId; logger.warn(
if (user_id != null) { cursorData.user_id = user_id; } { err: error, client_id: client.id, project_id, user_id },
if (email != null) { cursorData.email = email; } "silently ignoring unauthorized updateClientPosition. Client likely hasn't called joinProject yet."
// Don't store anonymous users in redis to avoid influx )
if (!user_id || (user_id === 'anonymous-user')) { return callback()
cursorData.name = ""; }
callback(); cursorData.id = client.publicId
} else { if (user_id != null) {
cursorData.name = first_name && last_name ? cursorData.user_id = user_id
`${first_name} ${last_name}` }
: first_name || (last_name || ""); if (email != null) {
ConnectedUsersManager.updateUserPosition(project_id, client.publicId, { cursorData.email = email
first_name, }
last_name, // Don't store anonymous users in redis to avoid influx
email, if (!user_id || user_id === 'anonymous-user') {
_id: user_id cursorData.name = ''
}, { callback()
row: cursorData.row, } else {
column: cursorData.column, cursorData.name =
doc_id: cursorData.doc_id first_name && last_name
}, callback); ? `${first_name} ${last_name}`
} : first_name || last_name || ''
return WebsocketLoadBalancer.emitToRoom(project_id, "clientTracking.clientUpdated", cursorData); ConnectedUsersManager.updateUserPosition(
}); project_id,
}, client.publicId,
{
first_name,
last_name,
email,
_id: user_id
},
{
row: cursorData.row,
column: cursorData.column,
doc_id: cursorData.doc_id
},
callback
)
}
return WebsocketLoadBalancer.emitToRoom(
project_id,
'clientTracking.clientUpdated',
cursorData
)
}
)
},
CLIENT_REFRESH_DELAY: 1000, CLIENT_REFRESH_DELAY: 1000,
getConnectedUsers(client, callback) { getConnectedUsers(client, callback) {
if (callback == null) { callback = function(error, users) {}; } if (callback == null) {
if (client.disconnected) { callback = function (error, users) {}
// they are not interested anymore, skip the redis lookups }
return callback(); if (client.disconnected) {
} // they are not interested anymore, skip the redis lookups
return callback()
}
metrics.inc("editor.get-connected-users"); metrics.inc('editor.get-connected-users')
const {project_id, user_id, is_restricted_user} = client.ol_context; const { project_id, user_id, is_restricted_user } = client.ol_context
if (is_restricted_user) { if (is_restricted_user) {
return callback(null, []); return callback(null, [])
} }
if ((project_id == null)) { return callback(new Error("no project_id found on client")); } if (project_id == null) {
logger.log({user_id, project_id, client_id: client.id}, "getting connected users"); return callback(new Error('no project_id found on client'))
return AuthorizationManager.assertClientCanViewProject(client, function(error) { }
if (error != null) { return callback(error); } logger.log(
WebsocketLoadBalancer.emitToRoom(project_id, 'clientTracking.refresh'); { user_id, project_id, client_id: client.id },
return setTimeout(() => ConnectedUsersManager.getConnectedUsers(project_id, function(error, users) { 'getting connected users'
if (error != null) { return callback(error); } )
callback(null, users); return AuthorizationManager.assertClientCanViewProject(client, function (
return logger.log({user_id, project_id, client_id: client.id}, "got connected users"); error
}) ) {
, WebsocketController.CLIENT_REFRESH_DELAY); if (error != null) {
}); return callback(error)
}, }
WebsocketLoadBalancer.emitToRoom(project_id, 'clientTracking.refresh')
return setTimeout(
() =>
ConnectedUsersManager.getConnectedUsers(project_id, function (
error,
users
) {
if (error != null) {
return callback(error)
}
callback(null, users)
return logger.log(
{ user_id, project_id, client_id: client.id },
'got connected users'
)
}),
WebsocketController.CLIENT_REFRESH_DELAY
)
})
},
applyOtUpdate(client, doc_id, update, callback) { applyOtUpdate(client, doc_id, update, callback) {
// client may have disconnected, but we can submit their update to doc-updater anyways. // client may have disconnected, but we can submit their update to doc-updater anyways.
if (callback == null) { callback = function(error) {}; } if (callback == null) {
const {user_id, project_id} = client.ol_context; callback = function (error) {}
if ((project_id == null)) { return callback(new Error("no project_id found on client")); } }
const { user_id, project_id } = client.ol_context
if (project_id == null) {
return callback(new Error('no project_id found on client'))
}
return WebsocketController._assertClientCanApplyUpdate(client, doc_id, update, function(error) { return WebsocketController._assertClientCanApplyUpdate(
if (error != null) { client,
logger.warn({err: error, doc_id, client_id: client.id, version: update.v}, "client is not authorized to make update"); doc_id,
setTimeout(() => // Disconnect, but give the client the chance to receive the error update,
client.disconnect() function (error) {
, 100); if (error != null) {
return callback(error); logger.warn(
} { err: error, doc_id, client_id: client.id, version: update.v },
if (!update.meta) { update.meta = {}; } 'client is not authorized to make update'
update.meta.source = client.publicId; )
update.meta.user_id = user_id; setTimeout(
metrics.inc("editor.doc-update", 0.3); () =>
// Disconnect, but give the client the chance to receive the error
client.disconnect(),
100
)
return callback(error)
}
if (!update.meta) {
update.meta = {}
}
update.meta.source = client.publicId
update.meta.user_id = user_id
metrics.inc('editor.doc-update', 0.3)
logger.log({user_id, doc_id, project_id, client_id: client.id, version: update.v}, "sending update to doc updater"); logger.log(
{
user_id,
doc_id,
project_id,
client_id: client.id,
version: update.v
},
'sending update to doc updater'
)
return DocumentUpdaterManager.queueChange(project_id, doc_id, update, function(error) { return DocumentUpdaterManager.queueChange(
if ((error != null ? error.message : undefined) === "update is too large") { project_id,
metrics.inc("update_too_large"); doc_id,
const { update,
updateSize function (error) {
} = error; if (
logger.warn({user_id, project_id, doc_id, updateSize}, "update is too large"); (error != null ? error.message : undefined) ===
'update is too large'
) {
metrics.inc('update_too_large')
const { updateSize } = error
logger.warn(
{ user_id, project_id, doc_id, updateSize },
'update is too large'
)
// mark the update as received -- the client should not send it again! // mark the update as received -- the client should not send it again!
callback(); callback()
// trigger an out-of-sync error // trigger an out-of-sync error
const message = {project_id, doc_id, error: "update is too large"}; const message = {
setTimeout(function() { project_id,
if (client.disconnected) { doc_id,
// skip the message broadcast, the client has moved on error: 'update is too large'
return metrics.inc('editor.doc-update.disconnected', 1, {status:'at-otUpdateError'}); }
} setTimeout(function () {
client.emit("otUpdateError", message.error, message); if (client.disconnected) {
return client.disconnect(); // skip the message broadcast, the client has moved on
} return metrics.inc('editor.doc-update.disconnected', 1, {
, 100); status: 'at-otUpdateError'
return; })
} }
client.emit('otUpdateError', message.error, message)
return client.disconnect()
}, 100)
return
}
if (error != null) { if (error != null) {
logger.error({err: error, project_id, doc_id, client_id: client.id, version: update.v}, "document was not available for update"); logger.error(
client.disconnect(); {
} err: error,
return callback(error); project_id,
}); doc_id,
}); client_id: client.id,
}, version: update.v
},
'document was not available for update'
)
client.disconnect()
}
return callback(error)
}
)
}
)
},
_assertClientCanApplyUpdate(client, doc_id, update, callback) { _assertClientCanApplyUpdate(client, doc_id, update, callback) {
return AuthorizationManager.assertClientCanEditProjectAndDoc(client, doc_id, function(error) { return AuthorizationManager.assertClientCanEditProjectAndDoc(
if (error != null) { client,
if ((error.message === "not authorized") && WebsocketController._isCommentUpdate(update)) { doc_id,
// This might be a comment op, which we only need read-only priveleges for function (error) {
return AuthorizationManager.assertClientCanViewProjectAndDoc(client, doc_id, callback); if (error != null) {
} else { if (
return callback(error); error.message === 'not authorized' &&
} WebsocketController._isCommentUpdate(update)
} else { ) {
return callback(null); // This might be a comment op, which we only need read-only priveleges for
} return AuthorizationManager.assertClientCanViewProjectAndDoc(
}); client,
}, doc_id,
callback
)
} else {
return callback(error)
}
} else {
return callback(null)
}
}
)
},
_isCommentUpdate(update) { _isCommentUpdate(update) {
for (const op of Array.from(update.op)) { for (const op of Array.from(update.op)) {
if ((op.c == null)) { if (op.c == null) {
return false; return false
} }
} }
return true; return true
} }
}); }
function __guard__(value, transform) { function __guard__(value, transform) {
return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined; return typeof value !== 'undefined' && value !== null
} ? transform(value)
: undefined
}

View file

@ -11,146 +11,207 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let WebsocketLoadBalancer; let WebsocketLoadBalancer
const Settings = require('settings-sharelatex'); const Settings = require('settings-sharelatex')
const logger = require('logger-sharelatex'); const logger = require('logger-sharelatex')
const RedisClientManager = require("./RedisClientManager"); const RedisClientManager = require('./RedisClientManager')
const SafeJsonParse = require("./SafeJsonParse"); const SafeJsonParse = require('./SafeJsonParse')
const EventLogger = require("./EventLogger"); const EventLogger = require('./EventLogger')
const HealthCheckManager = require("./HealthCheckManager"); const HealthCheckManager = require('./HealthCheckManager')
const RoomManager = require("./RoomManager"); const RoomManager = require('./RoomManager')
const ChannelManager = require("./ChannelManager"); const ChannelManager = require('./ChannelManager')
const ConnectedUsersManager = require("./ConnectedUsersManager"); const ConnectedUsersManager = require('./ConnectedUsersManager')
const RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST = [ const RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST = [
'connectionAccepted', 'connectionAccepted',
'otUpdateApplied', 'otUpdateApplied',
'otUpdateError', 'otUpdateError',
'joinDoc', 'joinDoc',
'reciveNewDoc', 'reciveNewDoc',
'reciveNewFile', 'reciveNewFile',
'reciveNewFolder', 'reciveNewFolder',
'removeEntity' 'removeEntity'
]; ]
module.exports = (WebsocketLoadBalancer = { module.exports = WebsocketLoadBalancer = {
rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub), rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub),
rclientSubList: RedisClientManager.createClientList(Settings.redis.pubsub), rclientSubList: RedisClientManager.createClientList(Settings.redis.pubsub),
emitToRoom(room_id, message, ...payload) { emitToRoom(room_id, message, ...payload) {
if ((room_id == null)) { if (room_id == null) {
logger.warn({message, payload}, "no room_id provided, ignoring emitToRoom"); logger.warn(
return; { message, payload },
} 'no room_id provided, ignoring emitToRoom'
const data = JSON.stringify({ )
room_id, return
message, }
payload const data = JSON.stringify({
}); room_id,
logger.log({room_id, message, payload, length: data.length}, "emitting to room"); message,
payload
})
logger.log(
{ room_id, message, payload, length: data.length },
'emitting to room'
)
return Array.from(this.rclientPubList).map((rclientPub) => return Array.from(this.rclientPubList).map((rclientPub) =>
ChannelManager.publish(rclientPub, "editor-events", room_id, data)); ChannelManager.publish(rclientPub, 'editor-events', room_id, data)
}, )
},
emitToAll(message, ...payload) { emitToAll(message, ...payload) {
return this.emitToRoom("all", message, ...Array.from(payload)); return this.emitToRoom('all', message, ...Array.from(payload))
}, },
listenForEditorEvents(io) { listenForEditorEvents(io) {
logger.log({rclients: this.rclientPubList.length}, "publishing editor events"); logger.log(
logger.log({rclients: this.rclientSubList.length}, "listening for editor events"); { rclients: this.rclientPubList.length },
for (const rclientSub of Array.from(this.rclientSubList)) { 'publishing editor events'
rclientSub.subscribe("editor-events"); )
rclientSub.on("message", function(channel, message) { logger.log(
if (Settings.debugEvents > 0) { EventLogger.debugEvent(channel, message); } { rclients: this.rclientSubList.length },
return WebsocketLoadBalancer._processEditorEvent(io, channel, message); 'listening for editor events'
}); )
} for (const rclientSub of Array.from(this.rclientSubList)) {
return this.handleRoomUpdates(this.rclientSubList); rclientSub.subscribe('editor-events')
}, rclientSub.on('message', function (channel, message) {
if (Settings.debugEvents > 0) {
EventLogger.debugEvent(channel, message)
}
return WebsocketLoadBalancer._processEditorEvent(io, channel, message)
})
}
return this.handleRoomUpdates(this.rclientSubList)
},
handleRoomUpdates(rclientSubList) { handleRoomUpdates(rclientSubList) {
const roomEvents = RoomManager.eventSource(); const roomEvents = RoomManager.eventSource()
roomEvents.on('project-active', function(project_id) { roomEvents.on('project-active', function (project_id) {
const subscribePromises = Array.from(rclientSubList).map((rclient) => const subscribePromises = Array.from(rclientSubList).map((rclient) =>
ChannelManager.subscribe(rclient, "editor-events", project_id)); ChannelManager.subscribe(rclient, 'editor-events', project_id)
return RoomManager.emitOnCompletion(subscribePromises, `project-subscribed-${project_id}`); )
}); return RoomManager.emitOnCompletion(
return roomEvents.on('project-empty', project_id => Array.from(rclientSubList).map((rclient) => subscribePromises,
ChannelManager.unsubscribe(rclient, "editor-events", project_id))); `project-subscribed-${project_id}`
}, )
})
return roomEvents.on('project-empty', (project_id) =>
Array.from(rclientSubList).map((rclient) =>
ChannelManager.unsubscribe(rclient, 'editor-events', project_id)
)
)
},
_processEditorEvent(io, channel, message) { _processEditorEvent(io, channel, message) {
return SafeJsonParse.parse(message, function(error, message) { return SafeJsonParse.parse(message, function (error, message) {
let clientList; let clientList
let client; let client
if (error != null) { if (error != null) {
logger.error({err: error, channel}, "error parsing JSON"); logger.error({ err: error, channel }, 'error parsing JSON')
return; return
} }
if (message.room_id === "all") { if (message.room_id === 'all') {
return io.sockets.emit(message.message, ...Array.from(message.payload)); return io.sockets.emit(message.message, ...Array.from(message.payload))
} else if ((message.message === 'clientTracking.refresh') && (message.room_id != null)) { } else if (
clientList = io.sockets.clients(message.room_id); message.message === 'clientTracking.refresh' &&
logger.log({channel, message: message.message, room_id: message.room_id, message_id: message._id, socketIoClients: ((() => { message.room_id != null
const result = []; ) {
for (client of Array.from(clientList)) { result.push(client.id); clientList = io.sockets.clients(message.room_id)
} logger.log(
return result; {
})())}, "refreshing client list"); channel,
return (() => { message: message.message,
const result1 = []; room_id: message.room_id,
for (client of Array.from(clientList)) { message_id: message._id,
result1.push(ConnectedUsersManager.refreshClient(message.room_id, client.publicId)); socketIoClients: (() => {
} const result = []
return result1; for (client of Array.from(clientList)) {
})(); result.push(client.id)
} else if (message.room_id != null) { }
if ((message._id != null) && Settings.checkEventOrder) { return result
const status = EventLogger.checkEventOrder("editor-events", message._id, message); })()
if (status === "duplicate") { },
return; // skip duplicate events 'refreshing client list'
} )
} return (() => {
const result1 = []
for (client of Array.from(clientList)) {
result1.push(
ConnectedUsersManager.refreshClient(
message.room_id,
client.publicId
)
)
}
return result1
})()
} else if (message.room_id != null) {
if (message._id != null && Settings.checkEventOrder) {
const status = EventLogger.checkEventOrder(
'editor-events',
message._id,
message
)
if (status === 'duplicate') {
return // skip duplicate events
}
}
const is_restricted_message = !Array.from(RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST).includes(message.message); const is_restricted_message = !Array.from(
RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST
).includes(message.message)
// send messages only to unique clients (due to duplicate entries in io.sockets.clients) // send messages only to unique clients (due to duplicate entries in io.sockets.clients)
clientList = io.sockets.clients(message.room_id) clientList = io.sockets
.filter(client => !(is_restricted_message && client.ol_context.is_restricted_user)); .clients(message.room_id)
.filter(
(client) =>
!(is_restricted_message && client.ol_context.is_restricted_user)
)
// avoid unnecessary work if no clients are connected // avoid unnecessary work if no clients are connected
if (clientList.length === 0) { return; } if (clientList.length === 0) {
logger.log({ return
channel, }
message: message.message, logger.log(
room_id: message.room_id, {
message_id: message._id, channel,
socketIoClients: ((() => { message: message.message,
const result2 = []; room_id: message.room_id,
for (client of Array.from(clientList)) { result2.push(client.id); message_id: message._id,
} socketIoClients: (() => {
return result2; const result2 = []
})()) for (client of Array.from(clientList)) {
}, "distributing event to clients"); result2.push(client.id)
const seen = {}; }
return (() => { return result2
const result3 = []; })()
for (client of Array.from(clientList)) { },
if (!seen[client.id]) { 'distributing event to clients'
seen[client.id] = true; )
result3.push(client.emit(message.message, ...Array.from(message.payload))); const seen = {}
} else { return (() => {
result3.push(undefined); const result3 = []
} for (client of Array.from(clientList)) {
} if (!seen[client.id]) {
return result3; seen[client.id] = true
})(); result3.push(
} else if (message.health_check != null) { client.emit(message.message, ...Array.from(message.payload))
logger.debug({message}, "got health check message in editor events channel"); )
return HealthCheckManager.check(channel, message.key); } else {
} result3.push(undefined)
}); }
} }
}); return result3
})()
} else if (message.health_check != null) {
logger.debug(
{ message },
'got health check message in editor events channel'
)
return HealthCheckManager.check(channel, message.key)
}
})
}
}