diff --git a/CHANGELOG.md b/CHANGELOG.md index ae08281b8e4..011cb37f9c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `go.opentelemetry.io/otel/api/global` packages global TextMapPropagator now delegates functionality to a globally set delegate for all previously returned propagators. (#1258) - Fix condition in `label.Any`. (#1299) - Fix global `TracerProvider` to pass options to its configured provider. (#1329) +- Fix missing handler for `ExactKind` aggregator in OTLP metrics transformer (#1309) ## [0.13.0] - 2020-10-08 diff --git a/exporters/otlp/internal/transform/metric.go b/exporters/otlp/internal/transform/metric.go index 20b063c389e..ad0432a6391 100644 --- a/exporters/otlp/internal/transform/metric.go +++ b/exporters/otlp/internal/transform/metric.go @@ -289,11 +289,71 @@ func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricp } return gaugePoint(r, value, time.Time{}, tm) + case aggregation.ExactKind: + e, ok := agg.(aggregation.Points) + if !ok { + return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg) + } + pts, err := e.Points() + if err != nil { + return nil, err + } + + return gaugeArray(r, pts) + default: return nil, fmt.Errorf("%w: %T", ErrUnimplementedAgg, agg) } } +func gaugeArray(record export.Record, points []number.Number) (*metricpb.Metric, error) { + desc := record.Descriptor() + m := &metricpb.Metric{ + Name: desc.Name(), + Description: desc.Description(), + Unit: string(desc.Unit()), + } + + switch n := desc.NumberKind(); n { + case number.Int64Kind: + var pts []*metricpb.IntDataPoint + for _, p := range points { + pts = append(pts, &metricpb.IntDataPoint{ + Labels: nil, + StartTimeUnixNano: toNanos(record.StartTime()), + TimeUnixNano: toNanos(record.EndTime()), + Value: p.CoerceToInt64(n), + }) + } + m.Data = &metricpb.Metric_IntGauge{ + IntGauge: &metricpb.IntGauge{ + DataPoints: pts, + }, + } + + case number.Float64Kind: + var pts []*metricpb.DoubleDataPoint + for _, p := range points { + pts = append(pts, &metricpb.DoubleDataPoint{ + Labels: nil, + StartTimeUnixNano: toNanos(record.StartTime()), + TimeUnixNano: toNanos(record.EndTime()), + Value: p.CoerceToFloat64(n), + }) + } + m.Data = &metricpb.Metric_DoubleGauge{ + DoubleGauge: &metricpb.DoubleGauge{ + DataPoints: pts, + }, + } + + default: + return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, n) + } + + return m, nil +} + func gaugePoint(record export.Record, num number.Number, start, end time.Time) (*metricpb.Metric, error) { desc := record.Descriptor() labels := record.Labels() diff --git a/exporters/otlp/internal/transform/metric_test.go b/exporters/otlp/internal/transform/metric_test.go index d732fb1e996..f6fa64e58f3 100644 --- a/exporters/otlp/internal/transform/metric_test.go +++ b/exporters/otlp/internal/transform/metric_test.go @@ -32,7 +32,7 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/metrictest" - "go.opentelemetry.io/otel/sdk/metric/aggregator/array" + arrAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" lvAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" @@ -243,6 +243,58 @@ func TestLastValueIntDataPoints(t *testing.T) { } } +func TestExactIntDataPoints(t *testing.T) { + desc := metric.NewDescriptor("", metric.ValueRecorderInstrumentKind, number.Int64Kind) + labels := label.NewSet() + e, ckpt := metrictest.Unslice2(arrAgg.New(2)) + assert.NoError(t, e.Update(context.Background(), number.Number(100), &desc)) + require.NoError(t, e.SynchronizedMove(ckpt, &desc)) + record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd) + p, ok := ckpt.(aggregation.Points) + require.True(t, ok, "ckpt is not an aggregation.Points: %T", ckpt) + pts, err := p.Points() + require.NoError(t, err) + + if m, err := gaugeArray(record, pts); assert.NoError(t, err) { + assert.Equal(t, []*metricpb.IntDataPoint{{ + Value: 100, + StartTimeUnixNano: toNanos(intervalStart), + TimeUnixNano: toNanos(intervalEnd), + }}, m.GetIntGauge().DataPoints) + assert.Nil(t, m.GetIntHistogram()) + assert.Nil(t, m.GetIntSum()) + assert.Nil(t, m.GetDoubleGauge()) + assert.Nil(t, m.GetDoubleHistogram()) + assert.Nil(t, m.GetDoubleSum()) + } +} + +func TestExactFloatDataPoints(t *testing.T) { + desc := metric.NewDescriptor("", metric.ValueRecorderInstrumentKind, number.Float64Kind) + labels := label.NewSet() + e, ckpt := metrictest.Unslice2(arrAgg.New(2)) + assert.NoError(t, e.Update(context.Background(), number.NewFloat64Number(100), &desc)) + require.NoError(t, e.SynchronizedMove(ckpt, &desc)) + record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd) + p, ok := ckpt.(aggregation.Points) + require.True(t, ok, "ckpt is not an aggregation.Points: %T", ckpt) + pts, err := p.Points() + require.NoError(t, err) + + if m, err := gaugeArray(record, pts); assert.NoError(t, err) { + assert.Equal(t, []*metricpb.DoubleDataPoint{{ + Value: 100, + StartTimeUnixNano: toNanos(intervalStart), + TimeUnixNano: toNanos(intervalEnd), + }}, m.GetDoubleGauge().DataPoints) + assert.Nil(t, m.GetIntHistogram()) + assert.Nil(t, m.GetIntSum()) + assert.Nil(t, m.GetIntGauge()) + assert.Nil(t, m.GetDoubleHistogram()) + assert.Nil(t, m.GetDoubleSum()) + } +} + func TestSumErrUnknownValueType(t *testing.T) { desc := metric.NewDescriptor("", metric.ValueRecorderInstrumentKind, number.Kind(-1)) labels := label.NewSet() @@ -357,11 +409,11 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) { require.Nil(t, mpb) require.True(t, errors.Is(err, ErrIncompatibleAgg)) - mpb, err = makeMpb(aggregation.ExactKind, &array.New(1)[0]) + mpb, err = makeMpb(aggregation.ExactKind, &lastvalue.New(1)[0]) require.Error(t, err) require.Nil(t, mpb) - require.True(t, errors.Is(err, ErrUnimplementedAgg)) + require.True(t, errors.Is(err, ErrIncompatibleAgg)) } func TestRecordAggregatorUnexpectedErrors(t *testing.T) {