Merge pull request #19455 from overleaf/jpa-metrics

[misc] add metrics for document processing/broadcasting

GitOrigin-RevId: d81de0dfb7a91863547631580f3c85f569718130
This commit is contained in:
Jakob Ackermann 2024-07-18 15:01:09 +02:00 committed by Copybot
parent 6afb067737
commit 0f0d562786
23 changed files with 280 additions and 71 deletions

View file

@ -205,7 +205,7 @@ app.use((error, req, res, next) => {
if (error instanceof Errors.NotFoundError) {
return res.sendStatus(404)
} else if (error instanceof Errors.OpRangeNotAvailableError) {
return res.sendStatus(422) // Unprocessable Entity
return res.status(422).json(error.info)
} else if (error instanceof Errors.FileTooLargeError) {
return res.sendStatus(413)
} else if (error.statusCode === 413) {

View file

@ -1,51 +1,12 @@
/* eslint-disable
no-proto,
no-unused-vars,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
let Errors
function NotFoundError(message) {
const error = new Error(message)
error.name = 'NotFoundError'
error.__proto__ = NotFoundError.prototype
return error
}
NotFoundError.prototype.__proto__ = Error.prototype
const OError = require('@overleaf/o-error')
function OpRangeNotAvailableError(message) {
const error = new Error(message)
error.name = 'OpRangeNotAvailableError'
error.__proto__ = OpRangeNotAvailableError.prototype
return error
}
OpRangeNotAvailableError.prototype.__proto__ = Error.prototype
class NotFoundError extends OError {}
class OpRangeNotAvailableError extends OError {}
class ProjectStateChangedError extends OError {}
class DeleteMismatchError extends OError {}
class FileTooLargeError extends OError {}
function ProjectStateChangedError(message) {
const error = new Error(message)
error.name = 'ProjectStateChangedError'
error.__proto__ = ProjectStateChangedError.prototype
return error
}
ProjectStateChangedError.prototype.__proto__ = Error.prototype
function DeleteMismatchError(message) {
const error = new Error(message)
error.name = 'DeleteMismatchError'
error.__proto__ = DeleteMismatchError.prototype
return error
}
DeleteMismatchError.prototype.__proto__ = Error.prototype
function FileTooLargeError(message) {
const error = new Error(message)
error.name = 'FileTooLargeError'
error.__proto__ = FileTooLargeError.prototype
return error
}
FileTooLargeError.prototype.__proto__ = Error.prototype
module.exports = Errors = {
module.exports = {
NotFoundError,
OpRangeNotAvailableError,
ProjectStateChangedError,

View file

@ -44,6 +44,7 @@ function getDoc(req, res, next) {
ops,
ranges,
pathname,
ttlInS: RedisManager.DOC_OPS_TTL,
})
}
)

View file

@ -149,7 +149,7 @@ function getProjectDocsAndFlushIfOld(
// we can't return docs if project structure has changed
if (projectStateChanged) {
return callback(
Errors.ProjectStateChangedError('project state changed')
new Errors.ProjectStateChangedError('project state changed')
)
}
// project structure hasn't changed, return doc content from redis

View file

@ -38,6 +38,7 @@ const RealTimeRedisManager = {
// the MULTI's operations are all done on the same node in a
// cluster environment.
const multi = rclient.multi()
multi.llen(Keys.pendingUpdates({ doc_id: docId }))
multi.lrange(
Keys.pendingUpdates({ doc_id: docId }),
0,
@ -49,19 +50,23 @@ const RealTimeRedisManager = {
-1
)
return multi.exec(function (error, replys) {
let jsonUpdate
if (error != null) {
return callback(error)
}
const jsonUpdates = replys[0]
for (jsonUpdate of Array.from(jsonUpdates)) {
const [llen, jsonUpdates, _trimResult] = replys
metrics.histogram(
'redis.pendingUpdates.llen',
llen,
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 75, 100]
)
for (const jsonUpdate of jsonUpdates) {
// record metric for each update removed from queue
metrics.summary('redis.pendingUpdates', jsonUpdate.length, {
status: 'pop',
})
}
const updates = []
for (jsonUpdate of Array.from(jsonUpdates)) {
for (const jsonUpdate of jsonUpdates) {
let update
try {
update = JSON.parse(jsonUpdate)
@ -78,6 +83,33 @@ const RealTimeRedisManager = {
return rclient.llen(Keys.pendingUpdates({ doc_id: docId }), callback)
},
sendCanaryAppliedOp({ projectId, docId, op }) {
const ack = JSON.stringify({ v: op.v, doc: docId }).length
// Updates with op.dup===true will not get sent to other clients, they only get acked.
const broadcast = op.dup ? 0 : JSON.stringify(op).length
const payload = JSON.stringify({
message: 'canary-applied-op',
payload: {
ack,
broadcast,
docId,
projectId,
source: op.meta.source,
},
})
// Publish on the editor-events channel of the project as real-time already listens to that before completing the connection startup.
// publish on separate channels for individual projects and docs when
// configured (needs realtime to be configured for this too).
if (Settings.publishOnIndividualChannels) {
return pubsubClient.publish(`editor-events:${projectId}`, payload)
} else {
return pubsubClient.publish('editor-events', payload)
}
},
sendData(data) {
// create a unique message id using a counter
const messageId = `doc:${HOST}:${RND}-${COUNT++}`

View file

@ -396,7 +396,8 @@ const RedisManager = {
if (start < firstVersionInRedis || end > version) {
error = new Errors.OpRangeNotAvailableError(
'doc ops range is not loaded in redis'
'doc ops range is not loaded in redis',
{ firstVersionInRedis, version, ttlInS: RedisManager.DOC_OPS_TTL }
)
logger.debug(
{ err: error, docId, length, version, start, end },

View file

@ -11,6 +11,8 @@
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let ShareJsDB
const logger = require('@overleaf/logger')
const Metrics = require('@overleaf/metrics')
const Keys = require('./UpdateKeys')
const RedisManager = require('./RedisManager')
const Errors = require('./Errors')
@ -25,10 +27,15 @@ module.exports = ShareJsDB = class ShareJsDB {
// ShareJS calls this detacted from the instance, so we need
// bind it to keep our context that can access @appliedOps
this.writeOp = this._writeOp.bind(this)
this.startTimeShareJsDB = performance.now()
}
getOps(docKey, start, end, callback) {
if (start === end) {
Metrics.inc('transform-updates', 1, {
status: 'is-up-to-date',
path: 'sharejs',
})
return callback(null, [])
}
@ -40,7 +47,60 @@ module.exports = ShareJsDB = class ShareJsDB {
}
const [projectId, docId] = Array.from(Keys.splitProjectIdAndDocId(docKey))
return RedisManager.getPreviousDocOps(docId, start, end, callback)
const timer = new Metrics.Timer(
'transform-updates.timing',
1,
{ path: 'sharejs' },
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 50, 100, 200, 500, 1000]
)
RedisManager.getPreviousDocOps(docId, start, end, (err, ops) => {
let status
if (err) {
if (err instanceof Errors.OpRangeNotAvailableError) {
status = 'out-of-range'
} else {
status = 'error'
}
} else {
if (ops.length === 0) {
status = 'fetched-zero'
} else {
status = 'fetched'
if (start === this.version && end === -1) {
// The sharejs processing is happening under a lock.
// this.version is the version as read from redis under lock.
// In case there are any new ops available, something bypassed the lock (or we overran it).
logger.warn(
{
projectId,
docId,
start,
nOps: ops.length,
timeSinceShareJsDBInit:
performance.now() - this.startTimeShareJsDB,
},
'concurrent update of docOps while transforming update'
)
}
}
Metrics.histogram(
'transform-updates.count',
ops.length,
[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 75, 100,
// prepare buckets for full-project history/larger buffer experiments
150,
200, 300, 400,
],
{ path: 'sharejs', status }
)
}
timer.done({ status })
Metrics.inc('transform-updates', 1, { status, path: 'sharejs' })
callback(err, ops)
})
}
_writeOp(docKey, opData, callback) {

View file

@ -128,11 +128,16 @@ const ShareJsUpdateManager = {
},
_sendOp(projectId, docId, op) {
return RealTimeRedisManager.sendData({
RealTimeRedisManager.sendData({
project_id: projectId,
doc_id: docId,
op,
})
RealTimeRedisManager.sendCanaryAppliedOp({
projectId,
docId,
op,
})
},
_computeHash(content) {

View file

@ -15,7 +15,9 @@ describe('HttpController', function () {
'./ProjectManager': (this.ProjectManager = {}),
'./ProjectFlusher': { flushAllProjects() {} },
'./DeleteQueueManager': (this.DeleteQueueManager = {}),
'./RedisManager': (this.RedisManager = {}),
'./RedisManager': (this.RedisManager = {
DOC_OPS_TTL: 42,
}),
'./Metrics': (this.Metrics = {}),
'./Errors': Errors,
'@overleaf/settings': { max_doc_length: 2 * 1024 * 1024 },
@ -84,6 +86,7 @@ describe('HttpController', function () {
ops: [],
ranges: this.ranges,
pathname: this.pathname,
ttlInS: 42,
})
.should.equal(true)
})
@ -134,6 +137,7 @@ describe('HttpController', function () {
ops: this.ops,
ranges: this.ranges,
pathname: this.pathname,
ttlInS: 42,
})
.should.equal(true)
})

View file

@ -49,7 +49,10 @@ describe('RealTimeRedisManager', function () {
.returns(Buffer.from([0x1, 0x2, 0x3, 0x4])),
}),
os: (this.os = { hostname: sinon.stub().returns('somehost') }),
'./Metrics': (this.metrics = { summary: sinon.stub() }),
'./Metrics': (this.metrics = {
summary: sinon.stub(),
histogram: sinon.stub(),
}),
},
})
@ -60,6 +63,7 @@ describe('RealTimeRedisManager', function () {
describe('getPendingUpdatesForDoc', function () {
beforeEach(function () {
this.rclient.llen = sinon.stub()
this.rclient.lrange = sinon.stub()
return (this.rclient.ltrim = sinon.stub())
})
@ -71,9 +75,7 @@ describe('RealTimeRedisManager', function () {
{ op: [{ i: 'foo', p: 4 }] },
]
this.jsonUpdates = this.updates.map(update => JSON.stringify(update))
this.rclient.exec = sinon
.stub()
.callsArgWith(0, null, [this.jsonUpdates])
this.rclient.exec = sinon.stub().yields(null, [2, this.jsonUpdates])
return this.RealTimeRedisManager.getPendingUpdatesForDoc(
this.doc_id,
this.callback
@ -103,9 +105,7 @@ describe('RealTimeRedisManager', function () {
JSON.stringify({ op: [{ i: 'foo', p: 4 }] }),
'broken json',
]
this.rclient.exec = sinon
.stub()
.callsArgWith(0, null, [this.jsonUpdates])
this.rclient.exec = sinon.stub().yields(null, [2, this.jsonUpdates])
return this.RealTimeRedisManager.getPendingUpdatesForDoc(
this.doc_id,
this.callback

View file

@ -407,6 +407,18 @@ describe('RedisManager', function () {
.should.equal(true)
})
it('should send details for metrics', function () {
this.callback.should.have.been.calledWith(
sinon.match({
info: {
firstVersionInRedis: this.first_version_in_redis,
version: this.version,
ttlInS: this.RedisManager.DOC_OPS_TTL,
},
})
)
})
it('should log out the problem as a debug message', function () {
this.logger.debug.called.should.equal(true)
})

View file

@ -26,6 +26,13 @@ describe('ShareJsDB', function () {
requires: {
'./RedisManager': (this.RedisManager = {}),
'./Errors': Errors,
'@overleaf/metrics': {
inc: sinon.stub(),
histogram: sinon.stub(),
Timer: class Timer {
done() {}
},
},
},
})
@ -89,6 +96,7 @@ describe('ShareJsDB', function () {
beforeEach(function () {
this.start = 35
this.end = 42
this.ops = new Array(this.end - this.start)
this.RedisManager.getPreviousDocOps = sinon
.stub()
.callsArgWith(3, null, this.ops)
@ -110,6 +118,7 @@ describe('ShareJsDB', function () {
beforeEach(function () {
this.start = 35
this.end = null
this.ops = []
this.RedisManager.getPreviousDocOps = sinon
.stub()
.callsArgWith(3, null, this.ops)

View file

@ -33,7 +33,9 @@ describe('ShareJsUpdateManager', function () {
return (this.rclient = { auth() {} })
},
},
'./RealTimeRedisManager': (this.RealTimeRedisManager = {}),
'./RealTimeRedisManager': (this.RealTimeRedisManager = {
sendCanaryAppliedOp: sinon.stub(),
}),
'./Metrics': (this.metrics = { inc: sinon.stub() }),
},
globals: {

View file

@ -105,6 +105,11 @@ module.exports = DocumentUpdaterController = {
if (clientList.length === 0) {
return
}
update.meta = update.meta || {}
const { tsRT: realTimeIngestionTime } = update.meta
delete update.meta.tsRT
// send updates to clients
logger.debug(
{
@ -129,6 +134,15 @@ module.exports = DocumentUpdaterController = {
},
'distributing update to sender'
)
metrics.histogram(
'update-processing-time',
performance.now() - realTimeIngestionTime,
[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 50, 100, 200, 500, 1000,
2000, 5000, 10000,
],
{ path: 'sharejs' }
)
client.emit('otUpdateApplied', { v: update.v, doc: update.doc })
} else if (!update.dup) {
// Duplicate ops should just be sent back to sending client for acknowledgement

View file

@ -42,7 +42,16 @@ const DocumentUpdaterManager = {
return callback(error)
}
body = body || {}
callback(null, body.lines, body.version, body.ranges, body.ops)
callback(
null,
body.lines,
body.version,
body.ranges,
body.ops,
body.ttlInS
)
} else if (res.statusCode === 422 && body?.firstVersionInRedis) {
callback(new ClientRequestedMissingOpsError(422, body))
} else if ([404, 422].includes(res.statusCode)) {
callback(new ClientRequestedMissingOpsError(res.statusCode))
} else {

View file

@ -1,9 +1,10 @@
const OError = require('@overleaf/o-error')
class ClientRequestedMissingOpsError extends OError {
constructor(statusCode) {
constructor(statusCode, info = {}) {
super('doc updater could not load requested ops', {
statusCode,
...info,
})
}
}

View file

@ -11,8 +11,28 @@ const {
JoinLeaveEpochMismatchError,
NotAuthorizedError,
NotJoinedError,
ClientRequestedMissingOpsError,
} = require('./Errors')
const JOIN_DOC_CATCH_UP_LENGTH_BUCKETS = [
0, 5, 10, 25, 50, 100, 150, 200, 250, 500, 1000,
]
const JOIN_DOC_CATCH_UP_AGE = [
0,
1,
2,
5,
10,
20,
30,
60,
120,
240,
600,
60 * 60,
24 * 60 * 60,
].map(x => x * 1000)
let WebsocketController
module.exports = WebsocketController = {
// If the protocol version changes when the client reconnects,
@ -195,6 +215,38 @@ module.exports = WebsocketController = {
'client joining doc'
)
const emitJoinDocCatchUpMetrics = (
status,
{ firstVersionInRedis, version, ttlInS }
) => {
if (fromVersion === -1) return // full joinDoc call
if (typeof options.age !== 'number') return // old frontend
if (!ttlInS) return // old document-updater pod
const isStale = options.age > ttlInS * 1000
const method = isStale ? 'stale' : 'recent'
metrics.histogram(
'join-doc-catch-up-length',
version - fromVersion,
JOIN_DOC_CATCH_UP_LENGTH_BUCKETS,
{ status, method, path: client.transport }
)
if (firstVersionInRedis) {
metrics.histogram(
'join-doc-catch-up-length-extra-needed',
firstVersionInRedis - fromVersion,
JOIN_DOC_CATCH_UP_LENGTH_BUCKETS,
{ status, method, path: client.transport }
)
}
metrics.histogram(
'join-doc-catch-up-age',
options.age,
JOIN_DOC_CATCH_UP_AGE,
{ status, path: client.transport }
)
}
WebsocketController._assertClientAuthorization(
client,
docId,
@ -231,10 +283,14 @@ module.exports = WebsocketController = {
projectId,
docId,
fromVersion,
function (error, lines, version, ranges, ops) {
function (error, lines, version, ranges, ops, ttlInS) {
if (error) {
if (error instanceof ClientRequestedMissingOpsError) {
emitJoinDocCatchUpMetrics('missing', error.info)
}
return callback(error)
}
emitJoinDocCatchUpMetrics('success', { version, ttlInS })
if (client.disconnected) {
metrics.inc('editor.join-doc.disconnected', 1, {
status: 'after-doc-updater-call',
@ -503,6 +559,7 @@ module.exports = WebsocketController = {
}
update.meta.source = client.publicId
update.meta.user_id = userId
update.meta.tsRT = performance.now()
metrics.inc('editor.doc-update', 0.3, { status: client.transport })
logger.debug(

View file

@ -1,5 +1,6 @@
const Settings = require('@overleaf/settings')
const logger = require('@overleaf/logger')
const Metrics = require('@overleaf/metrics')
const RedisClientManager = require('./RedisClientManager')
const SafeJsonParse = require('./SafeJsonParse')
const EventLogger = require('./EventLogger')
@ -24,6 +25,11 @@ const RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST = [
'toggle-track-changes',
'projectRenamedOrDeletedByExternalSource',
]
const BANDWIDTH_BUCKETS = [0]
// 64 bytes ... 8MB
for (let i = 5; i <= 22; i++) {
BANDWIDTH_BUCKETS.push(2 << i)
}
let WebsocketLoadBalancer
module.exports = WebsocketLoadBalancer = {
@ -139,6 +145,31 @@ module.exports = WebsocketLoadBalancer = {
for (const client of clientList) {
ConnectedUsersManager.refreshClient(message.room_id, client.publicId)
}
} else if (message.message === 'canary-applied-op') {
const { ack, broadcast, source, projectId, docId } = message.payload
const estimateBandwidth = (room, path) => {
const seen = new Set()
for (const client of io.sockets.clients(room)) {
if (seen.has(client.id)) continue
seen.add(client.id)
let v = client.id === source ? ack : broadcast
if (v === 0) {
// Acknowledgements with update.dup===true will not get sent to other clients.
continue
}
v += `5:::{"name":"otUpdateApplied","args":[]}`.length
Metrics.histogram(
'estimated-applied-ops-bandwidth',
v,
BANDWIDTH_BUCKETS,
{ path }
)
}
}
estimateBandwidth(projectId, 'per-project')
estimateBandwidth(docId, 'per-doc')
} else if (message.room_id) {
if (message._id && Settings.checkEventOrder) {
const status = EventLogger.checkEventOrder(

View file

@ -129,7 +129,7 @@ describe('applyOtUpdate', function () {
let [update] = Array.from(rest[0])
update = JSON.parse(update)
update.op.should.deep.equal(this.update.op)
update.meta.should.deep.equal({
update.meta.should.include({
source: this.client.publicId,
user_id: this.user_id,
})
@ -408,7 +408,7 @@ describe('applyOtUpdate', function () {
let [update] = Array.from(rest[0])
update = JSON.parse(update)
update.op.should.deep.equal(this.comment_update.op)
update.meta.should.deep.equal({
update.meta.should.include({
source: this.client.publicId,
user_id: this.user_id,
})

View file

@ -55,7 +55,10 @@ describe('DocumentUpdaterController', function () {
}),
'./EventLogger': (this.EventLogger = { checkEventOrder: sinon.stub() }),
'./HealthCheckManager': { check: sinon.stub() },
'@overleaf/metrics': (this.metrics = { inc: sinon.stub() }),
'@overleaf/metrics': (this.metrics = {
inc: sinon.stub(),
histogram: sinon.stub(),
}),
'./RoomManager': (this.RoomManager = {
eventSource: sinon.stub().returns(this.RoomEvents),
}),

View file

@ -459,7 +459,7 @@ export class DocumentContainer extends EventEmitter {
'joinDoc',
this.doc_id,
this.doc.getVersion(),
{ encodeRanges: true },
{ encodeRanges: true, age: this.doc.getTimeSinceLastServerActivity() },
(error, docLines, version, updates, ranges) => {
if (error) {
callback?.(error)

View file

@ -257,6 +257,10 @@ export class ShareJsDoc extends EventEmitter {
return this._doc.version
}
getTimeSinceLastServerActivity() {
return Math.floor(performance.now() - this._doc.lastServerActivity)
}
getType() {
return this.type
}

View file

@ -909,6 +909,7 @@ export const { Doc } = (() => {
openData = {};
}
this.version = openData.v;
this.lastServerActivity = performance.now()
this.snapshot = openData.snaphot;
if (openData.type) {
this._setType(openData.type);
@ -1169,6 +1170,7 @@ export const { Doc } = (() => {
}
this.version++;
this.lastServerActivity = performance.now()
this.emit('acknowledge', oldInflightOp);
var _iteratorNormalCompletion13 = true;
var _didIteratorError13 = false;
@ -1236,6 +1238,7 @@ export const { Doc } = (() => {
}
this.version++;
this.lastServerActivity = performance.now()
// Finally, apply the op to @snapshot and trigger any event listeners
return this._otApply(docOp, true, msg);
} else if (msg.meta) {