From 76948742ff19aa9d6581b2ac7c864cdb4b4be0db Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Thu, 9 Apr 2026 10:45:26 -0300 Subject: [PATCH 01/10] flp-informers deployment and wiring --- bundle.Dockerfile | 2 +- .../flows.netobserv.io_flowcollectors.yaml | 2 +- ...lows.netobserv.io_flowcollectorslices.yaml | 2 +- .../flows.netobserv.io_flowmetrics.yaml | 2 +- ...observ-operator.clusterserviceversion.yaml | 2 +- bundle/metadata/annotations.yaml | 2 +- .../flows.netobserv.io_flowcollectors.yaml | 2 +- ...lows.netobserv.io_flowcollectorslices.yaml | 2 +- .../bases/flows.netobserv.io_flowmetrics.yaml | 2 +- .../flows.netobserv.io_flowcollectors.yaml | 2 +- ...lows.netobserv.io_flowcollectorslices.yaml | 2 +- helm/crds/flows.netobserv.io_flowmetrics.yaml | 2 +- internal/controller/flp/flp_common_objects.go | 11 +- internal/controller/flp/flp_controller.go | 1 + .../controller/flp/flp_controller_test.go | 28 +++-- .../controller/flp/flp_informer_builder.go | 116 +++++++++++++++++ .../controller/flp/flp_informer_reconciler.go | 119 ++++++++++++++++++ .../controller/flp/flp_monolith_objects.go | 20 ++- .../controller/flp/flp_monolith_reconciler.go | 8 -- .../controller/flp/flp_transfo_reconciler.go | 8 -- internal/pkg/manager/status/status_manager.go | 1 + 21 files changed, 289 insertions(+), 47 deletions(-) create mode 100644 internal/controller/flp/flp_informer_builder.go create mode 100644 internal/controller/flp/flp_informer_reconciler.go diff --git a/bundle.Dockerfile b/bundle.Dockerfile index 55fea91d3e..0061406146 100644 --- a/bundle.Dockerfile +++ b/bundle.Dockerfile @@ -7,7 +7,7 @@ LABEL operators.operatorframework.io.bundle.metadata.v1=metadata/ LABEL operators.operatorframework.io.bundle.package.v1=netobserv-operator LABEL operators.operatorframework.io.bundle.channels.v1=latest,community LABEL operators.operatorframework.io.bundle.channel.default.v1=community -LABEL operators.operatorframework.io.metrics.builder=operator-sdk-v1.42.0 +LABEL operators.operatorframework.io.metrics.builder=operator-sdk-v1.40.0 LABEL operators.operatorframework.io.metrics.mediatype.v1=metrics+v1 LABEL operators.operatorframework.io.metrics.project_layout=go.kubebuilder.io/v4 diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml index 5e49ae372a..d1938e86bb 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.0 + controller-gen.kubebuilder.io/version: v0.16.2 creationTimestamp: null name: flowcollectors.flows.netobserv.io spec: diff --git a/bundle/manifests/flows.netobserv.io_flowcollectorslices.yaml b/bundle/manifests/flows.netobserv.io_flowcollectorslices.yaml index 81704a1ed1..0e57318c67 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectorslices.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectorslices.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.0 + controller-gen.kubebuilder.io/version: v0.16.2 creationTimestamp: null name: flowcollectorslices.flows.netobserv.io spec: diff --git a/bundle/manifests/flows.netobserv.io_flowmetrics.yaml b/bundle/manifests/flows.netobserv.io_flowmetrics.yaml index 9f028f12bf..b97bec0af5 100644 --- a/bundle/manifests/flows.netobserv.io_flowmetrics.yaml +++ b/bundle/manifests/flows.netobserv.io_flowmetrics.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.0 + controller-gen.kubebuilder.io/version: v0.16.2 creationTimestamp: null name: flowmetrics.flows.netobserv.io spec: diff --git a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml index 190e598f09..97049f01fd 100644 --- a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml +++ b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml @@ -96,7 +96,7 @@ metadata: operatorframework.io/initialization-resource: '{"apiVersion":"flows.netobserv.io/v1beta2", "kind":"FlowCollector","metadata":{"name":"cluster"},"spec": {}}' operatorframework.io/suggested-namespace: openshift-netobserv-operator - operators.operatorframework.io/builder: operator-sdk-v1.42.0 + operators.operatorframework.io/builder: operator-sdk-v1.40.0 operators.operatorframework.io/project_layout: go.kubebuilder.io/v4 repository: https://github.com/netobserv/netobserv-operator support: NetObserv team diff --git a/bundle/metadata/annotations.yaml b/bundle/metadata/annotations.yaml index 4aefd671e1..7d96f3b3b6 100644 --- a/bundle/metadata/annotations.yaml +++ b/bundle/metadata/annotations.yaml @@ -6,7 +6,7 @@ annotations: operators.operatorframework.io.bundle.package.v1: netobserv-operator operators.operatorframework.io.bundle.channels.v1: latest,community operators.operatorframework.io.bundle.channel.default.v1: community - operators.operatorframework.io.metrics.builder: operator-sdk-v1.42.0 + operators.operatorframework.io.metrics.builder: operator-sdk-v1.40.0 operators.operatorframework.io.metrics.mediatype.v1: metrics+v1 operators.operatorframework.io.metrics.project_layout: go.kubebuilder.io/v4 diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index cfc3410785..fdbe2a46d5 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.0 + controller-gen.kubebuilder.io/version: v0.16.2 name: flowcollectors.flows.netobserv.io spec: group: flows.netobserv.io diff --git a/config/crd/bases/flows.netobserv.io_flowcollectorslices.yaml b/config/crd/bases/flows.netobserv.io_flowcollectorslices.yaml index e241609ca3..d93ffa53d7 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectorslices.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectorslices.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.0 + controller-gen.kubebuilder.io/version: v0.16.2 name: flowcollectorslices.flows.netobserv.io spec: group: flows.netobserv.io diff --git a/config/crd/bases/flows.netobserv.io_flowmetrics.yaml b/config/crd/bases/flows.netobserv.io_flowmetrics.yaml index fb0e0aa83a..024a0cd9f1 100644 --- a/config/crd/bases/flows.netobserv.io_flowmetrics.yaml +++ b/config/crd/bases/flows.netobserv.io_flowmetrics.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.0 + controller-gen.kubebuilder.io/version: v0.16.2 name: flowmetrics.flows.netobserv.io spec: group: flows.netobserv.io diff --git a/helm/crds/flows.netobserv.io_flowcollectors.yaml b/helm/crds/flows.netobserv.io_flowcollectors.yaml index 0762c1ad99..7c19330ee1 100644 --- a/helm/crds/flows.netobserv.io_flowcollectors.yaml +++ b/helm/crds/flows.netobserv.io_flowcollectors.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.0 + controller-gen.kubebuilder.io/version: v0.16.2 creationTimestamp: null name: flowcollectors.flows.netobserv.io spec: diff --git a/helm/crds/flows.netobserv.io_flowcollectorslices.yaml b/helm/crds/flows.netobserv.io_flowcollectorslices.yaml index d476970227..11cc67a21e 100644 --- a/helm/crds/flows.netobserv.io_flowcollectorslices.yaml +++ b/helm/crds/flows.netobserv.io_flowcollectorslices.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.0 + controller-gen.kubebuilder.io/version: v0.16.2 creationTimestamp: null name: flowcollectorslices.flows.netobserv.io spec: diff --git a/helm/crds/flows.netobserv.io_flowmetrics.yaml b/helm/crds/flows.netobserv.io_flowmetrics.yaml index db4f48473a..3865d3210c 100644 --- a/helm/crds/flows.netobserv.io_flowmetrics.yaml +++ b/helm/crds/flows.netobserv.io_flowmetrics.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.0 + controller-gen.kubebuilder.io/version: v0.16.2 creationTimestamp: null name: flowmetrics.flows.netobserv.io spec: diff --git a/internal/controller/flp/flp_common_objects.go b/internal/controller/flp/flp_common_objects.go index 3a3f10b63b..ddd606ff97 100644 --- a/internal/controller/flp/flp_common_objects.go +++ b/internal/controller/flp/flp_common_objects.go @@ -128,6 +128,11 @@ func podTemplate( Name: prometheusPortName, ContainerPort: desired.Processor.GetMetricsPort(), }) + ports = append(ports, corev1.ContainerPort{ + Name: "k8scache", + ContainerPort: 9090, + Protocol: corev1.ProtocolTCP, + }) if advancedConfig.ProfilePort != nil { ports = append(ports, corev1.ContainerPort{ @@ -166,7 +171,11 @@ func podTemplate( Name: constants.FLPName, Image: imageName, ImagePullPolicy: corev1.PullPolicy(desired.Processor.ImagePullPolicy), - Args: []string{fmt.Sprintf(`--config=%s/%s`, configPath, configFile)}, + Args: []string{ + fmt.Sprintf(`--config=%s/%s`, configPath, configFile), + "--k8scache.port=9090", + "--k8scache.address=0.0.0.0", + }, Resources: *desired.Processor.Resources.DeepCopy(), VolumeMounts: volumeMounts, Ports: ports, diff --git a/internal/controller/flp/flp_controller.go b/internal/controller/flp/flp_controller.go index 032b2dfaca..eaa05a34c3 100644 --- a/internal/controller/flp/flp_controller.go +++ b/internal/controller/flp/flp_controller.go @@ -179,6 +179,7 @@ func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, fc *flow // `reconcilers.Common` is dependent on the FlowCollector object, which isn't known at start time. images := map[reconcilers.ImageRef]string{reconcilers.MainImage: r.mgr.Config.FlowlogsPipelineImage} reconcilers := []subReconciler{ + newInformerReconciler(cmn.NewInstance(images, r.mgr.Status.ForComponent(status.FLPInformers))), newMonolithReconciler(cmn.NewInstance(images, r.mgr.Status.ForComponent(status.FLPMonolith))), newTransformerReconciler(cmn.NewInstance(images, r.mgr.Status.ForComponent(status.FLPTransformer))), } diff --git a/internal/controller/flp/flp_controller_test.go b/internal/controller/flp/flp_controller_test.go index 7b1efe8842..ab99692697 100644 --- a/internal/controller/flp/flp_controller_test.go +++ b/internal/controller/flp/flp_controller_test.go @@ -59,14 +59,17 @@ func ControllerSpecs() { Name: transfoName, Namespace: operatorNamespace, } + flpKeyInformer := types.NamespacedName{ + Name: informerName, + Namespace: operatorNamespace, + } rbKeyConfigWatcherMono := types.NamespacedName{Name: resources.GetRoleBindingName(monoShortName, constants.ConfigWatcherRole), Namespace: operatorNamespace} rbKeyHostNetworkMono := types.NamespacedName{Name: resources.GetClusterRoleBindingName(monoShortName, constants.HostNetworkRole)} rbKeyLokiWriterMono := types.NamespacedName{Name: resources.GetClusterRoleBindingName(monoShortName, constants.LokiWriterRole)} - rbKeyInformerMono := types.NamespacedName{Name: resources.GetClusterRoleBindingName(monoShortName, constants.FLPInformersRole)} rbKeyConfigWatcherTransfo := types.NamespacedName{Name: resources.GetRoleBindingName(transfoShortName, constants.ConfigWatcherRole), Namespace: operatorNamespace} rbKeyHostNetworkTransfo := types.NamespacedName{Name: resources.GetClusterRoleBindingName(transfoShortName, constants.HostNetworkRole)} rbKeyLokiWriterTransfo := types.NamespacedName{Name: resources.GetClusterRoleBindingName(transfoShortName, constants.LokiWriterRole)} - rbKeyInformerTransfo := types.NamespacedName{Name: resources.GetClusterRoleBindingName(transfoShortName, constants.FLPInformersRole)} + rbKeyInformer := types.NamespacedName{Name: resources.GetClusterRoleBindingName(informerShortName, constants.FLPInformersRole)} // Created objects to cleanup cleanupList := []client.Object{} @@ -128,6 +131,12 @@ func ControllerSpecs() { return nil }, timeout, interval).Should(Succeed()) + By("Expecting to create the flp-informers Deployment") + Eventually(func() interface{} { + deploy := appsv1.Deployment{} + return k8sClient.Get(ctx, flpKeyInformer, &deploy) + }, timeout, interval).Should(Succeed()) + By("Expecting to create the flowlogs-pipeline ServiceAccount") Eventually(func() interface{} { svcAcc := v1.ServiceAccount{} @@ -158,10 +167,10 @@ func ControllerSpecs() { rb3 := rbacv1.ClusterRoleBinding{} Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyInformerMono, &rb3) + return k8sClient.Get(ctx, rbKeyInformer, &rb3) }, timeout, interval).Should(Succeed()) Expect(rb3.Subjects).Should(HaveLen(1)) - Expect(rb3.Subjects[0].Name).Should(Equal("flowlogs-pipeline")) + Expect(rb3.Subjects[0].Name).Should(Equal("flp-informers")) Expect(rb3.RoleRef.Name).Should(Equal("netobserv-informers")) By("Not expecting Loki role (requires LokiStack)") @@ -176,9 +185,6 @@ func ControllerSpecs() { Eventually(func() interface{} { return k8sClient.Get(ctx, rbKeyHostNetworkTransfo, &rbacv1.ClusterRoleBinding{}) }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "netobserv-hostnetwork-flptransfo" not found`)) - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyInformerTransfo, &rbacv1.ClusterRoleBinding{}) - }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "netobserv-informers-flptransfo" not found`)) Eventually(func() interface{} { return k8sClient.Get(ctx, rbKeyLokiWriterTransfo, &rbacv1.ClusterRoleBinding{}) }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "netobserv-loki-writer-flptransfo" not found`)) @@ -321,12 +327,13 @@ func ControllerSpecs() { Expect(rb1.Subjects[0].Name).Should(Equal("flowlogs-pipeline-transformer")) Expect(rb1.RoleRef.Name).Should(Equal("netobserv-config-watcher")) + By("Expecting informer role binding (independent deployment)") rb2 := rbacv1.ClusterRoleBinding{} Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyInformerTransfo, &rb2) + return k8sClient.Get(ctx, rbKeyInformer, &rb2) }, timeout, interval).Should(Succeed()) Expect(rb2.Subjects).Should(HaveLen(1)) - Expect(rb2.Subjects[0].Name).Should(Equal("flowlogs-pipeline-transformer")) + Expect(rb2.Subjects[0].Name).Should(Equal("flp-informers")) Expect(rb2.RoleRef.Name).Should(Equal("netobserv-informers")) By("Not expecting hostnetwork role (not needed with Kafka)") @@ -346,9 +353,6 @@ func ControllerSpecs() { Eventually(func() interface{} { return k8sClient.Get(ctx, rbKeyHostNetworkMono, &rbacv1.ClusterRoleBinding{}) }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "netobserv-hostnetwork-flp" not found`)) - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyInformerMono, &rbacv1.ClusterRoleBinding{}) - }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "netobserv-informers-flp" not found`)) Eventually(func() interface{} { return k8sClient.Get(ctx, rbKeyLokiWriterMono, &rbacv1.ClusterRoleBinding{}) }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "netobserv-loki-writer-flp" not found`)) diff --git a/internal/controller/flp/flp_informer_builder.go b/internal/controller/flp/flp_informer_builder.go new file mode 100644 index 0000000000..d76b3848bb --- /dev/null +++ b/internal/controller/flp/flp_informer_builder.go @@ -0,0 +1,116 @@ +package flp + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" + "github.com/netobserv/netobserv-operator/internal/controller/constants" + "github.com/netobserv/netobserv-operator/internal/controller/reconcilers" + "github.com/netobserv/netobserv-operator/internal/pkg/helper" +) + +type informerBuilder struct { + *reconcilers.Instance + desired *flowslatest.FlowCollectorSpec +} + +func newInformerBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec) informerBuilder { + return informerBuilder{ + Instance: info, + desired: desired, + } +} + +func (b *informerBuilder) deployment() (*appsv1.Deployment, error) { + var replicas int32 = 1 + version := helper.MaxLabelLength(helper.ExtractVersion(b.Images[reconcilers.MainImage])) + + // Determine the correct processor selector based on deployment model + processorSelector := "app=flowlogs-pipeline" + if b.desired.UseKafka() { + processorSelector = "app=flowlogs-pipeline-transformer" + } + + container := corev1.Container{ + Name: informerName, + Image: b.Images[reconcilers.MainImage], + ImagePullPolicy: corev1.PullPolicy(b.desired.Processor.ImagePullPolicy), + Command: []string{"/app/flp-informers"}, + Args: []string{ + fmt.Sprintf("--processor-selector=%s", processorSelector), + "--processor-port=9090", + "--resync-interval=60", + fmt.Sprintf("--log-level=%s", b.desired.Processor.LogLevel), + }, + Env: []corev1.EnvVar{ + { + Name: "POD_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("128Mi"), + corev1.ResourceCPU: resource.MustParse("50m"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("256Mi"), + corev1.ResourceCPU: resource.MustParse("200m"), + }, + }, + } + + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: informerName, + Namespace: b.Namespace, + Labels: map[string]string{ + "part-of": constants.OperatorName, + "app": informerName, + "version": version, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": informerName, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": informerName, + "version": version, + }, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: informerName, + Containers: []corev1.Container{container}, + }, + }, + }, + }, nil +} + +func (b *informerBuilder) serviceAccount() *corev1.ServiceAccount { + return &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: informerName, + Namespace: b.Namespace, + Labels: map[string]string{ + "part-of": constants.OperatorName, + "app": informerName, + }, + }, + } +} diff --git a/internal/controller/flp/flp_informer_reconciler.go b/internal/controller/flp/flp_informer_reconciler.go new file mode 100644 index 0000000000..16f13621a5 --- /dev/null +++ b/internal/controller/flp/flp_informer_reconciler.go @@ -0,0 +1,119 @@ +package flp + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + "sigs.k8s.io/controller-runtime/pkg/log" + + flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" + sliceslatest "github.com/netobserv/netobserv-operator/api/flowcollectorslice/v1alpha1" + metricslatest "github.com/netobserv/netobserv-operator/api/flowmetrics/v1alpha1" + "github.com/netobserv/netobserv-operator/internal/controller/constants" + "github.com/netobserv/netobserv-operator/internal/controller/reconcilers" + "github.com/netobserv/netobserv-operator/internal/pkg/helper" + "github.com/netobserv/netobserv-operator/internal/pkg/manager/status" + "github.com/netobserv/netobserv-operator/internal/pkg/resources" +) + +const ( + informerName = "flp-informers" + informerShortName = "informers" +) + +type informerReconciler struct { + *reconcilers.Instance + deployment *appsv1.Deployment + serviceAccount *corev1.ServiceAccount + rbInformer *rbacv1.ClusterRoleBinding +} + +func newInformerReconciler(cmn *reconcilers.Instance) *informerReconciler { + rec := informerReconciler{ + Instance: cmn, + deployment: cmn.Managed.NewDeployment(informerName), + serviceAccount: cmn.Managed.NewServiceAccount(informerName), + rbInformer: cmn.Managed.NewCRB(resources.GetClusterRoleBindingName(informerShortName, constants.FLPInformersRole)), + } + return &rec +} + +func (r *informerReconciler) context(ctx context.Context) context.Context { + l := log.FromContext(ctx).WithName("informers") + return log.IntoContext(ctx, l) +} + +func (r *informerReconciler) getStatus() *status.Instance { + return &r.Status +} + +func (r *informerReconciler) reconcile(ctx context.Context, desired *flowslatest.FlowCollector, _ *metricslatest.FlowMetricList, _ []sliceslatest.FlowCollectorSlice, _ []flowslatest.SubnetLabel) error { + // Retrieve current owned objects + err := r.Managed.FetchAll(ctx) + if err != nil { + return err + } + + if desired.Spec.OnHold() { + r.Status.SetUnused("FlowCollector is on hold") + r.Managed.TryDeleteAll(ctx) + return nil + } + + builder := newInformerBuilder(r.Instance, &desired.Spec) + + // Reconcile ServiceAccount + if err := r.reconcileServiceAccount(ctx, &builder); err != nil { + return err + } + + // Reconcile RBAC + if err := r.reconcilePermissions(ctx, &builder); err != nil { + return err + } + + // Reconcile Deployment + if err := r.reconcileDeployment(ctx, &builder); err != nil { + return err + } + + return nil +} + +func (r *informerReconciler) reconcileServiceAccount(ctx context.Context, builder *informerBuilder) error { + if !r.Managed.Exists(r.serviceAccount) { + return r.CreateOwned(ctx, builder.serviceAccount()) + } // We only configure name, update is not needed for now + return nil +} + +func (r *informerReconciler) reconcilePermissions(ctx context.Context, builder *informerBuilder) error { + // Informers + r.rbInformer = resources.GetClusterRoleBinding(r.Namespace, informerShortName, informerName, informerName, constants.FLPInformersRole) + if err := r.ReconcileClusterRoleBinding(ctx, r.rbInformer); err != nil { + return err + } + return nil +} + +func (r *informerReconciler) reconcileDeployment(ctx context.Context, builder *informerBuilder) error { + report := helper.NewChangeReport("FLP informers Deployment") + defer report.LogIfNeeded(ctx) + + desiredDep, err := builder.deployment() + if err != nil { + return err + } + + return reconcilers.ReconcileDeployment( + ctx, + r.Instance, + r.deployment, + desiredDep, + informerName, + false, + &report, + ) +} diff --git a/internal/controller/flp/flp_monolith_objects.go b/internal/controller/flp/flp_monolith_objects.go index ee930c7ae9..f6e2fbcc1e 100644 --- a/internal/controller/flp/flp_monolith_objects.go +++ b/internal/controller/flp/flp_monolith_objects.go @@ -168,12 +168,20 @@ func (b *monolithBuilder) service() *corev1.Service { }, Spec: corev1.ServiceSpec{ Selector: map[string]string{"app": monoName}, - Ports: []corev1.ServicePort{{ - Name: constants.FLPPortName, - Port: port, - Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt32(port), - }}, + Ports: []corev1.ServicePort{ + { + Name: constants.FLPPortName, + Port: port, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt32(port), + }, + { + Name: "k8scache", + Port: 9090, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt(9090), + }, + }, }, } if b.info.ClusterInfo.IsOpenShift() && (b.desired.Processor.Service == nil || b.desired.Processor.Service.TLSType == flowslatest.TLSAuto) { diff --git a/internal/controller/flp/flp_monolith_reconciler.go b/internal/controller/flp/flp_monolith_reconciler.go index a4dd98eb6c..a0ad88b5c6 100644 --- a/internal/controller/flp/flp_monolith_reconciler.go +++ b/internal/controller/flp/flp_monolith_reconciler.go @@ -33,7 +33,6 @@ type monolithReconciler struct { rbConfigWatcher *rbacv1.RoleBinding rbHostNetwork *rbacv1.ClusterRoleBinding rbLokiWriter *rbacv1.ClusterRoleBinding - rbInformer *rbacv1.ClusterRoleBinding serviceMonitor *monitoringv1.ServiceMonitor prometheusRule *monitoringv1.PrometheusRule } @@ -51,7 +50,6 @@ func newMonolithReconciler(cmn *reconcilers.Instance) *monolithReconciler { rbConfigWatcher: cmn.Managed.NewRB(resources.GetRoleBindingName(monoShortName, constants.ConfigWatcherRole)), rbHostNetwork: cmn.Managed.NewCRB(resources.GetClusterRoleBindingName(monoShortName, constants.HostNetworkRole)), rbLokiWriter: cmn.Managed.NewCRB(resources.GetClusterRoleBindingName(monoShortName, constants.LokiWriterRole)), - rbInformer: cmn.Managed.NewCRB(resources.GetClusterRoleBindingName(monoShortName, constants.FLPInformersRole)), } if cmn.ClusterInfo.HasSvcMonitor() { rec.serviceMonitor = cmn.Managed.NewServiceMonitor(monoServiceMonitor) @@ -241,12 +239,6 @@ func (r *monolithReconciler) reconcilePermissions(ctx context.Context, builder * return r.CreateOwned(ctx, builder.serviceAccount()) } // We only configure name, update is not needed for now - // Informers - r.rbInformer = resources.GetClusterRoleBinding(r.Namespace, monoShortName, monoName, monoName, constants.FLPInformersRole) - if err := r.ReconcileClusterRoleBinding(ctx, r.rbInformer); err != nil { - return err - } - // Host network if r.ClusterInfo.IsOpenShift() && builder.desired.UseHostNetwork() { r.rbHostNetwork = resources.GetClusterRoleBinding(r.Namespace, monoShortName, monoName, monoName, constants.HostNetworkRole) diff --git a/internal/controller/flp/flp_transfo_reconciler.go b/internal/controller/flp/flp_transfo_reconciler.go index f15518af55..dcb8c9f6fa 100644 --- a/internal/controller/flp/flp_transfo_reconciler.go +++ b/internal/controller/flp/flp_transfo_reconciler.go @@ -32,7 +32,6 @@ type transformerReconciler struct { dynamicConfigMap *corev1.ConfigMap rbConfigWatcher *rbacv1.RoleBinding rbLokiWriter *rbacv1.ClusterRoleBinding - rbInformer *rbacv1.ClusterRoleBinding serviceMonitor *monitoringv1.ServiceMonitor prometheusRule *monitoringv1.PrometheusRule } @@ -48,7 +47,6 @@ func newTransformerReconciler(cmn *reconcilers.Instance) *transformerReconciler dynamicConfigMap: cmn.Managed.NewConfigMap(transfoDynConfigMap), rbConfigWatcher: cmn.Managed.NewRB(resources.GetRoleBindingName(transfoShortName, constants.ConfigWatcherRole)), rbLokiWriter: cmn.Managed.NewCRB(resources.GetClusterRoleBindingName(transfoShortName, constants.LokiWriterRole)), - rbInformer: cmn.Managed.NewCRB(resources.GetClusterRoleBindingName(transfoShortName, constants.FLPInformersRole)), } if cmn.ClusterInfo.HasSvcMonitor() { rec.serviceMonitor = cmn.Managed.NewServiceMonitor(transfoServiceMonitor) @@ -219,12 +217,6 @@ func (r *transformerReconciler) reconcilePermissions(ctx context.Context, builde return r.CreateOwned(ctx, builder.serviceAccount()) } // We only configure name, update is not needed for now - // Informers - r.rbInformer = resources.GetClusterRoleBinding(r.Namespace, transfoShortName, transfoName, transfoName, constants.FLPInformersRole) - if err := r.ReconcileClusterRoleBinding(ctx, r.rbInformer); err != nil { - return err - } - // Loki writer if builder.desired.UseLoki() && builder.desired.Loki.Mode == flowslatest.LokiModeLokiStack { r.rbLokiWriter = resources.GetClusterRoleBinding(r.Namespace, transfoShortName, transfoName, transfoName, constants.LokiWriterRole) diff --git a/internal/pkg/manager/status/status_manager.go b/internal/pkg/manager/status/status_manager.go index 56c222b052..d59371dea6 100644 --- a/internal/pkg/manager/status/status_manager.go +++ b/internal/pkg/manager/status/status_manager.go @@ -27,6 +27,7 @@ const ( EBPFAgents ComponentName = "EBPFAgents" WebConsole ComponentName = "WebConsole" FLPParent ComponentName = "FLPParent" + FLPInformers ComponentName = "FLPInformers" FLPMonolith ComponentName = "FLPMonolith" FLPTransformer ComponentName = "FLPTransformer" Monitoring ComponentName = "Monitoring" From e9332d75ed20fd037a0013ad47474924be146ee2 Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Thu, 9 Apr 2026 10:59:29 -0300 Subject: [PATCH 02/10] fix unused parameter --- internal/controller/flp/flp_informer_reconciler.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/controller/flp/flp_informer_reconciler.go b/internal/controller/flp/flp_informer_reconciler.go index 16f13621a5..a412d7e2b1 100644 --- a/internal/controller/flp/flp_informer_reconciler.go +++ b/internal/controller/flp/flp_informer_reconciler.go @@ -70,7 +70,7 @@ func (r *informerReconciler) reconcile(ctx context.Context, desired *flowslatest } // Reconcile RBAC - if err := r.reconcilePermissions(ctx, &builder); err != nil { + if err := r.reconcilePermissions(ctx); err != nil { return err } @@ -89,8 +89,7 @@ func (r *informerReconciler) reconcileServiceAccount(ctx context.Context, builde return nil } -func (r *informerReconciler) reconcilePermissions(ctx context.Context, builder *informerBuilder) error { - // Informers +func (r *informerReconciler) reconcilePermissions(ctx context.Context) error { r.rbInformer = resources.GetClusterRoleBinding(r.Namespace, informerShortName, informerName, informerName, constants.FLPInformersRole) if err := r.ReconcileClusterRoleBinding(ctx, r.rbInformer); err != nil { return err From 5960d2adb6f97c1e75fc02548fa849fd2ba32f3b Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Thu, 9 Apr 2026 15:28:53 -0300 Subject: [PATCH 03/10] fix bundle --- bundle.Dockerfile | 2 +- bundle/manifests/netobserv-operator.clusterserviceversion.yaml | 2 +- bundle/metadata/annotations.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bundle.Dockerfile b/bundle.Dockerfile index 0061406146..55fea91d3e 100644 --- a/bundle.Dockerfile +++ b/bundle.Dockerfile @@ -7,7 +7,7 @@ LABEL operators.operatorframework.io.bundle.metadata.v1=metadata/ LABEL operators.operatorframework.io.bundle.package.v1=netobserv-operator LABEL operators.operatorframework.io.bundle.channels.v1=latest,community LABEL operators.operatorframework.io.bundle.channel.default.v1=community -LABEL operators.operatorframework.io.metrics.builder=operator-sdk-v1.40.0 +LABEL operators.operatorframework.io.metrics.builder=operator-sdk-v1.42.0 LABEL operators.operatorframework.io.metrics.mediatype.v1=metrics+v1 LABEL operators.operatorframework.io.metrics.project_layout=go.kubebuilder.io/v4 diff --git a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml index 97049f01fd..190e598f09 100644 --- a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml +++ b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml @@ -96,7 +96,7 @@ metadata: operatorframework.io/initialization-resource: '{"apiVersion":"flows.netobserv.io/v1beta2", "kind":"FlowCollector","metadata":{"name":"cluster"},"spec": {}}' operatorframework.io/suggested-namespace: openshift-netobserv-operator - operators.operatorframework.io/builder: operator-sdk-v1.40.0 + operators.operatorframework.io/builder: operator-sdk-v1.42.0 operators.operatorframework.io/project_layout: go.kubebuilder.io/v4 repository: https://github.com/netobserv/netobserv-operator support: NetObserv team diff --git a/bundle/metadata/annotations.yaml b/bundle/metadata/annotations.yaml index 7d96f3b3b6..4aefd671e1 100644 --- a/bundle/metadata/annotations.yaml +++ b/bundle/metadata/annotations.yaml @@ -6,7 +6,7 @@ annotations: operators.operatorframework.io.bundle.package.v1: netobserv-operator operators.operatorframework.io.bundle.channels.v1: latest,community operators.operatorframework.io.bundle.channel.default.v1: community - operators.operatorframework.io.metrics.builder: operator-sdk-v1.40.0 + operators.operatorframework.io.metrics.builder: operator-sdk-v1.42.0 operators.operatorframework.io.metrics.mediatype.v1: metrics+v1 operators.operatorframework.io.metrics.project_layout: go.kubebuilder.io/v4 From d926d6c39f9f9f25db98a9471b575b636928c6a1 Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Fri, 10 Apr 2026 09:33:00 -0300 Subject: [PATCH 04/10] fix bundle --- bundle/manifests/flows.netobserv.io_flowcollectors.yaml | 2 +- bundle/manifests/flows.netobserv.io_flowcollectorslices.yaml | 2 +- bundle/manifests/flows.netobserv.io_flowmetrics.yaml | 2 +- config/crd/bases/flows.netobserv.io_flowcollectors.yaml | 2 +- config/crd/bases/flows.netobserv.io_flowcollectorslices.yaml | 2 +- config/crd/bases/flows.netobserv.io_flowmetrics.yaml | 2 +- helm/crds/flows.netobserv.io_flowcollectors.yaml | 2 +- helm/crds/flows.netobserv.io_flowcollectorslices.yaml | 2 +- helm/crds/flows.netobserv.io_flowmetrics.yaml | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml index d1938e86bb..5e49ae372a 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.20.0 creationTimestamp: null name: flowcollectors.flows.netobserv.io spec: diff --git a/bundle/manifests/flows.netobserv.io_flowcollectorslices.yaml b/bundle/manifests/flows.netobserv.io_flowcollectorslices.yaml index 0e57318c67..81704a1ed1 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectorslices.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectorslices.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.20.0 creationTimestamp: null name: flowcollectorslices.flows.netobserv.io spec: diff --git a/bundle/manifests/flows.netobserv.io_flowmetrics.yaml b/bundle/manifests/flows.netobserv.io_flowmetrics.yaml index b97bec0af5..9f028f12bf 100644 --- a/bundle/manifests/flows.netobserv.io_flowmetrics.yaml +++ b/bundle/manifests/flows.netobserv.io_flowmetrics.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.20.0 creationTimestamp: null name: flowmetrics.flows.netobserv.io spec: diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index fdbe2a46d5..cfc3410785 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.20.0 name: flowcollectors.flows.netobserv.io spec: group: flows.netobserv.io diff --git a/config/crd/bases/flows.netobserv.io_flowcollectorslices.yaml b/config/crd/bases/flows.netobserv.io_flowcollectorslices.yaml index d93ffa53d7..e241609ca3 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectorslices.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectorslices.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.20.0 name: flowcollectorslices.flows.netobserv.io spec: group: flows.netobserv.io diff --git a/config/crd/bases/flows.netobserv.io_flowmetrics.yaml b/config/crd/bases/flows.netobserv.io_flowmetrics.yaml index 024a0cd9f1..fb0e0aa83a 100644 --- a/config/crd/bases/flows.netobserv.io_flowmetrics.yaml +++ b/config/crd/bases/flows.netobserv.io_flowmetrics.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.20.0 name: flowmetrics.flows.netobserv.io spec: group: flows.netobserv.io diff --git a/helm/crds/flows.netobserv.io_flowcollectors.yaml b/helm/crds/flows.netobserv.io_flowcollectors.yaml index 7c19330ee1..0762c1ad99 100644 --- a/helm/crds/flows.netobserv.io_flowcollectors.yaml +++ b/helm/crds/flows.netobserv.io_flowcollectors.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.20.0 creationTimestamp: null name: flowcollectors.flows.netobserv.io spec: diff --git a/helm/crds/flows.netobserv.io_flowcollectorslices.yaml b/helm/crds/flows.netobserv.io_flowcollectorslices.yaml index 11cc67a21e..d476970227 100644 --- a/helm/crds/flows.netobserv.io_flowcollectorslices.yaml +++ b/helm/crds/flows.netobserv.io_flowcollectorslices.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.20.0 creationTimestamp: null name: flowcollectorslices.flows.netobserv.io spec: diff --git a/helm/crds/flows.netobserv.io_flowmetrics.yaml b/helm/crds/flows.netobserv.io_flowmetrics.yaml index 3865d3210c..db4f48473a 100644 --- a/helm/crds/flows.netobserv.io_flowmetrics.yaml +++ b/helm/crds/flows.netobserv.io_flowmetrics.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.20.0 creationTimestamp: null name: flowmetrics.flows.netobserv.io spec: From 41fccaec49627b5389094d2dc423a275eba8ac86 Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Fri, 10 Apr 2026 10:18:41 -0300 Subject: [PATCH 05/10] fix possible port conflict --- internal/controller/flp/flp_common_objects.go | 33 ++++++++++++++- .../controller/flp/flp_monolith_objects.go | 5 +++ internal/controller/flp/flp_test.go | 40 ++++++++++++++++++- .../controller/flp/flp_transfo_objects.go | 5 +++ 4 files changed, 80 insertions(+), 3 deletions(-) diff --git a/internal/controller/flp/flp_common_objects.go b/internal/controller/flp/flp_common_objects.go index ddd606ff97..53b085ed7c 100644 --- a/internal/controller/flp/flp_common_objects.go +++ b/internal/controller/flp/flp_common_objects.go @@ -27,6 +27,7 @@ const ( healthPortName = "health" prometheusPortName = "prometheus" profilePortName = "pprof" + k8scachePort = 9090 healthTimeoutSeconds = 5 livenessPeriodSeconds = 10 startupFailureThreshold = 5 @@ -94,6 +95,34 @@ const ( pull ) +// validatePortConflicts checks if any user-configured ports conflict with the hardcoded k8scache port +func validatePortConflicts(desired *flowslatest.FlowCollectorSpec) error { + advancedConfig := helper.GetAdvancedProcessorConfig(desired) + + // Check FLP port + if advancedConfig.Port != nil && *advancedConfig.Port == k8scachePort { + return fmt.Errorf("flowlogs-pipeline port %d conflicts with reserved k8scache port %d", *advancedConfig.Port, k8scachePort) + } + + // Check health port + if advancedConfig.HealthPort != nil && *advancedConfig.HealthPort == k8scachePort { + return fmt.Errorf("flowlogs-pipeline health port %d conflicts with reserved k8scache port %d", *advancedConfig.HealthPort, k8scachePort) + } + + // Check metrics port + metricsPort := desired.Processor.GetMetricsPort() + if metricsPort == k8scachePort { + return fmt.Errorf("flowlogs-pipeline metrics port %d conflicts with reserved k8scache port %d", metricsPort, k8scachePort) + } + + // Check profile port (optional) + if advancedConfig.ProfilePort != nil && *advancedConfig.ProfilePort == k8scachePort { + return fmt.Errorf("flowlogs-pipeline profile port %d conflicts with reserved k8scache port %d", *advancedConfig.ProfilePort, k8scachePort) + } + + return nil +} + func podTemplate( appName, version, imageName, cmName string, desired *flowslatest.FlowCollectorSpec, @@ -130,7 +159,7 @@ func podTemplate( }) ports = append(ports, corev1.ContainerPort{ Name: "k8scache", - ContainerPort: 9090, + ContainerPort: k8scachePort, Protocol: corev1.ProtocolTCP, }) @@ -173,7 +202,7 @@ func podTemplate( ImagePullPolicy: corev1.PullPolicy(desired.Processor.ImagePullPolicy), Args: []string{ fmt.Sprintf(`--config=%s/%s`, configPath, configFile), - "--k8scache.port=9090", + fmt.Sprintf("--k8scache.port=%d", k8scachePort), "--k8scache.address=0.0.0.0", }, Resources: *desired.Processor.Resources.DeepCopy(), diff --git a/internal/controller/flp/flp_monolith_objects.go b/internal/controller/flp/flp_monolith_objects.go index f6e2fbcc1e..04d158f845 100644 --- a/internal/controller/flp/flp_monolith_objects.go +++ b/internal/controller/flp/flp_monolith_objects.go @@ -37,6 +37,11 @@ type monolithBuilder struct { } func newMonolithBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec, flowMetrics *metricslatest.FlowMetricList, fcSlices []sliceslatest.FlowCollectorSlice, detectedSubnets []flowslatest.SubnetLabel) (monolithBuilder, error) { + // Validate port conflicts early + if err := validatePortConflicts(desired); err != nil { + return monolithBuilder{}, err + } + version := helper.ExtractVersion(info.Images[reconcilers.MainImage]) promTLS, err := getPromTLS(desired, constants.FLPMetricsSvcName) if err != nil { diff --git a/internal/controller/flp/flp_test.go b/internal/controller/flp/flp_test.go index 6ce8776cd2..3c1089cd13 100644 --- a/internal/controller/flp/flp_test.go +++ b/internal/controller/flp/flp_test.go @@ -52,7 +52,7 @@ func getConfig() flowslatest.FlowCollectorSpec { Resources: rs, Metrics: flowslatest.FLPMetrics{ Server: flowslatest.MetricsServerConfig{ - Port: ptr.To(int32(9090)), + Port: ptr.To(int32(9401)), TLS: flowslatest.ServerTLS{ Type: flowslatest.TLSDisabled, }, @@ -810,3 +810,41 @@ func TestToleration(t *testing.T) { assert.Len(ds.Spec.Template.Spec.Tolerations, 1) assert.Equal(corev1.Toleration{Operator: "Exists"}, ds.Spec.Template.Spec.Tolerations[0]) } + +func TestPortConflictValidation(t *testing.T) { + assert := assert.New(t) + info := reconcilers.Common{Namespace: "ns", ClusterInfo: &cluster.Info{}} + + // Test FLP port conflict + cfg := getConfig() + cfg.Processor.Advanced.Port = ptr.To(int32(9090)) + _, err := newMonolithBuilder(info.NewInstance(image, status.Instance{}), &cfg, &metricslatest.FlowMetricList{}, nil, nil) + assert.Error(err) + assert.Contains(err.Error(), "flowlogs-pipeline port 9090 conflicts with reserved k8scache port") + + // Test health port conflict + cfg = getConfig() + cfg.Processor.Advanced.HealthPort = ptr.To(int32(9090)) + _, err = newMonolithBuilder(info.NewInstance(image, status.Instance{}), &cfg, &metricslatest.FlowMetricList{}, nil, nil) + assert.Error(err) + assert.Contains(err.Error(), "flowlogs-pipeline health port 9090 conflicts with reserved k8scache port") + + // Test metrics port conflict + cfg = getConfig() + cfg.Processor.Metrics.Server.Port = ptr.To(int32(9090)) + _, err = newMonolithBuilder(info.NewInstance(image, status.Instance{}), &cfg, &metricslatest.FlowMetricList{}, nil, nil) + assert.Error(err) + assert.Contains(err.Error(), "flowlogs-pipeline metrics port 9090 conflicts with reserved k8scache port") + + // Test profile port conflict + cfg = getConfig() + cfg.Processor.Advanced.ProfilePort = ptr.To(int32(9090)) + _, err = newMonolithBuilder(info.NewInstance(image, status.Instance{}), &cfg, &metricslatest.FlowMetricList{}, nil, nil) + assert.Error(err) + assert.Contains(err.Error(), "flowlogs-pipeline profile port 9090 conflicts with reserved k8scache port") + + // Test valid configuration (no conflicts) + cfg = getConfig() + _, err = newMonolithBuilder(info.NewInstance(image, status.Instance{}), &cfg, &metricslatest.FlowMetricList{}, nil, nil) + assert.NoError(err) +} diff --git a/internal/controller/flp/flp_transfo_objects.go b/internal/controller/flp/flp_transfo_objects.go index 0b2a886b83..e9b865cee6 100644 --- a/internal/controller/flp/flp_transfo_objects.go +++ b/internal/controller/flp/flp_transfo_objects.go @@ -37,6 +37,11 @@ type transfoBuilder struct { } func newTransfoBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec, flowMetrics *metricslatest.FlowMetricList, fcSlices []sliceslatest.FlowCollectorSlice, detectedSubnets []flowslatest.SubnetLabel) (transfoBuilder, error) { + // Validate port conflicts early + if err := validatePortConflicts(desired); err != nil { + return transfoBuilder{}, err + } + version := helper.ExtractVersion(info.Images[reconcilers.MainImage]) promTLS, err := getPromTLS(desired, constants.FLPTransfoMetricsSvcName) if err != nil { From 29f8516ce53f83ab9122fb9ab8c22146c806848a Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Fri, 10 Apr 2026 10:22:26 -0300 Subject: [PATCH 06/10] add missing scc --- internal/controller/flp/flp_informer_builder.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/controller/flp/flp_informer_builder.go b/internal/controller/flp/flp_informer_builder.go index d76b3848bb..482639d1eb 100644 --- a/internal/controller/flp/flp_informer_builder.go +++ b/internal/controller/flp/flp_informer_builder.go @@ -67,6 +67,7 @@ func (b *informerBuilder) deployment() (*appsv1.Deployment, error) { corev1.ResourceCPU: resource.MustParse("200m"), }, }, + SecurityContext: helper.ContainerDefaultSecurityContext(), } return &appsv1.Deployment{ From 3d006ac14946350328c847a244f90902a8f945c5 Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Fri, 10 Apr 2026 10:26:08 -0300 Subject: [PATCH 07/10] improve error handling --- .../controller/flp/flp_informer_reconciler.go | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/internal/controller/flp/flp_informer_reconciler.go b/internal/controller/flp/flp_informer_reconciler.go index a412d7e2b1..30b0ef1cb3 100644 --- a/internal/controller/flp/flp_informer_reconciler.go +++ b/internal/controller/flp/flp_informer_reconciler.go @@ -2,6 +2,7 @@ package flp import ( "context" + "fmt" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -53,7 +54,7 @@ func (r *informerReconciler) reconcile(ctx context.Context, desired *flowslatest // Retrieve current owned objects err := r.Managed.FetchAll(ctx) if err != nil { - return err + return fmt.Errorf("failed to fetch all managed resources: %w", err) } if desired.Spec.OnHold() { @@ -66,17 +67,17 @@ func (r *informerReconciler) reconcile(ctx context.Context, desired *flowslatest // Reconcile ServiceAccount if err := r.reconcileServiceAccount(ctx, &builder); err != nil { - return err + return fmt.Errorf("failed to reconcile service account: %w", err) } // Reconcile RBAC if err := r.reconcilePermissions(ctx); err != nil { - return err + return fmt.Errorf("failed to reconcile permissions: %w", err) } // Reconcile Deployment if err := r.reconcileDeployment(ctx, &builder); err != nil { - return err + return fmt.Errorf("failed to reconcile deployment: %w", err) } return nil @@ -84,7 +85,9 @@ func (r *informerReconciler) reconcile(ctx context.Context, desired *flowslatest func (r *informerReconciler) reconcileServiceAccount(ctx context.Context, builder *informerBuilder) error { if !r.Managed.Exists(r.serviceAccount) { - return r.CreateOwned(ctx, builder.serviceAccount()) + if err := r.CreateOwned(ctx, builder.serviceAccount()); err != nil { + return fmt.Errorf("failed to create service account: %w", err) + } } // We only configure name, update is not needed for now return nil } @@ -92,7 +95,7 @@ func (r *informerReconciler) reconcileServiceAccount(ctx context.Context, builde func (r *informerReconciler) reconcilePermissions(ctx context.Context) error { r.rbInformer = resources.GetClusterRoleBinding(r.Namespace, informerShortName, informerName, informerName, constants.FLPInformersRole) if err := r.ReconcileClusterRoleBinding(ctx, r.rbInformer); err != nil { - return err + return fmt.Errorf("failed to reconcile cluster role binding: %w", err) } return nil } @@ -103,10 +106,10 @@ func (r *informerReconciler) reconcileDeployment(ctx context.Context, builder *i desiredDep, err := builder.deployment() if err != nil { - return err + return fmt.Errorf("failed to build deployment: %w", err) } - return reconcilers.ReconcileDeployment( + if err := reconcilers.ReconcileDeployment( ctx, r.Instance, r.deployment, @@ -114,5 +117,8 @@ func (r *informerReconciler) reconcileDeployment(ctx context.Context, builder *i informerName, false, &report, - ) + ); err != nil { + return fmt.Errorf("failed to reconcile deployment: %w", err) + } + return nil } From 2d230f0d15e737da3c301f0a12786cae1083aad8 Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Wed, 15 Apr 2026 13:34:15 -0300 Subject: [PATCH 08/10] feedback addressed --- .../v1beta2/flowcollector_types.go | 65 +++++ .../flowcollector_validation_webhook.go | 24 ++ .../v1beta2/zz_generated.deepcopy.go | 76 ++++++ .../flows.netobserv.io_flowcollectors.yaml | 127 ++++++++++ ...c.authorization.k8s.io_v1_clusterrole.yaml | 9 + ...observ-operator.clusterserviceversion.yaml | 28 +++ .../flows.netobserv.io_flowcollectors.yaml | 121 +++++++++ config/rbac/component_roles.yaml | 9 + config/rbac/role.yaml | 10 + .../samples/flows_v1beta2_flowcollector.yaml | 17 ++ docs/FlowCollector.md | 233 ++++++++++++++++++ .../flows.netobserv.io_flowcollectors.yaml | 121 +++++++++ helm/templates/clusterrole.yaml | 10 + ...c.authorization.k8s.io_v1_clusterrole.yaml | 9 + ...rization.k8s.io_v1_clusterrolebinding.yaml | 12 + .../controller/flp/flp_informer_builder.go | 138 +++++++++-- .../controller/flp/flp_informer_reconciler.go | 15 ++ .../controller/flp/flp_monolith_objects.go | 4 +- internal/pkg/manager/manager.go | 1 + 19 files changed, 1011 insertions(+), 18 deletions(-) create mode 100644 helm/templates/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrolebinding.yaml diff --git a/api/flowcollector/v1beta2/flowcollector_types.go b/api/flowcollector/v1beta2/flowcollector_types.go index 49f9ce3e73..4ab5876484 100644 --- a/api/flowcollector/v1beta2/flowcollector_types.go +++ b/api/flowcollector/v1beta2/flowcollector_types.go @@ -753,6 +753,12 @@ type FlowCollectorFLP struct { // +optional Service *ProcessorServiceConfig `json:"service,omitempty"` + // `informers` configuration for centralized Kubernetes informers that push cache updates to flowlogs-pipeline processors. + // This reduces load on the Kubernetes API server by having a single component (flp-informers) query the API instead of N FLP processors. + // When enabled, a dedicated `flp-informers` deployment is created that watches Kubernetes resources and pushes updates via gRPC. + // +optional + Informers *FlowCollectorInformers `json:"informers,omitempty"` + // `advanced` allows setting some aspects of the internal configuration of the flow processor. // This section is aimed mostly for debugging and fine-grained performance optimizations, // such as `GOGC` and `GOMAXPROCS` environment variables. Set these values at your own risk. @@ -760,6 +766,65 @@ type FlowCollectorFLP struct { Advanced *AdvancedProcessorConfig `json:"advanced,omitempty"` } +// `FlowCollectorInformers` defines the configuration for centralized Kubernetes informers +type FlowCollectorInformers struct { + // `enabled` controls whether to deploy centralized Kubernetes informers. + // When `true`, a dedicated `flp-informers` deployment watches K8s resources and pushes cache updates via gRPC to FLP processors. + // When `false`, each FLP processor uses local informers (previous behavior). + // +kubebuilder:default:=true + Enabled *bool `json:"enabled,omitempty"` + + // `replicas` defines the number of replicas for the flp-informers deployment. + // For high availability, a minimum of 2 replicas is required when `enabled` is `true`. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:default:=2 + Replicas *int32 `json:"replicas,omitempty"` + + // `resources` are the compute resources required by the informers container. + // For more information, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + // +kubebuilder:default:={requests:{memory:"128Mi",cpu:"50m"},limits:{memory:"256Mi",cpu:"200m"}} + // +optional + Resources corev1.ResourceRequirements `json:"resources,omitempty" protobuf:"bytes,8,opt,name=resources"` + + // `advanced` allows setting some technical parameters of the informers component. + // +optional + Advanced *AdvancedInformersConfig `json:"advanced,omitempty"` +} + +// `AdvancedInformersConfig` defines advanced configuration for the informers component +type AdvancedInformersConfig struct { + // `resyncInterval` defines the interval in seconds to rediscover processors and sync state. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:default:=60 + // +optional + ResyncInterval *int `json:"resyncInterval,omitempty"` + + // `batchSize` defines the maximum number of cache entries to send in a single update batch. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:default:=100 + // +optional + BatchSize *int `json:"batchSize,omitempty"` + + // `sendTimeout` defines the timeout in seconds for sending updates to processors. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:default:=10 + // +optional + SendTimeout *int `json:"sendTimeout,omitempty"` + + // `updateBufferSize` defines the size of the internal update channel buffer. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:default:=100 + // +optional + UpdateBufferSize *int `json:"updateBufferSize,omitempty"` + + // `processorPort` defines the gRPC port where flowlogs-pipeline processors listen for k8s cache updates. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:default:=9090 + // +optional + ProcessorPort *int32 `json:"processorPort,omitempty"` +} + type FLPDeduperMode string const ( diff --git a/api/flowcollector/v1beta2/flowcollector_validation_webhook.go b/api/flowcollector/v1beta2/flowcollector_validation_webhook.go index 26205d80d1..6dd23693fa 100644 --- a/api/flowcollector/v1beta2/flowcollector_validation_webhook.go +++ b/api/flowcollector/v1beta2/flowcollector_validation_webhook.go @@ -262,6 +262,7 @@ func (v *validator) validateFLP() { v.validateFLPMetricsForAlerts() v.validateFLPMetricsIncludeLists() v.validateFLPTLS() + v.validateInformers() } func (v *validator) validateScheduling() { @@ -466,6 +467,29 @@ func (v *validator) validateFLPTLS() { } } +func (v *validator) validateInformers() { + if v.fc.Processor.Informers == nil { + return + } + + // Check if enabled + enabled := v.fc.Processor.Informers.Enabled != nil && *v.fc.Processor.Informers.Enabled + + if enabled { + // When enabled, replicas must be at least 2 for high availability + replicas := int32(2) // default + if v.fc.Processor.Informers.Replicas != nil { + replicas = *v.fc.Processor.Informers.Replicas + } + if replicas < 2 { + v.errors = append( + v.errors, + fmt.Errorf("spec.processor.informers.replicas must be at least 2 when informers are enabled (got %d). Centralized informers require high availability to avoid losing the entire flow collection pipeline in case of failure", replicas), + ) + } + } +} + func GetFirstRequiredMetrics(anyRequired, actual []string) string { for _, m := range anyRequired { if slices.Contains(actual, m) { diff --git a/api/flowcollector/v1beta2/zz_generated.deepcopy.go b/api/flowcollector/v1beta2/zz_generated.deepcopy.go index 2f1d5aeb5e..5eafcd07da 100644 --- a/api/flowcollector/v1beta2/zz_generated.deepcopy.go +++ b/api/flowcollector/v1beta2/zz_generated.deepcopy.go @@ -59,6 +59,46 @@ func (in *AdvancedAgentConfig) DeepCopy() *AdvancedAgentConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdvancedInformersConfig) DeepCopyInto(out *AdvancedInformersConfig) { + *out = *in + if in.ResyncInterval != nil { + in, out := &in.ResyncInterval, &out.ResyncInterval + *out = new(int) + **out = **in + } + if in.BatchSize != nil { + in, out := &in.BatchSize, &out.BatchSize + *out = new(int) + **out = **in + } + if in.SendTimeout != nil { + in, out := &in.SendTimeout, &out.SendTimeout + *out = new(int) + **out = **in + } + if in.UpdateBufferSize != nil { + in, out := &in.UpdateBufferSize, &out.UpdateBufferSize + *out = new(int) + **out = **in + } + if in.ProcessorPort != nil { + in, out := &in.ProcessorPort, &out.ProcessorPort + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdvancedInformersConfig. +func (in *AdvancedInformersConfig) DeepCopy() *AdvancedInformersConfig { + if in == nil { + return nil + } + out := new(AdvancedInformersConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AdvancedLokiConfig) DeepCopyInto(out *AdvancedLokiConfig) { *out = *in @@ -837,6 +877,11 @@ func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) { *out = new(ProcessorServiceConfig) (*in).DeepCopyInto(*out) } + if in.Informers != nil { + in, out := &in.Informers, &out.Informers + *out = new(FlowCollectorInformers) + (*in).DeepCopyInto(*out) + } if in.Advanced != nil { in, out := &in.Advanced, &out.Advanced *out = new(AdvancedProcessorConfig) @@ -913,6 +958,37 @@ func (in *FlowCollectorIPFIXReceiver) DeepCopy() *FlowCollectorIPFIXReceiver { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlowCollectorInformers) DeepCopyInto(out *FlowCollectorInformers) { + *out = *in + if in.Enabled != nil { + in, out := &in.Enabled, &out.Enabled + *out = new(bool) + **out = **in + } + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } + in.Resources.DeepCopyInto(&out.Resources) + if in.Advanced != nil { + in, out := &in.Advanced, &out.Advanced + *out = new(AdvancedInformersConfig) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorInformers. +func (in *FlowCollectorInformers) DeepCopy() *FlowCollectorInformers { + if in == nil { + return nil + } + out := new(FlowCollectorInformers) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FlowCollectorIntegrationsStatus) DeepCopyInto(out *FlowCollectorIntegrationsStatus) { *out = *in diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml index 5e49ae372a..b1fe61f191 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml @@ -5529,6 +5529,133 @@ spec: - Always - Never type: string + informers: + description: |- + `informers` configuration for centralized Kubernetes informers that push cache updates to flowlogs-pipeline processors. + This reduces load on the Kubernetes API server by having a single component (flp-informers) query the API instead of N FLP processors. + When enabled, a dedicated `flp-informers` deployment is created that watches Kubernetes resources and pushes updates via gRPC. + properties: + advanced: + description: '`advanced` allows setting some technical parameters + of the informers component.' + properties: + batchSize: + default: 100 + description: '`batchSize` defines the maximum number of + cache entries to send in a single update batch.' + minimum: 1 + type: integer + processorPort: + default: 9090 + description: '`processorPort` defines the gRPC port where + flowlogs-pipeline processors listen for k8s cache updates.' + format: int32 + maximum: 65535 + minimum: 1 + type: integer + resyncInterval: + default: 60 + description: '`resyncInterval` defines the interval in + seconds to rediscover processors and sync state.' + minimum: 1 + type: integer + sendTimeout: + default: 10 + description: '`sendTimeout` defines the timeout in seconds + for sending updates to processors.' + minimum: 1 + type: integer + updateBufferSize: + default: 100 + description: '`updateBufferSize` defines the size of the + internal update channel buffer.' + minimum: 1 + type: integer + type: object + enabled: + default: true + description: |- + `enabled` controls whether to deploy centralized Kubernetes informers. + When `true`, a dedicated `flp-informers` deployment watches K8s resources and pushes cache updates via gRPC to FLP processors. + When `false`, each FLP processor uses local informers (previous behavior). + type: boolean + replicas: + default: 2 + description: |- + `replicas` defines the number of replicas for the flp-informers deployment. + For high availability, a minimum of 2 replicas is required when `enabled` is `true`. + format: int32 + minimum: 1 + type: integer + resources: + default: + limits: + cpu: 200m + memory: 256Mi + requests: + cpu: 50m + memory: 128Mi + description: |- + `resources` are the compute resources required by the informers container. + For more information, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + properties: + claims: + description: |- + Claims lists the names of resources, defined in spec.resourceClaims, + that are used by this container. + + This field depends on the + DynamicResourceAllocation feature gate. + + This field is immutable. It can only be set for containers. + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: |- + Name must match the name of one entry in pod.spec.resourceClaims of + the Pod where this field is used. It makes that resource available + inside a container. + type: string + request: + description: |- + Request is the name chosen for a request in the referenced claim. + If empty, everything from the claim is made available, otherwise + only the result of this request. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Limits describes the maximum amount of compute resources allowed. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Requests describes the minimum amount of compute resources required. + If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, + otherwise to an implementation-defined value. Requests cannot exceed Limits. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + type: object + type: object kafkaConsumerAutoscaler: description: |- `kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. diff --git a/bundle/manifests/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrole.yaml b/bundle/manifests/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrole.yaml index c763f52310..c207b8d568 100644 --- a/bundle/manifests/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrole.yaml +++ b/bundle/manifests/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrole.yaml @@ -32,3 +32,12 @@ rules: - get - list - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - list + - update diff --git a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml index 190e598f09..5762328409 100644 --- a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml +++ b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml @@ -466,6 +466,24 @@ spec: path: processor.deduper.sampling - displayName: Filters path: processor.filters + - displayName: Informers + path: processor.informers + - displayName: Advanced + path: processor.informers.advanced + - displayName: Batch size + path: processor.informers.advanced.batchSize + - displayName: Processor port + path: processor.informers.advanced.processorPort + - displayName: Resync interval + path: processor.informers.advanced.resyncInterval + - displayName: Send timeout + path: processor.informers.advanced.sendTimeout + - displayName: Update buffer size + path: processor.informers.advanced.updateBufferSize + - displayName: Enabled + path: processor.informers.enabled + - displayName: Replicas + path: processor.informers.replicas - displayName: Log types path: processor.logTypes - displayName: Additional include list @@ -822,6 +840,16 @@ spec: - patch - update - watch + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - list + - update + - watch - apiGroups: - discovery.k8s.io resources: diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index cfc3410785..e5e26048e5 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -5103,6 +5103,127 @@ spec: - Always - Never type: string + informers: + description: |- + `informers` configuration for centralized Kubernetes informers that push cache updates to flowlogs-pipeline processors. + This reduces load on the Kubernetes API server by having a single component (flp-informers) query the API instead of N FLP processors. + When enabled, a dedicated `flp-informers` deployment is created that watches Kubernetes resources and pushes updates via gRPC. + properties: + advanced: + description: '`advanced` allows setting some technical parameters of the informers component.' + properties: + batchSize: + default: 100 + description: '`batchSize` defines the maximum number of cache entries to send in a single update batch.' + minimum: 1 + type: integer + processorPort: + default: 9090 + description: '`processorPort` defines the gRPC port where flowlogs-pipeline processors listen for k8s cache updates.' + format: int32 + maximum: 65535 + minimum: 1 + type: integer + resyncInterval: + default: 60 + description: '`resyncInterval` defines the interval in seconds to rediscover processors and sync state.' + minimum: 1 + type: integer + sendTimeout: + default: 10 + description: '`sendTimeout` defines the timeout in seconds for sending updates to processors.' + minimum: 1 + type: integer + updateBufferSize: + default: 100 + description: '`updateBufferSize` defines the size of the internal update channel buffer.' + minimum: 1 + type: integer + type: object + enabled: + default: true + description: |- + `enabled` controls whether to deploy centralized Kubernetes informers. + When `true`, a dedicated `flp-informers` deployment watches K8s resources and pushes cache updates via gRPC to FLP processors. + When `false`, each FLP processor uses local informers (previous behavior). + type: boolean + replicas: + default: 2 + description: |- + `replicas` defines the number of replicas for the flp-informers deployment. + For high availability, a minimum of 2 replicas is required when `enabled` is `true`. + format: int32 + minimum: 1 + type: integer + resources: + default: + limits: + cpu: 200m + memory: 256Mi + requests: + cpu: 50m + memory: 128Mi + description: |- + `resources` are the compute resources required by the informers container. + For more information, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + properties: + claims: + description: |- + Claims lists the names of resources, defined in spec.resourceClaims, + that are used by this container. + + This field depends on the + DynamicResourceAllocation feature gate. + + This field is immutable. It can only be set for containers. + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: |- + Name must match the name of one entry in pod.spec.resourceClaims of + the Pod where this field is used. It makes that resource available + inside a container. + type: string + request: + description: |- + Request is the name chosen for a request in the referenced claim. + If empty, everything from the claim is made available, otherwise + only the result of this request. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Limits describes the maximum amount of compute resources allowed. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Requests describes the minimum amount of compute resources required. + If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, + otherwise to an implementation-defined value. Requests cannot exceed Limits. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + type: object + type: object kafkaConsumerAutoscaler: description: |- `kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. diff --git a/config/rbac/component_roles.yaml b/config/rbac/component_roles.yaml index d746a83138..7fee59f3ca 100644 --- a/config/rbac/component_roles.yaml +++ b/config/rbac/component_roles.yaml @@ -96,6 +96,15 @@ rules: - get - list - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - list + - update --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index ec3cf668d4..6412fca33a 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -135,6 +135,16 @@ rules: - patch - update - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - list + - update + - watch - apiGroups: - discovery.k8s.io resources: diff --git a/config/samples/flows_v1beta2_flowcollector.yaml b/config/samples/flows_v1beta2_flowcollector.yaml index 675b55974d..17c23c3746 100644 --- a/config/samples/flows_v1beta2_flowcollector.yaml +++ b/config/samples/flows_v1beta2_flowcollector.yaml @@ -90,6 +90,23 @@ spec: # customLabels: # - cidrs: [] # name: "" + # Centralized Kubernetes informers (enabled by default for reduced API server load) + # informers: + # enabled: true + # replicas: 2 # Minimum 2 for HA with leader election + # resources: + # requests: + # memory: 128Mi + # cpu: 50m + # limits: + # memory: 256Mi + # cpu: 200m + # advanced: + # resyncInterval: 60 # Seconds to rediscover processors + # batchSize: 100 # Max entries per update batch + # sendTimeout: 10 # Seconds timeout for sending updates + # updateBufferSize: 100 # Update channel buffer size + # processorPort: 9090 # gRPC port for k8s cache updates metrics: # server: # port: 9401 diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md index bd7a56c47d..2aaadf3e3e 100644 --- a/docs/FlowCollector.md +++ b/docs/FlowCollector.md @@ -8556,6 +8556,15 @@ but with a lesser improvement in performance.
Default: IfNotPresent
false + + informers + object + + `informers` configuration for centralized Kubernetes informers that push cache updates to flowlogs-pipeline processors. +This reduces load on the Kubernetes API server by having a single component (flp-informers) query the API instead of N FLP processors. +When enabled, a dedicated `flp-informers` deployment is created that watches Kubernetes resources and pushes updates via gRPC.
+ + false kafkaConsumerAutoscaler object @@ -10679,6 +10688,230 @@ Fields absent from the 'k8s.v1.cni.cncf.io/network-status' annotation must not b +### FlowCollector.spec.processor.informers +[↩ Parent](#flowcollectorspecprocessor) + + + +`informers` configuration for centralized Kubernetes informers that push cache updates to flowlogs-pipeline processors. +This reduces load on the Kubernetes API server by having a single component (flp-informers) query the API instead of N FLP processors. +When enabled, a dedicated `flp-informers` deployment is created that watches Kubernetes resources and pushes updates via gRPC. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
NameTypeDescriptionRequired
advancedobject + `advanced` allows setting some technical parameters of the informers component.
+
false
enabledboolean + `enabled` controls whether to deploy centralized Kubernetes informers. +When `true`, a dedicated `flp-informers` deployment watches K8s resources and pushes cache updates via gRPC to FLP processors. +When `false`, each FLP processor uses local informers (previous behavior).
+
+ Default: true
+
false
replicasinteger + `replicas` defines the number of replicas for the flp-informers deployment. +For high availability, a minimum of 2 replicas is required when `enabled` is `true`.
+
+ Format: int32
+ Default: 2
+ Minimum: 1
+
false
resourcesobject + `resources` are the compute resources required by the informers container. +For more information, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
+
+ Default: map[limits:map[cpu:200m memory:256Mi] requests:map[cpu:50m memory:128Mi]]
+
false
+ + +### FlowCollector.spec.processor.informers.advanced +[↩ Parent](#flowcollectorspecprocessorinformers) + + + +`advanced` allows setting some technical parameters of the informers component. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
NameTypeDescriptionRequired
batchSizeinteger + `batchSize` defines the maximum number of cache entries to send in a single update batch.
+
+ Default: 100
+ Minimum: 1
+
false
processorPortinteger + `processorPort` defines the gRPC port where flowlogs-pipeline processors listen for k8s cache updates.
+
+ Format: int32
+ Default: 9090
+ Minimum: 1
+ Maximum: 65535
+
false
resyncIntervalinteger + `resyncInterval` defines the interval in seconds to rediscover processors and sync state.
+
+ Default: 60
+ Minimum: 1
+
false
sendTimeoutinteger + `sendTimeout` defines the timeout in seconds for sending updates to processors.
+
+ Default: 10
+ Minimum: 1
+
false
updateBufferSizeinteger + `updateBufferSize` defines the size of the internal update channel buffer.
+
+ Default: 100
+ Minimum: 1
+
false
+ + +### FlowCollector.spec.processor.informers.resources +[↩ Parent](#flowcollectorspecprocessorinformers) + + + +`resources` are the compute resources required by the informers container. +For more information, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + + + + + + + + + + + + + + + + + + + + + + + + + + +
NameTypeDescriptionRequired
claims[]object + Claims lists the names of resources, defined in spec.resourceClaims, +that are used by this container. + +This field depends on the +DynamicResourceAllocation feature gate. + +This field is immutable. It can only be set for containers.
+
false
limitsmap[string]int or string + Limits describes the maximum amount of compute resources allowed. +More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
+
false
requestsmap[string]int or string + Requests describes the minimum amount of compute resources required. +If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, +otherwise to an implementation-defined value. Requests cannot exceed Limits. +More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
+
false
+ + +### FlowCollector.spec.processor.informers.resources.claims[index] +[↩ Parent](#flowcollectorspecprocessorinformersresources) + + + +ResourceClaim references one entry in PodSpec.ResourceClaims. + + + + + + + + + + + + + + + + + + + + + +
NameTypeDescriptionRequired
namestring + Name must match the name of one entry in pod.spec.resourceClaims of +the Pod where this field is used. It makes that resource available +inside a container.
+
true
requeststring + Request is the name chosen for a request in the referenced claim. +If empty, everything from the claim is made available, otherwise +only the result of this request.
+
false
+ + ### FlowCollector.spec.processor.kafkaConsumerAutoscaler [↩ Parent](#flowcollectorspecprocessor) diff --git a/helm/crds/flows.netobserv.io_flowcollectors.yaml b/helm/crds/flows.netobserv.io_flowcollectors.yaml index 0762c1ad99..a843d47736 100644 --- a/helm/crds/flows.netobserv.io_flowcollectors.yaml +++ b/helm/crds/flows.netobserv.io_flowcollectors.yaml @@ -5107,6 +5107,127 @@ spec: - Always - Never type: string + informers: + description: |- + `informers` configuration for centralized Kubernetes informers that push cache updates to flowlogs-pipeline processors. + This reduces load on the Kubernetes API server by having a single component (flp-informers) query the API instead of N FLP processors. + When enabled, a dedicated `flp-informers` deployment is created that watches Kubernetes resources and pushes updates via gRPC. + properties: + advanced: + description: '`advanced` allows setting some technical parameters of the informers component.' + properties: + batchSize: + default: 100 + description: '`batchSize` defines the maximum number of cache entries to send in a single update batch.' + minimum: 1 + type: integer + processorPort: + default: 9090 + description: '`processorPort` defines the gRPC port where flowlogs-pipeline processors listen for k8s cache updates.' + format: int32 + maximum: 65535 + minimum: 1 + type: integer + resyncInterval: + default: 60 + description: '`resyncInterval` defines the interval in seconds to rediscover processors and sync state.' + minimum: 1 + type: integer + sendTimeout: + default: 10 + description: '`sendTimeout` defines the timeout in seconds for sending updates to processors.' + minimum: 1 + type: integer + updateBufferSize: + default: 100 + description: '`updateBufferSize` defines the size of the internal update channel buffer.' + minimum: 1 + type: integer + type: object + enabled: + default: true + description: |- + `enabled` controls whether to deploy centralized Kubernetes informers. + When `true`, a dedicated `flp-informers` deployment watches K8s resources and pushes cache updates via gRPC to FLP processors. + When `false`, each FLP processor uses local informers (previous behavior). + type: boolean + replicas: + default: 2 + description: |- + `replicas` defines the number of replicas for the flp-informers deployment. + For high availability, a minimum of 2 replicas is required when `enabled` is `true`. + format: int32 + minimum: 1 + type: integer + resources: + default: + limits: + cpu: 200m + memory: 256Mi + requests: + cpu: 50m + memory: 128Mi + description: |- + `resources` are the compute resources required by the informers container. + For more information, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + properties: + claims: + description: |- + Claims lists the names of resources, defined in spec.resourceClaims, + that are used by this container. + + This field depends on the + DynamicResourceAllocation feature gate. + + This field is immutable. It can only be set for containers. + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: |- + Name must match the name of one entry in pod.spec.resourceClaims of + the Pod where this field is used. It makes that resource available + inside a container. + type: string + request: + description: |- + Request is the name chosen for a request in the referenced claim. + If empty, everything from the claim is made available, otherwise + only the result of this request. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Limits describes the maximum amount of compute resources allowed. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Requests describes the minimum amount of compute resources required. + If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, + otherwise to an implementation-defined value. Requests cannot exceed Limits. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + type: object + type: object kafkaConsumerAutoscaler: description: |- `kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. diff --git a/helm/templates/clusterrole.yaml b/helm/templates/clusterrole.yaml index d1c69c8e6f..5bee522316 100644 --- a/helm/templates/clusterrole.yaml +++ b/helm/templates/clusterrole.yaml @@ -134,6 +134,16 @@ rules: - patch - update - watch + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - list + - update + - watch - apiGroups: - discovery.k8s.io resources: diff --git a/helm/templates/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrole.yaml b/helm/templates/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrole.yaml index c763f52310..c207b8d568 100644 --- a/helm/templates/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrole.yaml +++ b/helm/templates/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrole.yaml @@ -32,3 +32,12 @@ rules: - get - list - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - list + - update diff --git a/helm/templates/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrolebinding.yaml b/helm/templates/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrolebinding.yaml new file mode 100644 index 0000000000..bb8a225f67 --- /dev/null +++ b/helm/templates/netobserv-informers_rbac.authorization.k8s.io_v1_clusterrolebinding.yaml @@ -0,0 +1,12 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: netobserv-informers-informers +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: netobserv-informers +subjects: + - kind: ServiceAccount + name: flp-informers + namespace: '{{ .Release.Namespace }}' diff --git a/internal/controller/flp/flp_informer_builder.go b/internal/controller/flp/flp_informer_builder.go index 482639d1eb..987c3bf8a8 100644 --- a/internal/controller/flp/flp_informer_builder.go +++ b/internal/controller/flp/flp_informer_builder.go @@ -7,6 +7,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" "github.com/netobserv/netobserv-operator/internal/controller/constants" @@ -27,7 +28,57 @@ func newInformerBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCol } func (b *informerBuilder) deployment() (*appsv1.Deployment, error) { - var replicas int32 = 1 + // Get configuration from FlowCollector spec + config := b.desired.Processor.Informers + if config == nil { + config = &flowslatest.FlowCollectorInformers{} + } + + // Replicas: default 2 for HA + replicas := int32(2) + if config.Replicas != nil { + replicas = *config.Replicas + } + + // Resources: use configured or defaults + resources := corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("128Mi"), + corev1.ResourceCPU: resource.MustParse("50m"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("256Mi"), + corev1.ResourceCPU: resource.MustParse("200m"), + }, + } + if config.Resources.Requests != nil || config.Resources.Limits != nil { + resources = *config.Resources.DeepCopy() + } + + // Advanced config with defaults + resyncInterval := 60 + batchSize := 100 + sendTimeout := 10 + updateBufferSize := 100 + processorPort := int32(k8scachePort) + if config.Advanced != nil { + if config.Advanced.ResyncInterval != nil { + resyncInterval = *config.Advanced.ResyncInterval + } + if config.Advanced.BatchSize != nil { + batchSize = *config.Advanced.BatchSize + } + if config.Advanced.SendTimeout != nil { + sendTimeout = *config.Advanced.SendTimeout + } + if config.Advanced.UpdateBufferSize != nil { + updateBufferSize = *config.Advanced.UpdateBufferSize + } + if config.Advanced.ProcessorPort != nil { + processorPort = *config.Advanced.ProcessorPort + } + } + version := helper.MaxLabelLength(helper.ExtractVersion(b.Images[reconcilers.MainImage])) // Determine the correct processor selector based on deployment model @@ -36,17 +87,69 @@ func (b *informerBuilder) deployment() (*appsv1.Deployment, error) { processorSelector = "app=flowlogs-pipeline-transformer" } + // Build container args + args := []string{ + fmt.Sprintf("--processor-selector=%s", processorSelector), + fmt.Sprintf("--processor-port=%d", processorPort), + fmt.Sprintf("--resync-interval=%d", resyncInterval), + fmt.Sprintf("--batch-size=%d", batchSize), + fmt.Sprintf("--send-timeout=%d", sendTimeout), + fmt.Sprintf("--update-buffer-size=%d", updateBufferSize), + fmt.Sprintf("--log-level=%s", b.desired.Processor.LogLevel), + } + + // Define container ports + ports := []corev1.ContainerPort{ + { + Name: "grpc", + ContainerPort: processorPort, + Protocol: corev1.ProtocolTCP, + }, + { + Name: "health", + ContainerPort: 8080, + Protocol: corev1.ProtocolTCP, + }, + { + Name: "metrics", + ContainerPort: 9091, + Protocol: corev1.ProtocolTCP, + }, + } + + // Health probes - using HTTP endpoints + livenessProbe := &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.FromInt(8080), + }, + }, + InitialDelaySeconds: 10, + PeriodSeconds: 10, + TimeoutSeconds: 5, + FailureThreshold: 3, + } + + readinessProbe := &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/ready", + Port: intstr.FromInt(8080), + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 5, + TimeoutSeconds: 3, + FailureThreshold: 2, + } + container := corev1.Container{ Name: informerName, Image: b.Images[reconcilers.MainImage], ImagePullPolicy: corev1.PullPolicy(b.desired.Processor.ImagePullPolicy), Command: []string{"/app/flp-informers"}, - Args: []string{ - fmt.Sprintf("--processor-selector=%s", processorSelector), - "--processor-port=9090", - "--resync-interval=60", - fmt.Sprintf("--log-level=%s", b.desired.Processor.LogLevel), - }, + Args: args, Env: []corev1.EnvVar{ { Name: "POD_NAMESPACE", @@ -56,17 +159,19 @@ func (b *informerBuilder) deployment() (*appsv1.Deployment, error) { }, }, }, - }, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceMemory: resource.MustParse("128Mi"), - corev1.ResourceCPU: resource.MustParse("50m"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceMemory: resource.MustParse("256Mi"), - corev1.ResourceCPU: resource.MustParse("200m"), + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, }, }, + Ports: ports, + Resources: resources, + LivenessProbe: livenessProbe, + ReadinessProbe: readinessProbe, SecurityContext: helper.ContainerDefaultSecurityContext(), } @@ -90,6 +195,7 @@ func (b *informerBuilder) deployment() (*appsv1.Deployment, error) { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ + "part-of": constants.OperatorName, "app": informerName, "version": version, }, diff --git a/internal/controller/flp/flp_informer_reconciler.go b/internal/controller/flp/flp_informer_reconciler.go index 30b0ef1cb3..b86ec888ba 100644 --- a/internal/controller/flp/flp_informer_reconciler.go +++ b/internal/controller/flp/flp_informer_reconciler.go @@ -63,6 +63,20 @@ func (r *informerReconciler) reconcile(ctx context.Context, desired *flowslatest return nil } + // Check if informers are enabled (default: true) + enabled := true + if desired.Spec.Processor.Informers != nil && desired.Spec.Processor.Informers.Enabled != nil { + enabled = *desired.Spec.Processor.Informers.Enabled + } + + if !enabled { + // Informers disabled - cleanup resources and use local informers mode + r.Status.SetUnused("Centralized informers disabled - using local informers mode") + r.Managed.TryDeleteAll(ctx) + return nil + } + + // Informers enabled - proceed with reconciliation builder := newInformerBuilder(r.Instance, &desired.Spec) // Reconcile ServiceAccount @@ -80,6 +94,7 @@ func (r *informerReconciler) reconcile(ctx context.Context, desired *flowslatest return fmt.Errorf("failed to reconcile deployment: %w", err) } + r.Status.SetReady() return nil } diff --git a/internal/controller/flp/flp_monolith_objects.go b/internal/controller/flp/flp_monolith_objects.go index 04d158f845..01e0a1c93d 100644 --- a/internal/controller/flp/flp_monolith_objects.go +++ b/internal/controller/flp/flp_monolith_objects.go @@ -182,9 +182,9 @@ func (b *monolithBuilder) service() *corev1.Service { }, { Name: "k8scache", - Port: 9090, + Port: k8scachePort, Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt(9090), + TargetPort: intstr.FromInt(k8scachePort), }, }, }, diff --git a/internal/pkg/manager/manager.go b/internal/pkg/manager/manager.go index 052e0aaf67..76f03f6fd2 100644 --- a/internal/pkg/manager/manager.go +++ b/internal/pkg/manager/manager.go @@ -27,6 +27,7 @@ import ( //+kubebuilder:rbac:groups=apps,resources=deployments;daemonsets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=replicasets,verbs=get;list;watch //+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings;rolebindings,verbs=get;list;create;delete;update;watch +//+kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update //+kubebuilder:rbac:groups=console.openshift.io,resources=consoleplugins,verbs=get;create;delete;update;patch;list;watch //+kubebuilder:rbac:groups=operator.openshift.io,resources=consoles,verbs=get;list;update;watch //+kubebuilder:rbac:groups=operator.openshift.io,resources=networks,verbs=get;list;watch From 5b7d3e298f2e0895b270be385d688debc0795513 Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Thu, 16 Apr 2026 19:45:02 -0300 Subject: [PATCH 09/10] fix linter --- internal/pkg/manager/status/status_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/manager/status/status_manager.go b/internal/pkg/manager/status/status_manager.go index d59371dea6..8fc7fa23c9 100644 --- a/internal/pkg/manager/status/status_manager.go +++ b/internal/pkg/manager/status/status_manager.go @@ -173,7 +173,7 @@ func (s *Manager) populateComponentStatuses(fc *flowslatest.FlowCollector, prevC switch cs.Name { case EBPFAgents: fc.Status.Components.Agent = cs.toCRDStatus() - case FLPParent, FLPMonolith, FLPTransformer: + case FLPParent, FLPMonolith, FLPTransformer, FLPInformers: fc.Status.Components.Processor = mergeProcessorStatus(fc.Status.Components.Processor, &cs) case WebConsole: fc.Status.Components.Plugin = cs.toCRDStatus() From d1de54ce96ae036380af7ee1a5ee3f553518de62 Mon Sep 17 00:00:00 2001 From: Leandro Beretta Date: Tue, 28 Apr 2026 10:22:59 -0300 Subject: [PATCH 10/10] fix inf loop --- internal/controller/flp/flp_informer_builder.go | 6 ++++-- internal/controller/flp/flp_pipeline_builder_test.go | 8 ++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/controller/flp/flp_informer_builder.go b/internal/controller/flp/flp_informer_builder.go index 987c3bf8a8..353685096f 100644 --- a/internal/controller/flp/flp_informer_builder.go +++ b/internal/controller/flp/flp_informer_builder.go @@ -155,7 +155,8 @@ func (b *informerBuilder) deployment() (*appsv1.Deployment, error) { Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", + APIVersion: "v1", + FieldPath: "metadata.namespace", }, }, }, @@ -163,7 +164,8 @@ func (b *informerBuilder) deployment() (*appsv1.Deployment, error) { Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", + APIVersion: "v1", + FieldPath: "metadata.name", }, }, }, diff --git a/internal/controller/flp/flp_pipeline_builder_test.go b/internal/controller/flp/flp_pipeline_builder_test.go index 90f7c58e6c..41035da062 100644 --- a/internal/controller/flp/flp_pipeline_builder_test.go +++ b/internal/controller/flp/flp_pipeline_builder_test.go @@ -270,10 +270,10 @@ func TestMergeMetricsConfiguration_WithAdditionalList_Dedup(t *testing.T) { cfg := getConfig() cfg.Processor.Metrics.AdditionalIncludeList = &[]flowslatest.FLPMetric{ - "node_ingress_bytes_total", // Already in defaults - "namespace_egress_bytes_total", // New metric - "namespace_egress_bytes_total", // Duplicate in additional - "node_ingress_bytes_total", // Duplicate overlap with default + "node_ingress_bytes_total", // Already in defaults + "namespace_egress_bytes_total", // New metric + "namespace_egress_bytes_total", // Duplicate in additional + "node_ingress_bytes_total", // Duplicate overlap with default } names, cfs := buildAndGetMetricNames(t, &cfg)