Skip to content

Commit

Permalink
Show pods in component health message (#2496)
Browse files Browse the repository at this point in the history
* Show pods with health

* changelog

* fix unit test

* update example, add note
  • Loading branch information
jaronoff97 authored Jan 9, 2024
1 parent 71634d6 commit 168c90f
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 38 deletions.
16 changes: 16 additions & 0 deletions .chloggen/pods-in-bridge-message.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: bridge

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Sets pods in the component health map

# One or more tracking issues related to the change
issues: [2489]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: this change adds a requirement for a new permission for the bridge to list and get pods.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ container-push:
container-target-allocator-push:
docker push ${TARGETALLOCATOR_IMG}

.PHONY: container-operator-opamp-bridge-push
container-operator-opamp-bridge-push:
docker push ${OPERATOROPAMPBRIDGE_IMG}

.PHONY: container-target-allocator
container-target-allocator: GOOS = linux
container-target-allocator: targetallocator
Expand Down Expand Up @@ -333,7 +337,7 @@ endif


.PHONY: load-image-operator-opamp-bridge
load-image-operator-opamp-bridge:
load-image-operator-opamp-bridge: container-operator-opamp-bridge
kind load --name $(KIND_CLUSTER_NAME) docker-image ${OPERATOROPAMPBRIDGE_IMG}

.PHONY: cert-manager
Expand Down
70 changes: 61 additions & 9 deletions cmd/operator-opamp-bridge/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
Expand All @@ -29,6 +30,7 @@ import (
"k8s.io/utils/clock"
"sigs.k8s.io/yaml"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/config"
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/metrics"
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/operator"
Expand All @@ -37,7 +39,7 @@ import (
type Agent struct {
logger logr.Logger

appliedKeys map[collectorKey]bool
appliedKeys map[kubeResourceKey]bool
clock clock.Clock
startTime uint64
lastHash []byte
Expand Down Expand Up @@ -65,7 +67,7 @@ func NewAgent(logger logr.Logger, applier operator.ConfigApplier, config *config
config: config,
applier: applier,
logger: logger,
appliedKeys: map[collectorKey]bool{},
appliedKeys: map[kubeResourceKey]bool{},
instanceId: config.GetNewInstanceId(),
agentDescription: config.GetDescription(),
remoteConfigEnabled: config.RemoteConfigEnabled(),
Expand All @@ -85,7 +87,7 @@ func NewAgent(logger logr.Logger, applier operator.ConfigApplier, config *config

// getHealth is called every heartbeat interval to report health.
func (agent *Agent) getHealth() *protobufs.ComponentHealth {
healthMap, err := agent.generateComponentHealthMap()
healthMap, err := agent.generateCollectorPoolHealth()
if err != nil {
return &protobufs.ComponentHealth{
Healthy: false,
Expand All @@ -102,20 +104,70 @@ func (agent *Agent) getHealth() *protobufs.ComponentHealth {
}
}

// generateComponentHealthMap allows the bridge to report the status of the collector pools it owns.
// TODO: implement enhanced health messaging.
func (agent *Agent) generateComponentHealthMap() (map[string]*protobufs.ComponentHealth, error) {
// generateCollectorPoolHealth allows the bridge to report the status of the collector pools it owns.
// TODO: implement enhanced health messaging using the collector's new healthcheck extension:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26661
func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.ComponentHealth, error) {
cols, err := agent.applier.ListInstances()
if err != nil {
return nil, err
}
healthMap := map[string]*protobufs.ComponentHealth{}
for _, col := range cols {
key := newCollectorKey(col.GetNamespace(), col.GetName())
key := newKubeResourceKey(col.GetNamespace(), col.GetName())
podMap, err := agent.generateCollectorHealth(agent.getCollectorSelector(col), col.GetNamespace())
if err != nil {
return nil, err
}
healthMap[key.String()] = &protobufs.ComponentHealth{
StartTimeUnixNano: uint64(col.ObjectMeta.GetCreationTimestamp().UnixNano()),
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
Status: col.Status.Scale.StatusReplicas,
ComponentHealthMap: podMap,
}
}
return healthMap, nil
}

// getCollectorSelector destructures the collectors scale selector if present, if uses the labelmap from the operator.
func (agent *Agent) getCollectorSelector(col v1alpha1.OpenTelemetryCollector) map[string]string {
if len(col.Status.Scale.Selector) > 0 {
selMap := map[string]string{}
for _, kvPair := range strings.Split(col.Status.Scale.Selector, ",") {
kv := strings.Split(kvPair, "=")
// skip malformed pairs
if len(kv) != 2 {
continue
}
selMap[kv[0]] = kv[1]
}
return selMap
}
return map[string]string{
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/instance": fmt.Sprintf("%s.%s", col.GetNamespace(), col.GetName()),
"app.kubernetes.io/part-of": "opentelemetry",
"app.kubernetes.io/component": "opentelemetry-collector",
}
}

func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, namespace string) (map[string]*protobufs.ComponentHealth, error) {
pods, err := agent.applier.GetCollectorPods(selectorLabels, namespace)
if err != nil {
return nil, err
}
healthMap := map[string]*protobufs.ComponentHealth{}
for _, item := range pods.Items {
key := newKubeResourceKey(item.GetNamespace(), item.GetName())
healthy := true
if item.Status.Phase != "Running" {
healthy = false
}
healthMap[key.String()] = &protobufs.ComponentHealth{
StartTimeUnixNano: uint64(item.Status.StartTime.UnixNano()),
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
Status: string(item.Status.Phase),
Healthy: healthy,
}
}
return healthMap, nil
Expand Down Expand Up @@ -232,7 +284,7 @@ func (agent *Agent) getEffectiveConfig(ctx context.Context) (*protobufs.Effectiv
agent.logger.Error(err, "failed to marhsal config")
return nil, err
}
mapKey := newCollectorKey(instance.GetNamespace(), instance.GetName())
mapKey := newKubeResourceKey(instance.GetNamespace(), instance.GetName())
instanceMap[mapKey.String()] = &protobufs.AgentConfigFile{
Body: marshaled,
ContentType: "yaml",
Expand Down Expand Up @@ -277,7 +329,7 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*pro
if len(key) == 0 || len(file.Body) == 0 {
continue
}
colKey, err := collectorKeyFromKey(key)
colKey, err := kubeResourceFromKey(key)
if err != nil {
multiErr = multierr.Append(multiErr, err)
continue
Expand Down
78 changes: 75 additions & 3 deletions cmd/operator-opamp-bridge/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
testingclock "k8s.io/utils/clock/testing"
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
Expand All @@ -49,9 +51,11 @@ const (
testNamespace = "testnamespace"
testCollectorName = "collector"
otherCollectorName = "other"
thirdCollectorName = "third"
emptyConfigHash = ""
testCollectorKey = testNamespace + "/" + testCollectorName
otherCollectorKey = testNamespace + "/" + otherCollectorName
thirdCollectorKey = otherCollectorName + "/" + thirdCollectorName

agentTestFileName = "testdata/agent.yaml"
agentTestFileHttpName = "testdata/agenthttpbasic.yaml"
Expand All @@ -73,6 +77,32 @@ var (
invalidYamlConfigHash = getConfigHash(testCollectorKey, collectorInvalidFile)
updatedYamlConfigHash = getConfigHash(testCollectorKey, collectorUpdatedFile)
otherUpdatedYamlConfigHash = getConfigHash(otherCollectorKey, collectorUpdatedFile)

podTime = metav1.NewTime(time.UnixMicro(1704748549000000))
mockPodList = &v1.PodList{
TypeMeta: metav1.TypeMeta{
Kind: "PodList",
APIVersion: "v1",
},
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: thirdCollectorName + "-1",
Namespace: otherCollectorName,
Labels: map[string]string{
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/instance": fmt.Sprintf("%s.%s", otherCollectorName, thirdCollectorName),
"app.kubernetes.io/part-of": "opentelemetry",
"app.kubernetes.io/component": "opentelemetry-collector",
},
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
StartTime: &podTime,
Phase: v1.PodRunning,
},
},
}}
)

func getConfigHash(key, file string) string {
Expand Down Expand Up @@ -130,16 +160,17 @@ func (m *mockOpampClient) SetPackageStatuses(_ *protobufs.PackageStatuses) error
return nil
}

func getFakeApplier(t *testing.T, conf *config.Config) *operator.Client {
func getFakeApplier(t *testing.T, conf *config.Config, lists ...runtimeClient.ObjectList) *operator.Client {
schemeBuilder := runtime.NewSchemeBuilder(func(s *runtime.Scheme) error {
s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.OpenTelemetryCollector{}, &v1alpha1.OpenTelemetryCollectorList{})
s.AddKnownTypes(v1.SchemeGroupVersion, &v1.Pod{}, &v1.PodList{})
metav1.AddToGroupVersion(s, v1alpha1.GroupVersion)
return nil
})
scheme := runtime.NewScheme()
err := schemeBuilder.AddToScheme(scheme)
require.NoError(t, err, "Should be able to add custom types")
c := fake.NewClientBuilder().WithScheme(scheme)
c := fake.NewClientBuilder().WithLists(lists...).WithScheme(scheme)
return operator.NewClient("test-bridge", l, c.Build(), conf.GetComponentsAllowed())
}

Expand Down Expand Up @@ -205,6 +236,7 @@ func TestAgent_getHealth(t *testing.T) {
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
},
Expand Down Expand Up @@ -236,13 +268,53 @@ func TestAgent_getHealth(t *testing.T) {
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
"testnamespace/other": {
Healthy: false, // we're working with mocks so the status will never be reconciled.
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
},
},
},
{
name: "with pod health",
fields: fields{
configFile: agentTestFileName,
},
args: args{
ctx: context.Background(),
configs: []map[string]string{
{
thirdCollectorKey: collectorBasicFile,
},
},
},
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"other/third": {
Healthy: false, // we're working with mocks so the status will never be reconciled.
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
otherCollectorName + "/" + thirdCollectorName + "-1": {
Healthy: true,
Status: "Running",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: uint64(podTime.UnixNano()),
},
},
},
},
},
Expand All @@ -255,7 +327,7 @@ func TestAgent_getHealth(t *testing.T) {
conf := config.NewConfig(logr.Discard())
loadErr := config.LoadFromFile(conf, tt.fields.configFile)
require.NoError(t, loadErr, "should be able to load config")
applier := getFakeApplier(t, conf)
applier := getFakeApplier(t, conf, mockPodList)
agent := NewAgent(l, applier, conf, mockClient)
agent.clock = fakeClock
err := agent.Start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@ import (
"strings"
)

type collectorKey struct {
type kubeResourceKey struct {
name string
namespace string
}

func newCollectorKey(namespace string, name string) collectorKey {
return collectorKey{name: name, namespace: namespace}
func newKubeResourceKey(namespace string, name string) kubeResourceKey {
return kubeResourceKey{name: name, namespace: namespace}
}

func collectorKeyFromKey(key string) (collectorKey, error) {
func kubeResourceFromKey(key string) (kubeResourceKey, error) {
s := strings.Split(key, "/")
// We expect map keys to be of the form name/namespace
if len(s) != 2 {
return collectorKey{}, errors.New("invalid key")
return kubeResourceKey{}, errors.New("invalid key")
}
return newCollectorKey(s[0], s[1]), nil
return newKubeResourceKey(s[0], s[1]), nil
}

func (k collectorKey) String() string {
func (k kubeResourceKey) String() string {
return fmt.Sprintf("%s/%s", k.namespace, k.name)
}
Loading

0 comments on commit 168c90f

Please sign in to comment.