Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions api/flowcollector/v1beta2/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,13 +753,78 @@ type FlowCollectorFLP struct {
// +optional
Service *ProcessorServiceConfig `json:"service,omitempty"`

// `informers` configuration for centralized Kubernetes informers that push cache updates to flowlogs-pipeline processors.
// This reduces load on the Kubernetes API server by having a single component (flp-informers) query the API instead of N FLP processors.
// When enabled, a dedicated `flp-informers` deployment is created that watches Kubernetes resources and pushes updates via gRPC.
// +optional
Informers *FlowCollectorInformers `json:"informers,omitempty"`

// `advanced` allows setting some aspects of the internal configuration of the flow processor.
// This section is aimed mostly for debugging and fine-grained performance optimizations,
// such as `GOGC` and `GOMAXPROCS` environment variables. Set these values at your own risk.
// +optional
Advanced *AdvancedProcessorConfig `json:"advanced,omitempty"`
}

// `FlowCollectorInformers` defines the configuration for centralized Kubernetes informers
type FlowCollectorInformers struct {
// `enabled` controls whether to deploy centralized Kubernetes informers.
// When `true`, a dedicated `flp-informers` deployment watches K8s resources and pushes cache updates via gRPC to FLP processors.
// When `false`, each FLP processor uses local informers (previous behavior).
// +kubebuilder:default:=true
Enabled *bool `json:"enabled,omitempty"`

// `replicas` defines the number of replicas for the flp-informers deployment.
// For high availability, a minimum of 2 replicas is required when `enabled` is `true`.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default:=2
Replicas *int32 `json:"replicas,omitempty"`

// `resources` are the compute resources required by the informers container.
// For more information, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
// +kubebuilder:default:={requests:{memory:"128Mi",cpu:"50m"},limits:{memory:"256Mi",cpu:"200m"}}
// +optional
Resources corev1.ResourceRequirements `json:"resources,omitempty" protobuf:"bytes,8,opt,name=resources"`

// `advanced` allows setting some technical parameters of the informers component.
// +optional
Advanced *AdvancedInformersConfig `json:"advanced,omitempty"`
}

// `AdvancedInformersConfig` defines advanced configuration for the informers component
type AdvancedInformersConfig struct {
// `resyncInterval` defines the interval in seconds to rediscover processors and sync state.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default:=60
// +optional
ResyncInterval *int `json:"resyncInterval,omitempty"`

// `batchSize` defines the maximum number of cache entries to send in a single update batch.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default:=100
// +optional
BatchSize *int `json:"batchSize,omitempty"`

// `sendTimeout` defines the timeout in seconds for sending updates to processors.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default:=10
// +optional
SendTimeout *int `json:"sendTimeout,omitempty"`

// `updateBufferSize` defines the size of the internal update channel buffer.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default:=100
// +optional
UpdateBufferSize *int `json:"updateBufferSize,omitempty"`

// `processorPort` defines the gRPC port where flowlogs-pipeline processors listen for k8s cache updates.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=65535
// +kubebuilder:default:=9090
// +optional
ProcessorPort *int32 `json:"processorPort,omitempty"`
}

type FLPDeduperMode string

