Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Add option to provide resource based on metric descriptor. #231

Merged
merged 4 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/resource"
)
Expand Down Expand Up @@ -154,12 +155,26 @@ func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.M
// TODO: (@rghetia) perhaps log this error from labels extraction, if non-nil.
continue
}

var rsc *monitoredrespb.MonitoredResource
var mr monitoredresource.Interface
if se.o.ResourceByDescriptor != nil {
labels, mr = se.o.ResourceByDescriptor(&metric.Descriptor, labels)
// TODO(rghetia): optimize this. It is inefficient to convert this for all metrics.
rsc = convertMonitoredResourceToPB(mr)
if rsc.Type == "" {
rsc.Type = "global"
rsc.Labels = nil
}
} else {
rsc = resource
}
timeSeries = append(timeSeries, &monitoringpb.TimeSeries{
Metric: &googlemetricpb.Metric{
Type: metricType,
Labels: labels,
},
Resource: resource,
Resource: rsc,
Points: sdPoints,
})
}
Expand Down
311 changes: 311 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/resource"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -559,3 +560,313 @@ func TestMetricsToMonitoringMetrics_fromProtoPoint(t *testing.T) {
}
}
}

func TestResourceByDescriptor(t *testing.T) {
startTimestamp := &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000090,
}
startTime, _ := ptypes.Timestamp(startTimestamp)
endTimestamp := &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000997,
}
endTime, _ := ptypes.Timestamp(endTimestamp)

tests := []struct {
in *metricdata.Metric
want []*monitoringpb.CreateTimeSeriesRequest
wantErr string
}{
{
in: &metricdata.Metric{
Descriptor: metricdata.Descriptor{
Name: "custom_resource_one",
Description: "This is a test",
Unit: metricdata.UnitBytes,
Type: metricdata.TypeCumulativeInt64,
LabelKeys: []metricdata.LabelKey{
{
Key: "k11",
},
{
Key: "k12",
},
},
},
Resource: nil,
TimeSeries: []*metricdata.TimeSeries{
{
StartTime: startTime,
Points: []metricdata.Point{
{
Time: endTime,
Value: int64(5),
},
},
LabelValues: []metricdata.LabelValue{
{
Value: "v11",
},
{
Value: "v12",
},
},
},
},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/custom_resource_one",
Labels: map[string]string{
"k12": "v12",
},
},
Resource: &monitoredrespb.MonitoredResource{
Type: "one",
Labels: map[string]string{
"k11": "v11",
},
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: 5,
},
},
},
},
},
},
},
},
},
{
in: &metricdata.Metric{
Descriptor: metricdata.Descriptor{
Name: "custom_resource_two",
Description: "This is a test",
Unit: metricdata.UnitBytes,
Type: metricdata.TypeCumulativeInt64,
LabelKeys: []metricdata.LabelKey{
{
Key: "k21",
},
{
Key: "k22",
},
},
},
Resource: nil,
TimeSeries: []*metricdata.TimeSeries{
{
StartTime: startTime,
Points: []metricdata.Point{
{
Time: endTime,
Value: int64(5),
},
},
LabelValues: []metricdata.LabelValue{
{
Value: "v21",
},
{
Value: "v22",
},
},
},
},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/custom_resource_two",
Labels: map[string]string{
"k21": "v21",
},
},
Resource: &monitoredrespb.MonitoredResource{
Type: "two",
Labels: map[string]string{
"k22": "v22",
},
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: 5,
},
},
},
},
},
},
},
},
},
{
in: &metricdata.Metric{
Descriptor: metricdata.Descriptor{
Name: "custom_resource_other",
Description: "This is a test",
Unit: metricdata.UnitBytes,
Type: metricdata.TypeCumulativeInt64,
LabelKeys: []metricdata.LabelKey{
{
Key: "k31",
},
{
Key: "k32",
},
},
},
Resource: nil,
TimeSeries: []*metricdata.TimeSeries{
{
StartTime: startTime,
Points: []metricdata.Point{
{
Time: endTime,
Value: int64(5),
},
},
LabelValues: []metricdata.LabelValue{
{
Value: "v31",
},
{
Value: "v32",
},
},
},
},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/custom_resource_other",
Labels: map[string]string{
"k31": "v31",
"k32": "v32",
},
},
Resource: &monitoredrespb.MonitoredResource{
Type: "global",
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: 5,
},
},
},
},
},
},
},
},
},
}

var se = &statsExporter{
o: Options{
ProjectID: "foo",
ResourceByDescriptor: getResourceByDescriptor,
},
}

for i, tt := range tests {
tsl, err := se.metricToMpbTs(context.Background(), tt.in)
if tt.wantErr != "" {
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr)
}
continue
}
if err != nil {
t.Errorf("#%d: unexpected error: %v", i, err)
continue
}

got := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
// Our saving grace is serialization equality since some
// unexported fields could be present in the various values.
if diff := cmpTSReqs(got, tt.want); diff != "" {
t.Fatalf("Test %d failed. Unexpected CreateTimeSeriesRequests -got +want: %s", i, diff)
}
}
}

type customResource struct {
rt string
rm map[string]string
}

var _ monitoredresource.Interface = (*customResource)(nil)

func (cr *customResource) MonitoredResource() (resType string, labels map[string]string) {
return cr.rt, cr.rm
}

var crEmpty = &customResource{rt: ""}

func getResourceByDescriptor(md *metricdata.Descriptor, labels map[string]string) (map[string]string, monitoredresource.Interface) {
switch md.Name {
case "custom_resource_one":
cr := &customResource{
rt: "one",
rm: map[string]string{
"k11": labels["k11"],
},
}
newLabels := removeLabel(labels, cr.rm)
return newLabels, cr
case "custom_resource_two":
cr := &customResource{
rt: "two",
rm: map[string]string{
"k22": labels["k22"],
},
}
newLabels := removeLabel(labels, cr.rm)
return newLabels, cr
default:
return labels, crEmpty
}
}

func removeLabel(m map[string]string, remove map[string]string) map[string]string {
newM := make(map[string]string)
for k, v := range m {
if _, ok := remove[k]; !ok {
newM[k] = v
}
}
return newM
}
18 changes: 18 additions & 0 deletions stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,24 @@ type Options struct {
// to Stackdriver Monitoring. This is only used for Proto metrics export
// for now. The minimum number of workers is 1.
NumberOfWorkers int

// ResourceByDescriptor may be provided to supply monitored resource dynamically
// based on the metric Descriptor. Most users will not need to set this,
// but should instead set ResourceDetector.
//
// The MonitoredResource and ResourceDetector fields are ignored if this
// field is set to a non-nil value.
//
// The ResourceByDescriptor is called to derive monitored resources from
// metric.Descriptor and the label map associated with the time-series.
// If any label is used for the derived resource then it will be removed
// from the label map. The remaining labels in the map are returned to
// be used with the time-series.
//
// If the func set to this field does not return valid resource even for one
// time-series then it will result into an error for the entire CreateTimeSeries request
// which may contain more than one time-series.
ResourceByDescriptor func(*metricdata.Descriptor, map[string]string) (map[string]string, monitoredresource.Interface)
}

const defaultTimeout = 5 * time.Second
Expand Down