overleaf/services/real-time/app/js/DocumentUpdaterController.js
Brian Gough 618cf99548 Merge pull request #4950 from overleaf/bg-realtime-log-relevelling
realtime log re-levelling

GitOrigin-RevId: 81d86ba648e7fc1436b638b67371b35fd831a62f
2021-09-15 08:03:25 +00:00

176 lines
5.4 KiB
JavaScript

/* eslint-disable
camelcase,
*/
const logger = require('logger-sharelatex')
const settings = require('@overleaf/settings')
const RedisClientManager = require('./RedisClientManager')
const SafeJsonParse = require('./SafeJsonParse')
const EventLogger = require('./EventLogger')
const HealthCheckManager = require('./HealthCheckManager')
const RoomManager = require('./RoomManager')
const ChannelManager = require('./ChannelManager')
const metrics = require('@overleaf/metrics')
let DocumentUpdaterController
module.exports = DocumentUpdaterController = {
// DocumentUpdaterController is responsible for updates that come via Redis
// Pub/Sub from the document updater.
rclientList: RedisClientManager.createClientList(settings.redis.pubsub),
listenForUpdatesFromDocumentUpdater(io) {
logger.debug(
{ rclients: this.rclientList.length },
'listening for applied-ops events'
)
for (const rclient of this.rclientList) {
rclient.subscribe('applied-ops')
rclient.on('message', function (channel, message) {
metrics.inc('rclient', 0.001) // global event rate metric
if (settings.debugEvents > 0) {
EventLogger.debugEvent(channel, message)
}
DocumentUpdaterController._processMessageFromDocumentUpdater(
io,
channel,
message
)
})
}
// create metrics for each redis instance only when we have multiple redis clients
if (this.rclientList.length > 1) {
this.rclientList.forEach((rclient, i) => {
// per client event rate metric
const metricName = `rclient-${i}`
rclient.on('message', () => metrics.inc(metricName, 0.001))
})
}
this.handleRoomUpdates(this.rclientList)
},
handleRoomUpdates(rclientSubList) {
const roomEvents = RoomManager.eventSource()
roomEvents.on('doc-active', function (doc_id) {
const subscribePromises = rclientSubList.map(rclient =>
ChannelManager.subscribe(rclient, 'applied-ops', doc_id)
)
RoomManager.emitOnCompletion(
subscribePromises,
`doc-subscribed-${doc_id}`
)
})
roomEvents.on('doc-empty', doc_id =>
rclientSubList.map(rclient =>
ChannelManager.unsubscribe(rclient, 'applied-ops', doc_id)
)
)
},
_processMessageFromDocumentUpdater(io, channel, message) {
SafeJsonParse.parse(message, function (error, message) {
if (error) {
logger.error({ err: error, channel }, 'error parsing JSON')
return
}
if (message.op) {
if (message._id && settings.checkEventOrder) {
const status = EventLogger.checkEventOrder(
'applied-ops',
message._id,
message
)
if (status === 'duplicate') {
return // skip duplicate events
}
}
DocumentUpdaterController._applyUpdateFromDocumentUpdater(
io,
message.doc_id,
message.op
)
} else if (message.error) {
DocumentUpdaterController._processErrorFromDocumentUpdater(
io,
message.doc_id,
message.error,
message
)
} else if (message.health_check) {
logger.debug(
{ message },
'got health check message in applied ops channel'
)
HealthCheckManager.check(channel, message.key)
}
})
},
_applyUpdateFromDocumentUpdater(io, doc_id, update) {
let client
const clientList = io.sockets.clients(doc_id)
// avoid unnecessary work if no clients are connected
if (clientList.length === 0) {
return
}
// send updates to clients
logger.debug(
{
doc_id,
version: update.v,
source: update.meta && update.meta.source,
socketIoClients: clientList.map(client => client.id),
},
'distributing updates to clients'
)
const seen = {}
// send messages only to unique clients (due to duplicate entries in io.sockets.clients)
for (client of clientList) {
if (!seen[client.id]) {
seen[client.id] = true
if (client.publicId === update.meta.source) {
logger.debug(
{
doc_id,
version: update.v,
source: update.meta.source,
},
'distributing update to sender'
)
client.emit('otUpdateApplied', { v: update.v, doc: update.doc })
} else if (!update.dup) {
// Duplicate ops should just be sent back to sending client for acknowledgement
logger.debug(
{
doc_id,
version: update.v,
source: update.meta.source,
client_id: client.id,
},
'distributing update to collaborator'
)
client.emit('otUpdateApplied', update)
}
}
}
if (Object.keys(seen).length < clientList.length) {
metrics.inc('socket-io.duplicate-clients', 0.1)
logger.debug(
{
doc_id,
socketIoClients: clientList.map(client => client.id),
},
'discarded duplicate clients'
)
}
},
_processErrorFromDocumentUpdater(io, doc_id, error, message) {
for (const client of io.sockets.clients(doc_id)) {
logger.warn(
{ err: error, doc_id, client_id: client.id },
'error from document updater, disconnecting client'
)
client.emit('otUpdateError', error, message)
client.disconnect()
}
},
}