const (
Expand Down
24 changes: 24 additions & 0 deletions api/flowcollector/v1beta2/flowcollector_validation_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (v *validator) validateFLP() {
v.validateFLPMetricsForAlerts()
v.validateFLPMetricsIncludeLists()
v.validateFLPTLS()
v.validateInformers()
}

func (v *validator) validateScheduling() {
Expand Down Expand Up @@ -466,6 +467,29 @@ func (v *validator) validateFLPTLS() {
}
}

func (v *validator) validateInformers() {
if v.fc.Processor.Informers == nil {
return
}

// Check if enabled
enabled := v.fc.Processor.Informers.Enabled != nil && *v.fc.Processor.Informers.Enabled

if enabled {
// When enabled, replicas must be at least 2 for high availability
replicas := int32(2) // default
if v.fc.Processor.Informers.Replicas != nil {
replicas = *v.fc.Processor.Informers.Replicas
}
if replicas < 2 {
v.errors = append(
v.errors,
fmt.Errorf("spec.processor.informers.replicas must be at least 2 when informers are enabled (got %d). Centralized informers require high availability to avoid losing the entire flow collection pipeline in case of failure", replicas),
)
}
}
}
Comment on lines +470 to +491
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Default-enabled nil check may skip validation.

Enabled has kubebuilder default true, but this code treats nil as disabled. If the webhook runs before CRD defaulting (or on an object that bypassed defaulting), a user with informers: {} sneaks past the replica check while effectively running enabled. Consider treating nil as true to match the declared default:

🔧 Suggested tweak
-	// Check if enabled
-	enabled := v.fc.Processor.Informers.Enabled != nil && *v.fc.Processor.Informers.Enabled
+	// Check if enabled (defaults to true per CRD)
+	enabled := v.fc.Processor.Informers.Enabled == nil || *v.fc.Processor.Informers.Enabled

Also note: the CRD declares +kubebuilder:validation:Minimum=1 on Replicas, but this webhook enforces >=2 when enabled. Worth aligning the docstring on Replicas or the CRD marker so users aren't surprised by a "1 is valid here, but not really" situation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/flowcollector/v1beta2/flowcollector_validation_webhook.go` around lines
460 - 481, In validateInformers(), treat a nil Enabled pointer as true (respect
the kubebuilder default) by changing the enabled calculation to consider nil as
enabled instead of disabled (refer to v.fc.Processor.Informers.Enabled and the
validateInformers function); keep the replicas >=2 enforcement when enabled but
update the error message or coordinate with the CRD (Replicas'
+kubebuilder:validation:Minimum) so documentation/CRD marker and the webhook
behavior are aligned (either relax webhook to match CRD or change CRD/marker to
indicate minimum 2).


func GetFirstRequiredMetrics(anyRequired, actual []string) string {
for _, m := range anyRequired {
if slices.Contains(actual, m) {
Expand Down
76 changes: 76 additions & 0 deletions api/flowcollector/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

127 changes: 127 additions & 0 deletions bundle/manifests/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5529,6 +5529,133 @@ spec:
- Always
- Never
type: string
informers:
description: |-
`informers` configuration for centralized Kubernetes informers that push cache updates to flowlogs-pipeline processors.
This reduces load on the Kubernetes API server by having a single component (flp-informers) query the API instead of N FLP processors.
When enabled, a dedicated `flp-informers` deployment is created that watches Kubernetes resources and pushes updates via gRPC.
properties:
advanced:
description: '`advanced` allows setting some technical parameters
of the informers component.'
properties:
batchSize:
default: 100
description: '`batchSize` defines the maximum number of
cache entries to send in a single update batch.'
minimum: 1
type: integer
processorPort:
default: 9090
description: '`processorPort` defines the gRPC port where
flowlogs-pipeline processors listen for k8s cache updates.'
format: int32
maximum: 65535
minimum: 1
type: integer
resyncInterval:
default: 60
description: '`resyncInterval` defines the interval in
seconds to rediscover processors and sync state.'
minimum: 1
type: integer
sendTimeout:
default: 10
description: '`sendTimeout` defines the timeout in seconds
for sending updates to processors.'
minimum: 1
type: integer
updateBufferSize:
default: 100
description: '`updateBufferSize` defines the size of the
internal update channel buffer.'
minimum: 1
type: integer
type: object
enabled:
default: true
description: |-
`enabled` controls whether to deploy centralized Kubernetes informers.
When `true`, a dedicated `flp-informers` deployment watches K8s resources and pushes cache updates via gRPC to FLP processors.
When `false`, each FLP processor uses local informers (previous behavior).
type: boolean
replicas:
default: 2
description: |-
`replicas` defines the number of replicas for the flp-informers deployment.
For high availability, a minimum of 2 replicas is required when `enabled` is `true`.
format: int32
minimum: 1
type: integer
Comment on lines +5582 to +5589
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Enforce the HA replica rule in schema (or relax the wording).

Line 5556 says a minimum of 2 replicas is required when enabled, but the schema only enforces minimum: 1 (Line 5558) with no conditional rule. enabled: true + replicas: 1 would still validate.

Suggested CRD fix
                   informers:
@@
                     properties:
@@
                       replicas:
                         default: 2
                         description: |-
                           `replicas` defines the number of replicas for the flp-informers deployment.
                           For high availability, a minimum of 2 replicas is required when `enabled` is `true`.
                         format: int32
                         minimum: 1
                         type: integer
@@
-                    type: object
+                    type: object
+                    x-kubernetes-validations:
+                    - message: replicas must be at least 2 when informers are enabled
+                      rule: self.enabled != true || !has(self.replicas) || self.replicas >= 2
#!/bin/bash
set -euo pipefail

# Verify whether webhook already enforces "enabled => replicas >= 2"
fd -i 'flowcollector_validation_webhook.go' -x sh -c '
  echo "==> $1"
  rg -n "informers|replicas|enabled|at least 2|>=\\s*2" "$1"
' sh {}

# Verify current CRD validation presence for informers replicas
fd -i 'flows.netobserv.io_flowcollectors.yaml' -x sh -c '
  echo "==> $1"
  rg -n "informers:|replicas:|x-kubernetes-validations|enabled:" "$1"
' sh {}

Also applies to: 5628-5628

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@bundle/manifests/flows.netobserv.io_flowcollectors.yaml` around lines 5552 -
5559, The YAML's `replicas` schema currently sets `minimum: 1` but the comment
requires `replicas >= 2` when `enabled` is true; update the CRD validation to
enforce that conditional rule (or relax the comment). Add an
x-kubernetes-validations rule for the relevant `informers`/`flp-informers`
section that asserts "if .enabled == true then .replicas >= 2" (using the CRD
validation expression language), or alternatively change `minimum: 1` to
`minimum: 2` and adjust the description; target the `replicas` and `enabled`
fields in the same schema block so the validation is applied correctly.

resources:
default:
limits:
cpu: 200m
memory: 256Mi
requests:
cpu: 50m
memory: 128Mi
description: |-
`resources` are the compute resources required by the informers container.
For more information, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
properties:
claims:
description: |-
Claims lists the names of resources, defined in spec.resourceClaims,
that are used by this container.

This field depends on the
DynamicResourceAllocation feature gate.

This field is immutable. It can only be set for containers.
items:
description: ResourceClaim references one entry in PodSpec.ResourceClaims.
properties:
name:
description: |-
Name must match the name of one entry in pod.spec.resourceClaims of
the Pod where this field is used. It makes that resource available
inside a container.
type: string
request:
description: |-
Request is the name chosen for a request in the referenced claim.
If empty, everything from the claim is made available, otherwise
only the result of this request.
type: string
required:
- name
type: object
type: array
x-kubernetes-list-map-keys:
- name
x-kubernetes-list-type: map
limits:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: |-
Limits describes the maximum amount of compute resources allowed.
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
type: object
requests:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: |-
Requests describes the minimum amount of compute resources required.
If Requests is omitted for a container, it defaults to Limits if that is explicitly specified,
otherwise to an implementation-defined value. Requests cannot exceed Limits.
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
type: object
type: object
type: object
kafkaConsumerAutoscaler:
description: |-
`kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,12 @@ rules:
- get
- list
- watch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- get
- list
- update
Loading
Loading