overleaf/services/real-time/app/js/DocumentUpdaterController.js

143 lines
5.7 KiB
JavaScript
Raw Normal View History

/* eslint-disable
camelcase,
no-unused-vars,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS102: Remove unnecessary code created because of implicit returns
* DS205: Consider reworking code to avoid use of IIFEs
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let DocumentUpdaterController;
const logger = require("logger-sharelatex");
const settings = require('settings-sharelatex');
const RedisClientManager = require("./RedisClientManager");
const SafeJsonParse = require("./SafeJsonParse");
const EventLogger = require("./EventLogger");
const HealthCheckManager = require("./HealthCheckManager");
const RoomManager = require("./RoomManager");
const ChannelManager = require("./ChannelManager");
const metrics = require("metrics-sharelatex");
const MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024; // 1Mb
module.exports = (DocumentUpdaterController = {
// DocumentUpdaterController is responsible for updates that come via Redis
// Pub/Sub from the document updater.
rclientList: RedisClientManager.createClientList(settings.redis.pubsub),
listenForUpdatesFromDocumentUpdater(io) {
let i, rclient;
logger.log({rclients: this.rclientList.length}, "listening for applied-ops events");
for (i = 0; i < this.rclientList.length; i++) {
rclient = this.rclientList[i];
rclient.subscribe("applied-ops");
rclient.on("message", function(channel, message) {
metrics.inc("rclient", 0.001); // global event rate metric
if (settings.debugEvents > 0) { EventLogger.debugEvent(channel, message); }
return DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message);
});
}
// create metrics for each redis instance only when we have multiple redis clients
if (this.rclientList.length > 1) {
for (i = 0; i < this.rclientList.length; i++) {
rclient = this.rclientList[i];
((i => // per client event rate metric
rclient.on("message", () => metrics.inc(`rclient-${i}`, 0.001))))(i);
}
}
return this.handleRoomUpdates(this.rclientList);
},
handleRoomUpdates(rclientSubList) {
const roomEvents = RoomManager.eventSource();
roomEvents.on('doc-active', function(doc_id) {
const subscribePromises = Array.from(rclientSubList).map((rclient) =>
ChannelManager.subscribe(rclient, "applied-ops", doc_id));
return RoomManager.emitOnCompletion(subscribePromises, `doc-subscribed-${doc_id}`);
});
return roomEvents.on('doc-empty', doc_id => Array.from(rclientSubList).map((rclient) =>
ChannelManager.unsubscribe(rclient, "applied-ops", doc_id)));
},
_processMessageFromDocumentUpdater(io, channel, message) {
return SafeJsonParse.parse(message, function(error, message) {
if (error != null) {
logger.error({err: error, channel}, "error parsing JSON");
return;
}
if (message.op != null) {
if ((message._id != null) && settings.checkEventOrder) {
const status = EventLogger.checkEventOrder("applied-ops", message._id, message);
if (status === 'duplicate') {
return; // skip duplicate events
}
}
return DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op);
} else if (message.error != null) {
return DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message);
} else if (message.health_check != null) {
logger.debug({message}, "got health check message in applied ops channel");
return HealthCheckManager.check(channel, message.key);
}
});
},
_applyUpdateFromDocumentUpdater(io, doc_id, update) {
let client;
const clientList = io.sockets.clients(doc_id);
// avoid unnecessary work if no clients are connected
if (clientList.length === 0) {
return;
}
// send updates to clients
logger.log({doc_id, version: update.v, source: (update.meta != null ? update.meta.source : undefined), socketIoClients: (((() => {
const result = [];
for (client of Array.from(clientList)) { result.push(client.id);
}
return result;
})()))}, "distributing updates to clients");
const seen = {};
// send messages only to unique clients (due to duplicate entries in io.sockets.clients)
for (client of Array.from(clientList)) {
if (!seen[client.id]) {
seen[client.id] = true;
if (client.publicId === update.meta.source) {
logger.log({doc_id, version: update.v, source: (update.meta != null ? update.meta.source : undefined)}, "distributing update to sender");
client.emit("otUpdateApplied", {v: update.v, doc: update.doc});
} else if (!update.dup) { // Duplicate ops should just be sent back to sending client for acknowledgement
logger.log({doc_id, version: update.v, source: (update.meta != null ? update.meta.source : undefined), client_id: client.id}, "distributing update to collaborator");
client.emit("otUpdateApplied", update);
}
}
}
if (Object.keys(seen).length < clientList.length) {
metrics.inc("socket-io.duplicate-clients", 0.1);
return logger.log({doc_id, socketIoClients: (((() => {
const result1 = [];
for (client of Array.from(clientList)) { result1.push(client.id);
}
return result1;
})()))}, "discarded duplicate clients");
}
},
_processErrorFromDocumentUpdater(io, doc_id, error, message) {
return (() => {
const result = [];
for (const client of Array.from(io.sockets.clients(doc_id))) {
logger.warn({err: error, doc_id, client_id: client.id}, "error from document updater, disconnecting client");
client.emit("otUpdateError", error, message);
result.push(client.disconnect());
}
return result;
})();
}
});