From 540a5466f31bb466a385d84c344c00ee6a024d69 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 23 May 2023 09:10:44 +0100 Subject: [PATCH] Merge branch 'bg-socket-leak-detection' GitOrigin-RevId: 19c04cd195d0966b1f33eb4e4795db19d183dcf2 --- libraries/metrics/index.js | 1 + libraries/metrics/leaked_sockets.js | 251 ++++++++++++++++++ .../test/unit/js/CompileControllerTests.js | 3 + .../clsi/test/unit/js/CompileManagerTests.js | 6 + .../test/unit/js/PersistorManagerTests.js | 2 + .../unit/js/LockManager/CheckingTheLock.js | 2 +- .../unit/js/LockManager/ReleasingTheLock.js | 2 +- .../test/unit/js/LockManager/getLockTests.js | 2 +- .../test/unit/js/LockManager/tryLockTests.js | 2 +- .../js/RangesManager/RangesManagerTests.js | 5 +- 10 files changed, 271 insertions(+), 5 deletions(-) create mode 100644 libraries/metrics/leaked_sockets.js diff --git a/libraries/metrics/index.js b/libraries/metrics/index.js index 43d715b37e..263f3ffc8e 100644 --- a/libraries/metrics/index.js +++ b/libraries/metrics/index.js @@ -249,6 +249,7 @@ module.exports.register = promWrapper.registry module.exports.http = require('./http') module.exports.open_sockets = require('./open_sockets') +module.exports.leaked_sockets = require('./leaked_sockets') module.exports.event_loop = require('./event_loop') module.exports.memory = require('./memory') module.exports.mongodb = require('./mongodb') diff --git a/libraries/metrics/leaked_sockets.js b/libraries/metrics/leaked_sockets.js new file mode 100644 index 0000000000..8f27513a88 --- /dev/null +++ b/libraries/metrics/leaked_sockets.js @@ -0,0 +1,251 @@ +/** + * This file monitors HTTP connections in Node.js and logs any potential socket leaks. + * It uses the `diagnostics_channel` module to intercept requests and reponses in the + * `http` module and tracks the lifetime of each http socket. If a socket is open for + * longer than a specified time, it is considered a potential leak and its details are + * logged along with the corresponding information from /proc/net/tcp. + */ + +const fs = require('fs') +const diagnosticsChannel = require('diagnostics_channel') + +const SOCKET_MONITOR_INTERVAL = 60 * 1000 +// set the threshold for logging leaked sockets in minutes, defaults to 15 +const MIN_SOCKET_LEAK_TIME = + (parseInt(process.env.LEAKED_SOCKET_AGE_THRESHOLD, 10) || 15) * 60 * 1000 + +// Record HTTP events using diagnostics_channel + +const channels = [ + 'http.client.request.start', + 'http.client.response.finish', + 'http.server.request.start', + 'http.server.response.finish', +] + +for (const channel of channels) { + diagnosticsChannel.subscribe(channel, handler(channel)) +} + +function handler(channel) { + return function ({ request: req, response: res }) { + const socket = req?.socket || res?.socket + if (!socket) { + return + } + // If we haven't seen this socket before, add a debug object from the request + if (!socket._ol_debug && req) { + const { method, protocol, path, url, rawHeaders, _header } = req + socket._ol_debug = { + method, + protocol, + url: url ?? path, + request: { headers: rawHeaders ?? _header, ts: new Date() }, + } + } else if (socket._ol_debug && res) { + // We've already seen the request, now add debug info from the response + const { statusCode, statusMessage, headers, _header } = res + Object.assign(socket._ol_debug, { + response: { + statusCode, + statusMessage, + headers: headers ?? _header, + ts: new Date(), + }, + }) + } + } +} + +// Additional functions to log request headers with sensitive information redacted + +function flattenHeaders(rawHeaders) { + // Headers can be an array [KEY, VALUE, KEY, VALUE, ..] + // an object {key:value, key:value, ...} + // or a string of the headers separated by \r\n + // Flatten the array and object headers into the string form. + if (Array.isArray(rawHeaders)) { + return rawHeaders + .map((item, index) => (index % 2 === 0 ? `${item}: ` : `${item}\r\n`)) + .join('') + } else if (typeof rawHeaders === 'object') { + return Object.entries(rawHeaders) + .map(([key, value]) => `${key}: ${value}\r\n`) + .join('') + } else if (typeof rawHeaders === 'string') { + return rawHeaders + } else { + return JSON.stringify(rawHeaders) + } +} + +const REDACT_REGEX = /^(Authorization|Set-Cookie|Cookie):.*?\r/gim + +function redactObject(obj) { + const result = {} + for (const [key, value] of Object.entries(obj)) { + if (key === 'headers') { + // remove headers with sensitive information + result[key] = flattenHeaders(value).replace( + REDACT_REGEX, + `$1: REDACTED\r` + ) + } else if ( + typeof value === 'object' && + ['request', 'response'].includes(key) + ) { + result[key] = redactObject(value) + } else { + result[key] = value + } + } + return result +} + +// Check if an old socket has crossed the threshold for logging. +// We log multiple times with an exponential backoff so we can +// see how long a socket hangs around. + +function isOldSocket(handle) { + const now = new Date() + const created = handle._ol_debug.request.ts + const lastLoggedAt = handle._ol_debug.lastLoggedAt ?? created + const nextLogTime = new Date( + created.getTime() + + Math.max(2 * (lastLoggedAt - created), MIN_SOCKET_LEAK_TIME) + ) + return now >= nextLogTime +} + +function logOldSocket(logger, handle, tcpinfo) { + const now = new Date() + const info = Object.assign( + { + localAddress: handle.localAddress, + localPort: handle.localPort, + remoteAddress: handle.remoteAddress, + remotePort: handle.remotePort, + tcpinfo, + age: Math.floor((now - handle._ol_debug.request.ts) / (60 * 1000)), // age in minutes + }, + redactObject(handle._ol_debug) + ) + handle._ol_debug.lastLoggedAt = now + if (tcpinfo) { + logger.error(info, 'old socket handle - tcp socket') + } else { + logger.warn(info, 'stale socket handle - no entry in /proc/net/tcp') + } +} + +// Correlate socket handles with /proc/net/tcp entries using a key based on the +// local and remote addresses and ports. This will allow us to distinguish between +// sockets that are still open and sockets that have been closed and removed from +// the /proc/net/tcp table but are still present in the node active handles array. + +async function getOpenSockets() { + // get open sockets remote and local address:port from /proc/net/tcp + const procNetTcp = '/proc/net/tcp' + const openSockets = new Map() + const lines = await fs.promises.readFile(procNetTcp, 'utf8') + for (const line of lines.split('\n')) { + const socket = parseProcNetTcp(line) + if (socket) { + openSockets.set(socket, line) + } + } + return openSockets +} + +function keyFromSocket(socket) { + return `${socket.localAddress}:${socket.localPort} -> ${socket.remoteAddress}:${socket.remotePort}` +} + +function decodeHexIpAddress(hex) { + // decode hex ip address to dotted decimal notation + const ip = parseInt(hex, 16) + const a = ip & 0xff + const b = (ip >> 8) & 0xff + const c = (ip >> 16) & 0xff + const d = (ip >> 24) & 0xff + return `${a}.${b}.${c}.${d}` +} + +function decodeHexPort(hex) { + // decode hex port to decimal + return parseInt(hex, 16) +} + +// Regex for extracting the local and remote addresses and ports from the /proc/net/tcp output +// Example line: +// 16: AB02A8C0:D9E2 86941864:01BB 01 00000000:00000000 02:000004BE 00000000 0 0 36802 2 0000000000000000 28 4 26 10 -1 +// ^^^^^^^^^^^^^ ^^^^^^^^^^^^^ +// local remote + +const TCP_STATE_REGEX = + /^\s*\d+:\s+(?[0-9A-F]{8}):(?[0-9A-F]{4})\s+(?[0-9A-F]{8}):(?[0-9A-F]{4})/i + +function parseProcNetTcp(line) { + const match = line.match(TCP_STATE_REGEX) + if (match) { + const { localHexAddress, localHexPort, remoteHexAddress, remoteHexPort } = + match.groups + return keyFromSocket({ + localAddress: decodeHexIpAddress(localHexAddress), + localPort: decodeHexPort(localHexPort), + remoteAddress: decodeHexIpAddress(remoteHexAddress), + remotePort: decodeHexPort(remoteHexPort), + }) + } +} + +let LeakedSocketsMonitor + +// Export the monitor and scanSockets functions + +module.exports = LeakedSocketsMonitor = { + monitor(logger) { + const interval = setInterval( + () => LeakedSocketsMonitor.scanSockets(logger), + SOCKET_MONITOR_INTERVAL + ) + const Metrics = require('./index') + return Metrics.registerDestructor(() => clearInterval(interval)) + }, + scanSockets(logger) { + const debugSockets = process._getActiveHandles().filter(handle => { + return handle._ol_debug + }) + + // Bail out if there are no sockets with the _ol_debug property + if (debugSockets.length === 0) { + return + } + + const oldSockets = debugSockets.filter(isOldSocket) + + // Bail out if there are no old sockets to log + if (oldSockets.length === 0) { + return + } + + // If there old sockets to log, get the connections from /proc/net/tcp + // to distinguish between sockets that are still open and sockets that + // have been closed and removed from the /proc/net/tcp table. + getOpenSockets() + .then(openSockets => { + oldSockets.forEach(handle => { + try { + const key = keyFromSocket(handle) + const tcpinfo = openSockets.get(key) + logOldSocket(logger, handle, tcpinfo) + } catch (err) { + logger.error({ err }, 'error in scanSockets') + } + }) + }) + .catch(err => { + logger.error({ err }, 'error getting open sockets') + }) + }, +} diff --git a/services/clsi/test/unit/js/CompileControllerTests.js b/services/clsi/test/unit/js/CompileControllerTests.js index 2b48aabbf0..041e03bbaf 100644 --- a/services/clsi/test/unit/js/CompileControllerTests.js +++ b/services/clsi/test/unit/js/CompileControllerTests.js @@ -62,6 +62,9 @@ describe('CompileController', function () { }, }, }), + '@overleaf/metrics': { + Timer: sinon.stub().returns({ done: sinon.stub() }), + }, './ProjectPersistenceManager': (this.ProjectPersistenceManager = {}), }, }) diff --git a/services/clsi/test/unit/js/CompileManagerTests.js b/services/clsi/test/unit/js/CompileManagerTests.js index 6e11161c02..c014efcfa1 100644 --- a/services/clsi/test/unit/js/CompileManagerTests.js +++ b/services/clsi/test/unit/js/CompileManagerTests.js @@ -132,6 +132,12 @@ describe('CompileManager', function () { './OutputFileFinder': this.OutputFileFinder, './OutputCacheManager': this.OutputCacheManager, '@overleaf/settings': this.Settings, + '@overleaf/metrics': { + inc: sinon.stub(), + timing: sinon.stub(), + gauge: sinon.stub(), + Timer: sinon.stub().returns({ done: sinon.stub() }), + }, child_process: this.child_process, './CommandRunner': this.CommandRunner, './DraftModeManager': this.DraftModeManager, diff --git a/services/docstore/test/unit/js/PersistorManagerTests.js b/services/docstore/test/unit/js/PersistorManagerTests.js index b9cc3c3051..8f8ddacdd8 100644 --- a/services/docstore/test/unit/js/PersistorManagerTests.js +++ b/services/docstore/test/unit/js/PersistorManagerTests.js @@ -21,6 +21,7 @@ describe('PersistorManager', function () { requires: { '@overleaf/settings': Settings, '@overleaf/object-persistor': () => new FakePersistor(), + '@overleaf/metrics': {}, }, }) @@ -41,6 +42,7 @@ describe('PersistorManager', function () { requires: { '@overleaf/settings': Settings, '@overleaf/object-persistor': () => new FakePersistor(), + '@overleaf/metrics': {}, }, }) diff --git a/services/document-updater/test/unit/js/LockManager/CheckingTheLock.js b/services/document-updater/test/unit/js/LockManager/CheckingTheLock.js index 1de15f5878..eb5b44532a 100644 --- a/services/document-updater/test/unit/js/LockManager/CheckingTheLock.js +++ b/services/document-updater/test/unit/js/LockManager/CheckingTheLock.js @@ -31,7 +31,7 @@ describe('LockManager - checking the lock', function () { } }, }, - './Metrics': { inc() {} }, + '@overleaf/metrics': { inc() {} }, './Profiler': (Profiler = (function () { Profiler = class Profiler { static initClass() { diff --git a/services/document-updater/test/unit/js/LockManager/ReleasingTheLock.js b/services/document-updater/test/unit/js/LockManager/ReleasingTheLock.js index 8d253d8899..0413e268d6 100644 --- a/services/document-updater/test/unit/js/LockManager/ReleasingTheLock.js +++ b/services/document-updater/test/unit/js/LockManager/ReleasingTheLock.js @@ -40,7 +40,7 @@ describe('LockManager - releasing the lock', function () { }, }, }, - './Metrics': { inc() {} }, + '@overleaf/metrics': { inc() {} }, './Profiler': (Profiler = (function () { Profiler = class Profiler { static initClass() { diff --git a/services/document-updater/test/unit/js/LockManager/getLockTests.js b/services/document-updater/test/unit/js/LockManager/getLockTests.js index ec7d2a7884..938c593a51 100644 --- a/services/document-updater/test/unit/js/LockManager/getLockTests.js +++ b/services/document-updater/test/unit/js/LockManager/getLockTests.js @@ -26,7 +26,7 @@ describe('LockManager - getting the lock', function () { return { auth() {} } }, }, - './Metrics': { inc() {} }, + '@overleaf/metrics': { inc() {} }, './Profiler': (Profiler = (function () { Profiler = class Profiler { static initClass() { diff --git a/services/document-updater/test/unit/js/LockManager/tryLockTests.js b/services/document-updater/test/unit/js/LockManager/tryLockTests.js index 2644da8d86..861f733979 100644 --- a/services/document-updater/test/unit/js/LockManager/tryLockTests.js +++ b/services/document-updater/test/unit/js/LockManager/tryLockTests.js @@ -28,7 +28,7 @@ describe('LockManager - trying the lock', function () { } }, }, - './Metrics': { inc() {} }, + '@overleaf/metrics': { inc() {} }, '@overleaf/settings': { redis: { lock: { diff --git a/services/document-updater/test/unit/js/RangesManager/RangesManagerTests.js b/services/document-updater/test/unit/js/RangesManager/RangesManagerTests.js index eb29b1991d..a211d133c6 100644 --- a/services/document-updater/test/unit/js/RangesManager/RangesManagerTests.js +++ b/services/document-updater/test/unit/js/RangesManager/RangesManagerTests.js @@ -18,7 +18,9 @@ const SandboxedModule = require('sandboxed-module') describe('RangesManager', function () { beforeEach(function () { this.RangesManager = SandboxedModule.require(modulePath, { - requires: { './Metrics': (this.Metrics = { histogram: sinon.stub() }) }, + requires: { + '@overleaf/metrics': (this.Metrics = { histogram: sinon.stub() }), + }, }) this.doc_id = 'doc-id-123' @@ -424,6 +426,7 @@ describe('RangesManager', function () { requires: { '@overleaf/ranges-tracker': (this.RangesTracker = SandboxedModule.require('@overleaf/ranges-tracker')), + '@overleaf/metrics': {}, }, })