Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Metrics SDK - aggregator, batcher, controller #738

Merged
merged 7 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 43 additions & 46 deletions packages/opentelemetry-exporter-prometheus/src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
import { ExportResult } from '@opentelemetry/base';
import { NoopLogger } from '@opentelemetry/core';
import {
LabelValue,
MetricDescriptor,
MetricDescriptorType,
MetricExporter,
ReadableMetric,
MetricRecord,
MetricDescriptor,
LastValue,
MetricKind,
Sum,
} from '@opentelemetry/metrics';
import * as types from '@opentelemetry/api';
import { createServer, IncomingMessage, Server, ServerResponse } from 'http';
import { Counter, Gauge, labelValues, Metric, Registry } from 'prom-client';
import * as url from 'url';
import { ExporterConfig } from './export/types';
import { LabelSet } from '@opentelemetry/metrics/build/src/LabelSet';
import { CounterSumAggregator } from '@opentelemetry/metrics/build/src/export/Aggregator';

export class PrometheusExporter implements MetricExporter {
static readonly DEFAULT_OPTIONS = {
Expand Down Expand Up @@ -81,13 +84,10 @@ export class PrometheusExporter implements MetricExporter {
* be a no-op and the exporter should reach into the metrics when the export endpoint is
* called. As there is currently no interface to do this, this is our only option.
*
* @param readableMetrics Metrics to be sent to the prometheus backend
* @param records Metrics to be sent to the prometheus backend
* @param cb result callback to be called on finish
*/
export(
readableMetrics: ReadableMetric[],
cb: (result: ExportResult) => void
) {
export(records: MetricRecord[], cb: (result: ExportResult) => void) {
if (!this._server) {
// It is conceivable that the _server may not be started as it is an async startup
// However unlikely, if this happens the caller may retry the export
Expand All @@ -97,8 +97,8 @@ export class PrometheusExporter implements MetricExporter {

this._logger.debug('Prometheus exporter export');

for (const readableMetric of readableMetrics) {
this._updateMetric(readableMetric);
for (const record of records) {
this._updateMetric(record);
}

cb(ExportResult.SUCCESS);
Expand All @@ -117,51 +117,53 @@ export class PrometheusExporter implements MetricExporter {
/**
* Updates the value of a single metric in the registry
*
* @param readableMetric Metric value to be saved
* @param record Metric value to be saved
*/
private _updateMetric(readableMetric: ReadableMetric) {
const metric = this._registerMetric(readableMetric);
private _updateMetric(record: MetricRecord) {
const metric = this._registerMetric(record);
if (!metric) return;

const labelKeys = readableMetric.descriptor.labelKeys;
const labelKeys = record.descriptor.labelKeys;
const value = record.aggregator.value();

if (metric instanceof Counter) {
for (const ts of readableMetric.timeseries) {
// Prometheus counter saves internal state and increments by given value.
// ReadableMetric value is the current state, not the delta to be incremented by.
// Currently, _registerMetric creates a new counter every time the value changes,
// so the increment here behaves as a set value (increment from 0)
metric.inc(
this._getLabelValues(labelKeys, ts.labelValues),
ts.points[0].value as number
);
}
// Prometheus counter saves internal state and increments by given value.
// ReadableMetric value is the current state, not the delta to be incremented by.
// Currently, _registerMetric creates a new counter every time the value changes,
// so the increment here behaves as a set value (increment from 0)
metric.inc(this._getLabelValues(labelKeys, record.labels), value as Sum);
}

if (metric instanceof Gauge) {
for (const ts of readableMetric.timeseries) {
if (record.aggregator instanceof CounterSumAggregator) {
metric.set(
this._getLabelValues(labelKeys, record.labels),
value as Sum
);
} else {
metric.set(
this._getLabelValues(labelKeys, ts.labelValues),
ts.points[0].value as number
this._getLabelValues(labelKeys, record.labels),
(value as LastValue).value
);
}
}

// TODO: only counter and gauge are implemented in metrics so far
}

private _getLabelValues(keys: string[], values: LabelValue[]) {
private _getLabelValues(keys: string[], values: LabelSet) {
const labelValues: labelValues = {};
const labels = values.labels;
for (let i = 0; i < keys.length; i++) {
if (values[i].value !== null) {
labelValues[keys[i]] = values[i].value!;
if (labels[keys[i]] !== null) {
labelValues[keys[i]] = labels[keys[i]];
}
}
return labelValues;
}

private _registerMetric(readableMetric: ReadableMetric): Metric | undefined {
const metricName = this._getPrometheusMetricName(readableMetric.descriptor);
private _registerMetric(record: MetricRecord): Metric | undefined {
const metricName = this._getPrometheusMetricName(record.descriptor);
const metric = this._registry.getSingleMetric(metricName);

/**
Expand All @@ -177,31 +179,26 @@ export class PrometheusExporter implements MetricExporter {
this._registry.removeSingleMetric(metricName);
} else if (metric) return metric;

return this._newMetric(readableMetric, metricName);
return this._newMetric(record, metricName);
}

private _newMetric(
readableMetric: ReadableMetric,
name: string
): Metric | undefined {
private _newMetric(record: MetricRecord, name: string): Metric | undefined {
const metricObject = {
name,
// prom-client throws with empty description which is our default
help: readableMetric.descriptor.description || 'description missing',
labelNames: readableMetric.descriptor.labelKeys,
help: record.descriptor.description || 'description missing',
labelNames: record.descriptor.labelKeys,
// list of registries to register the newly created metric
registers: [this._registry],
};

switch (readableMetric.descriptor.type) {
case MetricDescriptorType.COUNTER_DOUBLE:
case MetricDescriptorType.COUNTER_INT64:
switch (record.descriptor.metricKind) {
case MetricKind.COUNTER:
// there is no such thing as a non-monotonic counter in prometheus
return readableMetric.descriptor.monotonic
return record.descriptor.monotonic
? new Counter(metricObject)
: new Gauge(metricObject);
case MetricDescriptorType.GAUGE_DOUBLE:
case MetricDescriptorType.GAUGE_INT64:
case MetricKind.GAUGE:
return new Gauge(metricObject);
default:
// Other metric types are currently unimplemented
Expand Down
33 changes: 21 additions & 12 deletions packages/opentelemetry-exporter-prometheus/test/prometheus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,13 @@ describe('PrometheusExporter', () => {

const boundCounter = counter.bind(meter.labels({ key1: 'labelValue1' }));
boundCounter.add(10);
exporter.export(meter.getMetrics(), () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
// This is to test the special case where counters are destroyed
// and recreated in the exporter in order to get around prom-client's
// aggregation and use ours.
boundCounter.add(10);
exporter.export(meter.getMetrics(), () => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -227,7 +228,8 @@ describe('PrometheusExporter', () => {

const boundGauge = gauge.bind(meter.labels({ key1: 'labelValue1' }));
boundGauge.set(10);
exporter.export([gauge.get()!], () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -259,9 +261,10 @@ describe('PrometheusExporter', () => {
labelKeys: ['counterKey1'],
}) as CounterMetric;

gauge.bind(meter.labels({ key1: 'labelValue1' })).set(10);
counter.bind(meter.labels({ key1: 'labelValue1' })).add(10);
exporter.export([gauge.get()!, counter.get()!], () => {
gauge.bind(meter.labels({ gaugeKey1: 'labelValue1' })).set(10);
counter.bind(meter.labels({ counterKey1: 'labelValue1' })).add(10);
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -308,7 +311,8 @@ describe('PrometheusExporter', () => {

const boundGauge = gauge.bind(meter.labels({ key1: 'labelValue1' }));
boundGauge.set(10);
exporter.export([gauge.get()!], () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand All @@ -333,7 +337,8 @@ describe('PrometheusExporter', () => {
const gauge = meter.createGauge('gauge.bad-name') as GaugeMetric;
const boundGauge = gauge.bind(meter.labels({ key1: 'labelValue1' }));
boundGauge.set(10);
exporter.export([gauge.get()!], () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -362,7 +367,8 @@ describe('PrometheusExporter', () => {
});

counter.bind(meter.labels({ key1: 'labelValue1' })).add(20);
exporter.export(meter.getMetrics(), () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -407,7 +413,8 @@ describe('PrometheusExporter', () => {
});

exporter.startServer(() => {
exporter!.export(meter.getMetrics(), () => {
meter.collect();
exporter!.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -435,7 +442,8 @@ describe('PrometheusExporter', () => {
});

exporter.startServer(() => {
exporter!.export(meter.getMetrics(), () => {
meter.collect();
exporter!.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:8080/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -463,7 +471,8 @@ describe('PrometheusExporter', () => {
});

exporter.startServer(() => {
exporter!.export(meter.getMetrics(), () => {
meter.collect();
exporter!.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/test', res => {
res.on('data', chunk => {
Expand Down
Loading