From bf18c6e513fdfeb56d7a5a7e9592aa2a8ff68cf7 Mon Sep 17 00:00:00 2001 From: Henry Oswald Date: Wed, 30 Jan 2019 10:29:12 +0000 Subject: [PATCH] MVP for running both statsd and prom side by side statsd code is from v1.8.1 --- libraries/metrics/metrics.coffee | 6 ++ libraries/metrics/package.json | 2 +- libraries/metrics/statsd/event_loop.coffee | 18 ++++ libraries/metrics/statsd/http.coffee | 31 ++++++ libraries/metrics/statsd/memory.coffee | 85 +++++++++++++++ libraries/metrics/statsd/metrics.coffee | 62 +++++++++++ libraries/metrics/statsd/mongodb.coffee | 100 ++++++++++++++++++ libraries/metrics/statsd/open_sockets.coffee | 28 +++++ .../metrics/statsd/timeAsyncMethod.coffee | 36 +++++++ .../metrics/statsd/uv_threadpool_size.coffee | 2 + 10 files changed, 369 insertions(+), 1 deletion(-) create mode 100644 libraries/metrics/statsd/event_loop.coffee create mode 100644 libraries/metrics/statsd/http.coffee create mode 100644 libraries/metrics/statsd/memory.coffee create mode 100644 libraries/metrics/statsd/metrics.coffee create mode 100644 libraries/metrics/statsd/mongodb.coffee create mode 100644 libraries/metrics/statsd/open_sockets.coffee create mode 100644 libraries/metrics/statsd/timeAsyncMethod.coffee create mode 100644 libraries/metrics/statsd/uv_threadpool_size.coffee diff --git a/libraries/metrics/metrics.coffee b/libraries/metrics/metrics.coffee index c322f2efd9..94313d3aec 100644 --- a/libraries/metrics/metrics.coffee +++ b/libraries/metrics/metrics.coffee @@ -1,3 +1,9 @@ +if process.env["USE_PROM_METRICS"] != "true" + return module.exports = require("./statsd/metrics") +else + console.log("using prometheus") + + prom = require('prom-client') Register = require('prom-client').register diff --git a/libraries/metrics/package.json b/libraries/metrics/package.json index 002a92fc06..97f6a3ab9d 100644 --- a/libraries/metrics/package.json +++ b/libraries/metrics/package.json @@ -1,6 +1,6 @@ { "name": "metrics-sharelatex", - "version": "2.0.14", + "version": "2.1.0", "description": "A drop-in metrics and monitoring module for node.js apps", "repository": { "type": "git", diff --git a/libraries/metrics/statsd/event_loop.coffee b/libraries/metrics/statsd/event_loop.coffee new file mode 100644 index 0000000000..01e37b96a1 --- /dev/null +++ b/libraries/metrics/statsd/event_loop.coffee @@ -0,0 +1,18 @@ +module.exports = EventLoopMonitor = + monitor: (logger, interval = 1000, log_threshold = 100) -> + Metrics = require "./metrics" + # check for logger on startup to avoid exceptions later if undefined + throw new Error("logger is undefined") if !logger? + # monitor delay in setInterval to detect event loop blocking + previous = Date.now() + intervalId = setInterval () -> + now = Date.now() + offset = now - previous - interval + if offset > log_threshold + logger.warn {offset: offset}, "slow event loop" + previous = now + Metrics.timing("event-loop-millsec", offset) + , interval + + Metrics.registerDestructor () -> + clearInterval(intervalId) diff --git a/libraries/metrics/statsd/http.coffee b/libraries/metrics/statsd/http.coffee new file mode 100644 index 0000000000..c175f26a14 --- /dev/null +++ b/libraries/metrics/statsd/http.coffee @@ -0,0 +1,31 @@ +os = require("os") + +module.exports.monitor = (logger) -> + return (req, res, next) -> + Metrics = require("./metrics") + startTime = new Date() + end = res.end + res.end = () -> + end.apply(this, arguments) + responseTime = new Date() - startTime + if req.route?.path? + routePath = req.route.path.toString().replace(/\//g, '_').replace(/\:/g, '').slice(1) + key = "http-requests.#{routePath}.#{req.method}.#{res.statusCode}" + + Metrics.timing(key, responseTime) + logger.log + req: + url: req.originalUrl || req.url + method: req.method + referrer: req.headers['referer'] || req.headers['referrer'] + "remote-addr": req.ip || req.socket?.socket?.remoteAddress || req.socket?.remoteAddress + "user-agent": req.headers["user-agent"] + "content-length": req.headers["content-length"] + res: + "content-length": res._headers?["content-length"] + statusCode: res.statusCode + "response-time": responseTime + "http request" + + next() + diff --git a/libraries/metrics/statsd/memory.coffee b/libraries/metrics/statsd/memory.coffee new file mode 100644 index 0000000000..a1e157a5bf --- /dev/null +++ b/libraries/metrics/statsd/memory.coffee @@ -0,0 +1,85 @@ +# record memory usage each minute and run a periodic gc(), keeping cpu +# usage within allowable range of 1ms per minute. Also, dynamically +# adjust the period between gc()'s to reach a target of the gc saving +# 4 megabytes each time. + +oneMinute = 60 * 1000 +oneMegaByte = 1024 * 1024 + +CpuTimeBucket = 100 # current cpu time allowance in milliseconds +CpuTimeBucketMax = 100 # maximum amount of cpu time allowed in bucket +CpuTimeBucketRate = 10 # add this many milliseconds per minute + +gcInterval = 1 # how many minutes between gc (parameter is dynamically adjusted) +countSinceLastGc = 0 # how many minutes since last gc +MemoryChunkSize = 4 # how many megabytes we need to free to consider gc worth doing + +readyToGc = () -> + # update allowed cpu time + CpuTimeBucket = CpuTimeBucket + CpuTimeBucketRate + CpuTimeBucket = if CpuTimeBucket < CpuTimeBucketMax then CpuTimeBucket else CpuTimeBucketMax + # update counts since last gc + countSinceLastGc = countSinceLastGc + 1 + # check there is enough time since last gc and we have enough cpu + return (countSinceLastGc > gcInterval) && (CpuTimeBucket > 0) + +executeAndTime = (fn) -> + # time the execution of fn() and subtract from cpu allowance + t0 = process.hrtime() + fn() + dt = process.hrtime(t0) + timeTaken = (dt[0] + dt[1]*1e-9) * 1e3 # in milliseconds + CpuTimeBucket -= Math.ceil timeTaken + return timeTaken + +inMegaBytes = (obj) -> + # convert process.memoryUsage hash {rss,heapTotal,heapFreed} into megabytes + result = {} + for k, v of obj + result[k] = (v / oneMegaByte).toFixed(2) + return result + +updateMemoryStats = (oldMem, newMem) -> + countSinceLastGc = 0 + delta = {} + for k of newMem + delta[k] = (newMem[k] - oldMem[k]).toFixed(2) + # take the max of all memory measures + savedMemory = Math.max -delta.rss, -delta.heapTotal, -delta.heapUsed + delta.megabytesFreed = savedMemory + # did it do any good? + if savedMemory < MemoryChunkSize + gcInterval = gcInterval + 1 # no, so wait longer next time + else + gcInterval = Math.max gcInterval - 1, 1 # yes, wait less time + return delta + +module.exports = MemoryMonitor = + monitor: (logger) -> + interval = setInterval () -> + MemoryMonitor.Check(logger) + , oneMinute + Metrics = require "./metrics" + Metrics.registerDestructor () -> + clearInterval(interval) + + Check: (logger) -> + Metrics = require "./metrics" + memBeforeGc = mem = inMegaBytes process.memoryUsage() + Metrics.gauge("memory.rss", mem.rss) + Metrics.gauge("memory.heaptotal", mem.heapTotal) + Metrics.gauge("memory.heapused", mem.heapUsed) + Metrics.gauge("memory.gc-interval", gcInterval) + #Metrics.gauge("memory.cpu-time-bucket", CpuTimeBucket) + + logger.log mem, "process.memoryUsage()" + + if global.gc? && readyToGc() + gcTime = (executeAndTime global.gc).toFixed(2) + memAfterGc = inMegaBytes process.memoryUsage() + deltaMem = updateMemoryStats(memBeforeGc, memAfterGc) + logger.log {gcTime, memBeforeGc, memAfterGc, deltaMem, gcInterval, CpuTimeBucket}, "global.gc() forced" + #Metrics.timing("memory.gc-time", gcTime) + Metrics.gauge("memory.gc-rss-freed", -deltaMem.rss) + Metrics.gauge("memory.gc-heaptotal-freed", -deltaMem.heapTotal) + Metrics.gauge("memory.gc-heapused-freed", -deltaMem.heapUsed) diff --git a/libraries/metrics/statsd/metrics.coffee b/libraries/metrics/statsd/metrics.coffee new file mode 100644 index 0000000000..e632ebc565 --- /dev/null +++ b/libraries/metrics/statsd/metrics.coffee @@ -0,0 +1,62 @@ +console.log("using statsd") + +StatsD = require('lynx') +statsd = new StatsD(process.env["STATSD_HOST"] or "localhost", 8125, {on_error:->}) + +name = "unknown" +hostname = require('os').hostname() + +buildKey = (key)-> "#{name}.#{hostname}.#{key}" +buildGlobalKey = (key)-> "#{name}.global.#{key}" + +destructors = [] + +require "./uv_threadpool_size" + +module.exports = Metrics = + initialize: (_name) -> + name = _name + + registerDestructor: (func) -> + destructors.push func + + set : (key, value, sampleRate = 1)-> + statsd.set buildKey(key), value, sampleRate + + inc : (key, sampleRate = 1)-> + statsd.increment buildKey(key), sampleRate + + count : (key, count, sampleRate = 1)-> + statsd.count buildKey(key), count, sampleRate + + timing: (key, timeSpan, sampleRate)-> + statsd.timing(buildKey(key), timeSpan, sampleRate) + + Timer : class + constructor :(key, sampleRate = 1)-> + this.start = new Date() + this.key = key + this.sampleRate = sampleRate + done:-> + timeSpan = new Date - this.start + statsd.timing(buildKey(this.key), timeSpan, this.sampleRate) + return timeSpan + + gauge : (key, value, sampleRate = 1)-> + statsd.gauge buildKey(key), value, sampleRate + + globalGauge: (key, value, sampleRate = 1)-> + statsd.gauge buildGlobalKey(key), value, sampleRate + + mongodb: require "./mongodb" + http: require "./http" + open_sockets: require "./open_sockets" + event_loop: require "./event_loop" + memory: require "./memory" + + timeAsyncMethod: require('./timeAsyncMethod') + + close: () -> + for func in destructors + func() + statsd.close() diff --git a/libraries/metrics/statsd/mongodb.coffee b/libraries/metrics/statsd/mongodb.coffee new file mode 100644 index 0000000000..ce1ca71385 --- /dev/null +++ b/libraries/metrics/statsd/mongodb.coffee @@ -0,0 +1,100 @@ +module.exports = + monitor: (mongodb_require_path, logger) -> + + try + # for the v1 driver the methods to wrap are in the mongodb + # module in lib/mongodb/db.js + mongodb = require("#{mongodb_require_path}") + + try + # for the v2 driver the relevant methods are in the mongodb-core + # module in lib/topologies/{server,replset,mongos}.js + v2_path = mongodb_require_path.replace(/\/mongodb$/, '/mongodb-core') + mongodbCore = require(v2_path) + + Metrics = require("./metrics") + + monitorMethod = (base, method, type) -> + return unless base? + return unless (_method = base[method])? + arglen = _method.length + + mongo_driver_v1_wrapper = (db_command, options, callback) -> + if (typeof callback == 'undefined') + callback = options + options = {} + + collection = db_command.collectionName + if collection.match(/\$cmd$/) + # Ignore noisy command methods like authenticating, ismaster and ping + return _method.call this, db_command, options, callback + + key = "mongo-requests.#{collection}.#{type}" + if db_command.query? + query = Object.keys(db_command.query).sort().join("_") + key += "." + query + + timer = new Metrics.Timer(key) + start = new Date() + _method.call this, db_command, options, () -> + timer.done() + time = new Date() - start + logger.log + query: db_command.query + query_type: type + collection: collection + "response-time": new Date() - start + "mongo request" + callback.apply this, arguments + + mongo_driver_v2_wrapper = (ns, ops, options, callback) -> + if (typeof callback == 'undefined') + callback = options + options = {} + + if ns.match(/\$cmd$/) + # Ignore noisy command methods like authenticating, ismaster and ping + return _method.call this, ns, ops, options, callback + + key = "mongo-requests.#{ns}.#{type}" + if ops[0].q? # ops[0].q + query = Object.keys(ops[0].q).sort().join("_") + key += "." + query + + timer = new Metrics.Timer(key) + start = new Date() + _method.call this, ns, ops, options, () -> + timer.done() + time = new Date() - start + logger.log + query: ops[0].q + query_type: type + collection: ns + "response-time": new Date() - start + "mongo request" + callback.apply this, arguments + + if arglen == 3 + base[method] = mongo_driver_v1_wrapper + else if arglen == 4 + base[method] = mongo_driver_v2_wrapper + + monitorMethod(mongodb?.Db.prototype, "_executeQueryCommand", "query") + monitorMethod(mongodb?.Db.prototype, "_executeRemoveCommand", "remove") + monitorMethod(mongodb?.Db.prototype, "_executeInsertCommand", "insert") + monitorMethod(mongodb?.Db.prototype, "_executeUpdateCommand", "update") + + monitorMethod(mongodbCore?.Server.prototype, "command", "command") + monitorMethod(mongodbCore?.Server.prototype, "remove", "remove") + monitorMethod(mongodbCore?.Server.prototype, "insert", "insert") + monitorMethod(mongodbCore?.Server.prototype, "update", "update") + + monitorMethod(mongodbCore?.ReplSet.prototype, "command", "command") + monitorMethod(mongodbCore?.ReplSet.prototype, "remove", "remove") + monitorMethod(mongodbCore?.ReplSet.prototype, "insert", "insert") + monitorMethod(mongodbCore?.ReplSet.prototype, "update", "update") + + monitorMethod(mongodbCore?.Mongos.prototype, "command", "command") + monitorMethod(mongodbCore?.Mongos.prototype, "remove", "remove") + monitorMethod(mongodbCore?.Mongos.prototype, "insert", "insert") + monitorMethod(mongodbCore?.Mongos.prototype, "update", "update") diff --git a/libraries/metrics/statsd/open_sockets.coffee b/libraries/metrics/statsd/open_sockets.coffee new file mode 100644 index 0000000000..9af019dfc8 --- /dev/null +++ b/libraries/metrics/statsd/open_sockets.coffee @@ -0,0 +1,28 @@ +URL = require "url" +seconds = 1000 + +# In Node 0.10 the default is 5, which means only 5 open connections at one. +# Node 0.12 has a default of Infinity. Make sure we have no limit set, +# regardless of Node version. +require("http").globalAgent.maxSockets = Infinity +require("https").globalAgent.maxSockets = Infinity + +module.exports = OpenSocketsMonitor = + monitor: (logger) -> + interval = setInterval () -> + OpenSocketsMonitor.gaugeOpenSockets() + , 5 * seconds + Metrics = require "./metrics" + Metrics.registerDestructor () -> + clearInterval(interval) + + gaugeOpenSockets: () -> + Metrics = require "./metrics" + for url, agents of require('http').globalAgent.sockets + url = URL.parse("http://#{url}") + hostname = url.hostname?.replace(/\./g, "_") + Metrics.gauge "open_connections.http.#{hostname}", agents.length + for url, agents of require('https').globalAgent.sockets + url = URL.parse("https://#{url}") + hostname = url.hostname?.replace(/\./g, "_") + Metrics.gauge "open_connections.https.#{hostname}", agents.length diff --git a/libraries/metrics/statsd/timeAsyncMethod.coffee b/libraries/metrics/statsd/timeAsyncMethod.coffee new file mode 100644 index 0000000000..27e21e6e09 --- /dev/null +++ b/libraries/metrics/statsd/timeAsyncMethod.coffee @@ -0,0 +1,36 @@ + +module.exports = (obj, methodName, prefix, logger) -> + metrics = require('./metrics') + + if typeof obj[methodName] != 'function' + throw new Error("[Metrics] expected object property '#{methodName}' to be a function") + + realMethod = obj[methodName] + key = "#{prefix}.#{methodName}" + + obj[methodName] = (originalArgs...) -> + + [firstArgs..., callback] = originalArgs + + if !callback? || typeof callback != 'function' + if logger? + logger.log "[Metrics] expected wrapped method '#{methodName}' to be invoked with a callback" + return realMethod.apply this, originalArgs + + timer = new metrics.Timer(key) + + realMethod.call this, firstArgs..., (callbackArgs...) -> + elapsedTime = timer.done() + possibleError = callbackArgs[0] + if possibleError? + metrics.inc "#{key}.failure" + else + metrics.inc "#{key}.success" + if logger? + loggableArgs = {} + try + for arg, idx in firstArgs + if arg.toString().match(/^[0-9a-f]{24}$/) + loggableArgs["#{idx}"] = arg + logger.log {key, args: loggableArgs, elapsedTime}, "[Metrics] timed async method call" + callback.apply this, callbackArgs diff --git a/libraries/metrics/statsd/uv_threadpool_size.coffee b/libraries/metrics/statsd/uv_threadpool_size.coffee new file mode 100644 index 0000000000..c0947fee31 --- /dev/null +++ b/libraries/metrics/statsd/uv_threadpool_size.coffee @@ -0,0 +1,2 @@ +process.env.UV_THREADPOOL_SIZE=16 +console.log "Set UV_THREADPOOL_SIZE=#{process.env.UV_THREADPOOL_SIZE}"