mirror of
https://github.com/overleaf/overleaf.git
synced 2025-04-10 20:27:13 +00:00
Merge branch 'bg-socket-leak-detection'
GitOrigin-RevId: 19c04cd195d0966b1f33eb4e4795db19d183dcf2
This commit is contained in:
parent
8e03a9a4f2
commit
540a5466f3
10 changed files with 271 additions and 5 deletions
|
@ -249,6 +249,7 @@ module.exports.register = promWrapper.registry
|
|||
|
||||
module.exports.http = require('./http')
|
||||
module.exports.open_sockets = require('./open_sockets')
|
||||
module.exports.leaked_sockets = require('./leaked_sockets')
|
||||
module.exports.event_loop = require('./event_loop')
|
||||
module.exports.memory = require('./memory')
|
||||
module.exports.mongodb = require('./mongodb')
|
||||
|
|
251
libraries/metrics/leaked_sockets.js
Normal file
251
libraries/metrics/leaked_sockets.js
Normal file
|
@ -0,0 +1,251 @@
|
|||
/**
|
||||
* This file monitors HTTP connections in Node.js and logs any potential socket leaks.
|
||||
* It uses the `diagnostics_channel` module to intercept requests and reponses in the
|
||||
* `http` module and tracks the lifetime of each http socket. If a socket is open for
|
||||
* longer than a specified time, it is considered a potential leak and its details are
|
||||
* logged along with the corresponding information from /proc/net/tcp.
|
||||
*/
|
||||
|
||||
const fs = require('fs')
|
||||
const diagnosticsChannel = require('diagnostics_channel')
|
||||
|
||||
const SOCKET_MONITOR_INTERVAL = 60 * 1000
|
||||
// set the threshold for logging leaked sockets in minutes, defaults to 15
|
||||
const MIN_SOCKET_LEAK_TIME =
|
||||
(parseInt(process.env.LEAKED_SOCKET_AGE_THRESHOLD, 10) || 15) * 60 * 1000
|
||||
|
||||
// Record HTTP events using diagnostics_channel
|
||||
|
||||
const channels = [
|
||||
'http.client.request.start',
|
||||
'http.client.response.finish',
|
||||
'http.server.request.start',
|
||||
'http.server.response.finish',
|
||||
]
|
||||
|
||||
for (const channel of channels) {
|
||||
diagnosticsChannel.subscribe(channel, handler(channel))
|
||||
}
|
||||
|
||||
function handler(channel) {
|
||||
return function ({ request: req, response: res }) {
|
||||
const socket = req?.socket || res?.socket
|
||||
if (!socket) {
|
||||
return
|
||||
}
|
||||
// If we haven't seen this socket before, add a debug object from the request
|
||||
if (!socket._ol_debug && req) {
|
||||
const { method, protocol, path, url, rawHeaders, _header } = req
|
||||
socket._ol_debug = {
|
||||
method,
|
||||
protocol,
|
||||
url: url ?? path,
|
||||
request: { headers: rawHeaders ?? _header, ts: new Date() },
|
||||
}
|
||||
} else if (socket._ol_debug && res) {
|
||||
// We've already seen the request, now add debug info from the response
|
||||
const { statusCode, statusMessage, headers, _header } = res
|
||||
Object.assign(socket._ol_debug, {
|
||||
response: {
|
||||
statusCode,
|
||||
statusMessage,
|
||||
headers: headers ?? _header,
|
||||
ts: new Date(),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Additional functions to log request headers with sensitive information redacted
|
||||
|
||||
function flattenHeaders(rawHeaders) {
|
||||
// Headers can be an array [KEY, VALUE, KEY, VALUE, ..]
|
||||
// an object {key:value, key:value, ...}
|
||||
// or a string of the headers separated by \r\n
|
||||
// Flatten the array and object headers into the string form.
|
||||
if (Array.isArray(rawHeaders)) {
|
||||
return rawHeaders
|
||||
.map((item, index) => (index % 2 === 0 ? `${item}: ` : `${item}\r\n`))
|
||||
.join('')
|
||||
} else if (typeof rawHeaders === 'object') {
|
||||
return Object.entries(rawHeaders)
|
||||
.map(([key, value]) => `${key}: ${value}\r\n`)
|
||||
.join('')
|
||||
} else if (typeof rawHeaders === 'string') {
|
||||
return rawHeaders
|
||||
} else {
|
||||
return JSON.stringify(rawHeaders)
|
||||
}
|
||||
}
|
||||
|
||||
const REDACT_REGEX = /^(Authorization|Set-Cookie|Cookie):.*?\r/gim
|
||||
|
||||
function redactObject(obj) {
|
||||
const result = {}
|
||||
for (const [key, value] of Object.entries(obj)) {
|
||||
if (key === 'headers') {
|
||||
// remove headers with sensitive information
|
||||
result[key] = flattenHeaders(value).replace(
|
||||
REDACT_REGEX,
|
||||
`$1: REDACTED\r`
|
||||
)
|
||||
} else if (
|
||||
typeof value === 'object' &&
|
||||
['request', 'response'].includes(key)
|
||||
) {
|
||||
result[key] = redactObject(value)
|
||||
} else {
|
||||
result[key] = value
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Check if an old socket has crossed the threshold for logging.
|
||||
// We log multiple times with an exponential backoff so we can
|
||||
// see how long a socket hangs around.
|
||||
|
||||
function isOldSocket(handle) {
|
||||
const now = new Date()
|
||||
const created = handle._ol_debug.request.ts
|
||||
const lastLoggedAt = handle._ol_debug.lastLoggedAt ?? created
|
||||
const nextLogTime = new Date(
|
||||
created.getTime() +
|
||||
Math.max(2 * (lastLoggedAt - created), MIN_SOCKET_LEAK_TIME)
|
||||
)
|
||||
return now >= nextLogTime
|
||||
}
|
||||
|
||||
function logOldSocket(logger, handle, tcpinfo) {
|
||||
const now = new Date()
|
||||
const info = Object.assign(
|
||||
{
|
||||
localAddress: handle.localAddress,
|
||||
localPort: handle.localPort,
|
||||
remoteAddress: handle.remoteAddress,
|
||||
remotePort: handle.remotePort,
|
||||
tcpinfo,
|
||||
age: Math.floor((now - handle._ol_debug.request.ts) / (60 * 1000)), // age in minutes
|
||||
},
|
||||
redactObject(handle._ol_debug)
|
||||
)
|
||||
handle._ol_debug.lastLoggedAt = now
|
||||
if (tcpinfo) {
|
||||
logger.error(info, 'old socket handle - tcp socket')
|
||||
} else {
|
||||
logger.warn(info, 'stale socket handle - no entry in /proc/net/tcp')
|
||||
}
|
||||
}
|
||||
|
||||
// Correlate socket handles with /proc/net/tcp entries using a key based on the
|
||||
// local and remote addresses and ports. This will allow us to distinguish between
|
||||
// sockets that are still open and sockets that have been closed and removed from
|
||||
// the /proc/net/tcp table but are still present in the node active handles array.
|
||||
|
||||
async function getOpenSockets() {
|
||||
// get open sockets remote and local address:port from /proc/net/tcp
|
||||
const procNetTcp = '/proc/net/tcp'
|
||||
const openSockets = new Map()
|
||||
const lines = await fs.promises.readFile(procNetTcp, 'utf8')
|
||||
for (const line of lines.split('\n')) {
|
||||
const socket = parseProcNetTcp(line)
|
||||
if (socket) {
|
||||
openSockets.set(socket, line)
|
||||
}
|
||||
}
|
||||
return openSockets
|
||||
}
|
||||
|
||||
function keyFromSocket(socket) {
|
||||
return `${socket.localAddress}:${socket.localPort} -> ${socket.remoteAddress}:${socket.remotePort}`
|
||||
}
|
||||
|
||||
function decodeHexIpAddress(hex) {
|
||||
// decode hex ip address to dotted decimal notation
|
||||
const ip = parseInt(hex, 16)
|
||||
const a = ip & 0xff
|
||||
const b = (ip >> 8) & 0xff
|
||||
const c = (ip >> 16) & 0xff
|
||||
const d = (ip >> 24) & 0xff
|
||||
return `${a}.${b}.${c}.${d}`
|
||||
}
|
||||
|
||||
function decodeHexPort(hex) {
|
||||
// decode hex port to decimal
|
||||
return parseInt(hex, 16)
|
||||
}
|
||||
|
||||
// Regex for extracting the local and remote addresses and ports from the /proc/net/tcp output
|
||||
// Example line:
|
||||
// 16: AB02A8C0:D9E2 86941864:01BB 01 00000000:00000000 02:000004BE 00000000 0 0 36802 2 0000000000000000 28 4 26 10 -1
|
||||
// ^^^^^^^^^^^^^ ^^^^^^^^^^^^^
|
||||
// local remote
|
||||
|
||||
const TCP_STATE_REGEX =
|
||||
/^\s*\d+:\s+(?<localHexAddress>[0-9A-F]{8}):(?<localHexPort>[0-9A-F]{4})\s+(?<remoteHexAddress>[0-9A-F]{8}):(?<remoteHexPort>[0-9A-F]{4})/i
|
||||
|
||||
function parseProcNetTcp(line) {
|
||||
const match = line.match(TCP_STATE_REGEX)
|
||||
if (match) {
|
||||
const { localHexAddress, localHexPort, remoteHexAddress, remoteHexPort } =
|
||||
match.groups
|
||||
return keyFromSocket({
|
||||
localAddress: decodeHexIpAddress(localHexAddress),
|
||||
localPort: decodeHexPort(localHexPort),
|
||||
remoteAddress: decodeHexIpAddress(remoteHexAddress),
|
||||
remotePort: decodeHexPort(remoteHexPort),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
let LeakedSocketsMonitor
|
||||
|
||||
// Export the monitor and scanSockets functions
|
||||
|
||||
module.exports = LeakedSocketsMonitor = {
|
||||
monitor(logger) {
|
||||
const interval = setInterval(
|
||||
() => LeakedSocketsMonitor.scanSockets(logger),
|
||||
SOCKET_MONITOR_INTERVAL
|
||||
)
|
||||
const Metrics = require('./index')
|
||||
return Metrics.registerDestructor(() => clearInterval(interval))
|
||||
},
|
||||
scanSockets(logger) {
|
||||
const debugSockets = process._getActiveHandles().filter(handle => {
|
||||
return handle._ol_debug
|
||||
})
|
||||
|
||||
// Bail out if there are no sockets with the _ol_debug property
|
||||
if (debugSockets.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
const oldSockets = debugSockets.filter(isOldSocket)
|
||||
|
||||
// Bail out if there are no old sockets to log
|
||||
if (oldSockets.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
// If there old sockets to log, get the connections from /proc/net/tcp
|
||||
// to distinguish between sockets that are still open and sockets that
|
||||
// have been closed and removed from the /proc/net/tcp table.
|
||||
getOpenSockets()
|
||||
.then(openSockets => {
|
||||
oldSockets.forEach(handle => {
|
||||
try {
|
||||
const key = keyFromSocket(handle)
|
||||
const tcpinfo = openSockets.get(key)
|
||||
logOldSocket(logger, handle, tcpinfo)
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'error in scanSockets')
|
||||
}
|
||||
})
|
||||
})
|
||||
.catch(err => {
|
||||
logger.error({ err }, 'error getting open sockets')
|
||||
})
|
||||
},
|
||||
}
|
|
@ -62,6 +62,9 @@ describe('CompileController', function () {
|
|||
},
|
||||
},
|
||||
}),
|
||||
'@overleaf/metrics': {
|
||||
Timer: sinon.stub().returns({ done: sinon.stub() }),
|
||||
},
|
||||
'./ProjectPersistenceManager': (this.ProjectPersistenceManager = {}),
|
||||
},
|
||||
})
|
||||
|
|
|
@ -132,6 +132,12 @@ describe('CompileManager', function () {
|
|||
'./OutputFileFinder': this.OutputFileFinder,
|
||||
'./OutputCacheManager': this.OutputCacheManager,
|
||||
'@overleaf/settings': this.Settings,
|
||||
'@overleaf/metrics': {
|
||||
inc: sinon.stub(),
|
||||
timing: sinon.stub(),
|
||||
gauge: sinon.stub(),
|
||||
Timer: sinon.stub().returns({ done: sinon.stub() }),
|
||||
},
|
||||
child_process: this.child_process,
|
||||
'./CommandRunner': this.CommandRunner,
|
||||
'./DraftModeManager': this.DraftModeManager,
|
||||
|
|
|
@ -21,6 +21,7 @@ describe('PersistorManager', function () {
|
|||
requires: {
|
||||
'@overleaf/settings': Settings,
|
||||
'@overleaf/object-persistor': () => new FakePersistor(),
|
||||
'@overleaf/metrics': {},
|
||||
},
|
||||
})
|
||||
|
||||
|
@ -41,6 +42,7 @@ describe('PersistorManager', function () {
|
|||
requires: {
|
||||
'@overleaf/settings': Settings,
|
||||
'@overleaf/object-persistor': () => new FakePersistor(),
|
||||
'@overleaf/metrics': {},
|
||||
},
|
||||
})
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ describe('LockManager - checking the lock', function () {
|
|||
}
|
||||
},
|
||||
},
|
||||
'./Metrics': { inc() {} },
|
||||
'@overleaf/metrics': { inc() {} },
|
||||
'./Profiler': (Profiler = (function () {
|
||||
Profiler = class Profiler {
|
||||
static initClass() {
|
||||
|
|
|
@ -40,7 +40,7 @@ describe('LockManager - releasing the lock', function () {
|
|||
},
|
||||
},
|
||||
},
|
||||
'./Metrics': { inc() {} },
|
||||
'@overleaf/metrics': { inc() {} },
|
||||
'./Profiler': (Profiler = (function () {
|
||||
Profiler = class Profiler {
|
||||
static initClass() {
|
||||
|
|
|
@ -26,7 +26,7 @@ describe('LockManager - getting the lock', function () {
|
|||
return { auth() {} }
|
||||
},
|
||||
},
|
||||
'./Metrics': { inc() {} },
|
||||
'@overleaf/metrics': { inc() {} },
|
||||
'./Profiler': (Profiler = (function () {
|
||||
Profiler = class Profiler {
|
||||
static initClass() {
|
||||
|
|
|
@ -28,7 +28,7 @@ describe('LockManager - trying the lock', function () {
|
|||
}
|
||||
},
|
||||
},
|
||||
'./Metrics': { inc() {} },
|
||||
'@overleaf/metrics': { inc() {} },
|
||||
'@overleaf/settings': {
|
||||
redis: {
|
||||
lock: {
|
||||
|
|
|
@ -18,7 +18,9 @@ const SandboxedModule = require('sandboxed-module')
|
|||
describe('RangesManager', function () {
|
||||
beforeEach(function () {
|
||||
this.RangesManager = SandboxedModule.require(modulePath, {
|
||||
requires: { './Metrics': (this.Metrics = { histogram: sinon.stub() }) },
|
||||
requires: {
|
||||
'@overleaf/metrics': (this.Metrics = { histogram: sinon.stub() }),
|
||||
},
|
||||
})
|
||||
|
||||
this.doc_id = 'doc-id-123'
|
||||
|
@ -424,6 +426,7 @@ describe('RangesManager', function () {
|
|||
requires: {
|
||||
'@overleaf/ranges-tracker': (this.RangesTracker =
|
||||
SandboxedModule.require('@overleaf/ranges-tracker')),
|
||||
'@overleaf/metrics': {},
|
||||
},
|
||||
})
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue