mirror of
https://github.com/overleaf/overleaf.git
synced 2024-12-23 18:21:55 +00:00
MVP for running both statsd and prom side by side
statsd code is from v1.8.1
This commit is contained in:
parent
85011ed0e7
commit
bf18c6e513
10 changed files with 369 additions and 1 deletions
|
@ -1,3 +1,9 @@
|
|||
if process.env["USE_PROM_METRICS"] != "true"
|
||||
return module.exports = require("./statsd/metrics")
|
||||
else
|
||||
console.log("using prometheus")
|
||||
|
||||
|
||||
prom = require('prom-client')
|
||||
Register = require('prom-client').register
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "metrics-sharelatex",
|
||||
"version": "2.0.14",
|
||||
"version": "2.1.0",
|
||||
"description": "A drop-in metrics and monitoring module for node.js apps",
|
||||
"repository": {
|
||||
"type": "git",
|
||||
|
|
18
libraries/metrics/statsd/event_loop.coffee
Normal file
18
libraries/metrics/statsd/event_loop.coffee
Normal file
|
@ -0,0 +1,18 @@
|
|||
module.exports = EventLoopMonitor =
|
||||
monitor: (logger, interval = 1000, log_threshold = 100) ->
|
||||
Metrics = require "./metrics"
|
||||
# check for logger on startup to avoid exceptions later if undefined
|
||||
throw new Error("logger is undefined") if !logger?
|
||||
# monitor delay in setInterval to detect event loop blocking
|
||||
previous = Date.now()
|
||||
intervalId = setInterval () ->
|
||||
now = Date.now()
|
||||
offset = now - previous - interval
|
||||
if offset > log_threshold
|
||||
logger.warn {offset: offset}, "slow event loop"
|
||||
previous = now
|
||||
Metrics.timing("event-loop-millsec", offset)
|
||||
, interval
|
||||
|
||||
Metrics.registerDestructor () ->
|
||||
clearInterval(intervalId)
|
31
libraries/metrics/statsd/http.coffee
Normal file
31
libraries/metrics/statsd/http.coffee
Normal file
|
@ -0,0 +1,31 @@
|
|||
os = require("os")
|
||||
|
||||
module.exports.monitor = (logger) ->
|
||||
return (req, res, next) ->
|
||||
Metrics = require("./metrics")
|
||||
startTime = new Date()
|
||||
end = res.end
|
||||
res.end = () ->
|
||||
end.apply(this, arguments)
|
||||
responseTime = new Date() - startTime
|
||||
if req.route?.path?
|
||||
routePath = req.route.path.toString().replace(/\//g, '_').replace(/\:/g, '').slice(1)
|
||||
key = "http-requests.#{routePath}.#{req.method}.#{res.statusCode}"
|
||||
|
||||
Metrics.timing(key, responseTime)
|
||||
logger.log
|
||||
req:
|
||||
url: req.originalUrl || req.url
|
||||
method: req.method
|
||||
referrer: req.headers['referer'] || req.headers['referrer']
|
||||
"remote-addr": req.ip || req.socket?.socket?.remoteAddress || req.socket?.remoteAddress
|
||||
"user-agent": req.headers["user-agent"]
|
||||
"content-length": req.headers["content-length"]
|
||||
res:
|
||||
"content-length": res._headers?["content-length"]
|
||||
statusCode: res.statusCode
|
||||
"response-time": responseTime
|
||||
"http request"
|
||||
|
||||
next()
|
||||
|
85
libraries/metrics/statsd/memory.coffee
Normal file
85
libraries/metrics/statsd/memory.coffee
Normal file
|
@ -0,0 +1,85 @@
|
|||
# record memory usage each minute and run a periodic gc(), keeping cpu
|
||||
# usage within allowable range of 1ms per minute. Also, dynamically
|
||||
# adjust the period between gc()'s to reach a target of the gc saving
|
||||
# 4 megabytes each time.
|
||||
|
||||
oneMinute = 60 * 1000
|
||||
oneMegaByte = 1024 * 1024
|
||||
|
||||
CpuTimeBucket = 100 # current cpu time allowance in milliseconds
|
||||
CpuTimeBucketMax = 100 # maximum amount of cpu time allowed in bucket
|
||||
CpuTimeBucketRate = 10 # add this many milliseconds per minute
|
||||
|
||||
gcInterval = 1 # how many minutes between gc (parameter is dynamically adjusted)
|
||||
countSinceLastGc = 0 # how many minutes since last gc
|
||||
MemoryChunkSize = 4 # how many megabytes we need to free to consider gc worth doing
|
||||
|
||||
readyToGc = () ->
|
||||
# update allowed cpu time
|
||||
CpuTimeBucket = CpuTimeBucket + CpuTimeBucketRate
|
||||
CpuTimeBucket = if CpuTimeBucket < CpuTimeBucketMax then CpuTimeBucket else CpuTimeBucketMax
|
||||
# update counts since last gc
|
||||
countSinceLastGc = countSinceLastGc + 1
|
||||
# check there is enough time since last gc and we have enough cpu
|
||||
return (countSinceLastGc > gcInterval) && (CpuTimeBucket > 0)
|
||||
|
||||
executeAndTime = (fn) ->
|
||||
# time the execution of fn() and subtract from cpu allowance
|
||||
t0 = process.hrtime()
|
||||
fn()
|
||||
dt = process.hrtime(t0)
|
||||
timeTaken = (dt[0] + dt[1]*1e-9) * 1e3 # in milliseconds
|
||||
CpuTimeBucket -= Math.ceil timeTaken
|
||||
return timeTaken
|
||||
|
||||
inMegaBytes = (obj) ->
|
||||
# convert process.memoryUsage hash {rss,heapTotal,heapFreed} into megabytes
|
||||
result = {}
|
||||
for k, v of obj
|
||||
result[k] = (v / oneMegaByte).toFixed(2)
|
||||
return result
|
||||
|
||||
updateMemoryStats = (oldMem, newMem) ->
|
||||
countSinceLastGc = 0
|
||||
delta = {}
|
||||
for k of newMem
|
||||
delta[k] = (newMem[k] - oldMem[k]).toFixed(2)
|
||||
# take the max of all memory measures
|
||||
savedMemory = Math.max -delta.rss, -delta.heapTotal, -delta.heapUsed
|
||||
delta.megabytesFreed = savedMemory
|
||||
# did it do any good?
|
||||
if savedMemory < MemoryChunkSize
|
||||
gcInterval = gcInterval + 1 # no, so wait longer next time
|
||||
else
|
||||
gcInterval = Math.max gcInterval - 1, 1 # yes, wait less time
|
||||
return delta
|
||||
|
||||
module.exports = MemoryMonitor =
|
||||
monitor: (logger) ->
|
||||
interval = setInterval () ->
|
||||
MemoryMonitor.Check(logger)
|
||||
, oneMinute
|
||||
Metrics = require "./metrics"
|
||||
Metrics.registerDestructor () ->
|
||||
clearInterval(interval)
|
||||
|
||||
Check: (logger) ->
|
||||
Metrics = require "./metrics"
|
||||
memBeforeGc = mem = inMegaBytes process.memoryUsage()
|
||||
Metrics.gauge("memory.rss", mem.rss)
|
||||
Metrics.gauge("memory.heaptotal", mem.heapTotal)
|
||||
Metrics.gauge("memory.heapused", mem.heapUsed)
|
||||
Metrics.gauge("memory.gc-interval", gcInterval)
|
||||
#Metrics.gauge("memory.cpu-time-bucket", CpuTimeBucket)
|
||||
|
||||
logger.log mem, "process.memoryUsage()"
|
||||
|
||||
if global.gc? && readyToGc()
|
||||
gcTime = (executeAndTime global.gc).toFixed(2)
|
||||
memAfterGc = inMegaBytes process.memoryUsage()
|
||||
deltaMem = updateMemoryStats(memBeforeGc, memAfterGc)
|
||||
logger.log {gcTime, memBeforeGc, memAfterGc, deltaMem, gcInterval, CpuTimeBucket}, "global.gc() forced"
|
||||
#Metrics.timing("memory.gc-time", gcTime)
|
||||
Metrics.gauge("memory.gc-rss-freed", -deltaMem.rss)
|
||||
Metrics.gauge("memory.gc-heaptotal-freed", -deltaMem.heapTotal)
|
||||
Metrics.gauge("memory.gc-heapused-freed", -deltaMem.heapUsed)
|
62
libraries/metrics/statsd/metrics.coffee
Normal file
62
libraries/metrics/statsd/metrics.coffee
Normal file
|
@ -0,0 +1,62 @@
|
|||
console.log("using statsd")
|
||||
|
||||
StatsD = require('lynx')
|
||||
statsd = new StatsD(process.env["STATSD_HOST"] or "localhost", 8125, {on_error:->})
|
||||
|
||||
name = "unknown"
|
||||
hostname = require('os').hostname()
|
||||
|
||||
buildKey = (key)-> "#{name}.#{hostname}.#{key}"
|
||||
buildGlobalKey = (key)-> "#{name}.global.#{key}"
|
||||
|
||||
destructors = []
|
||||
|
||||
require "./uv_threadpool_size"
|
||||
|
||||
module.exports = Metrics =
|
||||
initialize: (_name) ->
|
||||
name = _name
|
||||
|
||||
registerDestructor: (func) ->
|
||||
destructors.push func
|
||||
|
||||
set : (key, value, sampleRate = 1)->
|
||||
statsd.set buildKey(key), value, sampleRate
|
||||
|
||||
inc : (key, sampleRate = 1)->
|
||||
statsd.increment buildKey(key), sampleRate
|
||||
|
||||
count : (key, count, sampleRate = 1)->
|
||||
statsd.count buildKey(key), count, sampleRate
|
||||
|
||||
timing: (key, timeSpan, sampleRate)->
|
||||
statsd.timing(buildKey(key), timeSpan, sampleRate)
|
||||
|
||||
Timer : class
|
||||
constructor :(key, sampleRate = 1)->
|
||||
this.start = new Date()
|
||||
this.key = key
|
||||
this.sampleRate = sampleRate
|
||||
done:->
|
||||
timeSpan = new Date - this.start
|
||||
statsd.timing(buildKey(this.key), timeSpan, this.sampleRate)
|
||||
return timeSpan
|
||||
|
||||
gauge : (key, value, sampleRate = 1)->
|
||||
statsd.gauge buildKey(key), value, sampleRate
|
||||
|
||||
globalGauge: (key, value, sampleRate = 1)->
|
||||
statsd.gauge buildGlobalKey(key), value, sampleRate
|
||||
|
||||
mongodb: require "./mongodb"
|
||||
http: require "./http"
|
||||
open_sockets: require "./open_sockets"
|
||||
event_loop: require "./event_loop"
|
||||
memory: require "./memory"
|
||||
|
||||
timeAsyncMethod: require('./timeAsyncMethod')
|
||||
|
||||
close: () ->
|
||||
for func in destructors
|
||||
func()
|
||||
statsd.close()
|
100
libraries/metrics/statsd/mongodb.coffee
Normal file
100
libraries/metrics/statsd/mongodb.coffee
Normal file
|
@ -0,0 +1,100 @@
|
|||
module.exports =
|
||||
monitor: (mongodb_require_path, logger) ->
|
||||
|
||||
try
|
||||
# for the v1 driver the methods to wrap are in the mongodb
|
||||
# module in lib/mongodb/db.js
|
||||
mongodb = require("#{mongodb_require_path}")
|
||||
|
||||
try
|
||||
# for the v2 driver the relevant methods are in the mongodb-core
|
||||
# module in lib/topologies/{server,replset,mongos}.js
|
||||
v2_path = mongodb_require_path.replace(/\/mongodb$/, '/mongodb-core')
|
||||
mongodbCore = require(v2_path)
|
||||
|
||||
Metrics = require("./metrics")
|
||||
|
||||
monitorMethod = (base, method, type) ->
|
||||
return unless base?
|
||||
return unless (_method = base[method])?
|
||||
arglen = _method.length
|
||||
|
||||
mongo_driver_v1_wrapper = (db_command, options, callback) ->
|
||||
if (typeof callback == 'undefined')
|
||||
callback = options
|
||||
options = {}
|
||||
|
||||
collection = db_command.collectionName
|
||||
if collection.match(/\$cmd$/)
|
||||
# Ignore noisy command methods like authenticating, ismaster and ping
|
||||
return _method.call this, db_command, options, callback
|
||||
|
||||
key = "mongo-requests.#{collection}.#{type}"
|
||||
if db_command.query?
|
||||
query = Object.keys(db_command.query).sort().join("_")
|
||||
key += "." + query
|
||||
|
||||
timer = new Metrics.Timer(key)
|
||||
start = new Date()
|
||||
_method.call this, db_command, options, () ->
|
||||
timer.done()
|
||||
time = new Date() - start
|
||||
logger.log
|
||||
query: db_command.query
|
||||
query_type: type
|
||||
collection: collection
|
||||
"response-time": new Date() - start
|
||||
"mongo request"
|
||||
callback.apply this, arguments
|
||||
|
||||
mongo_driver_v2_wrapper = (ns, ops, options, callback) ->
|
||||
if (typeof callback == 'undefined')
|
||||
callback = options
|
||||
options = {}
|
||||
|
||||
if ns.match(/\$cmd$/)
|
||||
# Ignore noisy command methods like authenticating, ismaster and ping
|
||||
return _method.call this, ns, ops, options, callback
|
||||
|
||||
key = "mongo-requests.#{ns}.#{type}"
|
||||
if ops[0].q? # ops[0].q
|
||||
query = Object.keys(ops[0].q).sort().join("_")
|
||||
key += "." + query
|
||||
|
||||
timer = new Metrics.Timer(key)
|
||||
start = new Date()
|
||||
_method.call this, ns, ops, options, () ->
|
||||
timer.done()
|
||||
time = new Date() - start
|
||||
logger.log
|
||||
query: ops[0].q
|
||||
query_type: type
|
||||
collection: ns
|
||||
"response-time": new Date() - start
|
||||
"mongo request"
|
||||
callback.apply this, arguments
|
||||
|
||||
if arglen == 3
|
||||
base[method] = mongo_driver_v1_wrapper
|
||||
else if arglen == 4
|
||||
base[method] = mongo_driver_v2_wrapper
|
||||
|
||||
monitorMethod(mongodb?.Db.prototype, "_executeQueryCommand", "query")
|
||||
monitorMethod(mongodb?.Db.prototype, "_executeRemoveCommand", "remove")
|
||||
monitorMethod(mongodb?.Db.prototype, "_executeInsertCommand", "insert")
|
||||
monitorMethod(mongodb?.Db.prototype, "_executeUpdateCommand", "update")
|
||||
|
||||
monitorMethod(mongodbCore?.Server.prototype, "command", "command")
|
||||
monitorMethod(mongodbCore?.Server.prototype, "remove", "remove")
|
||||
monitorMethod(mongodbCore?.Server.prototype, "insert", "insert")
|
||||
monitorMethod(mongodbCore?.Server.prototype, "update", "update")
|
||||
|
||||
monitorMethod(mongodbCore?.ReplSet.prototype, "command", "command")
|
||||
monitorMethod(mongodbCore?.ReplSet.prototype, "remove", "remove")
|
||||
monitorMethod(mongodbCore?.ReplSet.prototype, "insert", "insert")
|
||||
monitorMethod(mongodbCore?.ReplSet.prototype, "update", "update")
|
||||
|
||||
monitorMethod(mongodbCore?.Mongos.prototype, "command", "command")
|
||||
monitorMethod(mongodbCore?.Mongos.prototype, "remove", "remove")
|
||||
monitorMethod(mongodbCore?.Mongos.prototype, "insert", "insert")
|
||||
monitorMethod(mongodbCore?.Mongos.prototype, "update", "update")
|
28
libraries/metrics/statsd/open_sockets.coffee
Normal file
28
libraries/metrics/statsd/open_sockets.coffee
Normal file
|
@ -0,0 +1,28 @@
|
|||
URL = require "url"
|
||||
seconds = 1000
|
||||
|
||||
# In Node 0.10 the default is 5, which means only 5 open connections at one.
|
||||
# Node 0.12 has a default of Infinity. Make sure we have no limit set,
|
||||
# regardless of Node version.
|
||||
require("http").globalAgent.maxSockets = Infinity
|
||||
require("https").globalAgent.maxSockets = Infinity
|
||||
|
||||
module.exports = OpenSocketsMonitor =
|
||||
monitor: (logger) ->
|
||||
interval = setInterval () ->
|
||||
OpenSocketsMonitor.gaugeOpenSockets()
|
||||
, 5 * seconds
|
||||
Metrics = require "./metrics"
|
||||
Metrics.registerDestructor () ->
|
||||
clearInterval(interval)
|
||||
|
||||
gaugeOpenSockets: () ->
|
||||
Metrics = require "./metrics"
|
||||
for url, agents of require('http').globalAgent.sockets
|
||||
url = URL.parse("http://#{url}")
|
||||
hostname = url.hostname?.replace(/\./g, "_")
|
||||
Metrics.gauge "open_connections.http.#{hostname}", agents.length
|
||||
for url, agents of require('https').globalAgent.sockets
|
||||
url = URL.parse("https://#{url}")
|
||||
hostname = url.hostname?.replace(/\./g, "_")
|
||||
Metrics.gauge "open_connections.https.#{hostname}", agents.length
|
36
libraries/metrics/statsd/timeAsyncMethod.coffee
Normal file
36
libraries/metrics/statsd/timeAsyncMethod.coffee
Normal file
|
@ -0,0 +1,36 @@
|
|||
|
||||
module.exports = (obj, methodName, prefix, logger) ->
|
||||
metrics = require('./metrics')
|
||||
|
||||
if typeof obj[methodName] != 'function'
|
||||
throw new Error("[Metrics] expected object property '#{methodName}' to be a function")
|
||||
|
||||
realMethod = obj[methodName]
|
||||
key = "#{prefix}.#{methodName}"
|
||||
|
||||
obj[methodName] = (originalArgs...) ->
|
||||
|
||||
[firstArgs..., callback] = originalArgs
|
||||
|
||||
if !callback? || typeof callback != 'function'
|
||||
if logger?
|
||||
logger.log "[Metrics] expected wrapped method '#{methodName}' to be invoked with a callback"
|
||||
return realMethod.apply this, originalArgs
|
||||
|
||||
timer = new metrics.Timer(key)
|
||||
|
||||
realMethod.call this, firstArgs..., (callbackArgs...) ->
|
||||
elapsedTime = timer.done()
|
||||
possibleError = callbackArgs[0]
|
||||
if possibleError?
|
||||
metrics.inc "#{key}.failure"
|
||||
else
|
||||
metrics.inc "#{key}.success"
|
||||
if logger?
|
||||
loggableArgs = {}
|
||||
try
|
||||
for arg, idx in firstArgs
|
||||
if arg.toString().match(/^[0-9a-f]{24}$/)
|
||||
loggableArgs["#{idx}"] = arg
|
||||
logger.log {key, args: loggableArgs, elapsedTime}, "[Metrics] timed async method call"
|
||||
callback.apply this, callbackArgs
|
2
libraries/metrics/statsd/uv_threadpool_size.coffee
Normal file
2
libraries/metrics/statsd/uv_threadpool_size.coffee
Normal file
|
@ -0,0 +1,2 @@
|
|||
process.env.UV_THREADPOOL_SIZE=16
|
||||
console.log "Set UV_THREADPOOL_SIZE=#{process.env.UV_THREADPOOL_SIZE}"
|
Loading…
Reference in a new issue