Skip to content

Commit

Permalink
[k8sclusterreceiver] switch k8s.hpa metrics to use pdata (#23408)
Browse files Browse the repository at this point in the history
Rebase of #18250 with latest main.

---------

Signed-off-by: Bogdan Drutu <[email protected]>
Co-authored-by: Bogdan Drutu <[email protected]>
  • Loading branch information
atoulme and bogdandrutu authored Jun 15, 2023
1 parent e6e1e76 commit fd526ee
Show file tree
Hide file tree
Showing 19 changed files with 956 additions and 158 deletions.
11 changes: 11 additions & 0 deletions .chloggen/switchk8shpa.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sclusterreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Switch k8s.hpa metrics to use pdata.

# One or more tracking issues related to the change
issues: [18250]
2 changes: 1 addition & 1 deletion receiver/k8sclusterreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/census-instrumentation/opencensus-proto v0.4.1
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/iancoleman/strcase v0.2.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.79.0
Expand Down Expand Up @@ -57,7 +58,6 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
Expand Down
21 changes: 11 additions & 10 deletions receiver/k8sclusterreceiver/internal/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
quotav1 "github.com/openshift/api/quota/v1"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
Expand Down Expand Up @@ -48,17 +49,17 @@ import (
// an interface to interact with refactored code from SignalFx Agent which is
// confined to the collection package.
type DataCollector struct {
logger *zap.Logger
settings receiver.CreateSettings
metricsStore *metricsStore
metadataStore *metadata.Store
nodeConditionsToReport []string
allocatableTypesToReport []string
}

// NewDataCollector returns a DataCollector.
func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector {
func NewDataCollector(set receiver.CreateSettings, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector {
return &DataCollector{
logger: logger,
settings: set,
metricsStore: &metricsStore{
metricsCache: make(map[types.UID]pmetric.Metrics),
},
Expand All @@ -75,7 +76,7 @@ func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store c

func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) {
if err := dc.metricsStore.remove(obj.(runtime.Object)); err != nil {
dc.logger.Error(
dc.settings.TelemetrySettings.Logger.Error(
"failed to remove from metric cache",
zap.String("obj", reflect.TypeOf(obj).String()),
zap.Error(err),
Expand All @@ -85,7 +86,7 @@ func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) {

func (dc *DataCollector) UpdateMetricsStore(obj interface{}, md pmetric.Metrics) {
if err := dc.metricsStore.update(obj.(runtime.Object), md); err != nil {
dc.logger.Error(
dc.settings.TelemetrySettings.Logger.Error(
"failed to update metric cache",
zap.String("obj", reflect.TypeOf(obj).String()),
zap.Error(err),
Expand All @@ -103,9 +104,9 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) {

switch o := obj.(type) {
case *corev1.Pod:
md = ocsToMetrics(pod.GetMetrics(o, dc.logger))
md = ocsToMetrics(pod.GetMetrics(o, dc.settings.TelemetrySettings.Logger))
case *corev1.Node:
md = ocsToMetrics(node.GetMetrics(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.logger))
md = ocsToMetrics(node.GetMetrics(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.settings.TelemetrySettings.Logger))
case *corev1.Namespace:
md = ocsToMetrics(namespace.GetMetrics(o))
case *corev1.ReplicationController:
Expand All @@ -127,9 +128,9 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) {
case *batchv1beta1.CronJob:
md = ocsToMetrics(cronjob.GetMetricsBeta(o))
case *autoscalingv2.HorizontalPodAutoscaler:
md = ocsToMetrics(hpa.GetMetrics(o))
md = hpa.GetMetrics(dc.settings, o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
md = ocsToMetrics(hpa.GetMetricsBeta(o))
md = hpa.GetMetricsBeta(dc.settings, o)
case *quotav1.ClusterResourceQuota:
md = ocsToMetrics(clusterresourcequota.GetMetrics(o))
default:
Expand All @@ -148,7 +149,7 @@ func (dc *DataCollector) SyncMetadata(obj interface{}) map[experimentalmetricmet
km := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}
switch o := obj.(type) {
case *corev1.Pod:
km = pod.GetMetadata(o, dc.metadataStore, dc.logger)
km = pod.GetMetadata(o, dc.metadataStore, dc.settings.TelemetrySettings.Logger)
case *corev1.Node:
km = node.GetMetadata(o)
case *corev1.ReplicationController:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -263,10 +264,11 @@ func TestDataCollectorSyncMetadata(t *testing.T) {

for _, tt := range tests {
observedLogger, _ := observer.New(zapcore.WarnLevel)
logger := zap.New(observedLogger)
set := receivertest.NewNopCreateSettings()
set.TelemetrySettings.Logger = zap.New(observedLogger)
t.Run(tt.name, func(t *testing.T) {
dc := &DataCollector{
logger: logger,
settings: set,
metadataStore: tt.metadataStore,
nodeConditionsToReport: []string{},
}
Expand Down
2 changes: 0 additions & 2 deletions receiver/k8sclusterreceiver/internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ const (
// Resource labels keys for UID.
K8sKeyNamespaceUID = "k8s.namespace.uid"
K8sKeyReplicationControllerUID = "k8s.replicationcontroller.uid"
K8sKeyHPAUID = "k8s.hpa.uid"
K8sKeyResourceQuotaUID = "k8s.resourcequota.uid"
K8sKeyClusterResourceQuotaUID = "openshift.clusterquota.uid"

// Resource labels keys for Name.
K8sKeyReplicationControllerName = "k8s.replicationcontroller.name"
K8sKeyHPAName = "k8s.hpa.name"
K8sKeyResourceQuotaName = "k8s.resourcequota.name"
K8sKeyClusterResourceQuotaName = "openshift.clusterquota.name"

Expand Down
9 changes: 9 additions & 0 deletions receiver/k8sclusterreceiver/internal/hpa/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

//go:generate mdatagen metadata.yaml

package hpa // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa"
53 changes: 53 additions & 0 deletions receiver/k8sclusterreceiver/internal/hpa/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)

# k8s/hpa

## Default Metrics

The following metrics are emitted by default. Each of them can be disabled by applying the following configuration:

```yaml
metrics:
<metric_name>:
enabled: false
```
### k8s.hpa.current_replicas
Current number of pod replicas managed by this autoscaler.
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |
### k8s.hpa.desired_replicas
Desired number of pod replicas managed by this autoscaler.
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |
### k8s.hpa.max_replicas
Maximum number of replicas to which the autoscaler can scale up.
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |
### k8s.hpa.min_replicas
Minimum number of replicas to which the autoscaler can scale up.
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |
## Resource Attributes
| Name | Description | Values | Enabled |
| ---- | ----------- | ------ | ------- |
| k8s.hpa.name | The k8s hpa name. | Any Str | true |
| k8s.hpa.uid | The k8s hpa uid. | Any Str | true |
| k8s.namespace.name | The name of the namespace that the pod is running in. | Any Str | true |
140 changes: 24 additions & 116 deletions receiver/k8sclusterreceiver/internal/hpa/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,129 +4,37 @@
package hpa // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa"

import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
autoscalingv2 "k8s.io/api/autoscaling/v2"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils"
)

var hpaMaxReplicasMetric = &metricspb.MetricDescriptor{
Name: "k8s.hpa.max_replicas",
Description: "Maximum number of replicas to which the autoscaler can scale up",
Unit: "1",
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

var hpaMinReplicasMetric = &metricspb.MetricDescriptor{
Name: "k8s.hpa.min_replicas",
Description: "Minimum number of replicas to which the autoscaler can scale down",
Unit: "1",
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

var hpaCurrentReplicasMetric = &metricspb.MetricDescriptor{
Name: "k8s.hpa.current_replicas",
Description: "Current number of pod replicas managed by this autoscaler",
Unit: "1",
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

var hpaDesiredReplicasMetric = &metricspb.MetricDescriptor{
Name: "k8s.hpa.desired_replicas",
Description: "Desired number of pod replicas managed by this autoscaler",
Unit: "1",
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

func GetMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := []*metricspb.Metric{
{
MetricDescriptor: hpaMaxReplicasMetric,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(hpa.Spec.MaxReplicas)),
},
},
{
MetricDescriptor: hpaMinReplicasMetric,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(*hpa.Spec.MinReplicas)),
},
},
{
MetricDescriptor: hpaCurrentReplicasMetric,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(hpa.Status.CurrentReplicas)),
},
},
{
MetricDescriptor: hpaDesiredReplicasMetric,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(hpa.Status.DesiredReplicas)),
},
},
}

return []*agentmetricspb.ExportMetricsServiceRequest{
{
Resource: getResourceForHPA(&hpa.ObjectMeta),
Metrics: metrics,
},
}
}

func GetMetricsBeta(hpa *autoscalingv2beta2.HorizontalPodAutoscaler) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := []*metricspb.Metric{
{
MetricDescriptor: hpaMaxReplicasMetric,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(hpa.Spec.MaxReplicas)),
},
},
{
MetricDescriptor: hpaMinReplicasMetric,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(*hpa.Spec.MinReplicas)),
},
},
{
MetricDescriptor: hpaCurrentReplicasMetric,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(hpa.Status.CurrentReplicas)),
},
},
{
MetricDescriptor: hpaDesiredReplicasMetric,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(hpa.Status.DesiredReplicas)),
},
},
}

return []*agentmetricspb.ExportMetricsServiceRequest{
{
Resource: getResourceForHPA(&hpa.ObjectMeta),
Metrics: metrics,
},
}
}

func getResourceForHPA(om *v1.ObjectMeta) *resourcepb.Resource {
return &resourcepb.Resource{
Type: constants.K8sType,
Labels: map[string]string{
constants.K8sKeyHPAUID: string(om.UID),
constants.K8sKeyHPAName: om.Name,
conventions.AttributeK8SNamespaceName: om.Namespace,
},
}
func GetMetricsBeta(set receiver.CreateSettings, hpa *autoscalingv2beta2.HorizontalPodAutoscaler) pmetric.Metrics {
mb := imetadata.NewMetricsBuilder(imetadata.DefaultMetricsBuilderConfig(), set)
ts := pcommon.NewTimestampFromTime(time.Now())
mb.RecordK8sHpaMaxReplicasDataPoint(ts, int64(hpa.Spec.MaxReplicas))
mb.RecordK8sHpaMinReplicasDataPoint(ts, int64(*hpa.Spec.MinReplicas))
mb.RecordK8sHpaCurrentReplicasDataPoint(ts, int64(hpa.Status.CurrentReplicas))
mb.RecordK8sHpaDesiredReplicasDataPoint(ts, int64(hpa.Status.DesiredReplicas))
return mb.Emit(imetadata.WithK8sHpaUID(string(hpa.UID)), imetadata.WithK8sHpaName(hpa.Name), imetadata.WithK8sNamespaceName(hpa.Namespace))
}

func GetMetrics(set receiver.CreateSettings, hpa *autoscalingv2.HorizontalPodAutoscaler) pmetric.Metrics {
mb := imetadata.NewMetricsBuilder(imetadata.DefaultMetricsBuilderConfig(), set)
ts := pcommon.NewTimestampFromTime(time.Now())
mb.RecordK8sHpaMaxReplicasDataPoint(ts, int64(hpa.Spec.MaxReplicas))
mb.RecordK8sHpaMinReplicasDataPoint(ts, int64(*hpa.Spec.MinReplicas))
mb.RecordK8sHpaCurrentReplicasDataPoint(ts, int64(hpa.Status.CurrentReplicas))
mb.RecordK8sHpaDesiredReplicasDataPoint(ts, int64(hpa.Status.DesiredReplicas))
return mb.Emit(imetadata.WithK8sHpaUID(string(hpa.UID)), imetadata.WithK8sHpaName(hpa.Name), imetadata.WithK8sNamespaceName(hpa.Namespace))
}

func GetMetadata(hpa *autoscalingv2.HorizontalPodAutoscaler) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata {
Expand Down
Loading

0 comments on commit fd526ee

Please sign in to comment.