2020-06-23 13:29:38 -04:00
|
|
|
/* eslint-disable
|
|
|
|
camelcase,
|
|
|
|
no-unused-vars,
|
|
|
|
*/
|
|
|
|
// TODO: This file was created by bulk-decaffeinate.
|
|
|
|
// Fix any style issues and re-enable lint.
|
2020-06-23 13:29:34 -04:00
|
|
|
/*
|
|
|
|
* decaffeinate suggestions:
|
|
|
|
* DS101: Remove unnecessary use of Array.from
|
|
|
|
* DS102: Remove unnecessary code created because of implicit returns
|
|
|
|
* DS205: Consider reworking code to avoid use of IIFEs
|
|
|
|
* DS207: Consider shorter variations of null checks
|
|
|
|
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
|
|
|
*/
|
2020-06-23 13:29:44 -04:00
|
|
|
let DocumentUpdaterController
|
|
|
|
const logger = require('logger-sharelatex')
|
|
|
|
const settings = require('settings-sharelatex')
|
|
|
|
const RedisClientManager = require('./RedisClientManager')
|
|
|
|
const SafeJsonParse = require('./SafeJsonParse')
|
|
|
|
const EventLogger = require('./EventLogger')
|
|
|
|
const HealthCheckManager = require('./HealthCheckManager')
|
|
|
|
const RoomManager = require('./RoomManager')
|
|
|
|
const ChannelManager = require('./ChannelManager')
|
|
|
|
const metrics = require('metrics-sharelatex')
|
2014-11-14 10:30:18 -05:00
|
|
|
|
2020-06-23 13:29:44 -04:00
|
|
|
const MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 // 1Mb
|
2015-11-30 10:25:09 -05:00
|
|
|
|
2020-06-23 13:29:44 -04:00
|
|
|
module.exports = DocumentUpdaterController = {
|
|
|
|
// DocumentUpdaterController is responsible for updates that come via Redis
|
|
|
|
// Pub/Sub from the document updater.
|
|
|
|
rclientList: RedisClientManager.createClientList(settings.redis.pubsub),
|
2014-11-14 10:30:18 -05:00
|
|
|
|
2020-06-23 13:29:44 -04:00
|
|
|
listenForUpdatesFromDocumentUpdater(io) {
|
|
|
|
let i, rclient
|
|
|
|
logger.log(
|
|
|
|
{ rclients: this.rclientList.length },
|
|
|
|
'listening for applied-ops events'
|
|
|
|
)
|
|
|
|
for (i = 0; i < this.rclientList.length; i++) {
|
|
|
|
rclient = this.rclientList[i]
|
|
|
|
rclient.subscribe('applied-ops')
|
|
|
|
rclient.on('message', function (channel, message) {
|
|
|
|
metrics.inc('rclient', 0.001) // global event rate metric
|
|
|
|
if (settings.debugEvents > 0) {
|
|
|
|
EventLogger.debugEvent(channel, message)
|
|
|
|
}
|
|
|
|
return DocumentUpdaterController._processMessageFromDocumentUpdater(
|
|
|
|
io,
|
|
|
|
channel,
|
|
|
|
message
|
|
|
|
)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
// create metrics for each redis instance only when we have multiple redis clients
|
|
|
|
if (this.rclientList.length > 1) {
|
|
|
|
for (i = 0; i < this.rclientList.length; i++) {
|
|
|
|
rclient = this.rclientList[i]
|
|
|
|
;((
|
|
|
|
i // per client event rate metric
|
|
|
|
) => rclient.on('message', () => metrics.inc(`rclient-${i}`, 0.001)))(i)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return this.handleRoomUpdates(this.rclientList)
|
|
|
|
},
|
2019-07-18 06:25:10 -04:00
|
|
|
|
2020-06-23 13:29:44 -04:00
|
|
|
handleRoomUpdates(rclientSubList) {
|
|
|
|
const roomEvents = RoomManager.eventSource()
|
|
|
|
roomEvents.on('doc-active', function (doc_id) {
|
|
|
|
const subscribePromises = Array.from(rclientSubList).map((rclient) =>
|
|
|
|
ChannelManager.subscribe(rclient, 'applied-ops', doc_id)
|
|
|
|
)
|
|
|
|
return RoomManager.emitOnCompletion(
|
|
|
|
subscribePromises,
|
|
|
|
`doc-subscribed-${doc_id}`
|
|
|
|
)
|
|
|
|
})
|
|
|
|
return roomEvents.on('doc-empty', (doc_id) =>
|
|
|
|
Array.from(rclientSubList).map((rclient) =>
|
|
|
|
ChannelManager.unsubscribe(rclient, 'applied-ops', doc_id)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
},
|
2019-07-18 06:25:10 -04:00
|
|
|
|
2020-06-23 13:29:44 -04:00
|
|
|
_processMessageFromDocumentUpdater(io, channel, message) {
|
|
|
|
return SafeJsonParse.parse(message, function (error, message) {
|
|
|
|
if (error != null) {
|
|
|
|
logger.error({ err: error, channel }, 'error parsing JSON')
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if (message.op != null) {
|
|
|
|
if (message._id != null && settings.checkEventOrder) {
|
|
|
|
const status = EventLogger.checkEventOrder(
|
|
|
|
'applied-ops',
|
|
|
|
message._id,
|
|
|
|
message
|
|
|
|
)
|
|
|
|
if (status === 'duplicate') {
|
|
|
|
return // skip duplicate events
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return DocumentUpdaterController._applyUpdateFromDocumentUpdater(
|
|
|
|
io,
|
|
|
|
message.doc_id,
|
|
|
|
message.op
|
|
|
|
)
|
|
|
|
} else if (message.error != null) {
|
|
|
|
return DocumentUpdaterController._processErrorFromDocumentUpdater(
|
|
|
|
io,
|
|
|
|
message.doc_id,
|
|
|
|
message.error,
|
|
|
|
message
|
|
|
|
)
|
|
|
|
} else if (message.health_check != null) {
|
|
|
|
logger.debug(
|
|
|
|
{ message },
|
|
|
|
'got health check message in applied ops channel'
|
|
|
|
)
|
|
|
|
return HealthCheckManager.check(channel, message.key)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
},
|
2014-11-14 10:30:18 -05:00
|
|
|
|
2020-06-23 13:29:44 -04:00
|
|
|
_applyUpdateFromDocumentUpdater(io, doc_id, update) {
|
|
|
|
let client
|
|
|
|
const clientList = io.sockets.clients(doc_id)
|
|
|
|
// avoid unnecessary work if no clients are connected
|
|
|
|
if (clientList.length === 0) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// send updates to clients
|
|
|
|
logger.log(
|
|
|
|
{
|
|
|
|
doc_id,
|
|
|
|
version: update.v,
|
|
|
|
source: update.meta != null ? update.meta.source : undefined,
|
|
|
|
socketIoClients: (() => {
|
|
|
|
const result = []
|
|
|
|
for (client of Array.from(clientList)) {
|
|
|
|
result.push(client.id)
|
|
|
|
}
|
|
|
|
return result
|
|
|
|
})()
|
|
|
|
},
|
|
|
|
'distributing updates to clients'
|
|
|
|
)
|
|
|
|
const seen = {}
|
|
|
|
// send messages only to unique clients (due to duplicate entries in io.sockets.clients)
|
|
|
|
for (client of Array.from(clientList)) {
|
|
|
|
if (!seen[client.id]) {
|
|
|
|
seen[client.id] = true
|
|
|
|
if (client.publicId === update.meta.source) {
|
|
|
|
logger.log(
|
|
|
|
{
|
|
|
|
doc_id,
|
|
|
|
version: update.v,
|
|
|
|
source: update.meta != null ? update.meta.source : undefined
|
|
|
|
},
|
|
|
|
'distributing update to sender'
|
|
|
|
)
|
|
|
|
client.emit('otUpdateApplied', { v: update.v, doc: update.doc })
|
|
|
|
} else if (!update.dup) {
|
|
|
|
// Duplicate ops should just be sent back to sending client for acknowledgement
|
|
|
|
logger.log(
|
|
|
|
{
|
|
|
|
doc_id,
|
|
|
|
version: update.v,
|
|
|
|
source: update.meta != null ? update.meta.source : undefined,
|
|
|
|
client_id: client.id
|
|
|
|
},
|
|
|
|
'distributing update to collaborator'
|
|
|
|
)
|
|
|
|
client.emit('otUpdateApplied', update)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (Object.keys(seen).length < clientList.length) {
|
|
|
|
metrics.inc('socket-io.duplicate-clients', 0.1)
|
|
|
|
return logger.log(
|
|
|
|
{
|
|
|
|
doc_id,
|
|
|
|
socketIoClients: (() => {
|
|
|
|
const result1 = []
|
|
|
|
for (client of Array.from(clientList)) {
|
|
|
|
result1.push(client.id)
|
|
|
|
}
|
|
|
|
return result1
|
|
|
|
})()
|
|
|
|
},
|
|
|
|
'discarded duplicate clients'
|
|
|
|
)
|
|
|
|
}
|
|
|
|
},
|
2014-11-14 10:30:18 -05:00
|
|
|
|
2020-06-23 13:29:44 -04:00
|
|
|
_processErrorFromDocumentUpdater(io, doc_id, error, message) {
|
|
|
|
return (() => {
|
|
|
|
const result = []
|
|
|
|
for (const client of Array.from(io.sockets.clients(doc_id))) {
|
|
|
|
logger.warn(
|
|
|
|
{ err: error, doc_id, client_id: client.id },
|
|
|
|
'error from document updater, disconnecting client'
|
|
|
|
)
|
|
|
|
client.emit('otUpdateError', error, message)
|
|
|
|
result.push(client.disconnect())
|
|
|
|
}
|
|
|
|
return result
|
|
|
|
})()
|
|
|
|
}
|
|
|
|
}
|