Skip to content

Commit

Permalink
feat: processor export phases interface
Browse files Browse the repository at this point in the history
Add start/finish method to processor interface
  • Loading branch information
legendecas committed Dec 4, 2020
1 parent 781b30f commit ac70b14
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ export class ExactProcessor<T, R extends Aggregator> extends Processor {
return aggregator;
}

start() {}

process(record: MetricRecord): void {
const labels = Object.keys(record.labels)
.map(k => `${k}=${record.labels[k]}`)
.join(',');
this._batchMap.set(record.descriptor.name + labels, record);
}

finish() {}
}
15 changes: 7 additions & 8 deletions packages/opentelemetry-metrics/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { Metric } from './Metric';
import { ValueObserverMetric } from './ValueObserverMetric';
import { SumObserverMetric } from './SumObserverMetric';
import { DEFAULT_METRIC_OPTIONS, DEFAULT_CONFIG, MeterConfig } from './types';
import { UngroupedProcessor } from './export/Processor';
import { UngroupedProcessor } from './export/UngroupedProcessor';
import { PushController } from './export/Controller';
import { NoopExporter } from './export/NoopExporter';

Expand Down Expand Up @@ -318,15 +318,14 @@ export class Meter implements api.Meter {
.filter(metric => {
return metric.getKind() !== MetricKind.BATCH_OBSERVER;
})
.map(metric => {
return metric.getMetricRecord();
.map(async metric => {
const records = await metric.getMetricRecord();
for (const record of records) {
this._processor.process(record);
}
});

await Promise.all(metrics).then(records => {
records.forEach(metrics => {
metrics.forEach(metric => this._processor.process(metric));
});
});
await Promise.all(metrics);
}

getProcessor(): Processor {
Expand Down
3 changes: 3 additions & 0 deletions packages/opentelemetry-metrics/src/export/Controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ export class PushController extends Controller {
}

private async _collect(): Promise<void> {
const processor = this._meter.getProcessor();
processor.start();
await this._meter.collect();
processor.finish();
return new Promise(resolve => {
this._exporter.export(
this._meter.getProcessor().checkPointSet(),
Expand Down
51 changes: 10 additions & 41 deletions packages/opentelemetry-metrics/src/export/Processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@
* limitations under the License.
*/

import * as aggregators from './aggregators';
import {
MetricRecord,
MetricKind,
Aggregator,
MetricDescriptor,
} from './types';
import { MetricRecord, Aggregator, MetricDescriptor } from './types';

/**
* Base class for all processor types.
Expand All @@ -35,44 +29,19 @@ export abstract class Processor {
/** Returns an aggregator based off metric descriptor. */
abstract aggregatorFor(metricKind: MetricDescriptor): Aggregator;

/** Prepare for new collection. */
abstract start(): void;

/** Stores record information to be ready for exporting. */
abstract process(record: MetricRecord): void;

/**
* Indicates a collection finishes. The processor can aggregate aggregations,
* e.g. handle delta to cumulative set.
*/
abstract finish(): void;

checkPointSet(): MetricRecord[] {
return Array.from(this._batchMap.values());
}
}

/**
* Processor which retains all dimensions/labels. It accepts all records and
* passes them for exporting.
*/
export class UngroupedProcessor extends Processor {
aggregatorFor(metricDescriptor: MetricDescriptor): Aggregator {
switch (metricDescriptor.metricKind) {
case MetricKind.COUNTER:
case MetricKind.UP_DOWN_COUNTER:
return new aggregators.SumAggregator();

case MetricKind.SUM_OBSERVER:
case MetricKind.UP_DOWN_SUM_OBSERVER:
case MetricKind.VALUE_OBSERVER:
return new aggregators.LastValueAggregator();

case MetricKind.VALUE_RECORDER:
return new aggregators.HistogramAggregator(
metricDescriptor.boundaries || [Infinity]
);

default:
return new aggregators.LastValueAggregator();
}
}

process(record: MetricRecord): void {
const labels = Object.keys(record.labels)
.map(k => `${k}=${record.labels[k]}`)
.join(',');
this._batchMap.set(record.descriptor.name + labels, record);
}
}
66 changes: 66 additions & 0 deletions packages/opentelemetry-metrics/src/export/UngroupedProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Processor } from './Processor';
import * as aggregators from './aggregators';
import {
MetricRecord,
MetricKind,
Aggregator,
MetricDescriptor,
} from './types';

/**
* Processor which retains all dimensions/labels. It accepts all records and
* passes them for exporting.
*/
export class UngroupedProcessor extends Processor {
aggregatorFor(metricDescriptor: MetricDescriptor): Aggregator {
switch (metricDescriptor.metricKind) {
case MetricKind.COUNTER:
case MetricKind.UP_DOWN_COUNTER:
return new aggregators.SumAggregator();

case MetricKind.SUM_OBSERVER:
case MetricKind.UP_DOWN_SUM_OBSERVER:
case MetricKind.VALUE_OBSERVER:
return new aggregators.LastValueAggregator();

case MetricKind.VALUE_RECORDER:
return new aggregators.HistogramAggregator(
metricDescriptor.boundaries || [Infinity]
);

default:
return new aggregators.LastValueAggregator();
}
}

start() {
/** Nothing to do with UngroupedProcessor on start */
}

process(record: MetricRecord): void {
const labels = Object.keys(record.labels)
.map(k => `${k}=${record.labels[k]}`)
.join(',');
this._batchMap.set(record.descriptor.name + labels, record);
}

finish() {
/** Nothing to do with UngroupedProcessor on finish */
}
}
1 change: 1 addition & 0 deletions packages/opentelemetry-metrics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export * from './ValueObserverMetric';
export * from './export/aggregators';
export * from './export/ConsoleMetricExporter';
export * from './export/Processor';
export * from './export/UngroupedProcessor';
export * from './export/types';
export * from './UpDownCounterMetric';
export { MeterConfig } from './types';
8 changes: 8 additions & 0 deletions packages/opentelemetry-metrics/test/Meter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1372,10 +1372,18 @@ describe('Meter', () => {
});

class CustomProcessor extends Processor {
start(): void {
throw new Error('start method not implemented.');
}

process(record: MetricRecord): void {
throw new Error('process method not implemented.');
}

finish(): void {
throw new Error('finish method not implemented.');
}

aggregatorFor(metricKind: MetricDescriptor): Aggregator {
throw new Error('aggregatorFor method not implemented.');
}
Expand Down

0 comments on commit ac70b14

Please sign in to comment.