overleaf/services/real-time/app/coffee/DocumentUpdaterController.coffee
2019-07-18 12:55:23 +01:00

86 lines
4 KiB
CoffeeScript

logger = require "logger-sharelatex"
settings = require 'settings-sharelatex'
RedisClientManager = require "./RedisClientManager"
SafeJsonParse = require "./SafeJsonParse"
EventLogger = require "./EventLogger"
HealthCheckManager = require "./HealthCheckManager"
RoomManager = require "./RoomManager"
ChannelManager = require "./ChannelManager"
metrics = require "metrics-sharelatex"
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
module.exports = DocumentUpdaterController =
# DocumentUpdaterController is responsible for updates that come via Redis
# Pub/Sub from the document updater.
rclientList: RedisClientManager.createClientList(settings.redis.pubsub)
listenForUpdatesFromDocumentUpdater: (io) ->
logger.log {rclients: @rclientList.length}, "listening for applied-ops events"
for rclient, i in @rclientList
rclient.subscribe "applied-ops"
rclient.on "message", (channel, message) ->
metrics.inc "rclient", 0.001 # global event rate metric
EventLogger.debugEvent(channel, message) if settings.debugEvents > 0
DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message)
# create metrics for each redis instance only when we have multiple redis clients
if @rclientList.length > 1
for rclient, i in @rclientList
do (i) ->
rclient.on "message", () ->
metrics.inc "rclient-#{i}", 0.001 # per client event rate metric
for rclient in @rclientList
@handleRoomUpdates(rclient)
handleRoomUpdates: (rclientSub) ->
roomEvents = RoomManager.eventSource()
roomEvents.on 'doc-active', (doc_id) ->
ChannelManager.subscribe rclientSub, "applied-ops", doc_id
roomEvents.on 'doc-empty', (doc_id) ->
ChannelManager.unsubscribe rclientSub, "applied-ops", doc_id
_processMessageFromDocumentUpdater: (io, channel, message) ->
SafeJsonParse.parse message, (error, message) ->
if error?
logger.error {err: error, channel}, "error parsing JSON"
return
if message.op?
if message._id? && settings.checkEventOrder
status = EventLogger.checkEventOrder("applied-ops", message._id, message)
if status is 'duplicate'
return # skip duplicate events
DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op)
else if message.error?
DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message)
else if message.health_check?
logger.debug {message}, "got health check message in applied ops channel"
HealthCheckManager.check channel, message.key
_applyUpdateFromDocumentUpdater: (io, doc_id, update) ->
clientList = io.sockets.clients(doc_id)
# avoid unnecessary work if no clients are connected
if clientList.length is 0
return
# send updates to clients
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, socketIoClients: (client.id for client in clientList), "distributing updates to clients"
seen = {}
# send messages only to unique clients (due to duplicate entries in io.sockets.clients)
for client in clientList when not seen[client.id]
seen[client.id] = true
if client.id == update.meta.source
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, "distributing update to sender"
client.emit "otUpdateApplied", v: update.v, doc: update.doc
else if !update.dup # Duplicate ops should just be sent back to sending client for acknowledgement
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, client_id: client.id, "distributing update to collaborator"
client.emit "otUpdateApplied", update
if Object.keys(seen).length < clientList.length
metrics.inc "socket-io.duplicate-clients", 0.1
logger.log doc_id: doc_id, socketIoClients: (client.id for client in clientList), "discarded duplicate clients"
_processErrorFromDocumentUpdater: (io, doc_id, error, message) ->
for client in io.sockets.clients(doc_id)
logger.warn err: error, doc_id: doc_id, client_id: client.id, "error from document updater, disconnecting client"
client.emit "otUpdateError", error, message
client.disconnect()