Skip to content

Commit

Permalink
fix for cluster aggregation performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Saurabh Singh authored and Saurabh Singh committed May 20, 2024
1 parent 564e467 commit 473f29c
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 39 deletions.
68 changes: 50 additions & 18 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
*/

const Registry = require('./registry');
const { Grouper } = require('./util');
const { Grouper, hashObject } = require('./util');
const { aggregators } = require('./metricAggregators');
const fs = require('fs');
const path = require('path');
const os = require('os');
// We need to lazy-load the 'cluster' module as some application servers -
// namely Passenger - crash when it is imported.
let cluster = () => {
Expand Down Expand Up @@ -117,6 +120,7 @@ class AggregatorRegistry extends Registry {

// Aggregate gathered metrics.
metricsByName.forEach(metrics => {
metrics.workerSize = metricsArr.length;
const aggregatorName = metrics[0].aggregator;
const aggregatorFn = aggregators[aggregatorName];
if (typeof aggregatorFn !== 'function') {
Expand Down Expand Up @@ -175,19 +179,29 @@ function addListeners() {
request.done(new Error(message.error));
return;
}

message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}

fs.readFile(message.filename, 'utf8', (err, data) => {
if(err) {
request.done(err);
return;
} else {
const metrics = JSON.parse(data);
metrics.forEach(registry => request.responses.push(registry));
fs.unlink(message.filename, (err) => {
if(err) console.error(`Error deleting file ${message.filename}:`, err)
});
request.pending--;
if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}
}
});
}
});
}
Expand All @@ -198,10 +212,28 @@ function addListeners() {
if (message.type === GET_METRICS_REQ) {
Promise.all(registries.map(r => r.getMetricsAsJSON()))
.then(metrics => {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
metrics,
metrics.forEach((registry, i) => {
registry.forEach(value => {
const hash = hashObject(value);
const key = `${value.metricName}_${hash}`;
value["hash"] = key;
});
});
const filename = path.join(os.tmpdir(), `metrics-${process.pid}}.json`);
fs.writeFile(filename, JSON.stringify(metrics), (err) => {
if(err) {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
error: err.message,
});
} else {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
filename,
});
}
});
})
.catch(error => {
Expand Down
19 changes: 15 additions & 4 deletions lib/metricAggregators.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const { Grouper, hashObject } = require('./util');
const metricMap = new Map();

/**
* Returns a new function that applies the `aggregatorFn` to the values.
Expand All @@ -18,11 +18,22 @@ function AggregatorFactory(aggregatorFn) {
aggregator: metrics[0].aggregator,
};
// Gather metrics by metricName and labels.
const byLabels = new Grouper();
if(!metricMap.get(metrics[0].name)) {
metricMap.set(metrics[0].name, new Map());
}
let byLabels = metricMap.get(metrics[0].name);
metrics.forEach(metric => {
metric.values.forEach(value => {
const key = hashObject(value.labels);
byLabels.add(`${value.metricName}_${key}`, value);
let valuesArray = byLabels.get(value.hash);
if(!valuesArray) {
byLabels.set(value.hash, [value]);
} else {
if(valuesArray.length < metrics.workerSize) {
valuesArray.push(value);
} else {
byLabels.set(value.hash, [value]);
}
}
});
});
// Apply aggregator function to gathered metrics.
Expand Down
17 changes: 10 additions & 7 deletions test/aggregatorsTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ describe('aggregators', () => {
name: 'metric_name',
type: 'does not matter',
values: [
{ labels: [], value: 1 },
{ labels: ['label1'], value: 2 },
{ labels: [], value: 1, hash: 'h1' },
{ labels: ['label1'], value: 2, hash: 'h2'},
],
},
{
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [
{ labels: [], value: 3 },
{ labels: ['label1'], value: 4 },
{ labels: [], value: 3, hash: 'h1'},
{ labels: ['label1'], value: 4, hash: 'h2'},
],
},
];

metrics.workerSize = 2;

describe('sum', () => {
it('properly sums values', () => {
const result = aggregators.sum(metrics);
Expand Down Expand Up @@ -102,21 +104,22 @@ describe('aggregators', () => {
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [{ labels: [], value: 1, metricName: 'abc' }],
values: [{ labels: [], value: 1, metricName: 'abc', hash: 'h1' }],
},
{
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [{ labels: [], value: 3, metricName: 'abc' }],
values: [{ labels: [], value: 3, metricName: 'abc', hash: 'h1' }],
},
{
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [{ labels: [], value: 5, metricName: 'def' }],
values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2'}],
},
];
metrics2.workerSize = 2;
const result = aggregators.sum(metrics2);
expect(result.values).toEqual([
{ value: 4, labels: [], metricName: 'abc' },
Expand Down
27 changes: 17 additions & 10 deletions test/clusterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const cluster = require('cluster');
const process = require('process');
const Registry = require('../lib/cluster');
const { hash } = require('crypto');

describe.each([
['Prometheus', Registry.PROMETHEUS_CONTENT_TYPE],
Expand Down Expand Up @@ -61,11 +62,13 @@ describe.each([
labels: { le: 0.1, code: '300' },
value: 0,
metricName: 'test_histogram_bucket',
hash: 'test_histogram_bucket{le="0.1",code="300"}',
},
{
labels: { le: 10, code: '300' },
value: 1.6486727018068046,
metricName: 'test_histogram_bucket',
hash: 'test_histogram_bucket{le="10",code="300"}',
},
],
aggregator: 'sum',
Expand All @@ -75,24 +78,24 @@ describe.each([
name: 'test_gauge',
type: 'gauge',
values: [
{ value: 0.47, labels: { method: 'get', code: 200 } },
{ value: 0.64, labels: {} },
{ value: 23, labels: { method: 'post', code: '300' } },
{ value: 0.47, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' },
{ value: 0.64, labels: {}, hash: 'test_gauge{}' },
{ value: 23, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' },
],
aggregator: 'sum',
},
{
help: 'Start time of the process since unix epoch in seconds.',
name: 'process_start_time_seconds',
type: 'gauge',
values: [{ value: 1502075832, labels: {} }],
values: [{ value: 1502075832, labels: {}, hash: 'process_start_time_seconds{}' }],
aggregator: 'omit',
},
{
help: 'Lag of event loop in seconds.',
name: 'nodejs_eventloop_lag_seconds',
type: 'gauge',
values: [{ value: 0.009, labels: {} }],
values: [{ value: 0.009, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }],
aggregator: 'average',
},
{
Expand All @@ -103,6 +106,7 @@ describe.each([
{
value: 1,
labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 },
hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}',
},
],
aggregator: 'first',
Expand All @@ -118,11 +122,13 @@ describe.each([
labels: { le: 0.1, code: '300' },
value: 0.235151,
metricName: 'test_histogram_bucket',
hash: 'test_histogram_bucket{le="0.1",code="300"}',
},
{
labels: { le: 10, code: '300' },
value: 1.192591,
metricName: 'test_histogram_bucket',
hash: 'test_histogram_bucket{le="10",code="300"}',
},
],
aggregator: 'sum',
Expand All @@ -132,24 +138,24 @@ describe.each([
name: 'test_gauge',
type: 'gauge',
values: [
{ value: 0.02, labels: { method: 'get', code: 200 } },
{ value: 0.24, labels: {} },
{ value: 51, labels: { method: 'post', code: '300' } },
{ value: 0.02, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' },
{ value: 0.24, labels: {}, hash: 'test_gauge{}' },
{ value: 51, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' },
],
aggregator: 'sum',
},
{
help: 'Start time of the process since unix epoch in seconds.',
name: 'process_start_time_seconds',
type: 'gauge',
values: [{ value: 1502075849, labels: {} }],
values: [{ value: 1502075849, labels: {}, hash: 'process_start_time_seconds{}' }],
aggregator: 'omit',
},
{
help: 'Lag of event loop in seconds.',
name: 'nodejs_eventloop_lag_seconds',
type: 'gauge',
values: [{ value: 0.008, labels: {} }],
values: [{ value: 0.008, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }],
aggregator: 'average',
},
{
Expand All @@ -160,6 +166,7 @@ describe.each([
{
value: 1,
labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 },
hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}',
},
],
aggregator: 'first',
Expand Down

0 comments on commit 473f29c

Please sign in to comment.