Skip to content

Commit

Permalink
feat: add a status condition to ic for instrumentation details (#2408)
Browse files Browse the repository at this point in the history
This adds a condition that describes why the workload is instrumented
(which kind of source), along with it's name for better troubleshooting
and traceability.

Examples:


![image](https://github.com/user-attachments/assets/16193fc3-3f95-4a5c-aa9b-1ab4572035b4)

The function in k8sutils now returns more info about the decision, which
can later be used for odigos describe and possibly be also persisted
with ic delete if we ever need finalizers for it.
  • Loading branch information
blumamir authored Feb 9, 2025
1 parent 86369de commit c3c9f2d
Show file tree
Hide file tree
Showing 13 changed files with 288 additions and 51 deletions.
42 changes: 42 additions & 0 deletions api/odigos/v1alpha1/instrumentationconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,55 @@ type InstrumentationConfig struct {

// conditions for the InstrumentationConfigStatus
const (
// Define a status condition type that describes why the workload is marked for instrumentation or not.
MarkedForInstrumentationStatusConditionType = "MarkedForInstrumentation"
// Describe the runtime detection status of this workload.
RuntimeDetectionStatusConditionType = "RuntimeDetection" // TODO: placeholder, not yet implemented
// this const is the Type field in the conditions of the InstrumentationConfigStatus.
AgentEnabledStatusConditionType = "AgentEnabled"
// reports whether the workload associated with the InstrumentationConfig has been rolled out.
// the rollout is needed to update the instrumentation done by the Pods webhook.
WorkloadRolloutStatusConditionType = "WorkloadRollout"
)

func StatusConditionTypeLogicalOrder(condType string) int {
switch condType {
case MarkedForInstrumentationStatusConditionType:
return 1
case RuntimeDetectionStatusConditionType:
// TODO: placeholder, not yet implemented
return 2
case AgentEnabledStatusConditionType:
return 3
case WorkloadRolloutStatusConditionType:
return 4
default:
return 5
}
}

// +kubebuilder:validation:Enum=WorkloadSource;NamespaceSource;WorkloadSourceDisabled;NoSource;RetirableError
type MarkedForInstrumentationReason string

const (
// denotes that the workload is instrumented because of a source CR exists for this workload.
// and the source is not disabled..
MarkedForInstrumentationReasonWorkloadSource MarkedForInstrumentationReason = "WorkloadSource"

// denotes that the workload does not have a source CR, but the namespace has a source CR,
// so the workload is instrumented as inherited from the namespace.
MarkedForInstrumentationReasonNamespaceSource MarkedForInstrumentationReason = "NamespaceSource"

// the source object for workload exists, and it is disabled, thus uninstrumented.
MarkedForInstrumentationReasonWorkloadSourceDisabled MarkedForInstrumentationReason = "WorkloadSourceDisabled"

// this workload is not instrumented because no source CR exists for it or its namespace.
MarkedForInstrumentationReasonNoSource MarkedForInstrumentationReason = "NoSource"

// cannot determine the reason for the instrumentation due to a possible transient error.
MarkedForInstrumentationReasonError MarkedForInstrumentationReason = "RetirableError"
)

// +kubebuilder:validation:Enum=EnabledSuccessfully;WaitingForRuntimeInspection;WaitingForNodeCollector;UnsupportedProgrammingLanguage;IgnoredContainer;NoAvailableAgent;UnsupportedRuntimeVersion;MissingDistroParameter;OtherAgentDetected
type AgentEnabledReason string

Expand Down
2 changes: 1 addition & 1 deletion instrumentor/controllers/agentenabled/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func reconcileWorkload(ctx context.Context, c client.Client, icName string, name
return ctrl.Result{}, err
}

logger.Info("Reconciling workload for InstrumentationConfig object agent enabling", "name", ic.Name, "namespace", ic.Namespace, "instrumentationConfig", ic)
logger.Info("Reconciling workload for InstrumentationConfig object agent enabling", "name", ic.Name, "namespace", ic.Namespace, "instrumentationConfigName", ic.Name)

condition, err := updateInstrumentationConfigSpec(ctx, c, pw, &ic)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
func reconcileWorkloadObject(ctx context.Context, kubeClient client.Client, workloadObject client.Object) error {
logger := log.FromContext(ctx)

instrumented, err := sourceutils.IsObjectInstrumentedBySource(ctx, kubeClient, workloadObject)
instrumented, _, _, err := sourceutils.IsObjectInstrumentedBySource(ctx, kubeClient, workloadObject)
if err != nil {
return err
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func syncGenericWorkloadListToNs(ctx context.Context, c client.Client, kind k8sc
}
}

instrumented, err := sourceutils.IsObjectInstrumentedBySource(ctx, c, freshWorkloadCopy)
instrumented, _, _, err := sourceutils.IsObjectInstrumentedBySource(ctx, c, freshWorkloadCopy)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *InstrumentationConfigReconciler) Reconcile(ctx context.Context, req ctr
return ctrl.Result{}, err
}

enabled, err := sourceutils.IsObjectInstrumentedBySource(ctx, r.Client, workloadObject)
enabled, _, _, err := sourceutils.IsObjectInstrumentedBySource(ctx, r.Client, workloadObject)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

enabled, err := sourceutils.IsObjectInstrumentedBySource(ctx, r.Client, &ns)
enabled, _, _, err := sourceutils.IsObjectInstrumentedBySource(ctx, r.Client, &ns)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (r *SourceReconciler) syncWorkload(ctx context.Context, source *v1alpha1.So
return err
}

instrumented, err := sourceutils.IsObjectInstrumentedBySource(ctx, r.Client, obj)
instrumented, _, _, err := sourceutils.IsObjectInstrumentedBySource(ctx, r.Client, obj)
if err != nil {
return err
}
Expand Down
63 changes: 46 additions & 17 deletions instrumentor/controllers/startlangdetection/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,33 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

func syncNamespaceWorkloads(ctx context.Context, k8sClient client.Client, runtimeScheme *runtime.Scheme, namespace string) error {
var err error
func syncNamespaceWorkloads(ctx context.Context, k8sClient client.Client, runtimeScheme *runtime.Scheme, namespace string) (ctrl.Result, error) {
collectiveRes := ctrl.Result{}
var errs error
for _, kind := range []k8sconsts.WorkloadKind{
k8sconsts.WorkloadKindDaemonSet,
k8sconsts.WorkloadKindDeployment,
k8sconsts.WorkloadKindStatefulSet,
} {
err = errors.Join(err, listAndReconcileWorkloadList(ctx, k8sClient, runtimeScheme, namespace, kind))
res, err := listAndReconcileWorkloadList(ctx, k8sClient, runtimeScheme, namespace, kind)
errs = errors.Join(errs, err)
collectiveRes = joinCtrlResultToList(res, collectiveRes)
}
return err
return collectiveRes, errs
}

func listAndReconcileWorkloadList(ctx context.Context,
k8sClient client.Client,
runtimeScheme *runtime.Scheme,
namespace string,
kind k8sconsts.WorkloadKind) error {
kind k8sconsts.WorkloadKind) (ctrl.Result, error) {

// pre-process existing Sources for specific workloads so we don't have to make a bunch of API calls
// This is used to check if a workload already has an explicit Source, so we don't overwrite its InstrumentationConfig
sourceList := v1alpha1.SourceList{}
err := k8sClient.List(ctx, &sourceList, client.InNamespace(namespace))
if err != nil {
return err
return ctrl.Result{}, err
}
namespaceKindSources := make(map[k8sconsts.WorkloadKind]map[string]struct{})
for _, s := range sourceList.Items {
Expand All @@ -52,47 +55,73 @@ func listAndReconcileWorkloadList(ctx context.Context,
deps := workload.ClientListObjectFromWorkloadKind(kind)
err = k8sClient.List(ctx, deps, client.InNamespace(namespace))
if client.IgnoreNotFound(err) != nil {
return err
return ctrl.Result{}, err
}

collectiveRes := ctrl.Result{}
switch obj := deps.(type) {
case *v1.DeploymentList:
for _, dep := range obj.Items {
err = reconcileWorkloadList(ctx, k8sClient, runtimeScheme, ctrl.Request{NamespacedName: client.ObjectKey{Name: dep.Name, Namespace: dep.Namespace}}, kind, namespaceKindSources)
res, err := reconcileWorkloadList(ctx, k8sClient, runtimeScheme, ctrl.Request{NamespacedName: client.ObjectKey{Name: dep.Name, Namespace: dep.Namespace}}, kind, namespaceKindSources)
// make sure we pass any requeue request up the chain to our caller.
if err != nil {
return err
return ctrl.Result{}, err
}
collectiveRes = joinCtrlResultToList(res, collectiveRes)
}
case *v1.DaemonSetList:
for _, dep := range obj.Items {
err = reconcileWorkloadList(ctx, k8sClient, runtimeScheme, ctrl.Request{NamespacedName: client.ObjectKey{Name: dep.Name, Namespace: dep.Namespace}}, kind, namespaceKindSources)
res, err := reconcileWorkloadList(ctx, k8sClient, runtimeScheme, ctrl.Request{NamespacedName: client.ObjectKey{Name: dep.Name, Namespace: dep.Namespace}}, kind, namespaceKindSources)
if err != nil {
return err
return ctrl.Result{}, err
}
collectiveRes = joinCtrlResultToList(res, collectiveRes)
}
case *v1.StatefulSetList:
for _, dep := range obj.Items {
err = reconcileWorkloadList(ctx, k8sClient, runtimeScheme, ctrl.Request{NamespacedName: client.ObjectKey{Name: dep.Name, Namespace: dep.Namespace}}, kind, namespaceKindSources)
res, err := reconcileWorkloadList(ctx, k8sClient, runtimeScheme, ctrl.Request{NamespacedName: client.ObjectKey{Name: dep.Name, Namespace: dep.Namespace}}, kind, namespaceKindSources)
if err != nil {
return err
return ctrl.Result{}, err
}
collectiveRes = joinCtrlResultToList(res, collectiveRes)
}
}
return nil
return collectiveRes, nil
}

func joinCtrlResultToList(res ctrl.Result, collectiveRes ctrl.Result) ctrl.Result {

// if the current res is not for requeue, we simply ignore it for the collectiveRes
if res.IsZero() {
return collectiveRes
}

// if the collectiveRes is not set it, set it to the current res
if collectiveRes.IsZero() {
return res
}

// notice - we ignore the requeueAfter value of the res, and only use the requeue value.

// else, the new res is less meaningful than the collectiveRes, so keep the collectiveRes
return collectiveRes
}

func reconcileWorkloadList(ctx context.Context,
k8sClient client.Client,
runtimeScheme *runtime.Scheme,
req ctrl.Request,
kind k8sconsts.WorkloadKind,
namespaceKindSources map[k8sconsts.WorkloadKind]map[string]struct{}) error {
namespaceKindSources map[k8sconsts.WorkloadKind]map[string]struct{}) (ctrl.Result, error) {
logger := log.FromContext(ctx)
if _, exists := namespaceKindSources[kind][req.Name]; !exists {
_, err := reconcileWorkload(ctx, k8sClient, kind, req, runtimeScheme)
res, err := reconcileWorkload(ctx, k8sClient, kind, req, runtimeScheme)
if err != nil {
logger.Error(err, "error requesting runtime details from odiglets", "name", req.Name, "namespace", req.Namespace, "kind", kind)
}
if !res.IsZero() {
return res, err
}
}
return nil
return ctrl.Result{}, nil
}
30 changes: 30 additions & 0 deletions instrumentor/controllers/startlangdetection/conditionutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package startlangdetection

import (
"slices"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Checks if the conditions array in the status is currently sorted by logical order
// this can be used to check if sorting is needed.
func areConditionsLogicallySorted(conditions []metav1.Condition) bool {
var lastTypeLogicalOrder int = 0
for _, condition := range conditions {
currentLogicalOrder := odigosv1.StatusConditionTypeLogicalOrder(condition.Type)
if currentLogicalOrder <= lastTypeLogicalOrder {
return false
}
lastTypeLogicalOrder = currentLogicalOrder
}
return true
}

// giving the input conditions array, this function will return a new array with the conditions sorted by logical order
func sortIcConditionsByLogicalOrder(conditions []metav1.Condition) []metav1.Condition {
slices.SortFunc(conditions, func(i, j metav1.Condition) int {
return odigosv1.StatusConditionTypeLogicalOrder(i.Type) - odigosv1.StatusConditionTypeLogicalOrder(j.Type)
})
return conditions
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (n *NamespacesReconciler) Reconcile(ctx context.Context, request ctrl.Reque
return ctrl.Result{}, client.IgnoreNotFound(err)
}

enabled, err := sourceutils.IsObjectInstrumentedBySource(ctx, n.Client, &ns)
enabled, _, _, err := sourceutils.IsObjectInstrumentedBySource(ctx, n.Client, &ns)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -34,5 +34,5 @@ func (n *NamespacesReconciler) Reconcile(ctx context.Context, request ctrl.Reque
}

logger.V(0).Info("Namespace enabled for instrumentation, recalculating runtime details of relevant workloads")
return ctrl.Result{}, syncNamespaceWorkloads(ctx, n.Client, n.Scheme, ns.GetName())
return syncNamespaceWorkloads(ctx, n.Client, n.Scheme, ns.GetName())
}
14 changes: 12 additions & 2 deletions instrumentor/controllers/startlangdetection/source_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,15 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

if source.Spec.Workload.Kind == k8sconsts.WorkloadKindNamespace {
err = errors.Join(err, syncNamespaceWorkloads(ctx, r.Client, r.Scheme, source.Spec.Workload.Name))
res, reconcileErr := syncNamespaceWorkloads(ctx, r.Client, r.Scheme, source.Spec.Workload.Name)
if reconcileErr != nil {
err = errors.Join(err, reconcileErr)
}
if !res.IsZero() {
return res, err
}
} else {
_, reconcileErr := reconcileWorkload(ctx,
res, reconcileErr := reconcileWorkload(ctx,
r.Client,
source.Spec.Workload.Kind,
ctrl.Request{
Expand All @@ -76,6 +82,10 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
if reconcileErr != nil {
err = errors.Join(err, reconcileErr)
}
if !res.IsZero() {
// propagate the requeue result to the caller if there is one
return res, err
}
}

if client.IgnoreNotFound(err) != nil {
Expand Down
Loading

0 comments on commit c3c9f2d

Please sign in to comment.