2014-11-14 10:30:18 -05:00
|
|
|
logger = require "logger-sharelatex"
|
|
|
|
settings = require 'settings-sharelatex'
|
2019-07-09 06:45:00 -04:00
|
|
|
RedisClientManager = require "./RedisClientManager"
|
2015-12-01 06:05:49 -05:00
|
|
|
SafeJsonParse = require "./SafeJsonParse"
|
2019-03-21 10:50:27 -04:00
|
|
|
EventLogger = require "./EventLogger"
|
2019-04-15 09:05:26 -04:00
|
|
|
HealthCheckManager = require "./HealthCheckManager"
|
2019-07-18 06:25:10 -04:00
|
|
|
RoomManager = require "./RoomManager"
|
|
|
|
ChannelManager = require "./ChannelManager"
|
2019-04-09 09:48:00 -04:00
|
|
|
metrics = require "metrics-sharelatex"
|
2014-11-14 10:30:18 -05:00
|
|
|
|
2015-11-30 10:40:03 -05:00
|
|
|
MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb
|
2015-11-30 10:25:09 -05:00
|
|
|
|
2014-11-14 10:30:18 -05:00
|
|
|
module.exports = DocumentUpdaterController =
|
|
|
|
# DocumentUpdaterController is responsible for updates that come via Redis
|
|
|
|
# Pub/Sub from the document updater.
|
2019-07-11 06:10:33 -04:00
|
|
|
rclientList: RedisClientManager.createClientList(settings.redis.pubsub)
|
2014-11-14 10:30:18 -05:00
|
|
|
|
|
|
|
listenForUpdatesFromDocumentUpdater: (io) ->
|
2019-07-09 09:18:39 -04:00
|
|
|
logger.log {rclients: @rclientList.length}, "listening for applied-ops events"
|
2019-07-09 07:01:58 -04:00
|
|
|
for rclient, i in @rclientList
|
2019-06-29 07:28:54 -04:00
|
|
|
rclient.subscribe "applied-ops"
|
|
|
|
rclient.on "message", (channel, message) ->
|
|
|
|
metrics.inc "rclient", 0.001 # global event rate metric
|
|
|
|
EventLogger.debugEvent(channel, message) if settings.debugEvents > 0
|
|
|
|
DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message)
|
2019-07-11 06:11:11 -04:00
|
|
|
# create metrics for each redis instance only when we have multiple redis clients
|
|
|
|
if @rclientList.length > 1
|
|
|
|
for rclient, i in @rclientList
|
|
|
|
do (i) ->
|
|
|
|
rclient.on "message", () ->
|
|
|
|
metrics.inc "rclient-#{i}", 0.001 # per client event rate metric
|
2019-07-23 12:02:09 -04:00
|
|
|
@handleRoomUpdates(@rclientList)
|
2019-07-18 06:25:10 -04:00
|
|
|
|
2019-07-23 12:02:09 -04:00
|
|
|
handleRoomUpdates: (rclientSubList) ->
|
2019-07-18 06:25:10 -04:00
|
|
|
roomEvents = RoomManager.eventSource()
|
|
|
|
roomEvents.on 'doc-active', (doc_id) ->
|
2019-07-23 12:02:09 -04:00
|
|
|
subscribePromises = for rclient in rclientSubList
|
|
|
|
ChannelManager.subscribe rclient, "applied-ops", doc_id
|
2019-07-24 09:30:48 -04:00
|
|
|
RoomManager.emitOnCompletion(subscribePromises, "doc-subscribed-#{doc_id}")
|
2019-07-18 06:25:10 -04:00
|
|
|
roomEvents.on 'doc-empty', (doc_id) ->
|
2019-07-23 12:02:09 -04:00
|
|
|
for rclient in rclientSubList
|
|
|
|
ChannelManager.unsubscribe rclient, "applied-ops", doc_id
|
2019-07-18 06:25:10 -04:00
|
|
|
|
2014-11-14 10:30:18 -05:00
|
|
|
_processMessageFromDocumentUpdater: (io, channel, message) ->
|
2015-12-01 06:05:49 -05:00
|
|
|
SafeJsonParse.parse message, (error, message) ->
|
|
|
|
if error?
|
|
|
|
logger.error {err: error, channel}, "error parsing JSON"
|
|
|
|
return
|
|
|
|
if message.op?
|
2019-07-15 08:45:34 -04:00
|
|
|
if message._id? && settings.checkEventOrder
|
2019-04-11 07:53:43 -04:00
|
|
|
status = EventLogger.checkEventOrder("applied-ops", message._id, message)
|
|
|
|
if status is 'duplicate'
|
|
|
|
return # skip duplicate events
|
2015-12-01 06:05:49 -05:00
|
|
|
DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op)
|
|
|
|
else if message.error?
|
|
|
|
DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message)
|
2019-02-15 10:23:59 -05:00
|
|
|
else if message.health_check?
|
|
|
|
logger.debug {message}, "got health check message in applied ops channel"
|
2019-04-15 09:05:26 -04:00
|
|
|
HealthCheckManager.check channel, message.key
|
2014-11-14 10:30:18 -05:00
|
|
|
|
|
|
|
_applyUpdateFromDocumentUpdater: (io, doc_id, update) ->
|
2019-04-09 09:48:00 -04:00
|
|
|
clientList = io.sockets.clients(doc_id)
|
2019-04-11 07:53:43 -04:00
|
|
|
# avoid unnecessary work if no clients are connected
|
|
|
|
if clientList.length is 0
|
|
|
|
return
|
|
|
|
# send updates to clients
|
|
|
|
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, socketIoClients: (client.id for client in clientList), "distributing updates to clients"
|
2019-04-09 09:48:00 -04:00
|
|
|
seen = {}
|
|
|
|
# send messages only to unique clients (due to duplicate entries in io.sockets.clients)
|
|
|
|
for client in clientList when not seen[client.id]
|
|
|
|
seen[client.id] = true
|
2020-06-04 10:52:13 -04:00
|
|
|
if client.publicId == update.meta.source
|
2014-11-14 10:30:18 -05:00
|
|
|
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, "distributing update to sender"
|
2020-02-05 05:05:36 -05:00
|
|
|
client.emit "otUpdateApplied", v: update.v, doc: update.doc
|
2015-11-19 05:58:28 -05:00
|
|
|
else if !update.dup # Duplicate ops should just be sent back to sending client for acknowledgement
|
2014-11-14 10:30:18 -05:00
|
|
|
logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, client_id: client.id, "distributing update to collaborator"
|
2020-02-05 05:05:36 -05:00
|
|
|
client.emit "otUpdateApplied", update
|
2019-04-09 09:48:00 -04:00
|
|
|
if Object.keys(seen).length < clientList.length
|
|
|
|
metrics.inc "socket-io.duplicate-clients", 0.1
|
|
|
|
logger.log doc_id: doc_id, socketIoClients: (client.id for client in clientList), "discarded duplicate clients"
|
2014-11-14 10:30:18 -05:00
|
|
|
|
|
|
|
_processErrorFromDocumentUpdater: (io, doc_id, error, message) ->
|
|
|
|
for client in io.sockets.clients(doc_id)
|
2017-11-10 10:01:23 -05:00
|
|
|
logger.warn err: error, doc_id: doc_id, client_id: client.id, "error from document updater, disconnecting client"
|
2020-02-05 05:05:36 -05:00
|
|
|
client.emit "otUpdateError", error, message
|
|
|
|
client.disconnect()
|
2014-11-14 10:30:18 -05:00
|
|
|
|
|
|
|
|