Skip to content

Commit

Permalink
Initial structure for new pdata metrics exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass committed Dec 9, 2021
1 parent 3a8287f commit cf630b7
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 142 deletions.
4 changes: 2 additions & 2 deletions exporter/collector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func createTracesExporter(

// createMetricsExporter creates a metrics exporter based on this config.
func createMetricsExporter(
_ context.Context,
ctx context.Context,
params component.ExporterCreateSettings,
cfg config.Exporter) (component.MetricsExporter, error) {
eCfg := cfg.(*Config)
return newGoogleCloudMetricsExporter(eCfg, params)
return newGoogleCloudMetricsExporter(ctx, eCfg, params)
}
1 change: 1 addition & 0 deletions exporter/collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
)

require (
cloud.google.com/go/monitoring v1.1.0
github.com/aws/aws-sdk-go v1.42.14 // indirect
github.com/google/go-cmp v0.5.6
)
Expand Down
139 changes: 0 additions & 139 deletions exporter/collector/googlecloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,25 @@ import (
"fmt"
"strings"

"contrib.go.opencensus.io/exporter/stackdriver"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/model/pdata"
conventions "go.opentelemetry.io/collector/model/semconv/v1.5.0"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/api/option"
"google.golang.org/grpc"

cloudtrace "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"

internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus"
)

// traceExporter is a wrapper struct of OT cloud trace exporter
type traceExporter struct {
texporter *cloudtrace.Exporter
}

// metricsExporter is a wrapper struct of OC stackdriver exporter
type metricsExporter struct {
mexporter *stackdriver.Exporter
}

func (te *traceExporter) Shutdown(ctx context.Context) error {
return te.texporter.Shutdown(ctx)
}

func (me *metricsExporter) Shutdown(context.Context) error {
me.mexporter.Flush()
me.mexporter.StopMetricsExporter()
return me.mexporter.Close()
}

func setVersionInUserAgent(cfg *Config, version string) {
cfg.UserAgent = strings.ReplaceAll(cfg.UserAgent, "{{version}}", version)
}
Expand Down Expand Up @@ -123,117 +106,6 @@ func newGoogleCloudTracesExporter(cfg *Config, set component.ExporterCreateSetti
exporterhelper.WithRetry(cfg.RetrySettings))
}

func newGoogleCloudMetricsExporter(cfg *Config, set component.ExporterCreateSettings) (component.MetricsExporter, error) {
setVersionInUserAgent(cfg, set.BuildInfo.Version)

// TODO: For each ProjectID, create a different exporter
// or at least a unique Google Cloud client per ProjectID.
options := stackdriver.Options{
// If the project ID is an empty string, it will be set by default based on
// the project this is running on in GCP.
ProjectID: cfg.ProjectID,

MetricPrefix: cfg.MetricConfig.Prefix,

// Set DefaultMonitoringLabels to an empty map to avoid getting the "opencensus_task" label
DefaultMonitoringLabels: &stackdriver.Labels{},

Timeout: cfg.Timeout,
}

// note options.UserAgent overrides the option.WithUserAgent client option in the Metric exporter
if cfg.UserAgent != "" {
options.UserAgent = cfg.UserAgent
}

copts, err := generateClientOptions(cfg)
if err != nil {
return nil, err
}
options.TraceClientOptions = copts
options.MonitoringClientOptions = copts

if cfg.MetricConfig.SkipCreateMetricDescriptor {
options.SkipCMD = true
}
if len(cfg.ResourceMappings) > 0 {
rm := resourceMapper{
mappings: cfg.ResourceMappings,
}
options.MapResource = rm.mapResource
}

sde, serr := stackdriver.NewExporter(options)
if serr != nil {
return nil, fmt.Errorf("cannot configure Google Cloud metric exporter: %w", serr)
}
mExp := &metricsExporter{mexporter: sde}

return exporterhelper.NewMetricsExporter(
cfg,
set,
mExp.pushMetrics,
exporterhelper.WithShutdown(mExp.Shutdown),
// Disable exporterhelper Timeout, since we are using a custom mechanism
// within exporter itself
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithRetry(cfg.RetrySettings))
}

