diff --git a/internal/controller/flp/flp_controller.go b/internal/controller/flp/flp_controller.go index 2f435cf774..032b2dfaca 100644 --- a/internal/controller/flp/flp_controller.go +++ b/internal/controller/flp/flp_controller.go @@ -119,7 +119,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result return ctrl.Result{}, err } - r.status.SetReady() + if fc.Spec.OnHold() { + r.status.SetUnused("FlowCollector is on hold") + } else { + r.status.SetReady() + } if r.mgr.Status.NeedsRequeue() { return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } diff --git a/internal/pkg/manager/status/status_manager.go b/internal/pkg/manager/status/status_manager.go index e0754c3b57..56c222b052 100644 --- a/internal/pkg/manager/status/status_manager.go +++ b/internal/pkg/manager/status/status_manager.go @@ -158,8 +158,12 @@ func (s *Manager) getConditions() []metav1.Condition { } // populateComponentStatuses maps internal ComponentStatus instances to the CRD status fields. -// Always start fresh to avoid stale data from a previous API server fetch influencing the merge. -func (s *Manager) populateComponentStatuses(fc *flowslatest.FlowCollector) { +// Integrations are always rebuilt from the in-memory map. Component fields are rebuilt from the map, +// then agent/plugin may be merged from prevComponents (see preserveAgentOrPluginSnapshot). +// prevComponents is the FlowCollector's status.components from the API before this update; when +// another controller commits while the legacy reconciler is between ForComponent (placeholder Unknown) +// and the real status, we keep agent/plugin from prev so users do not see transient Unknown. +func (s *Manager) populateComponentStatuses(fc *flowslatest.FlowCollector, prevComponents *flowslatest.FlowCollectorComponentsStatus) { fc.Status.Components = &flowslatest.FlowCollectorComponentsStatus{} fc.Status.Integrations = &flowslatest.FlowCollectorIntegrationsStatus{} @@ -194,6 +198,26 @@ func (s *Manager) populateComponentStatuses(fc *flowslatest.FlowCollector) { return true }) fc.Status.Integrations.Exporters = exporters + + if prevComponents != nil { + fc.Status.Components.Agent = preserveAgentOrPluginSnapshot(fc.Status.Components.Agent, prevComponents.Agent) + fc.Status.Components.Plugin = preserveAgentOrPluginSnapshot(fc.Status.Components.Plugin, prevComponents.Plugin) + } +} + +// preserveAgentOrPluginSnapshot avoids publishing a transient Unknown written by ForComponent before +// the owning controller finishes, or losing agent/plugin when their keys are absent from the map. +func preserveAgentOrPluginSnapshot(cur, prev *flowslatest.FlowCollectorComponentStatus) *flowslatest.FlowCollectorComponentStatus { + if prev == nil { + return cur + } + if cur == nil { + return prev + } + if cur.State == string(StatusUnknown) && cur.Reason == "" && cur.Message == "" { + return prev + } + return cur } // mergeProcessorStatus handles FLP processor status aggregation from parent, monolith, and transformer. @@ -288,11 +312,18 @@ func (s *Manager) updateStatus(ctx context.Context, c client.Client) { conditions = append(conditions, checkValidation(ctx, &fc)) if kafkaCond := s.GetKafkaCondition(); kafkaCond != nil { conditions = append(conditions, *kafkaCond) + } else if ts := s.getStatus(FLPTransformer); ts == nil || ts.Status != StatusUnknown { + // Skip removal while FLPTransformer is placeholder Unknown (GetKafkaCondition is nil then too). + meta.RemoveStatusCondition(&fc.Status.Conditions, "KafkaReady") } for _, cond := range conditions { meta.SetStatusCondition(&fc.Status.Conditions, cond) } - s.populateComponentStatuses(&fc) + var prevComponents *flowslatest.FlowCollectorComponentsStatus + if fc.Status.Components != nil { + prevComponents = fc.Status.Components.DeepCopy() + } + s.populateComponentStatuses(&fc, prevComponents) if err := c.Status().Update(ctx, &fc); err != nil { return err } @@ -395,8 +426,9 @@ func (s *Manager) GetKafkaCondition() *metav1.Condition { Message: "Kafka transformer is rolling out", } case StatusUnknown, StatusUnused, StatusFailure, StatusDegraded: - // Failure/Degraded already handled above via hasKafkaIssue; - // Unknown/Unused mean Kafka mode is not active. + // Failure/Degraded already handled above via hasKafkaIssue. + // Unused means transformer not used (e.g. direct mode). Unknown is either not yet + // reconciled (ForComponent placeholder) or indeterminate — no KafkaReady row in either case. } } diff --git a/internal/pkg/manager/status/status_manager_test.go b/internal/pkg/manager/status/status_manager_test.go index 3d0e1216c9..c423924e47 100644 --- a/internal/pkg/manager/status/status_manager_test.go +++ b/internal/pkg/manager/status/status_manager_test.go @@ -95,7 +95,7 @@ func TestUnusedStatusInCRD(t *testing.T) { assert.Equal(t, StatusUnused, cs.Status) fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) require.NotNil(t, fc.Status.Components) require.NotNil(t, fc.Status.Components.Agent) assert.Equal(t, "Unused", fc.Status.Components.Agent.State) @@ -103,6 +103,33 @@ func TestUnusedStatusInCRD(t *testing.T) { assert.Equal(t, "FlowCollector is on hold", fc.Status.Components.Agent.Message) } +func TestPopulatePreservesAgentPluginAgainstPlaceholderUnknown(t *testing.T) { + s := NewManager() + _ = s.ForComponent(EBPFAgents) + _ = s.ForComponent(WebConsole) + + prev := &flowslatest.FlowCollectorComponentsStatus{ + Agent: &flowslatest.FlowCollectorComponentStatus{ + State: "Unused", + Reason: "ComponentUnused", + Message: "FlowCollector is on hold", + }, + Plugin: &flowslatest.FlowCollectorComponentStatus{ + State: "Unused", + Reason: "ComponentUnused", + Message: "FlowCollector is on hold", + }, + } + + fc := &flowslatest.FlowCollector{} + s.populateComponentStatuses(fc, prev) + + require.NotNil(t, fc.Status.Components.Agent) + assert.Equal(t, "Unused", fc.Status.Components.Agent.State) + require.NotNil(t, fc.Status.Components.Plugin) + assert.Equal(t, "Unused", fc.Status.Components.Plugin.State) +} + func TestReplicaCounts(t *testing.T) { s := NewManager() agent := s.ForComponent(EBPFAgents) @@ -366,7 +393,7 @@ func TestPopulateComponentStatuses(t *testing.T) { monitoring.SetFailure("DashboardError", "dashboard CM missing") fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) require.NotNil(t, fc.Status.Components) require.NotNil(t, fc.Status.Components.Agent) @@ -405,7 +432,7 @@ func TestPopulateProcessorAggregation(t *testing.T) { }) fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) require.NotNil(t, fc.Status.Components) require.NotNil(t, fc.Status.Components.Processor) @@ -424,7 +451,7 @@ func TestPopulateProcessorAggregation(t *testing.T) { transformer.SetFailure("KafkaConnectionError", "cannot connect to broker") fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) require.NotNil(t, fc.Status.Components) require.NotNil(t, fc.Status.Components.Processor) @@ -443,12 +470,30 @@ func TestPopulateProcessorAggregation(t *testing.T) { trans.SetUnused("direct mode") fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) require.NotNil(t, fc.Status.Components) require.NotNil(t, fc.Status.Components.Processor) assert.Equal(t, "Ready", fc.Status.Components.Processor.State) }) + + t.Run("on hold parent unused with unused subs yields processor unused", func(t *testing.T) { + s := NewManager() + parent := s.ForComponent(FLPParent) + mono := s.ForComponent(FLPMonolith) + trans := s.ForComponent(FLPTransformer) + + parent.SetUnused("FlowCollector is on hold") + mono.SetUnused("FlowCollector is on hold") + trans.SetUnused("FlowCollector is on hold") + + fc := &flowslatest.FlowCollector{} + s.populateComponentStatuses(fc, nil) + + require.NotNil(t, fc.Status.Components) + require.NotNil(t, fc.Status.Components.Processor) + assert.Equal(t, "Unused", fc.Status.Components.Processor.State) + }) } func TestPopulateLokiStatus(t *testing.T) { @@ -458,7 +503,7 @@ func TestPopulateLokiStatus(t *testing.T) { ls.SetReady() fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) require.NotNil(t, fc.Status.Integrations) require.NotNil(t, fc.Status.Integrations.Loki) @@ -471,7 +516,7 @@ func TestPopulateLokiStatus(t *testing.T) { demo.SetFailure("DeployFailed", "cannot create PVC") fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) require.NotNil(t, fc.Status.Integrations) require.NotNil(t, fc.Status.Integrations.Loki) @@ -485,7 +530,7 @@ func TestPopulateLokiStatus(t *testing.T) { ls.SetUnused("Loki is disabled") fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) require.NotNil(t, fc.Status.Integrations) require.NotNil(t, fc.Status.Integrations.Loki) @@ -504,7 +549,7 @@ func TestControllerComponentsNotInCRDStatus(t *testing.T) { np.SetReady() fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) assert.Nil(t, fc.Status.Components.Agent) assert.Nil(t, fc.Status.Components.Plugin) @@ -519,7 +564,7 @@ func TestExporterStatus(t *testing.T) { s.SetExporterStatus("ipfix-export-0", "IPFIX", "Failure", "ConnectionRefused", "cannot connect") fc := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc) + s.populateComponentStatuses(fc, nil) require.NotNil(t, fc.Status.Integrations) assert.Len(t, fc.Status.Integrations.Exporters, 2) @@ -539,7 +584,7 @@ func TestExporterStatus(t *testing.T) { s.ClearExporters() fc2 := &flowslatest.FlowCollector{} - s.populateComponentStatuses(fc2) + s.populateComponentStatuses(fc2, nil) assert.Empty(t, fc2.Status.Integrations.Exporters) } @@ -554,6 +599,20 @@ func TestKafkaCondition(t *testing.T) { assert.Nil(t, s.GetKafkaCondition()) }) + t.Run("transformer placeholder unknown implies do not strip kafka ready from API", func(t *testing.T) { + s := NewManager() + tr := s.ForComponent(FLPTransformer) + ts := s.getStatus(FLPTransformer) + require.NotNil(t, ts) + assert.Equal(t, StatusUnknown, ts.Status) + assert.Nil(t, s.GetKafkaCondition()) + + tr.SetUnused("direct mode") + ts = s.getStatus(FLPTransformer) + require.NotNil(t, ts) + assert.NotEqual(t, StatusUnknown, ts.Status) + }) + t.Run("healthy transformer returns KafkaReady=True", func(t *testing.T) { s := NewManager() transformer := s.ForComponent(FLPTransformer)