From 2bc33522a2a9bb2d2e52bacd2f1d3561af8d2250 Mon Sep 17 00:00:00 2001 From: legendecas Date: Fri, 4 Dec 2020 12:35:56 +0800 Subject: [PATCH] feat: processor export phases interface Add start/finish method to processor interface --- packages/opentelemetry-metrics/src/Meter.ts | 15 ++--- .../src/export/Controller.ts | 3 + .../src/export/Processor.ts | 51 +++----------- .../src/export/UngroupedProcessor.ts | 66 +++++++++++++++++++ .../opentelemetry-metrics/test/Meter.test.ts | 8 +++ 5 files changed, 94 insertions(+), 49 deletions(-) create mode 100644 packages/opentelemetry-metrics/src/export/UngroupedProcessor.ts diff --git a/packages/opentelemetry-metrics/src/Meter.ts b/packages/opentelemetry-metrics/src/Meter.ts index 7d5da2434f1..307f6136f08 100644 --- a/packages/opentelemetry-metrics/src/Meter.ts +++ b/packages/opentelemetry-metrics/src/Meter.ts @@ -29,7 +29,7 @@ import { Metric } from './Metric'; import { ValueObserverMetric } from './ValueObserverMetric'; import { SumObserverMetric } from './SumObserverMetric'; import { DEFAULT_METRIC_OPTIONS, DEFAULT_CONFIG, MeterConfig } from './types'; -import { UngroupedProcessor } from './export/Processor'; +import { UngroupedProcessor } from './export/UngroupedProcessor'; import { PushController } from './export/Controller'; import { NoopExporter } from './export/NoopExporter'; @@ -318,15 +318,14 @@ export class Meter implements api.Meter { .filter(metric => { return metric.getKind() !== MetricKind.BATCH_OBSERVER; }) - .map(metric => { - return metric.getMetricRecord(); + .map(async metric => { + const records = await metric.getMetricRecord(); + for (const record of records) { + this._processor.process(record); + } }); - await Promise.all(metrics).then(records => { - records.forEach(metrics => { - metrics.forEach(metric => this._processor.process(metric)); - }); - }); + await Promise.all(metrics); } getProcessor(): Processor { diff --git a/packages/opentelemetry-metrics/src/export/Controller.ts b/packages/opentelemetry-metrics/src/export/Controller.ts index 239080a4e24..f0c3e9bf03a 100644 --- a/packages/opentelemetry-metrics/src/export/Controller.ts +++ b/packages/opentelemetry-metrics/src/export/Controller.ts @@ -48,7 +48,10 @@ export class PushController extends Controller { } private async _collect(): Promise { + const processor = this._meter.getProcessor(); + processor.start(); await this._meter.collect(); + processor.finish(); return new Promise(resolve => { this._exporter.export( this._meter.getProcessor().checkPointSet(), diff --git a/packages/opentelemetry-metrics/src/export/Processor.ts b/packages/opentelemetry-metrics/src/export/Processor.ts index 03d2cbcf52d..ca2be83611e 100644 --- a/packages/opentelemetry-metrics/src/export/Processor.ts +++ b/packages/opentelemetry-metrics/src/export/Processor.ts @@ -14,13 +14,7 @@ * limitations under the License. */ -import * as aggregators from './aggregators'; -import { - MetricRecord, - MetricKind, - Aggregator, - MetricDescriptor, -} from './types'; +import { MetricRecord, Aggregator, MetricDescriptor } from './types'; /** * Base class for all processor types. @@ -35,44 +29,19 @@ export abstract class Processor { /** Returns an aggregator based off metric descriptor. */ abstract aggregatorFor(metricKind: MetricDescriptor): Aggregator; + /** Prepare for new collection. */ + abstract start(): void; + /** Stores record information to be ready for exporting. */ abstract process(record: MetricRecord): void; + /** + * Indicates a collection finishes. The processor can aggregate aggregations, + * e.g. handle delta to cumulative set. + */ + abstract finish(): void; + checkPointSet(): MetricRecord[] { return Array.from(this._batchMap.values()); } } - -/** - * Processor which retains all dimensions/labels. It accepts all records and - * passes them for exporting. - */ -export class UngroupedProcessor extends Processor { - aggregatorFor(metricDescriptor: MetricDescriptor): Aggregator { - switch (metricDescriptor.metricKind) { - case MetricKind.COUNTER: - case MetricKind.UP_DOWN_COUNTER: - return new aggregators.SumAggregator(); - - case MetricKind.SUM_OBSERVER: - case MetricKind.UP_DOWN_SUM_OBSERVER: - case MetricKind.VALUE_OBSERVER: - return new aggregators.LastValueAggregator(); - - case MetricKind.VALUE_RECORDER: - return new aggregators.HistogramAggregator( - metricDescriptor.boundaries || [Infinity] - ); - - default: - return new aggregators.LastValueAggregator(); - } - } - - process(record: MetricRecord): void { - const labels = Object.keys(record.labels) - .map(k => `${k}=${record.labels[k]}`) - .join(','); - this._batchMap.set(record.descriptor.name + labels, record); - } -} diff --git a/packages/opentelemetry-metrics/src/export/UngroupedProcessor.ts b/packages/opentelemetry-metrics/src/export/UngroupedProcessor.ts new file mode 100644 index 00000000000..6f62013c221 --- /dev/null +++ b/packages/opentelemetry-metrics/src/export/UngroupedProcessor.ts @@ -0,0 +1,66 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Processor } from './Processor'; +import * as aggregators from './aggregators'; +import { + MetricRecord, + MetricKind, + Aggregator, + MetricDescriptor, +} from './types'; + +/** + * Processor which retains all dimensions/labels. It accepts all records and + * passes them for exporting. + */ +export class UngroupedProcessor extends Processor { + aggregatorFor(metricDescriptor: MetricDescriptor): Aggregator { + switch (metricDescriptor.metricKind) { + case MetricKind.COUNTER: + case MetricKind.UP_DOWN_COUNTER: + return new aggregators.SumAggregator(); + + case MetricKind.SUM_OBSERVER: + case MetricKind.UP_DOWN_SUM_OBSERVER: + case MetricKind.VALUE_OBSERVER: + return new aggregators.LastValueAggregator(); + + case MetricKind.VALUE_RECORDER: + return new aggregators.HistogramAggregator( + metricDescriptor.boundaries || [Infinity] + ); + + default: + return new aggregators.LastValueAggregator(); + } + } + + start() { + /** Nothing to do with UngroupedProcessor on start */ + } + + process(record: MetricRecord): void { + const labels = Object.keys(record.labels) + .map(k => `${k}=${record.labels[k]}`) + .join(','); + this._batchMap.set(record.descriptor.name + labels, record); + } + + finish() { + /** Nothing to do with UngroupedProcessor on finish */ + } +} diff --git a/packages/opentelemetry-metrics/test/Meter.test.ts b/packages/opentelemetry-metrics/test/Meter.test.ts index 957d21a497c..ceaebcf451e 100644 --- a/packages/opentelemetry-metrics/test/Meter.test.ts +++ b/packages/opentelemetry-metrics/test/Meter.test.ts @@ -1372,10 +1372,18 @@ describe('Meter', () => { }); class CustomProcessor extends Processor { + start(): void { + throw new Error('start method not implemented.'); + } + process(record: MetricRecord): void { throw new Error('process method not implemented.'); } + finish(): void { + throw new Error('finish method not implemented.'); + } + aggregatorFor(metricKind: MetricDescriptor): Aggregator { throw new Error('aggregatorFor method not implemented.'); }