// pushMetrics calls StackdriverExporter.PushMetricsProto on each element of the given metrics
func (me *metricsExporter) pushMetrics(ctx context.Context, m pdata.Metrics) error {
rms := m.ResourceMetrics()
mds := make([]*agentmetricspb.ExportMetricsServiceRequest, 0, rms.Len())
for i := 0; i < rms.Len(); i++ {
emsr := &agentmetricspb.ExportMetricsServiceRequest{}
emsr.Node, emsr.Resource, emsr.Metrics = internaldata.ResourceMetricsToOC(rms.At(i))
mds = append(mds, emsr)
}
// PushMetricsProto doesn't bundle subsequent calls, so we need to
// combine the data here to avoid generating too many RPC calls.
mds = exportAdditionalLabels(mds)

count := 0
for _, md := range mds {
count += len(md.Metrics)
}
metrics := make([]*metricspb.Metric, 0, count)
for _, md := range mds {
if md.Resource == nil {
metrics = append(metrics, md.Metrics...)
continue
}
for _, metric := range md.Metrics {
if metric.Resource == nil {
metric.Resource = md.Resource
}
metrics = append(metrics, metric)
}
}
points := numPoints(metrics)
// The two nil args here are: node (which is ignored) and resource
// (which we just moved to individual metrics).
dropped, err := me.mexporter.PushMetricsProto(ctx, nil, nil, metrics)
recordPointCount(ctx, points-dropped, dropped, err)
return err
}

func exportAdditionalLabels(mds []*agentmetricspb.ExportMetricsServiceRequest) []*agentmetricspb.ExportMetricsServiceRequest {
for _, md := range mds {
if md.Resource == nil ||
md.Resource.Labels == nil ||
md.Node == nil ||
md.Node.Identifier == nil ||
len(md.Node.Identifier.HostName) == 0 {
continue
}
// MetricsToOC removes `host.name` label and writes it to node indentifier, here we reintroduce it.
md.Resource.Labels[conventions.AttributeHostName] = md.Node.Identifier.HostName
}
return mds
}

// pushTraces calls texporter.ExportSpan for each span in the given traces
func (te *traceExporter) pushTraces(ctx context.Context, td pdata.Traces) error {
resourceSpans := td.ResourceSpans()
Expand All @@ -245,14 +117,3 @@ func (te *traceExporter) pushTraces(ctx context.Context, td pdata.Traces) error

return te.texporter.ExportSpans(ctx, spans)
}

func numPoints(metrics []*metricspb.Metric) int {
numPoints := 0
for _, metric := range metrics {
tss := metric.GetTimeseries()
for _, ts := range tss {
numPoints += len(ts.GetPoints())
}
}
return numPoints
}
4 changes: 3 additions & 1 deletion exporter/collector/googlecloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ func (ms *mockMetricServer) CreateTimeSeries(ctx context.Context, req *cloudmoni
}

func TestGoogleCloudMetricExport(t *testing.T) {
// TODO
t.Skip("Skipping until metrics rewrite is finished")
srv := grpc.NewServer()

descriptorReqCh := make(chan *requestWithMetadata)
Expand All @@ -190,7 +192,7 @@ func TestGoogleCloudMetricExport(t *testing.T) {
Version: "v0.0.1",
}

sde, err := newGoogleCloudMetricsExporter(&Config{
sde, err := newGoogleCloudMetricsExporter(context.Background(), &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
ProjectID: "idk",
Endpoint: "127.0.0.1:8080",
Expand Down
80 changes: 80 additions & 0 deletions exporter/collector/metricsexporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This file contains the rewritten googlecloud metrics exporter which no longer takes
// dependency on the OpenCensus stackdriver exporter.

package collector

import (
"context"

monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/model/pdata"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)

// metricsExporter is the GCM exporter that uses pdata directly
type metricsExporter struct {
cfg *Config
client *monitoring.MetricClient
}

func (me *metricsExporter) Shutdown(context.Context) error {
return me.client.Close()
}

func newGoogleCloudMetricsExporter(
ctx context.Context,
cfg *Config,
set component.ExporterCreateSettings,
) (component.MetricsExporter, error) {
setVersionInUserAgent(cfg, set.BuildInfo.Version)

client, err := monitoring.NewMetricClient(ctx)
if err != nil {
return nil, err
}

mExp := &metricsExporter{
cfg: cfg,
client: client,
}

return exporterhelper.NewMetricsExporter(
cfg,
set,
mExp.pushMetrics,
exporterhelper.WithShutdown(mExp.Shutdown),
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: defaultTimeout}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithRetry(cfg.RetrySettings))
}

// pushMetrics calls pushes pdata metrics to GCM, creating metric descriptors if necessary
func (me *metricsExporter) pushMetrics(ctx context.Context, m pdata.Metrics) error {
// TODO: self observability
points := numPoints(nil)
dropped := 0
var err error = nil
recordPointCount(ctx, points-dropped, dropped, err)
return nil
}

func numPoints(req []*monitoringpb.CreateTimeSeriesRequest) int {
// TODO
return 0
}

0 comments on commit cf630b7

Please sign in to comment.