Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: processor export phases interface #1717

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,18 @@ export class ExactProcessor<T, R extends Aggregator> extends Processor {
return aggregator;
}

start() {
/** Nothing to do with ExactProcessor on start */
}

process(record: MetricRecord): void {
const labels = Object.keys(record.labels)
.map(k => `${k}=${record.labels[k]}`)
.join(',');
this._batchMap.set(record.descriptor.name + labels, record);
}

finish() {
/** Nothing to do with ExactProcessor on finish */
}
}
16 changes: 8 additions & 8 deletions packages/opentelemetry-metrics/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { Metric } from './Metric';
import { ValueObserverMetric } from './ValueObserverMetric';
import { SumObserverMetric } from './SumObserverMetric';
import { DEFAULT_METRIC_OPTIONS, DEFAULT_CONFIG, MeterConfig } from './types';
import { UngroupedProcessor } from './export/Processor';
import { UngroupedProcessor } from './export/UngroupedProcessor';
import { PushController } from './export/Controller';
import { NoopExporter } from './export/NoopExporter';

Expand Down Expand Up @@ -318,15 +318,15 @@ export class Meter implements api.Meter {
.filter(metric => {
return metric.getKind() !== MetricKind.BATCH_OBSERVER;
})
.map(metric => {
return metric.getMetricRecord();
.map(async metric => {
const records = await metric.getMetricRecord();
// process the records in place to reduce required memory footprints.
for (const record of records) {
this._processor.process(record);
}
});

await Promise.all(metrics).then(records => {
records.forEach(metrics => {
metrics.forEach(metric => this._processor.process(metric));
});
});
await Promise.all(metrics);
}

getProcessor(): Processor {
Expand Down
22 changes: 11 additions & 11 deletions packages/opentelemetry-metrics/src/export/Controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ export class PushController extends Controller {
}

private async _collect(): Promise<void> {
const processor = this._meter.getProcessor();
processor.start();
await this._meter.collect();
processor.finish();
return new Promise(resolve => {
this._exporter.export(
this._meter.getProcessor().checkPointSet(),
result => {
if (result.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(
result.error ??
new Error('PushController: export failed in _collect')
);
}
resolve();
this._exporter.export(processor.checkPointSet(), result => {
if (result.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(
result.error ??
new Error('PushController: export failed in _collect')
);
}
);
resolve();
});
});
}
}
51 changes: 10 additions & 41 deletions packages/opentelemetry-metrics/src/export/Processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@
* limitations under the License.
*/

import * as aggregators from './aggregators';
import {
MetricRecord,
MetricKind,
Aggregator,
MetricDescriptor,
} from './types';
import { MetricRecord, Aggregator, MetricDescriptor } from './types';

/**
* Base class for all processor types.
Expand All @@ -35,44 +29,19 @@ export abstract class Processor {
/** Returns an aggregator based off metric descriptor. */
abstract aggregatorFor(metricKind: MetricDescriptor): Aggregator;

/** Prepare for new collection. */
abstract start(): void;

/** Stores record information to be ready for exporting. */
abstract process(record: MetricRecord): void;

/**
* Indicates a collection finishes. The processor can aggregate aggregations,
* e.g. handle delta to cumulative set.
*/
abstract finish(): void;

checkPointSet(): MetricRecord[] {
return Array.from(this._batchMap.values());
}
}

/**
* Processor which retains all dimensions/labels. It accepts all records and
* passes them for exporting.
*/
export class UngroupedProcessor extends Processor {
aggregatorFor(metricDescriptor: MetricDescriptor): Aggregator {
switch (metricDescriptor.metricKind) {
case MetricKind.COUNTER:
case MetricKind.UP_DOWN_COUNTER:
return new aggregators.SumAggregator();

case MetricKind.SUM_OBSERVER:
case MetricKind.UP_DOWN_SUM_OBSERVER:
case MetricKind.VALUE_OBSERVER:
return new aggregators.LastValueAggregator();

case MetricKind.VALUE_RECORDER:
return new aggregators.HistogramAggregator(
metricDescriptor.boundaries || [Infinity]
);

default:
return new aggregators.LastValueAggregator();
}
}

process(record: MetricRecord): void {
const labels = Object.keys(record.labels)
.map(k => `${k}=${record.labels[k]}`)
.join(',');
this._batchMap.set(record.descriptor.name + labels, record);
}
}
66 changes: 66 additions & 0 deletions packages/opentelemetry-metrics/src/export/UngroupedProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Processor } from './Processor';
import * as aggregators from './aggregators';
import {
MetricRecord,
MetricKind,
Aggregator,
MetricDescriptor,
} from './types';

/**
* Processor which retains all dimensions/labels. It accepts all records and
* passes them for exporting.
*/
export class UngroupedProcessor extends Processor {
aggregatorFor(metricDescriptor: MetricDescriptor): Aggregator {
switch (metricDescriptor.metricKind) {
case MetricKind.COUNTER:
case MetricKind.UP_DOWN_COUNTER:
return new aggregators.SumAggregator();

case MetricKind.SUM_OBSERVER:
case MetricKind.UP_DOWN_SUM_OBSERVER:
case MetricKind.VALUE_OBSERVER:
return new aggregators.LastValueAggregator();

case MetricKind.VALUE_RECORDER:
return new aggregators.HistogramAggregator(
metricDescriptor.boundaries || [Infinity]
);

default:
return new aggregators.LastValueAggregator();
}
}

start() {
/** Nothing to do with UngroupedProcessor on start */
}

process(record: MetricRecord): void {
const labels = Object.keys(record.labels)
.map(k => `${k}=${record.labels[k]}`)
.join(',');
this._batchMap.set(record.descriptor.name + labels, record);
}

finish() {
/** Nothing to do with UngroupedProcessor on finish */
}
}
1 change: 1 addition & 0 deletions packages/opentelemetry-metrics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export * from './ValueObserverMetric';
export * from './export/aggregators';
export * from './export/ConsoleMetricExporter';
export * from './export/Processor';
export * from './export/UngroupedProcessor';
export * from './export/types';
export * from './UpDownCounterMetric';
export { MeterConfig } from './types';
8 changes: 8 additions & 0 deletions packages/opentelemetry-metrics/test/Meter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1372,10 +1372,18 @@ describe('Meter', () => {
});

class CustomProcessor extends Processor {
start(): void {
throw new Error('start method not implemented.');
}

process(record: MetricRecord): void {
throw new Error('process method not implemented.');
}

finish(): void {
throw new Error('finish method not implemented.');
}

aggregatorFor(metricKind: MetricDescriptor): Aggregator {
throw new Error('aggregatorFor method not implemented.');
}
Expand Down
35 changes: 34 additions & 1 deletion packages/opentelemetry-metrics/test/export/Controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,43 @@

import * as assert from 'assert';
import * as sinon from 'sinon';
import { MeterProvider, MetricExporter, MetricRecord } from '../../src';
import {
MeterProvider,
MetricExporter,
MetricRecord,
UngroupedProcessor,
} from '../../src';
import {
ExportResult,
ExportResultCode,
setGlobalErrorHandler,
} from '@opentelemetry/core';

class MockProcessor extends UngroupedProcessor {
private _started = false;
private _finished = false;

start() {
assert(!this._started);
this._started = true;
}

process(record: MetricRecord) {
assert(this._started);
super.process(record);
}

finish() {
assert(this._started);
this._started = false;
this._finished = true;
}

get finished() {
return this._finished;
}
}

class MockExporter implements MetricExporter {
constructor(private _result: ExportResult) {}

Expand All @@ -44,7 +74,9 @@ describe('Controller', () => {
const errorHandlerSpy = sinon.spy();
setGlobalErrorHandler(errorHandlerSpy);
const expectedError = new Error('Failed to export');
const processor = new MockProcessor();
const meter = new MeterProvider({
processor,
exporter: new MockExporter({
code: ExportResultCode.FAILED,
error: expectedError,
Expand All @@ -65,6 +97,7 @@ describe('Controller', () => {
errorHandlerSpy.args[0][0].message,
expectedError.message
);
assert(processor.finished);
setGlobalErrorHandler(() => {});
return done();
}, 0);
Expand Down