
This commit is contained in:
Brian Gough 2020-07-17 16:01:58 +01:00
parent e31a819636
commit 747a80b545
23 changed files with 833 additions and 776 deletions

View file

@ -1,29 +0,0 @@
module.exports = (grunt) ->
expand: true
cwd: "test/unit/coffee"
src: ["**/*.coffee"]
dest: "test/unit/js/"
ext: ".js"
unit_tests: ["test/unit/js"]
reporter: grunt.option('reporter') or 'spec'
grep: grunt.option("grep")
src: ["test/unit/js/**/*.js"]
grunt.loadNpmTasks 'grunt-contrib-coffee'
grunt.loadNpmTasks 'grunt-contrib-clean'
grunt.loadNpmTasks 'grunt-mocha-test'
grunt.loadNpmTasks 'grunt-execute'
grunt.loadNpmTasks 'grunt-bunyan'
grunt.registerTask 'compile:unit_tests', ['clean:unit_tests', 'coffee:unit_tests']
grunt.registerTask 'test:unit', ['compile:unit_tests', 'mochaTest:unit']

View file

@ -0,0 +1,41 @@
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* Full docs:
module.exports = function(grunt) {
coffee: {
unit_tests: {
expand: true,
cwd: "test/unit/coffee",
src: ["**/*.coffee"],
dest: "test/unit/js/",
ext: ".js"
clean: {
unit_tests: ["test/unit/js"]
mochaTest: {
unit: {
options: {
reporter: grunt.option('reporter') || 'spec',
grep: grunt.option("grep")
src: ["test/unit/js/**/*.js"]
grunt.registerTask('compile:unit_tests', ['clean:unit_tests', 'coffee:unit_tests']);
return grunt.registerTask('test:unit', ['compile:unit_tests', 'mochaTest:unit']);

View file

@ -1,18 +0,0 @@
module.exports = EventLoopMonitor =
monitor: (logger, interval = 1000, log_threshold = 100) ->
Metrics = require "./metrics"
# check for logger on startup to avoid exceptions later if undefined
throw new Error("logger is undefined") if !logger?
# monitor delay in setInterval to detect event loop blocking
previous =
intervalId = setInterval () ->
now =
offset = now - previous - interval
if offset > log_threshold
logger.warn {offset: offset}, "slow event loop"
previous = now
Metrics.timing("event-loop-millsec", offset)
, interval
Metrics.registerDestructor () ->

View file

@ -0,0 +1,30 @@
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* DS207: Consider shorter variations of null checks
* Full docs:
let EventLoopMonitor;
module.exports = (EventLoopMonitor = {
monitor(logger, interval, log_threshold) {
if (interval == null) { interval = 1000; }
if (log_threshold == null) { log_threshold = 100; }
const Metrics = require("./metrics");
// check for logger on startup to avoid exceptions later if undefined
if ((logger == null)) { throw new Error("logger is undefined"); }
// monitor delay in setInterval to detect event loop blocking
let previous =;
const intervalId = setInterval(function() {
const now =;
const offset = now - previous - interval;
if (offset > log_threshold) {
logger.warn({offset}, "slow event loop");
previous = now;
return Metrics.timing("event-loop-millsec", offset);
, interval);
return Metrics.registerDestructor(() => clearInterval(intervalId));

View file

@ -1,53 +0,0 @@
os = require("os")
yn = require("yn")
module.exports.monitor = (logger) ->
return (req, res, next) ->
Metrics = require("./metrics")
startTime = process.hrtime()
end = res.end
res.end = () ->
end.apply(this, arguments)
responseTime = process.hrtime(startTime)
responseTimeMs = Math.round(responseTime[0] * 1000 + responseTime[1] / 1000000)
requestSize = parseInt(req.headers["content-length"], 10)
if req.route?.path?
routePath = req.route.path.toString().replace(/\//g, '_').replace(/\:/g, '').slice(1)
Metrics.timing("http_request", responseTimeMs, null, {method:req.method, status_code: res.statusCode, path:routePath})
if requestSize
Metrics.summary("http_request_size_bytes", requestSize, {method:req.method, status_code: res.statusCode, path:routePath})
remoteIp = req.ip || req.socket?.socket?.remoteAddress || req.socket?.remoteAddress
reqUrl = req.originalUrl || req.url
referrer = req.headers['referer'] || req.headers['referrer']
info =
requestMethod: req.method
requestUrl: reqUrl
requestSize: requestSize
status: res.statusCode
responseSize: res._headers?["content-length"]
userAgent: req.headers["user-agent"]
remoteIp: remoteIp
referer: referrer
seconds: responseTime[0]
nanos: responseTime[1]
protocol: req.protocol
info =
url: reqUrl
method: req.method
referrer: referrer
"remote-addr": remoteIp
"user-agent": req.headers["user-agent"]
"content-length": req.headers["content-length"]
"content-length": res._headers?["content-length"]
statusCode: res.statusCode
"response-time": responseTimeMs, "%s %s", req.method, reqUrl)

libraries/metrics/http.js Normal file
View file

@ -0,0 +1,77 @@
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* DS103: Rewrite code to no longer use __guard__
* DS207: Consider shorter variations of null checks
* Full docs:
const os = require("os");
const yn = require("yn");
module.exports.monitor = logger => (function(req, res, next) {
const Metrics = require("./metrics");
const startTime = process.hrtime();
const {
} = res;
res.end = function() {
let info;
end.apply(this, arguments);
const responseTime = process.hrtime(startTime);
const responseTimeMs = Math.round((responseTime[0] * 1000) + (responseTime[1] / 1000000));
const requestSize = parseInt(req.headers["content-length"], 10);
if ((req.route != null ? req.route.path : undefined) != null) {
const routePath = req.route.path.toString().replace(/\//g, '_').replace(/\:/g, '').slice(1);
Metrics.timing("http_request", responseTimeMs, null, {method:req.method, status_code: res.statusCode, path:routePath});
if (requestSize) {
Metrics.summary("http_request_size_bytes", requestSize, {method:req.method, status_code: res.statusCode, path:routePath});
const remoteIp = req.ip || __guard__(req.socket != null ? req.socket.socket : undefined, x => x.remoteAddress) || (req.socket != null ? req.socket.remoteAddress : undefined);
const reqUrl = req.originalUrl || req.url;
const referrer = req.headers['referer'] || req.headers['referrer'];
info = {
httpRequest: {
requestMethod: req.method,
requestUrl: reqUrl,
status: res.statusCode,
responseSize: (res._headers != null ? res._headers["content-length"] : undefined),
userAgent: req.headers["user-agent"],
referer: referrer,
latency: {
seconds: responseTime[0],
nanos: responseTime[1]
protocol: req.protocol
} else {
info = {
req: {
url: reqUrl,
method: req.method,
"remote-addr": remoteIp,
"user-agent": req.headers["user-agent"],
"content-length": req.headers["content-length"]
res: {
"content-length": (res._headers != null ? res._headers["content-length"] : undefined),
statusCode: res.statusCode
"response-time": responseTimeMs
return, "%s %s", req.method, reqUrl);
return next();
function __guard__(value, transform) {
return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined;

View file

@ -1,2 +0,0 @@
module.exports = require('./metrics');

View file

@ -1,85 +0,0 @@
# record memory usage each minute and run a periodic gc(), keeping cpu
# usage within allowable range of 1ms per minute. Also, dynamically
# adjust the period between gc()'s to reach a target of the gc saving
# 4 megabytes each time.
oneMinute = 60 * 1000
oneMegaByte = 1024 * 1024
CpuTimeBucket = 100 # current cpu time allowance in milliseconds
CpuTimeBucketMax = 100 # maximum amount of cpu time allowed in bucket
CpuTimeBucketRate = 10 # add this many milliseconds per minute
gcInterval = 1 # how many minutes between gc (parameter is dynamically adjusted)
countSinceLastGc = 0 # how many minutes since last gc
MemoryChunkSize = 4 # how many megabytes we need to free to consider gc worth doing
readyToGc = () ->
# update allowed cpu time
CpuTimeBucket = CpuTimeBucket + CpuTimeBucketRate
CpuTimeBucket = if CpuTimeBucket < CpuTimeBucketMax then CpuTimeBucket else CpuTimeBucketMax
# update counts since last gc
countSinceLastGc = countSinceLastGc + 1
# check there is enough time since last gc and we have enough cpu
return (countSinceLastGc > gcInterval) && (CpuTimeBucket > 0)
executeAndTime = (fn) ->
# time the execution of fn() and subtract from cpu allowance
t0 = process.hrtime()
dt = process.hrtime(t0)
timeTaken = (dt[0] + dt[1]*1e-9) * 1e3 # in milliseconds
CpuTimeBucket -= Math.ceil timeTaken
return timeTaken
inMegaBytes = (obj) ->
# convert process.memoryUsage hash {rss,heapTotal,heapFreed} into megabytes
result = {}
for k, v of obj
result[k] = (v / oneMegaByte).toFixed(2)
return result
updateMemoryStats = (oldMem, newMem) ->
countSinceLastGc = 0
delta = {}
for k of newMem
delta[k] = (newMem[k] - oldMem[k]).toFixed(2)
# take the max of all memory measures
savedMemory = Math.max -delta.rss, -delta.heapTotal, -delta.heapUsed
delta.megabytesFreed = savedMemory
# did it do any good?
if savedMemory < MemoryChunkSize
gcInterval = gcInterval + 1 # no, so wait longer next time
gcInterval = Math.max gcInterval - 1, 1 # yes, wait less time
return delta
module.exports = MemoryMonitor =
monitor: (logger) ->
interval = setInterval () ->
, oneMinute
Metrics = require "./metrics"
Metrics.registerDestructor () ->
Check: (logger) ->
Metrics = require "./metrics"
memBeforeGc = mem = inMegaBytes process.memoryUsage()
Metrics.gauge("memory.rss", mem.rss)
Metrics.gauge("memory.heaptotal", mem.heapTotal)
Metrics.gauge("memory.heapused", mem.heapUsed)
Metrics.gauge("memory.gc-interval", gcInterval)
#Metrics.gauge("memory.cpu-time-bucket", CpuTimeBucket)
logger.log mem, "process.memoryUsage()"
if global.gc? && readyToGc()
gcTime = (executeAndTime global.gc).toFixed(2)
memAfterGc = inMegaBytes process.memoryUsage()
deltaMem = updateMemoryStats(memBeforeGc, memAfterGc)
logger.log {gcTime, memBeforeGc, memAfterGc, deltaMem, gcInterval, CpuTimeBucket}, "global.gc() forced"
#Metrics.timing("memory.gc-time", gcTime)
Metrics.gauge("memory.gc-rss-freed", -deltaMem.rss)
Metrics.gauge("memory.gc-heaptotal-freed", -deltaMem.heapTotal)
Metrics.gauge("memory.gc-heapused-freed", -deltaMem.heapUsed)

libraries/metrics/memory.js Normal file
View file

@ -0,0 +1,103 @@
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* DS207: Consider shorter variations of null checks
* Full docs:
// record memory usage each minute and run a periodic gc(), keeping cpu
// usage within allowable range of 1ms per minute. Also, dynamically
// adjust the period between gc()'s to reach a target of the gc saving
// 4 megabytes each time.
let MemoryMonitor;
const oneMinute = 60 * 1000;
const oneMegaByte = 1024 * 1024;
let CpuTimeBucket = 100; // current cpu time allowance in milliseconds
const CpuTimeBucketMax = 100; // maximum amount of cpu time allowed in bucket
const CpuTimeBucketRate = 10; // add this many milliseconds per minute
let gcInterval = 1; // how many minutes between gc (parameter is dynamically adjusted)
let countSinceLastGc = 0; // how many minutes since last gc
const MemoryChunkSize = 4; // how many megabytes we need to free to consider gc worth doing
const readyToGc = function() {
// update allowed cpu time
CpuTimeBucket = CpuTimeBucket + CpuTimeBucketRate;
CpuTimeBucket = CpuTimeBucket < CpuTimeBucketMax ? CpuTimeBucket : CpuTimeBucketMax;
// update counts since last gc
countSinceLastGc = countSinceLastGc + 1;
// check there is enough time since last gc and we have enough cpu
return (countSinceLastGc > gcInterval) && (CpuTimeBucket > 0);
const executeAndTime = function(fn) {
// time the execution of fn() and subtract from cpu allowance
const t0 = process.hrtime();
const dt = process.hrtime(t0);
const timeTaken = (dt[0] + (dt[1]*1e-9)) * 1e3; // in milliseconds
CpuTimeBucket -= Math.ceil(timeTaken);
return timeTaken;
const inMegaBytes = function(obj) {
// convert process.memoryUsage hash {rss,heapTotal,heapFreed} into megabytes
const result = {};
for (let k in obj) {
const v = obj[k];
result[k] = (v / oneMegaByte).toFixed(2);
return result;
const updateMemoryStats = function(oldMem, newMem) {
countSinceLastGc = 0;
const delta = {};
for (let k in newMem) {
delta[k] = (newMem[k] - oldMem[k]).toFixed(2);
// take the max of all memory measures
const savedMemory = Math.max(-delta.rss, -delta.heapTotal, -delta.heapUsed);
delta.megabytesFreed = savedMemory;
// did it do any good?
if (savedMemory < MemoryChunkSize) {
gcInterval = gcInterval + 1; // no, so wait longer next time
} else {
gcInterval = Math.max(gcInterval - 1, 1); // yes, wait less time
return delta;
module.exports = (MemoryMonitor = {
monitor(logger) {
const interval = setInterval(() => MemoryMonitor.Check(logger)
, oneMinute);
const Metrics = require("./metrics");
return Metrics.registerDestructor(() => clearInterval(interval));
Check(logger) {
let mem;
const Metrics = require("./metrics");
const memBeforeGc = (mem = inMegaBytes(process.memoryUsage()));
Metrics.gauge("memory.rss", mem.rss);
Metrics.gauge("memory.heaptotal", mem.heapTotal);
Metrics.gauge("memory.heapused", mem.heapUsed);
Metrics.gauge("memory.gc-interval", gcInterval);
//Metrics.gauge("memory.cpu-time-bucket", CpuTimeBucket)
logger.log(mem, "process.memoryUsage()");
if ((global.gc != null) && readyToGc()) {
const gcTime = (executeAndTime(global.gc)).toFixed(2);
const memAfterGc = inMegaBytes(process.memoryUsage());
const deltaMem = updateMemoryStats(memBeforeGc, memAfterGc);
logger.log({gcTime, memBeforeGc, memAfterGc, deltaMem, gcInterval, CpuTimeBucket}, "global.gc() forced");
//Metrics.timing("memory.gc-time", gcTime)
Metrics.gauge("memory.gc-rss-freed", -deltaMem.rss);
Metrics.gauge("memory.gc-heaptotal-freed", -deltaMem.heapTotal);
return Metrics.gauge("memory.gc-heapused-freed", -deltaMem.heapUsed);

View file

@ -1,140 +0,0 @@
console.log("using prometheus")
prom = require('./prom_wrapper')
collectDefaultMetrics = prom.collectDefaultMetrics
appname = "unknown"
hostname = require('os').hostname()
destructors = []
require "./uv_threadpool_size"
module.exports = Metrics =
register: prom.registry
initialize: (_name, opts = {}) ->
appname = _name
collectDefaultMetrics({ timeout: 5000, prefix: Metrics.buildPromKey()})
if opts.ttlInMinutes
prom.ttlInMinutes = opts.ttlInMinutes
console.log("ENABLE_TRACE_AGENT set to #{process.env['ENABLE_TRACE_AGENT']}")
if process.env['ENABLE_TRACE_AGENT'] == "true"
console.log("starting google trace agent")
traceAgent = require('@google-cloud/trace-agent')
traceOpts =
ignoreUrls: [/^\/status/, /^\/health_check/]
console.log("ENABLE_DEBUG_AGENT set to #{process.env['ENABLE_DEBUG_AGENT']}")
if process.env['ENABLE_DEBUG_AGENT'] == "true"
console.log("starting google debug agent")
debugAgent = require('@google-cloud/debug-agent')
allowExpressions: true,
serviceContext: {
service: appname,
version: process.env['BUILD_VERSION']
console.log("ENABLE_PROFILE_AGENT set to #{process.env['ENABLE_PROFILE_AGENT']}")
if process.env['ENABLE_PROFILE_AGENT'] == "true"
console.log("starting google profile agent")
profiler = require('@google-cloud/profiler')
serviceContext: {
service: appname,
version: process.env['BUILD_VERSION']
registerDestructor: (func) ->
destructors.push func
injectMetricsRoute: (app) ->
app.get('/metrics', (req, res) ->
res.set('Content-Type', prom.registry.contentType)
buildPromKey: (key = "")->
key.replace /[^a-zA-Z0-9]/g, "_"
sanitizeValue: (value) ->
set : (key, value, sampleRate = 1)->
console.log("counts are not currently supported")
inc : (key, sampleRate = 1, opts = {})->
key = Metrics.buildPromKey(key) = appname = hostname
prom.metric('counter', key).inc(opts)
if process.env['DEBUG_METRICS']
console.log("doing inc", key, opts)
count : (key, count, sampleRate = 1, opts = {})->
key = Metrics.buildPromKey(key) = appname = hostname
prom.metric('counter', key).inc(opts, count)
if process.env['DEBUG_METRICS']
console.log("doing count/inc", key, opts)
summary : (key, value, opts = {})->
key = Metrics.buildPromKey(key) = appname = hostname
prom.metric('summary', key).observe(opts, value)
if process.env['DEBUG_METRICS']
console.log("doing summary", key, value, opts)
timing: (key, timeSpan, sampleRate, opts = {})->
key = Metrics.buildPromKey("timer_" + key) = appname = hostname
prom.metric('summary', key).observe(opts, timeSpan)
if process.env['DEBUG_METRICS']
console.log("doing timing", key, opts)
Timer : class
constructor :(key, sampleRate = 1, opts)->
this.start = new Date()
key = Metrics.buildPromKey(key)
this.key = key
this.sampleRate = sampleRate
this.opts = opts
timeSpan = new Date - this.start
Metrics.timing(this.key, timeSpan, this.sampleRate, this.opts)
return timeSpan
gauge : (key, value, sampleRate = 1, opts)->
key = Metrics.buildPromKey(key)
prom.metric('gauge', key).set({app: appname, host: hostname, status: opts?.status}, this.sanitizeValue(value))
if process.env['DEBUG_METRICS']
console.log("doing gauge", key, opts)
globalGauge: (key, value, sampleRate = 1, opts)->
key = Metrics.buildPromKey(key)
prom.metric('gauge', key).set({app: appname, status: opts?.status},this.sanitizeValue(value))
mongodb: require "./mongodb"
http: require "./http"
open_sockets: require "./open_sockets"
event_loop: require "./event_loop"
memory: require "./memory"
timeAsyncMethod: require('./timeAsyncMethod')
close: () ->
for func in destructors

View file

@ -0,0 +1,188 @@
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS102: Remove unnecessary code created because of implicit returns
* DS207: Consider shorter variations of null checks
* Full docs:
let Metrics;
console.log("using prometheus");
const prom = require('./prom_wrapper');
const {
} = prom;
let appname = "unknown";
const hostname = require('os').hostname();
const destructors = [];
module.exports = (Metrics = {
register: prom.registry,
initialize(_name, opts) {
if (opts == null) { opts = {}; }
appname = _name;
collectDefaultMetrics({ timeout: 5000, prefix: Metrics.buildPromKey()});
if (opts.ttlInMinutes) {
prom.ttlInMinutes = opts.ttlInMinutes;
console.log(`ENABLE_TRACE_AGENT set to ${process.env['ENABLE_TRACE_AGENT']}`);
if (process.env['ENABLE_TRACE_AGENT'] === "true") {
console.log("starting google trace agent");
const traceAgent = require('@google-cloud/trace-agent');
const traceOpts =
{ignoreUrls: [/^\/status/, /^\/health_check/]};
console.log(`ENABLE_DEBUG_AGENT set to ${process.env['ENABLE_DEBUG_AGENT']}`);
if (process.env['ENABLE_DEBUG_AGENT'] === "true") {
console.log("starting google debug agent");
const debugAgent = require('@google-cloud/debug-agent');
allowExpressions: true,
serviceContext: {
service: appname,
version: process.env['BUILD_VERSION']
console.log(`ENABLE_PROFILE_AGENT set to ${process.env['ENABLE_PROFILE_AGENT']}`);
if (process.env['ENABLE_PROFILE_AGENT'] === "true") {
console.log("starting google profile agent");
const profiler = require('@google-cloud/profiler');
serviceContext: {
service: appname,
version: process.env['BUILD_VERSION']
registerDestructor(func) {
return destructors.push(func);
injectMetricsRoute(app) {
return app.get('/metrics', function(req, res) {
res.set('Content-Type', prom.registry.contentType);
return res.end(prom.registry.metrics());
if (key == null) { key = ""; }
return key.replace(/[^a-zA-Z0-9]/g, "_");
sanitizeValue(value) {
return parseFloat(value);
set(key, value, sampleRate){
if (sampleRate == null) { sampleRate = 1; }
return console.log("counts are not currently supported");
inc(key, sampleRate, opts){
if (sampleRate == null) { sampleRate = 1; }
if (opts == null) { opts = {}; }
key = Metrics.buildPromKey(key); = appname; = hostname;
prom.metric('counter', key).inc(opts);
if (process.env['DEBUG_METRICS']) {
return console.log("doing inc", key, opts);
count(key, count, sampleRate, opts){
if (sampleRate == null) { sampleRate = 1; }
if (opts == null) { opts = {}; }
key = Metrics.buildPromKey(key); = appname; = hostname;
prom.metric('counter', key).inc(opts, count);
if (process.env['DEBUG_METRICS']) {
return console.log("doing count/inc", key, opts);
summary(key, value, opts){
if (opts == null) { opts = {}; }
key = Metrics.buildPromKey(key); = appname; = hostname;
prom.metric('summary', key).observe(opts, value);
if (process.env['DEBUG_METRICS']) {
return console.log("doing summary", key, value, opts);
timing(key, timeSpan, sampleRate, opts){
if (opts == null) { opts = {}; }
key = Metrics.buildPromKey("timer_" + key); = appname; = hostname;
prom.metric('summary', key).observe(opts, timeSpan);
if (process.env['DEBUG_METRICS']) {
return console.log("doing timing", key, opts);
Timer : class {
constructor(key, sampleRate, opts){
if (sampleRate == null) { sampleRate = 1; }
this.start = new Date();
key = Metrics.buildPromKey(key);
this.key = key;
this.sampleRate = sampleRate;
this.opts = opts;
done() {
const timeSpan = new Date - this.start;
Metrics.timing(this.key, timeSpan, this.sampleRate, this.opts);
return timeSpan;
gauge(key, value, sampleRate, opts){
if (sampleRate == null) { sampleRate = 1; }
key = Metrics.buildPromKey(key);
prom.metric('gauge', key).set({app: appname, host: hostname, status: (opts != null ? opts.status : undefined)}, this.sanitizeValue(value));
if (process.env['DEBUG_METRICS']) {
return console.log("doing gauge", key, opts);
globalGauge(key, value, sampleRate, opts){
if (sampleRate == null) { sampleRate = 1; }
key = Metrics.buildPromKey(key);
return prom.metric('gauge', key).set({app: appname, status: (opts != null ? opts.status : undefined)},this.sanitizeValue(value));
mongodb: require("./mongodb"),
http: require("./http"),
open_sockets: require("./open_sockets"),
event_loop: require("./event_loop"),
memory: require("./memory"),
timeAsyncMethod: require('./timeAsyncMethod'),
close() {
return Array.from(destructors).map((func) =>

View file

@ -1,100 +0,0 @@
module.exports =
monitor: (mongodb_require_path, logger) ->
# for the v1 driver the methods to wrap are in the mongodb
# module in lib/mongodb/db.js
mongodb = require("#{mongodb_require_path}")
# for the v2 driver the relevant methods are in the mongodb-core
# module in lib/topologies/{server,replset,mongos}.js
v2_path = mongodb_require_path.replace(/\/mongodb$/, '/mongodb-core')
mongodbCore = require(v2_path)
Metrics = require("./metrics")
monitorMethod = (base, method, type) ->
return unless base?
return unless (_method = base[method])?
arglen = _method.length
mongo_driver_v1_wrapper = (db_command, options, callback) ->
if (typeof callback == 'undefined')
callback = options
options = {}
collection = db_command.collectionName
if collection.match(/\$cmd$/)
# Ignore noisy command methods like authenticating, ismaster and ping
return this, db_command, options, callback
key = "mongo-requests.#{collection}.#{type}"
if db_command.query?
query = Object.keys(db_command.query).sort().join("_")
key += "." + query
timer = new Metrics.Timer("mongo", {collection: collection, query:query})
start = new Date() this, db_command, options, () ->
time = new Date() - start
query: db_command.query
query_type: type
collection: collection
"response-time": new Date() - start
"mongo request"
callback.apply this, arguments
mongo_driver_v2_wrapper = (ns, ops, options, callback) ->
if (typeof callback == 'undefined')
callback = options
options = {}
if ns.match(/\$cmd$/)
# Ignore noisy command methods like authenticating, ismaster and ping
return this, ns, ops, options, callback
key = "mongo-requests.#{ns}.#{type}"
if ops[0].q? # ops[0].q
query = Object.keys(ops[0].q).sort().join("_")
key += "." + query
timer = new Metrics.Timer(key)
start = new Date() this, ns, ops, options, () ->
time = new Date() - start
query: ops[0].q
query_type: type
collection: ns
"response-time": new Date() - start
"mongo request"
callback.apply this, arguments
if arglen == 3
base[method] = mongo_driver_v1_wrapper
else if arglen == 4
base[method] = mongo_driver_v2_wrapper
monitorMethod(mongodb?.Db.prototype, "_executeQueryCommand", "query")
monitorMethod(mongodb?.Db.prototype, "_executeRemoveCommand", "remove")
monitorMethod(mongodb?.Db.prototype, "_executeInsertCommand", "insert")
monitorMethod(mongodb?.Db.prototype, "_executeUpdateCommand", "update")
monitorMethod(mongodbCore?.Server.prototype, "command", "command")
monitorMethod(mongodbCore?.Server.prototype, "remove", "remove")
monitorMethod(mongodbCore?.Server.prototype, "insert", "insert")
monitorMethod(mongodbCore?.Server.prototype, "update", "update")
monitorMethod(mongodbCore?.ReplSet.prototype, "command", "command")
monitorMethod(mongodbCore?.ReplSet.prototype, "remove", "remove")
monitorMethod(mongodbCore?.ReplSet.prototype, "insert", "insert")
monitorMethod(mongodbCore?.ReplSet.prototype, "update", "update")
monitorMethod(mongodbCore?.Mongos.prototype, "command", "command")
monitorMethod(mongodbCore?.Mongos.prototype, "remove", "remove")
monitorMethod(mongodbCore?.Mongos.prototype, "insert", "insert")
monitorMethod(mongodbCore?.Mongos.prototype, "update", "update")

View file

@ -0,0 +1,128 @@
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* DS207: Consider shorter variations of null checks
* Full docs:
module.exports = {
monitor(mongodb_require_path, logger) {
let mongodb, mongodbCore;
try {
// for the v1 driver the methods to wrap are in the mongodb
// module in lib/mongodb/db.js
mongodb = require(`${mongodb_require_path}`);
} catch (error) {}
try {
// for the v2 driver the relevant methods are in the mongodb-core
// module in lib/topologies/{server,replset,mongos}.js
const v2_path = mongodb_require_path.replace(/\/mongodb$/, '/mongodb-core');
mongodbCore = require(v2_path);
} catch (error1) {}
const Metrics = require("./metrics");
const monitorMethod = function(base, method, type) {
let _method;
if (base == null) { return; }
if ((_method = base[method]) == null) { return; }
const arglen = _method.length;
const mongo_driver_v1_wrapper = function(db_command, options, callback) {
let query;
if (typeof callback === 'undefined') {
callback = options;
options = {};
const collection = db_command.collectionName;
if (collection.match(/\$cmd$/)) {
// Ignore noisy command methods like authenticating, ismaster and ping
return, db_command, options, callback);
let key = `mongo-requests.${collection}.${type}`;
if (db_command.query != null) {
query = Object.keys(db_command.query).sort().join("_");
key += "." + query;
const timer = new Metrics.Timer("mongo", {collection, query});
const start = new Date();
return, db_command, options, function() {
const time = new Date() - start;
query: db_command.query,
query_type: type,
"response-time": new Date() - start
"mongo request");
return callback.apply(this, arguments);
const mongo_driver_v2_wrapper = function(ns, ops, options, callback) {
let query;
if (typeof callback === 'undefined') {
callback = options;
options = {};
if (ns.match(/\$cmd$/)) {
// Ignore noisy command methods like authenticating, ismaster and ping
return, ns, ops, options, callback);
let key = `mongo-requests.${ns}.${type}`;
if (ops[0].q != null) { // ops[0].q
query = Object.keys(ops[0].q).sort().join("_");
key += "." + query;
const timer = new Metrics.Timer(key);
const start = new Date();
return, ns, ops, options, function() {
const time = new Date() - start;
query: ops[0].q,
query_type: type,
collection: ns,
"response-time": new Date() - start
"mongo request");
return callback.apply(this, arguments);
if (arglen === 3) {
return base[method] = mongo_driver_v1_wrapper;
} else if (arglen === 4) {
return base[method] = mongo_driver_v2_wrapper;
monitorMethod(mongodb != null ? mongodb.Db.prototype : undefined, "_executeQueryCommand", "query");
monitorMethod(mongodb != null ? mongodb.Db.prototype : undefined, "_executeRemoveCommand", "remove");
monitorMethod(mongodb != null ? mongodb.Db.prototype : undefined, "_executeInsertCommand", "insert");
monitorMethod(mongodb != null ? mongodb.Db.prototype : undefined, "_executeUpdateCommand", "update");
monitorMethod(mongodbCore != null ? mongodbCore.Server.prototype : undefined, "command", "command");
monitorMethod(mongodbCore != null ? mongodbCore.Server.prototype : undefined, "remove", "remove");
monitorMethod(mongodbCore != null ? mongodbCore.Server.prototype : undefined, "insert", "insert");
monitorMethod(mongodbCore != null ? mongodbCore.Server.prototype : undefined, "update", "update");
monitorMethod(mongodbCore != null ? mongodbCore.ReplSet.prototype : undefined, "command", "command");
monitorMethod(mongodbCore != null ? mongodbCore.ReplSet.prototype : undefined, "remove", "remove");
monitorMethod(mongodbCore != null ? mongodbCore.ReplSet.prototype : undefined, "insert", "insert");
monitorMethod(mongodbCore != null ? mongodbCore.ReplSet.prototype : undefined, "update", "update");
monitorMethod(mongodbCore != null ? mongodbCore.Mongos.prototype : undefined, "command", "command");
monitorMethod(mongodbCore != null ? mongodbCore.Mongos.prototype : undefined, "remove", "remove");
monitorMethod(mongodbCore != null ? mongodbCore.Mongos.prototype : undefined, "insert", "insert");
return monitorMethod(mongodbCore != null ? mongodbCore.Mongos.prototype : undefined, "update", "update");

View file

@ -1,28 +0,0 @@
URL = require "url"
seconds = 1000
# In Node 0.10 the default is 5, which means only 5 open connections at one.
# Node 0.12 has a default of Infinity. Make sure we have no limit set,
# regardless of Node version.
require("http").globalAgent.maxSockets = Infinity
require("https").globalAgent.maxSockets = Infinity
module.exports = OpenSocketsMonitor =
monitor: (logger) ->
interval = setInterval () ->
, 5 * seconds
Metrics = require "./metrics"
Metrics.registerDestructor () ->
gaugeOpenSockets: () ->
Metrics = require "./metrics"
for url, agents of require('http').globalAgent.sockets
url = URL.parse("http://#{url}")
hostname = url.hostname?.replace(/\./g, "_")
Metrics.gauge "open_connections.http.#{hostname}", agents.length
for url, agents of require('https').globalAgent.sockets
url = URL.parse("https://#{url}")
hostname = url.hostname?.replace(/\./g, "_")
Metrics.gauge "open_connections.https.#{hostname}", agents.length

View file

@ -0,0 +1,48 @@
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* DS205: Consider reworking code to avoid use of IIFEs
* DS207: Consider shorter variations of null checks
* Full docs:
let OpenSocketsMonitor;
const URL = require("url");
const seconds = 1000;
// In Node 0.10 the default is 5, which means only 5 open connections at one.
// Node 0.12 has a default of Infinity. Make sure we have no limit set,
// regardless of Node version.
require("http").globalAgent.maxSockets = Infinity;
require("https").globalAgent.maxSockets = Infinity;
module.exports = (OpenSocketsMonitor = {
monitor(logger) {
const interval = setInterval(() => OpenSocketsMonitor.gaugeOpenSockets()
, 5 * seconds);
const Metrics = require("./metrics");
return Metrics.registerDestructor(() => clearInterval(interval));
gaugeOpenSockets() {
let agents, hostname, url;
const Metrics = require("./metrics");
const object = require('http').globalAgent.sockets;
for (url in object) {
agents = object[url];
url = URL.parse(`http://${url}`);
hostname = url.hostname != null ? url.hostname.replace(/\./g, "_") : undefined;
Metrics.gauge(`open_connections.http.${hostname}`, agents.length);
return (() => {
const result = [];
const object1 = require('https').globalAgent.sockets;
for (url in object1) {
agents = object1[url];
url = URL.parse(`https://${url}`);
hostname = url.hostname != null ? url.hostname.replace(/\./g, "_") : undefined;
result.push(Metrics.gauge(`open_connections.https.${hostname}`, agents.length));
return result;

View file

@ -1,114 +0,0 @@
prom = require('prom-client')
registry = require('prom-client').register
metrics = new Map()
optsKey = (opts) ->
keys = Object.keys(opts)
return '' if keys.length == 0
keys = keys.sort()
hash = '';
for key in keys
hash += "," if hash.length
hash += "#{key}:#{opts[key]}"
return hash
extendOpts = (opts, labelNames) ->
for label in labelNames
opts[label] ||= ''
return opts
optsAsArgs = (opts, labelNames) ->
args = []
for label in labelNames
args.push(opts[label] || '')
return args
PromWrapper =
ttlInMinutes: 0
registry: registry
metric: (type, name) ->
metrics.get(name) || new MetricWrapper(type, name)
collectDefaultMetrics: prom.collectDefaultMetrics
class MetricWrapper
constructor: (type, name) ->
metrics.set(name, this)
@name = name
@instances = new Map()
@lastAccess = new Date()
@metric = switch type
when "counter"
new prom.Counter({
name: name,
help: name,
labelNames: ['app','host','status','method', 'path']
when "summary"
new prom.Summary({
name: name,
help: name,
maxAgeSeconds: 60,
ageBuckets: 10,
labelNames: ['app', 'host', 'path', 'status_code', 'method', 'collection', 'query']
when "gauge"
new prom.Gauge({
name: name,
help: name,
labelNames: ['app','host', 'status']
inc: (opts, value) ->
@_execMethod 'inc', opts, value
observe: (opts, value) ->
@_execMethod 'observe', opts, value
set: (opts, value) ->
@_execMethod 'set', opts, value
sweep: () ->
thresh = new Date( - 1000 * 60 * PromWrapper.ttlInMinutes)
@instances.forEach (instance, key) =>
if thresh > instance.time
if process.env['DEBUG_METRICS']
console.log("Sweeping stale metric instance", @name, opts: instance.opts, key)
@metric.remove(optsAsArgs(instance.opts, @metric.labelNames)...)
if thresh > @lastAccess
if process.env['DEBUG_METRICS']
console.log("Sweeping stale metric", @name, thresh, @lastAccess)
_execMethod: (method, opts, value) ->
opts = extendOpts(opts, @metric.labelNames)
key = optsKey(opts)
@instances.set(key, { time: new Date(), opts }) unless key == ''
@lastAccess = new Date()
@metric[method](opts, value)
unless PromWrapper.sweepRegistered
if process.env['DEBUG_METRICS']
console.log("Registering sweep method")
PromWrapper.sweepRegistered = true
() ->
if PromWrapper.ttlInMinutes
if process.env['DEBUG_METRICS']
console.log("Sweeping metrics")
metrics.forEach (metric, key) =>
module.exports = PromWrapper

View file

@ -0,0 +1,148 @@
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS102: Remove unnecessary code created because of implicit returns
* DS205: Consider reworking code to avoid use of IIFEs
* Full docs:
const prom = require('prom-client');
const registry = require('prom-client').register;
const metrics = new Map();
const optsKey = function(opts) {
let keys = Object.keys(opts);
if (keys.length === 0) { return ''; }
keys = keys.sort();
let hash = '';
for (let key of Array.from(keys)) {
if (hash.length) { hash += ","; }
hash += `${key}:${opts[key]}`;
return hash;
const extendOpts = function(opts, labelNames) {
for (let label of Array.from(labelNames)) {
if (!opts[label]) { opts[label] = ''; }
return opts;
const optsAsArgs = function(opts, labelNames) {
const args = [];
for (let label of Array.from(labelNames)) {
args.push(opts[label] || '');
return args;
const PromWrapper = {
ttlInMinutes: 0,
metric(type, name) {
return metrics.get(name) || new MetricWrapper(type, name);
collectDefaultMetrics: prom.collectDefaultMetrics
class MetricWrapper {
constructor(type, name) {
metrics.set(name, this); = name;
this.instances = new Map();
this.lastAccess = new Date();
this.metric = (() => { switch (type) {
case "counter":
return new prom.Counter({
help: name,
labelNames: ['app','host','status','method', 'path']
case "summary":
return new prom.Summary({
help: name,
maxAgeSeconds: 60,
ageBuckets: 10,
labelNames: ['app', 'host', 'path', 'status_code', 'method', 'collection', 'query']
case "gauge":
return new prom.Gauge({
help: name,
labelNames: ['app','host', 'status']
} })();
inc(opts, value) {
return this._execMethod('inc', opts, value);
observe(opts, value) {
return this._execMethod('observe', opts, value);
set(opts, value) {
return this._execMethod('set', opts, value);
sweep() {
const thresh = new Date( - (1000 * 60 * PromWrapper.ttlInMinutes));
this.instances.forEach((instance, key) => {
if (thresh > instance.time) {
if (process.env['DEBUG_METRICS']) {
console.log("Sweeping stale metric instance",, {opts: instance.opts}, key);
return this.metric.remove(...Array.from(optsAsArgs(instance.opts, this.metric.labelNames) || []));
if (thresh > this.lastAccess) {
if (process.env['DEBUG_METRICS']) {
console.log("Sweeping stale metric",, thresh, this.lastAccess);
return registry.removeSingleMetric(;
_execMethod(method, opts, value) {
opts = extendOpts(opts, this.metric.labelNames);
const key = optsKey(opts);
if (key !== '') { this.instances.set(key, { time: new Date(), opts }); }
this.lastAccess = new Date();
return this.metric[method](opts, value);
if (!PromWrapper.sweepRegistered) {
if (process.env['DEBUG_METRICS']) {
console.log("Registering sweep method");
PromWrapper.sweepRegistered = true;
function() {
if (PromWrapper.ttlInMinutes) {
if (process.env['DEBUG_METRICS']) {
console.log("Sweeping metrics");
return metrics.forEach((metric, key) => {
return metric.sweep();
module.exports = PromWrapper;

View file

@ -1,34 +0,0 @@
chai = require('chai')
should = chai.should()
expect = chai.expect
path = require('path')
modulePath = path.join __dirname, '../../../'
SandboxedModule = require('sandboxed-module')
sinon = require("sinon")
describe 'event_loop', ->
before ->
@metrics = {
timing: sinon.stub()
registerDestructor: sinon.stub()
@logger = {
warn: sinon.stub()
@event_loop = SandboxedModule.require modulePath, requires:
'./metrics': @metrics
describe 'with a logger provided', ->
before ->
it 'should register a destructor with metrics', ->
@metrics.registerDestructor.called.should.equal true
describe 'without a logger provided', ->
it 'should throw an exception', ->
expect(@event_loop.monitor).to.throw('logger is undefined')

View file

@ -1,126 +0,0 @@
chai = require('chai')
should = chai.should()
expect = chai.expect
path = require('path')
modulePath = path.join __dirname, '../../../'
SandboxedModule = require('sandboxed-module')
sinon = require("sinon")
describe 'timeAsyncMethod', ->
beforeEach ->
@Timer = {done: sinon.stub()}
@TimerConstructor = sinon.stub().returns(@Timer)
@metrics = {
Timer: @TimerConstructor
inc: sinon.stub()
@timeAsyncMethod = SandboxedModule.require modulePath, requires:
'./metrics': @metrics
@testObject = {
nextNumber: (n, callback=(err, result)->) ->
() ->
callback(null, n+1)
, 100
it 'should have the testObject behave correctly before wrapping', (done) ->
@testObject.nextNumber 2, (err, result) ->
expect(result).to.equal 3
it 'should wrap method without error', (done) ->
@timeAsyncMethod @testObject, 'nextNumber', 'someContext.TestObject'
it 'should transparently wrap method invocation in timer', (done) ->
@timeAsyncMethod @testObject, 'nextNumber', 'someContext.TestObject'
@testObject.nextNumber 2, (err, result) =>
expect(result).to.equal 3
expect(@TimerConstructor.callCount).to.equal 1
expect(@Timer.done.callCount).to.equal 1
it 'should increment success count', (done) -> = sinon.stub()
@timeAsyncMethod @testObject, 'nextNumber', 'someContext.TestObject'
@testObject.nextNumber 2, (err, result) =>
expect( 1
expect('someContext_result', 1, { method: 'TestObject_nextNumber', status: 'success'})).to.equal true
describe 'when base method produces an error', ->
beforeEach -> = sinon.stub()
@testObject.nextNumber = (n, callback=(err, result)->) ->
() ->
callback(new Error('woops'))
, 100
it 'should propagate the error transparently', (done) ->
@timeAsyncMethod @testObject, 'nextNumber', 'someContext.TestObject'
@testObject.nextNumber 2, (err, result) =>
expect(err) Error
it 'should increment failure count', (done) ->
@timeAsyncMethod @testObject, 'nextNumber', 'someContext.TestObject'
@testObject.nextNumber 2, (err, result) =>
expect( 1
expect('someContext_result', 1, { method: 'TestObject_nextNumber', status: 'failed'})).to.equal true
describe 'when a logger is supplied', ->
beforeEach ->
@logger = {log: sinon.stub()}
it 'should also call logger.log', (done) ->
@timeAsyncMethod @testObject, 'nextNumber', 'someContext.TestObject', @logger
@testObject.nextNumber 2, (err, result) =>
expect(result).to.equal 3
expect(@TimerConstructor.callCount).to.equal 1
expect(@Timer.done.callCount).to.equal 1
expect(@logger.log.callCount).to.equal 1
describe 'when the wrapper cannot be applied', ->
beforeEach ->
it 'should raise an error', ->
badWrap = () =>
@timeAsyncMethod @testObject, 'DEFINITELY_NOT_A_REAL_METHOD', 'someContext.TestObject'
/^.*expected object property 'DEFINITELY_NOT_A_REAL_METHOD' to be a function.*$/
describe 'when the wrapped function is not using a callback', ->
beforeEach ->
@realMethod = sinon.stub().returns(42)
@testObject.nextNumber = @realMethod
it 'should not throw an error', ->
@timeAsyncMethod @testObject, 'nextNumber', 'someContext.TestObject'
badCall = () =>
@testObject.nextNumber 2
it 'should call the underlying method', ->
@timeAsyncMethod @testObject, 'nextNumber', 'someContext.TestObject'
result = @testObject.nextNumber(12)
expect(@realMethod.callCount).to.equal 1
expect(@realMethod.calledWith(12)).to.equal true
expect(result).to.equal 42

View file

@ -1,44 +0,0 @@
module.exports = (obj, methodName, prefix, logger) ->
metrics = require('./metrics')
if typeof obj[methodName] != 'function'
throw new Error("[Metrics] expected object property '#{methodName}' to be a function")
key = "#{prefix}.#{methodName}"
realMethod = obj[methodName]
splitPrefix = prefix.split(".")
startPrefix = splitPrefix[0]
if splitPrefix[1]?
modifedMethodName = "#{splitPrefix[1]}_#{methodName}"
modifedMethodName = methodName
obj[methodName] = (originalArgs...) ->
[firstArgs..., callback] = originalArgs
if !callback? || typeof callback != 'function'
if logger?
logger.log "[Metrics] expected wrapped method '#{methodName}' to be invoked with a callback"
return realMethod.apply this, originalArgs
timer = new metrics.Timer(startPrefix, 1, {method: modifedMethodName}) this, firstArgs..., (callbackArgs...) ->
elapsedTime = timer.done()
possibleError = callbackArgs[0]
if possibleError? "#{startPrefix}_result", 1, {status:"failed", method: modifedMethodName}
else "#{startPrefix}_result", 1, {status:"success", method: modifedMethodName}
if logger?
loggableArgs = {}
for arg, idx in firstArgs
if arg.toString().match(/^[0-9a-f]{24}$/)
loggableArgs["#{idx}"] = arg
logger.log {key, args: loggableArgs, elapsedTime}, "[Metrics] timed async method call"
callback.apply this, callbackArgs

View file

@ -0,0 +1,66 @@
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS102: Remove unnecessary code created because of implicit returns
* DS201: Simplify complex destructure assignments
* DS207: Consider shorter variations of null checks
* Full docs:
module.exports = function(obj, methodName, prefix, logger) {
let modifedMethodName;
const metrics = require('./metrics');
if (typeof obj[methodName] !== 'function') {
throw new Error(`[Metrics] expected object property '${methodName}' to be a function`);
const key = `${prefix}.${methodName}`;
const realMethod = obj[methodName];
const splitPrefix = prefix.split(".");
const startPrefix = splitPrefix[0];
if (splitPrefix[1] != null) {
modifedMethodName = `${splitPrefix[1]}_${methodName}`;
} else {
modifedMethodName = methodName;
return obj[methodName] = function(...originalArgs) {
const adjustedLength = Math.max(originalArgs.length, 1), firstArgs = originalArgs.slice(0, adjustedLength - 1), callback = originalArgs[adjustedLength - 1];
if ((callback == null) || (typeof callback !== 'function')) {
if (logger != null) {
logger.log(`[Metrics] expected wrapped method '${methodName}' to be invoked with a callback`);
return realMethod.apply(this, originalArgs);
const timer = new metrics.Timer(startPrefix, 1, {method: modifedMethodName});
return, ...Array.from(firstArgs), function(...callbackArgs) {
const elapsedTime = timer.done();
const possibleError = callbackArgs[0];
if (possibleError != null) {`${startPrefix}_result`, 1, {status:"failed", method: modifedMethodName});
} else {`${startPrefix}_result`, 1, {status:"success", method: modifedMethodName});
if (logger != null) {
const loggableArgs = {};
try {
for (let idx = 0; idx < firstArgs.length; idx++) {
const arg = firstArgs[idx];
if (arg.toString().match(/^[0-9a-f]{24}$/)) {
loggableArgs[`${idx}`] = arg;
} catch (error) {}
logger.log({key, args: loggableArgs, elapsedTime}, "[Metrics] timed async method call");
return callback.apply(this, callbackArgs);

View file

@ -1,3 +0,0 @@
unless process.env.UV_THREADPOOL_SIZE
console.log "Set UV_THREADPOOL_SIZE=#{process.env.UV_THREADPOOL_SIZE}"

View file

@ -0,0 +1,4 @@
if (!process.env.UV_THREADPOOL_SIZE) {
console.log(`Set UV_THREADPOOL_SIZE=${process.env.UV_THREADPOOL_SIZE}`);