mirror of
https://github.com/overleaf/overleaf.git
synced 2025-01-20 11:01:18 +00:00
0f0d562786
[misc] add metrics for document processing/broadcasting GitOrigin-RevId: d81de0dfb7a91863547631580f3c85f569718130
251 lines
7.9 KiB
JavaScript
251 lines
7.9 KiB
JavaScript
const Settings = require('@overleaf/settings')
|
|
const logger = require('@overleaf/logger')
|
|
const Metrics = require('@overleaf/metrics')
|
|
const RedisClientManager = require('./RedisClientManager')
|
|
const SafeJsonParse = require('./SafeJsonParse')
|
|
const EventLogger = require('./EventLogger')
|
|
const HealthCheckManager = require('./HealthCheckManager')
|
|
const RoomManager = require('./RoomManager')
|
|
const ChannelManager = require('./ChannelManager')
|
|
const ConnectedUsersManager = require('./ConnectedUsersManager')
|
|
|
|
const RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST = [
|
|
'otUpdateApplied',
|
|
'otUpdateError',
|
|
'joinDoc',
|
|
'reciveNewDoc',
|
|
'reciveNewFile',
|
|
'reciveNewFolder',
|
|
'reciveEntityMove',
|
|
'reciveEntityRename',
|
|
'removeEntity',
|
|
'accept-changes',
|
|
'projectNameUpdated',
|
|
'rootDocUpdated',
|
|
'toggle-track-changes',
|
|
'projectRenamedOrDeletedByExternalSource',
|
|
]
|
|
const BANDWIDTH_BUCKETS = [0]
|
|
// 64 bytes ... 8MB
|
|
for (let i = 5; i <= 22; i++) {
|
|
BANDWIDTH_BUCKETS.push(2 << i)
|
|
}
|
|
|
|
let WebsocketLoadBalancer
|
|
module.exports = WebsocketLoadBalancer = {
|
|
rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub),
|
|
rclientSubList: RedisClientManager.createClientList(Settings.redis.pubsub),
|
|
|
|
shouldDisconnectClient(client, message) {
|
|
const userId = client.ol_context.user_id
|
|
if (message?.message === 'userRemovedFromProject') {
|
|
if (message?.payload?.includes(userId)) {
|
|
return true
|
|
}
|
|
} else if (message?.message === 'project:publicAccessLevel:changed') {
|
|
const [info] = message.payload
|
|
if (
|
|
info.newAccessLevel === 'private' &&
|
|
!client.ol_context.is_invited_member
|
|
) {
|
|
return true
|
|
}
|
|
} else if (message?.message === 'project:collaboratorAccessLevel:changed') {
|
|
const changedUserId = message.payload[0].userId
|
|
return userId === changedUserId
|
|
}
|
|
return false
|
|
},
|
|
|
|
emitToRoom(roomId, message, ...payload) {
|
|
if (!roomId) {
|
|
logger.warn(
|
|
{ message, payload },
|
|
'no room_id provided, ignoring emitToRoom'
|
|
)
|
|
return
|
|
}
|
|
const data = JSON.stringify({
|
|
room_id: roomId,
|
|
message,
|
|
payload,
|
|
})
|
|
logger.debug(
|
|
{ roomId, message, payload, length: data.length },
|
|
'emitting to room'
|
|
)
|
|
|
|
this.rclientPubList.map(rclientPub =>
|
|
ChannelManager.publish(rclientPub, 'editor-events', roomId, data)
|
|
)
|
|
},
|
|
|
|
emitToAll(message, ...payload) {
|
|
this.emitToRoom('all', message, ...payload)
|
|
},
|
|
|
|
listenForEditorEvents(io) {
|
|
logger.debug(
|
|
{ rclients: this.rclientSubList.length },
|
|
'listening for editor events'
|
|
)
|
|
for (const rclientSub of this.rclientSubList) {
|
|
rclientSub.subscribe('editor-events')
|
|
rclientSub.on('message', function (channel, message) {
|
|
if (Settings.debugEvents > 0) {
|
|
EventLogger.debugEvent(channel, message)
|
|
}
|
|
WebsocketLoadBalancer._processEditorEvent(io, channel, message)
|
|
})
|
|
}
|
|
this.handleRoomUpdates(this.rclientSubList)
|
|
},
|
|
|
|
handleRoomUpdates(rclientSubList) {
|
|
const roomEvents = RoomManager.eventSource()
|
|
roomEvents.on('project-active', function (projectId) {
|
|
const subscribePromises = rclientSubList.map(rclient =>
|
|
ChannelManager.subscribe(rclient, 'editor-events', projectId)
|
|
)
|
|
RoomManager.emitOnCompletion(
|
|
subscribePromises,
|
|
`project-subscribed-${projectId}`
|
|
)
|
|
})
|
|
roomEvents.on('project-empty', projectId =>
|
|
rclientSubList.map(rclient =>
|
|
ChannelManager.unsubscribe(rclient, 'editor-events', projectId)
|
|
)
|
|
)
|
|
},
|
|
|
|
_processEditorEvent(io, channel, message) {
|
|
SafeJsonParse.parse(message, function (error, message) {
|
|
if (error) {
|
|
logger.error({ err: error, channel }, 'error parsing JSON')
|
|
return
|
|
}
|
|
if (message.room_id === 'all') {
|
|
io.sockets.emit(message.message, ...message.payload)
|
|
} else if (
|
|
message.message === 'clientTracking.refresh' &&
|
|
message.room_id
|
|
) {
|
|
const clientList = io.sockets.clients(message.room_id)
|
|
logger.debug(
|
|
{
|
|
channel,
|
|
message: message.message,
|
|
roomId: message.room_id,
|
|
messageId: message._id,
|
|
socketIoClients: clientList.map(client => client.id),
|
|
},
|
|
'refreshing client list'
|
|
)
|
|
for (const client of clientList) {
|
|
ConnectedUsersManager.refreshClient(message.room_id, client.publicId)
|
|
}
|
|
} else if (message.message === 'canary-applied-op') {
|
|
const { ack, broadcast, source, projectId, docId } = message.payload
|
|
|
|
const estimateBandwidth = (room, path) => {
|
|
const seen = new Set()
|
|
for (const client of io.sockets.clients(room)) {
|
|
if (seen.has(client.id)) continue
|
|
seen.add(client.id)
|
|
let v = client.id === source ? ack : broadcast
|
|
if (v === 0) {
|
|
// Acknowledgements with update.dup===true will not get sent to other clients.
|
|
continue
|
|
}
|
|
v += `5:::{"name":"otUpdateApplied","args":[]}`.length
|
|
Metrics.histogram(
|
|
'estimated-applied-ops-bandwidth',
|
|
v,
|
|
BANDWIDTH_BUCKETS,
|
|
{ path }
|
|
)
|
|
}
|
|
}
|
|
|
|
estimateBandwidth(projectId, 'per-project')
|
|
estimateBandwidth(docId, 'per-doc')
|
|
} else if (message.room_id) {
|
|
if (message._id && Settings.checkEventOrder) {
|
|
const status = EventLogger.checkEventOrder(
|
|
'editor-events',
|
|
message._id,
|
|
message
|
|
)
|
|
if (status === 'duplicate') {
|
|
return // skip duplicate events
|
|
}
|
|
}
|
|
|
|
const isRestrictedMessage =
|
|
!RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST.includes(message.message)
|
|
|
|
// send messages only to unique clients (due to duplicate entries in io.sockets.clients)
|
|
const clientList = io.sockets.clients(message.room_id)
|
|
|
|
// avoid unnecessary work if no clients are connected
|
|
if (clientList.length === 0) {
|
|
return
|
|
}
|
|
logger.debug(
|
|
{
|
|
channel,
|
|
message: message.message,
|
|
roomId: message.room_id,
|
|
messageId: message._id,
|
|
socketIoClients: clientList.map(client => client.id),
|
|
},
|
|
'distributing event to clients'
|
|
)
|
|
const seen = new Map()
|
|
for (const client of clientList) {
|
|
if (!seen.has(client.id)) {
|
|
seen.set(client.id, true)
|
|
if (WebsocketLoadBalancer.shouldDisconnectClient(client, message)) {
|
|
logger.debug(
|
|
{
|
|
message,
|
|
userId: client?.ol_context?.user_id,
|
|
projectId: client?.ol_context?.project_id,
|
|
},
|
|
'disconnecting client'
|
|
)
|
|
if (
|
|
message?.message !== 'project:collaboratorAccessLevel:changed'
|
|
) {
|
|
client.emit('project:access:revoked')
|
|
}
|
|
client.disconnect()
|
|
} else {
|
|
if (isRestrictedMessage && client.ol_context.is_restricted_user) {
|
|
// hide restricted message
|
|
logger.debug(
|
|
{
|
|
message,
|
|
clientId: client.id,
|
|
userId: client.ol_context.user_id,
|
|
projectId: client.ol_context.project_id,
|
|
},
|
|
'hiding restricted message from client'
|
|
)
|
|
} else {
|
|
client.emit(message.message, ...message.payload)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else if (message.health_check) {
|
|
logger.debug(
|
|
{ message },
|
|
'got health check message in editor events channel'
|
|
)
|
|
HealthCheckManager.check(channel, message.key)
|
|
}
|
|
})
|
|
},
|
|
}
|