diff --git a/internal/gatewayapi/extensionserverpolicy.go b/internal/gatewayapi/extensionserverpolicy.go index e378c270ba..9a1b9ff2fc 100644 --- a/internal/gatewayapi/extensionserverpolicy.go +++ b/internal/gatewayapi/extensionserverpolicy.go @@ -81,7 +81,7 @@ func (t *Translator) ProcessExtensionServerPolicies(policies []unstructured.Unst } if accepted { res = append(res, *policy) - policy.Object["status"] = policyStatusToUnstructured(policyStatus) + policy.Object["status"] = PolicyStatusToUnstructured(policyStatus) } } @@ -108,14 +108,6 @@ func extractTargetRefs(policy *unstructured.Unstructured, gateways []*GatewayCon return ret, nil } -func policyStatusToUnstructured(policyStatus gwapiv1.PolicyStatus) map[string]any { - ret := map[string]any{} - // No need to check the marshal/unmarshal error here - d, _ := json.Marshal(policyStatus) - _ = json.Unmarshal(d, &ret) - return ret -} - func resolveExtServerPolicyGatewayTargetRef(policy *unstructured.Unstructured, target gwapiv1.LocalPolicyTargetReferenceWithSectionName, gateways map[types.NamespacedName]*policyGatewayTargetContext) *GatewayContext { // Check if the gateway exists key := types.NamespacedName{ @@ -132,6 +124,29 @@ func resolveExtServerPolicyGatewayTargetRef(policy *unstructured.Unstructured, t return gateway.GatewayContext } +func PolicyStatusToUnstructured(policyStatus gwapiv1.PolicyStatus) map[string]any { + ret := map[string]any{} + // No need to check the marshal/unmarshal error here + d, _ := json.Marshal(policyStatus) + _ = json.Unmarshal(d, &ret) + return ret +} + +func ExtServerPolicyStatusAsPolicyStatus(policy *unstructured.Unstructured) gwapiv1.PolicyStatus { + statusObj := policy.Object["status"] + status := gwapiv1.PolicyStatus{} + if _, ok := statusObj.(map[string]any); ok { + // No need to check the json marshal/unmarshal error, the policyStatus was + // created via a typed object so the marshalling/unmarshalling will always + // work + d, _ := json.Marshal(statusObj) + _ = json.Unmarshal(d, &status) + } else if _, ok := statusObj.(gwapiv1.PolicyStatus); ok { + status = statusObj.(gwapiv1.PolicyStatus) + } + return status +} + func (t *Translator) translateExtServerPolicyForGateway( policy *unstructured.Unstructured, gateway *GatewayContext, diff --git a/internal/gatewayapi/extensionserverpolicy_test.go b/internal/gatewayapi/extensionserverpolicy_test.go index fbfc8418d6..751a76af99 100644 --- a/internal/gatewayapi/extensionserverpolicy_test.go +++ b/internal/gatewayapi/extensionserverpolicy_test.go @@ -123,3 +123,123 @@ func TestExtractTargetRefs(t *testing.T) { }) } } + +func TestMergeAncestorsForExtensionServerPolicies(t *testing.T) { + tests := []struct { + aggStatus *gwapiv1.PolicyStatus + newStatus *gwapiv1.PolicyStatus + noStatus bool + }{ + { + aggStatus: &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{ + Name: "gateway-1", + }, + }, + }, + }, + newStatus: &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{ + Name: "gateway-2", + }, + }, + }, + }, + }, + { + aggStatus: &gwapiv1.PolicyStatus{}, + newStatus: &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{ + Name: "gateway-2", + }, + }, + }, + }, + }, + { + aggStatus: &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{ + Name: "gateway-1", + }, + }, + }, + }, + newStatus: &gwapiv1.PolicyStatus{}, + }, + { + aggStatus: &gwapiv1.PolicyStatus{}, + newStatus: &gwapiv1.PolicyStatus{}, + }, + { + aggStatus: nil, + newStatus: &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{ + Name: "gateway-1", + }, + }, + }, + }, + }, + { + aggStatus: &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{ + Name: "gateway-1", + }, + }, + }, + }, + newStatus: nil, + }, + { + aggStatus: nil, + newStatus: nil, + }, + } + + for _, test := range tests { + aggPolicy := unstructured.Unstructured{Object: make(map[string]interface{})} + newPolicy := unstructured.Unstructured{Object: make(map[string]interface{})} + desiredMergedStatus := gwapiv1.PolicyStatus{} + + // aggStatus == nil, means simulate not setting status at all within the policy. + if test.aggStatus != nil { + aggPolicy.Object["status"] = PolicyStatusToUnstructured(*test.aggStatus) + desiredMergedStatus.Ancestors = append(desiredMergedStatus.Ancestors, test.aggStatus.Ancestors...) + } + + // newStatus == nil, means simulate not setting status at all within the policy. + if test.newStatus != nil { + newPolicy.Object["status"] = PolicyStatusToUnstructured(*test.newStatus) + desiredMergedStatus.Ancestors = append(desiredMergedStatus.Ancestors, test.newStatus.Ancestors...) + } + + mergeAncestorsForExtensionServerPolicies(&aggPolicy, &newPolicy) + + // The product object will always have an existing `status`, even if with 0 ancestors. + newAggPolicy := ExtServerPolicyStatusAsPolicyStatus(&aggPolicy) + require.Len(t, newAggPolicy.Ancestors, len(desiredMergedStatus.Ancestors)) + for i := range newAggPolicy.Ancestors { + require.Equal(t, desiredMergedStatus.Ancestors[i].AncestorRef.Name, newAggPolicy.Ancestors[i].AncestorRef.Name) + } + } +} + +// Appends status ancestors from newPolicy into aggregatedPolicy's list of ancestors. +func mergeAncestorsForExtensionServerPolicies(aggregatedPolicy, newPolicy *unstructured.Unstructured) { + aggStatus := ExtServerPolicyStatusAsPolicyStatus(aggregatedPolicy) + newStatus := ExtServerPolicyStatusAsPolicyStatus(newPolicy) + aggStatus.Ancestors = append(aggStatus.Ancestors, newStatus.Ancestors...) + aggregatedPolicy.Object["status"] = PolicyStatusToUnstructured(aggStatus) +} diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index 17d6fe6ea9..0908614d65 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -8,7 +8,6 @@ package runner import ( "context" "crypto/tls" - "encoding/json" "fmt" "os" "path" @@ -27,6 +26,7 @@ import ( "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" + gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2" egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/envoyproxy/gateway/internal/crypto" @@ -34,6 +34,7 @@ import ( extension "github.com/envoyproxy/gateway/internal/extension/types" "github.com/envoyproxy/gateway/internal/gatewayapi" "github.com/envoyproxy/gateway/internal/gatewayapi/resource" + "github.com/envoyproxy/gateway/internal/gatewayapi/status" "github.com/envoyproxy/gateway/internal/infrastructure/host" "github.com/envoyproxy/gateway/internal/message" "github.com/envoyproxy/gateway/internal/utils" @@ -72,6 +73,42 @@ type Runner struct { done sync.WaitGroup } +type aggregatedPolicyStatus struct { + status *gwapiv1.PolicyStatus + generation int64 +} + +// TODO: zhaohuabing - truncate the parents to the max allowed(32) +func mergeRouteStatus(aggregated, incoming *gwapiv1.RouteStatus) *gwapiv1.RouteStatus { + if incoming != nil { + if aggregated != nil { + aggregated.Parents = append(aggregated.Parents, incoming.Parents...) + } else { + return incoming + } + } + return aggregated +} + +func mergePolicyStatus(aggregated aggregatedPolicyStatus, incoming *gwapiv1.PolicyStatus, generation int64) aggregatedPolicyStatus { + if incoming == nil { + return aggregated + } + + if aggregated.status == nil { + aggregated.status = incoming + aggregated.generation = generation + return aggregated + } + + aggregated.status.Ancestors = append(aggregated.status.Ancestors, incoming.Ancestors...) + if generation > aggregated.generation { + aggregated.generation = generation + } + + return aggregated +} + func New(cfg *Config) *Runner { return &Runner{ Config: *cfg, @@ -180,6 +217,34 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re var backendTLSPolicyStatusCount, clientTrafficPolicyStatusCount, backendTrafficPolicyStatusCount int var securityPolicyStatusCount, envoyExtensionPolicyStatusCount, backendStatusCount, extensionServerPolicyStatusCount int + // `aggregatedStatuses` aggregates status result of resources from all + // parents/ancestors, and then stores the status once for every resource. + aggregatedStatuses := struct { + HTTPRoutes map[types.NamespacedName]*gwapiv1.RouteStatus + GRPCRoutes map[types.NamespacedName]*gwapiv1.RouteStatus + TLSRoutes map[types.NamespacedName]*gwapiv1a2.RouteStatus + TCPRoutes map[types.NamespacedName]*gwapiv1a2.RouteStatus + UDPRoutes map[types.NamespacedName]*gwapiv1a2.RouteStatus + BackendTLSPolicies map[types.NamespacedName]aggregatedPolicyStatus + ClientTrafficPolicies map[types.NamespacedName]aggregatedPolicyStatus + BackendTrafficPolicies map[types.NamespacedName]aggregatedPolicyStatus + SecurityPolicies map[types.NamespacedName]aggregatedPolicyStatus + EnvoyExtensionPolicies map[types.NamespacedName]aggregatedPolicyStatus + ExtensionServerPolicies map[message.NamespacedNameAndGVK]aggregatedPolicyStatus + }{ + HTTPRoutes: make(map[types.NamespacedName]*gwapiv1.RouteStatus), + GRPCRoutes: make(map[types.NamespacedName]*gwapiv1.RouteStatus), + TLSRoutes: make(map[types.NamespacedName]*gwapiv1a2.RouteStatus), + TCPRoutes: make(map[types.NamespacedName]*gwapiv1a2.RouteStatus), + UDPRoutes: make(map[types.NamespacedName]*gwapiv1a2.RouteStatus), + BackendTLSPolicies: make(map[types.NamespacedName]aggregatedPolicyStatus), + ClientTrafficPolicies: make(map[types.NamespacedName]aggregatedPolicyStatus), + BackendTrafficPolicies: make(map[types.NamespacedName]aggregatedPolicyStatus), + SecurityPolicies: make(map[types.NamespacedName]aggregatedPolicyStatus), + EnvoyExtensionPolicies: make(map[types.NamespacedName]aggregatedPolicyStatus), + ExtensionServerPolicies: make(map[message.NamespacedNameAndGVK]aggregatedPolicyStatus), + } + span.AddEvent("translate", trace.WithAttributes(attribute.Int("resources.count", len(*val)))) for _, resources := range *val { // Translate and publish IRs. @@ -267,6 +332,7 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re r.ProviderResources.GatewayClassStatuses.Store(key, &result.GatewayClass.Status) } + // Resources which can only belong to 1 GatewayClass (at most) get their statuses stored right away. for _, gateway := range result.Gateways { key := utils.NamespacedName(gateway) r.ProviderResources.GatewayStatuses.Store(key, &gateway.Status) @@ -281,119 +347,171 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re delete(keysToDelete.ListenerSetStatus, key) r.keyCache.ListenerSetStatus[key] = true } + + // Backend statuses have no parents, so they are not aggregated. + for _, backend := range result.Backends { + key := utils.NamespacedName(backend) + if len(backend.Status.Conditions) > 0 { + r.ProviderResources.BackendStatuses.Store(key, &backend.Status) + backendStatusCount++ + } + delete(keysToDelete.BackendStatus, key) + r.keyCache.BackendStatus[key] = true + } + + // Resources which can belong to multiple GatewayClasses get their statuses aggregated, + // then stored once after iterating over all GatewayClasses. for _, httpRoute := range result.HTTPRoutes { - key := utils.NamespacedName(httpRoute) - r.ProviderResources.HTTPRouteStatuses.Store(key, &httpRoute.Status) - httpRouteStatusCount++ - delete(keysToDelete.HTTPRouteStatus, key) - r.keyCache.HTTPRouteStatus[key] = true + if len(httpRoute.Status.Parents) != 0 { + key := utils.NamespacedName(httpRoute) + aggregatedStatuses.HTTPRoutes[key] = mergeRouteStatus(aggregatedStatuses.HTTPRoutes[key], &httpRoute.Status.RouteStatus) + } } for _, grpcRoute := range result.GRPCRoutes { - key := utils.NamespacedName(grpcRoute) - r.ProviderResources.GRPCRouteStatuses.Store(key, &grpcRoute.Status) - grpcRouteStatusCount++ - delete(keysToDelete.GRPCRouteStatus, key) - r.keyCache.GRPCRouteStatus[key] = true + if len(grpcRoute.Status.Parents) != 0 { + key := utils.NamespacedName(grpcRoute) + aggregatedStatuses.GRPCRoutes[key] = mergeRouteStatus(aggregatedStatuses.GRPCRoutes[key], &grpcRoute.Status.RouteStatus) + } } for _, tlsRoute := range result.TLSRoutes { - key := utils.NamespacedName(tlsRoute) - r.ProviderResources.TLSRouteStatuses.Store(key, &tlsRoute.Status) - tlsRouteStatusCount++ - delete(keysToDelete.TLSRouteStatus, key) - r.keyCache.TLSRouteStatus[key] = true + if len(tlsRoute.Status.Parents) != 0 { + key := utils.NamespacedName(tlsRoute) + aggregatedStatuses.TLSRoutes[key] = mergeRouteStatus(aggregatedStatuses.TLSRoutes[key], &tlsRoute.Status.RouteStatus) + } } for _, tcpRoute := range result.TCPRoutes { - key := utils.NamespacedName(tcpRoute) - r.ProviderResources.TCPRouteStatuses.Store(key, &tcpRoute.Status) - tcpRouteStatusCount++ - delete(keysToDelete.TCPRouteStatus, key) - r.keyCache.TCPRouteStatus[key] = true + if len(tcpRoute.Status.Parents) != 0 { + key := utils.NamespacedName(tcpRoute) + aggregatedStatuses.TCPRoutes[key] = mergeRouteStatus(aggregatedStatuses.TCPRoutes[key], &tcpRoute.Status.RouteStatus) + } } for _, udpRoute := range result.UDPRoutes { - key := utils.NamespacedName(udpRoute) - r.ProviderResources.UDPRouteStatuses.Store(key, &udpRoute.Status) - udpRouteStatusCount++ - delete(keysToDelete.UDPRouteStatus, key) - r.keyCache.UDPRouteStatus[key] = true + if len(udpRoute.Status.Parents) != 0 { + key := utils.NamespacedName(udpRoute) + aggregatedStatuses.UDPRoutes[key] = mergeRouteStatus(aggregatedStatuses.UDPRoutes[key], &udpRoute.Status.RouteStatus) + } } - - // Skip updating status for policies with empty status - // They may have been skipped in this translation because - // their target is not found (not relevant) - for _, backendTLSPolicy := range result.BackendTLSPolicies { - key := utils.NamespacedName(backendTLSPolicy) - if len(backendTLSPolicy.Status.Ancestors) > 0 { - r.ProviderResources.BackendTLSPolicyStatuses.Store(key, &backendTLSPolicy.Status) - backendTLSPolicyStatusCount++ + if len(backendTLSPolicy.Status.Ancestors) != 0 { + key := utils.NamespacedName(backendTLSPolicy) + aggregatedStatuses.BackendTLSPolicies[key] = mergePolicyStatus(aggregatedStatuses.BackendTLSPolicies[key], &backendTLSPolicy.Status, backendTLSPolicy.Generation) } - delete(keysToDelete.BackendTLSPolicyStatus, key) - r.keyCache.BackendTLSPolicyStatus[key] = true } - for _, clientTrafficPolicy := range result.ClientTrafficPolicies { - key := utils.NamespacedName(clientTrafficPolicy) - if len(clientTrafficPolicy.Status.Ancestors) > 0 { - r.ProviderResources.ClientTrafficPolicyStatuses.Store(key, &clientTrafficPolicy.Status) - clientTrafficPolicyStatusCount++ + if len(clientTrafficPolicy.Status.Ancestors) != 0 { + key := utils.NamespacedName(clientTrafficPolicy) + aggregatedStatuses.ClientTrafficPolicies[key] = mergePolicyStatus(aggregatedStatuses.ClientTrafficPolicies[key], &clientTrafficPolicy.Status, clientTrafficPolicy.Generation) } - delete(keysToDelete.ClientTrafficPolicyStatus, key) - r.keyCache.ClientTrafficPolicyStatus[key] = true } for _, backendTrafficPolicy := range result.BackendTrafficPolicies { - key := utils.NamespacedName(backendTrafficPolicy) - if len(backendTrafficPolicy.Status.Ancestors) > 0 { - r.ProviderResources.BackendTrafficPolicyStatuses.Store(key, &backendTrafficPolicy.Status) - backendTrafficPolicyStatusCount++ + if len(backendTrafficPolicy.Status.Ancestors) != 0 { + key := utils.NamespacedName(backendTrafficPolicy) + aggregatedStatuses.BackendTrafficPolicies[key] = mergePolicyStatus(aggregatedStatuses.BackendTrafficPolicies[key], &backendTrafficPolicy.Status, backendTrafficPolicy.Generation) } - delete(keysToDelete.BackendTrafficPolicyStatus, key) - r.keyCache.BackendTrafficPolicyStatus[key] = true } for _, securityPolicy := range result.SecurityPolicies { - key := utils.NamespacedName(securityPolicy) - if len(securityPolicy.Status.Ancestors) > 0 { - r.ProviderResources.SecurityPolicyStatuses.Store(key, &securityPolicy.Status) - securityPolicyStatusCount++ + if len(securityPolicy.Status.Ancestors) != 0 { + key := utils.NamespacedName(securityPolicy) + aggregatedStatuses.SecurityPolicies[key] = mergePolicyStatus(aggregatedStatuses.SecurityPolicies[key], &securityPolicy.Status, securityPolicy.Generation) } - delete(keysToDelete.SecurityPolicyStatus, key) - r.keyCache.SecurityPolicyStatus[key] = true } for _, envoyExtensionPolicy := range result.EnvoyExtensionPolicies { - key := utils.NamespacedName(envoyExtensionPolicy) - if len(envoyExtensionPolicy.Status.Ancestors) > 0 { - r.ProviderResources.EnvoyExtensionPolicyStatuses.Store(key, &envoyExtensionPolicy.Status) - envoyExtensionPolicyStatusCount++ + if len(envoyExtensionPolicy.Status.Ancestors) != 0 { + key := utils.NamespacedName(envoyExtensionPolicy) + aggregatedStatuses.EnvoyExtensionPolicies[key] = mergePolicyStatus(aggregatedStatuses.EnvoyExtensionPolicies[key], &envoyExtensionPolicy.Status, envoyExtensionPolicy.Generation) } - delete(keysToDelete.EnvoyExtensionPolicyStatus, key) - r.keyCache.EnvoyExtensionPolicyStatus[key] = true - } - for _, backend := range result.Backends { - key := utils.NamespacedName(backend) - if len(backend.Status.Conditions) > 0 { - r.ProviderResources.BackendStatuses.Store(key, &backend.Status) - backendStatusCount++ - } - delete(keysToDelete.BackendStatus, key) - r.keyCache.BackendStatus[key] = true } for _, extServerPolicy := range result.ExtensionServerPolicies { - key := message.NamespacedNameAndGVK{ - NamespacedName: utils.NamespacedName(&extServerPolicy), - GroupVersionKind: extServerPolicy.GroupVersionKind(), - } - if statusObj, hasStatus := extServerPolicy.Object["status"]; hasStatus && statusObj != nil { - if statusMap, ok := statusObj.(map[string]any); ok && len(statusMap) > 0 { - policyStatus := unstructuredToPolicyStatus(statusMap) - r.ProviderResources.ExtensionPolicyStatuses.Store(key, &policyStatus) - extensionServerPolicyStatusCount++ + policyStatus := gatewayapi.ExtServerPolicyStatusAsPolicyStatus(&extServerPolicy) + if len(policyStatus.Ancestors) != 0 { + key := message.NamespacedNameAndGVK{ + NamespacedName: utils.NamespacedName(&extServerPolicy), + GroupVersionKind: extServerPolicy.GroupVersionKind(), } + aggregatedStatuses.ExtensionServerPolicies[key] = mergePolicyStatus(aggregatedStatuses.ExtensionServerPolicies[key], &policyStatus, extServerPolicy.GetGeneration()) } - delete(keysToDelete.ExtensionServerPolicyStatus, key) - r.keyCache.ExtensionServerPolicyStatus[key] = true } statusUpdateSpan.End() } + // Store the stauses of all objects atomically with the aggregated status. + for key, routeStatus := range aggregatedStatuses.HTTPRoutes { + s := gwapiv1.HTTPRouteStatus{RouteStatus: *routeStatus} + r.ProviderResources.HTTPRouteStatuses.Store(key, &s) + httpRouteStatusCount++ + delete(keysToDelete.HTTPRouteStatus, key) + r.keyCache.HTTPRouteStatus[key] = true + } + for key, routeStatus := range aggregatedStatuses.GRPCRoutes { + s := gwapiv1.GRPCRouteStatus{RouteStatus: *routeStatus} + r.ProviderResources.GRPCRouteStatuses.Store(key, &s) + grpcRouteStatusCount++ + delete(keysToDelete.GRPCRouteStatus, key) + r.keyCache.GRPCRouteStatus[key] = true + } + for key, routeStatus := range aggregatedStatuses.TLSRoutes { + s := gwapiv1.TLSRouteStatus{RouteStatus: *routeStatus} + r.ProviderResources.TLSRouteStatuses.Store(key, &s) + tlsRouteStatusCount++ + delete(keysToDelete.TLSRouteStatus, key) + r.keyCache.TLSRouteStatus[key] = true + } + for key, routeStatus := range aggregatedStatuses.TCPRoutes { + s := gwapiv1a2.TCPRouteStatus{RouteStatus: *routeStatus} + r.ProviderResources.TCPRouteStatuses.Store(key, &s) + tcpRouteStatusCount++ + delete(keysToDelete.TCPRouteStatus, key) + r.keyCache.TCPRouteStatus[key] = true + } + for key, routeStatus := range aggregatedStatuses.UDPRoutes { + s := gwapiv1a2.UDPRouteStatus{RouteStatus: *routeStatus} + r.ProviderResources.UDPRouteStatuses.Store(key, &s) + udpRouteStatusCount++ + delete(keysToDelete.UDPRouteStatus, key) + r.keyCache.UDPRouteStatus[key] = true + } + for key, entry := range aggregatedStatuses.BackendTLSPolicies { + status.TruncatePolicyAncestors(entry.status, r.EnvoyGateway.Gateway.ControllerName, entry.generation) + r.ProviderResources.BackendTLSPolicyStatuses.Store(key, entry.status) + backendTLSPolicyStatusCount++ + delete(keysToDelete.BackendTLSPolicyStatus, key) + r.keyCache.BackendTLSPolicyStatus[key] = true + } + for key, entry := range aggregatedStatuses.ClientTrafficPolicies { + status.TruncatePolicyAncestors(entry.status, r.EnvoyGateway.Gateway.ControllerName, entry.generation) + r.ProviderResources.ClientTrafficPolicyStatuses.Store(key, entry.status) + clientTrafficPolicyStatusCount++ + delete(keysToDelete.ClientTrafficPolicyStatus, key) + r.keyCache.ClientTrafficPolicyStatus[key] = true + } + for key, entry := range aggregatedStatuses.BackendTrafficPolicies { + status.TruncatePolicyAncestors(entry.status, r.EnvoyGateway.Gateway.ControllerName, entry.generation) + r.ProviderResources.BackendTrafficPolicyStatuses.Store(key, entry.status) + backendTrafficPolicyStatusCount++ + delete(keysToDelete.BackendTrafficPolicyStatus, key) + r.keyCache.BackendTrafficPolicyStatus[key] = true + } + for key, entry := range aggregatedStatuses.SecurityPolicies { + status.TruncatePolicyAncestors(entry.status, r.EnvoyGateway.Gateway.ControllerName, entry.generation) + r.ProviderResources.SecurityPolicyStatuses.Store(key, entry.status) + securityPolicyStatusCount++ + delete(keysToDelete.SecurityPolicyStatus, key) + r.keyCache.SecurityPolicyStatus[key] = true + } + for key, entry := range aggregatedStatuses.EnvoyExtensionPolicies { + status.TruncatePolicyAncestors(entry.status, r.EnvoyGateway.Gateway.ControllerName, entry.generation) + r.ProviderResources.EnvoyExtensionPolicyStatuses.Store(key, entry.status) + envoyExtensionPolicyStatusCount++ + delete(keysToDelete.EnvoyExtensionPolicyStatus, key) + r.keyCache.EnvoyExtensionPolicyStatus[key] = true + } + for key, entry := range aggregatedStatuses.ExtensionServerPolicies { + status.TruncatePolicyAncestors(entry.status, r.EnvoyGateway.Gateway.ControllerName, entry.generation) + r.ProviderResources.ExtensionPolicyStatuses.Store(key, entry.status) + extensionServerPolicyStatusCount++ + delete(keysToDelete.ExtensionServerPolicyStatus, key) + r.keyCache.ExtensionServerPolicyStatus[key] = true + } // Publish aggregated metrics message.PublishMetric(message.Metadata{Runner: r.Name(), Message: message.InfraIRMessageName}, infraIRCount) message.PublishMetric(message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, xdsIRCount) @@ -470,16 +588,6 @@ func (r *Runner) loadTLSConfig(ctx context.Context) (*tls.Config, []byte, error) } } -func unstructuredToPolicyStatus(policyStatus map[string]any) gwapiv1.PolicyStatus { - var ret gwapiv1.PolicyStatus - // No need to check the json marshal/unmarshal error, the policyStatus was - // created via a typed object so the marshalling/unmarshalling will always - // work - d, _ := json.Marshal(policyStatus) - _ = json.Unmarshal(d, &ret) - return ret -} - // deleteAllIRKeys deletes all XdsIR and InfraIR using tracked keys func (r *Runner) deleteAllKeys() { // Delete IR keys diff --git a/internal/gatewayapi/runner/runner_test.go b/internal/gatewayapi/runner/runner_test.go index 674654d4c0..81133ab7ad 100644 --- a/internal/gatewayapi/runner/runner_test.go +++ b/internal/gatewayapi/runner/runner_test.go @@ -240,6 +240,117 @@ func TestDeleteAllKeys(t *testing.T) { require.Empty(t, r.keyCache.EnvoyExtensionPolicyStatus) } +func TestMergePolicyStatus(t *testing.T) { + controllerName := "example.com/gateway" + + t.Run("nil incoming keeps existing entry", func(t *testing.T) { + existing := aggregatedPolicyStatus{ + status: &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{Name: gwapiv1.ObjectName("gw-a")}, + ControllerName: gwapiv1a2.GatewayController(controllerName), + }, + }, + }, + generation: 2, + } + + got := mergePolicyStatus(existing, nil, 10) + require.Equal(t, existing, got) + }) + + t.Run("nil existing takes incoming", func(t *testing.T) { + incoming := &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{Name: gwapiv1.ObjectName("gw-a")}, + ControllerName: gwapiv1a2.GatewayController(controllerName), + }, + }, + } + + got := mergePolicyStatus(aggregatedPolicyStatus{}, incoming, 7) + require.Same(t, incoming, got.status) + require.Equal(t, int64(7), got.generation) + }) + + t.Run("appends ancestors and tracks max generation", func(t *testing.T) { + first := &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{Name: gwapiv1.ObjectName("gw-a")}, + ControllerName: gwapiv1a2.GatewayController(controllerName), + }, + }, + } + second := &gwapiv1.PolicyStatus{ + Ancestors: []gwapiv1.PolicyAncestorStatus{ + { + AncestorRef: gwapiv1.ParentReference{Name: gwapiv1.ObjectName("gw-b")}, + ControllerName: gwapiv1a2.GatewayController(controllerName), + }, + }, + } + + entry := mergePolicyStatus(aggregatedPolicyStatus{}, first, 3) + entry = mergePolicyStatus(entry, second, 9) + + require.Len(t, entry.status.Ancestors, 2) + require.Equal(t, gwapiv1.ObjectName("gw-a"), entry.status.Ancestors[0].AncestorRef.Name) + require.Equal(t, gwapiv1.ObjectName("gw-b"), entry.status.Ancestors[1].AncestorRef.Name) + require.Equal(t, int64(9), entry.generation) + + entry = mergePolicyStatus(entry, &gwapiv1.PolicyStatus{}, 4) + require.Equal(t, int64(9), entry.generation) + }) +} + +func TestMergeRouteStatus(t *testing.T) { + t.Run("nil incoming keeps existing status", func(t *testing.T) { + existing := &gwapiv1.RouteStatus{ + Parents: []gwapiv1.RouteParentStatus{ + {ParentRef: gwapiv1.ParentReference{Name: gwapiv1.ObjectName("gw-a")}}, + }, + } + + got := mergeRouteStatus(existing, nil) + require.Same(t, existing, got) + require.Len(t, got.Parents, 1) + }) + + t.Run("nil existing takes incoming", func(t *testing.T) { + incoming := &gwapiv1.RouteStatus{ + Parents: []gwapiv1.RouteParentStatus{ + {ParentRef: gwapiv1.ParentReference{Name: gwapiv1.ObjectName("gw-a")}}, + }, + } + + got := mergeRouteStatus(nil, incoming) + require.Same(t, incoming, got) + require.Len(t, got.Parents, 1) + }) + + t.Run("appends parents", func(t *testing.T) { + existing := &gwapiv1.RouteStatus{ + Parents: []gwapiv1.RouteParentStatus{ + {ParentRef: gwapiv1.ParentReference{Name: gwapiv1.ObjectName("gw-a")}}, + }, + } + other := &gwapiv1.RouteStatus{ + Parents: []gwapiv1.RouteParentStatus{ + {ParentRef: gwapiv1.ParentReference{Name: gwapiv1.ObjectName("gw-b")}}, + }, + } + + got := mergeRouteStatus(existing, other) + require.Same(t, existing, got) + require.Len(t, got.Parents, 2) + require.Equal(t, gwapiv1.ObjectName("gw-a"), got.Parents[0].ParentRef.Name) + require.Equal(t, gwapiv1.ObjectName("gw-b"), got.Parents[1].ParentRef.Name) + }) +} + func TestLoadTLSConfig_HostMode(t *testing.T) { // Create temporary directory structure for certs using t.TempDir() configHome := t.TempDir() diff --git a/release-notes/current.yaml b/release-notes/current.yaml index 03b595c1d4..cab57da5ab 100644 --- a/release-notes/current.yaml +++ b/release-notes/current.yaml @@ -36,6 +36,7 @@ bug fixes: | Fixed standalone mode emitting non-actionable error logs for missing secrets and unsupported ratelimit deletion on every startup. Fixed local object reference resolution from parent policy in merged BackendTrafficPolicies. Fixed xPolicy resources being processed from all namespaces when NamespaceSelector watch mode is configured in the Kubernetes provider. + Fixed route and policy status aggregation across multiple GatewayClasses managed by the same controller, so resources preserve status from all relevant parents and ancestors instead of being overwritten by the last processed GatewayClass. # Enhancements that improve performance. performance improvements: | diff --git a/test/e2e/testdata/httproute-status-multiple-gc.yaml b/test/e2e/testdata/httproute-status-multiple-gc.yaml new file mode 100644 index 0000000000..545e1ba837 --- /dev/null +++ b/test/e2e/testdata/httproute-status-multiple-gc.yaml @@ -0,0 +1,102 @@ +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: internet-gateway + namespace: gateway-conformance-infra +spec: + gatewayClassName: internet + listeners: + - name: http + port: 80 + protocol: HTTP + allowedRoutes: + namespaces: + from: Same +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: private-gateway + namespace: gateway-conformance-infra +spec: + gatewayClassName: private + listeners: + - name: http + port: 80 + protocol: HTTP + allowedRoutes: + namespaces: + from: Same +--- +apiVersion: v1 +kind: Service +metadata: + name: multiple-gc-backend + namespace: gateway-conformance-infra +spec: + selector: + app: multiple-gc-backend + ports: + - protocol: TCP + port: 8080 + targetPort: 3000 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: multiple-gc-backend + namespace: gateway-conformance-infra + labels: + app: multiple-gc-backend +spec: + replicas: 2 + selector: + matchLabels: + app: multiple-gc-backend + template: + metadata: + labels: + app: multiple-gc-backend + spec: + containers: + - name: multiple-gc-backend + image: gcr.io/k8s-staging-ingressconformance/echoserver:v20221109-7ee2f3e + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + resources: + requests: + cpu: 10m +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: multiple-gc-route + namespace: gateway-conformance-infra +spec: + parentRefs: + - name: internet-gateway + sectionName: http + - name: private-gateway + sectionName: http + rules: + - matches: + - path: + type: PathPrefix + value: /internet + backendRefs: + - name: multiple-gc-backend + port: 8080 + - matches: + - path: + type: PathPrefix + value: /private + backendRefs: + - name: multiple-gc-backend + port: 8080 diff --git a/test/e2e/testdata/policy-status-multiple-gc.yaml b/test/e2e/testdata/policy-status-multiple-gc.yaml new file mode 100644 index 0000000000..712e39ba80 --- /dev/null +++ b/test/e2e/testdata/policy-status-multiple-gc.yaml @@ -0,0 +1,52 @@ +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: internet-gateway + namespace: gateway-conformance-infra +spec: + gatewayClassName: internet + listeners: + - name: http + port: 80 + protocol: HTTP + allowedRoutes: + namespaces: + from: Same +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: private-gateway + namespace: gateway-conformance-infra +spec: + gatewayClassName: private + listeners: + - name: http + port: 80 + protocol: HTTP + allowedRoutes: + namespaces: + from: Same +--- +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: BackendTrafficPolicy +metadata: + name: multiple-gc-btp + namespace: gateway-conformance-infra +spec: + targetRefs: + - group: gateway.networking.k8s.io + kind: Gateway + name: internet-gateway + sectionName: http + - group: gateway.networking.k8s.io + kind: Gateway + name: private-gateway + sectionName: http + responseOverride: + - match: + statusCodes: + - type: Value + value: 418 + response: + statusCode: 500 diff --git a/test/e2e/tests/multiple_gc.go b/test/e2e/tests/multiple_gc.go index 8d3a6c0cd5..a7bb1ad16e 100644 --- a/test/e2e/tests/multiple_gc.go +++ b/test/e2e/tests/multiple_gc.go @@ -10,13 +10,21 @@ package tests import ( + "context" "testing" + "time" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" "sigs.k8s.io/gateway-api/conformance/utils/http" "sigs.k8s.io/gateway-api/conformance/utils/kubernetes" "sigs.k8s.io/gateway-api/conformance/utils/suite" + + egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" + gatewayapi "github.com/envoyproxy/gateway/internal/gatewayapi" ) var ( @@ -26,7 +34,7 @@ var ( func init() { MultipleGCTests = make(map[string][]suite.ConformanceTest) - InternetGCTests = append(InternetGCTests, InternetGCTest) + InternetGCTests = append(InternetGCTests, InternetGCTest, HTTPRouteStatusAggregatesAcrossGatewayClassesTest, PolicyStatusAggregatesAcrossGatewayClassesTest) PrivateGCTests = append(PrivateGCTests, PrivateGCTest) MultipleGCTests["internet"] = InternetGCTests MultipleGCTests["private"] = PrivateGCTests @@ -83,3 +91,102 @@ var PrivateGCTest = suite.ConformanceTest{ }) }, } + +var HTTPRouteStatusAggregatesAcrossGatewayClassesTest = suite.ConformanceTest{ + ShortName: "HTTPRouteStatusAggregatesAcrossGatewayClasses", + Description: "HTTPRoute status should aggregate parents across multiple GatewayClasses managed by the same controller", + Manifests: []string{"testdata/httproute-status-multiple-gc.yaml"}, + Test: func(t *testing.T, suite *suite.ConformanceTestSuite) { + t.Run("httproute status aggregates across gateway classes", func(t *testing.T) { + ns := "gateway-conformance-infra" + routeNN := types.NamespacedName{Name: "multiple-gc-route", Namespace: ns} + internetGatewayNN := types.NamespacedName{Name: "internet-gateway", Namespace: ns} + privateGatewayNN := types.NamespacedName{Name: "private-gateway", Namespace: ns} + + _, err := kubernetes.WaitForGatewayAddress(t, suite.Client, suite.TimeoutConfig, kubernetes.NewGatewayRef(internetGatewayNN)) + if err != nil { + t.Fatalf("failed to get %s Gateway address: %v", internetGatewayNN.Name, err) + } + + _, err = kubernetes.WaitForGatewayAddress(t, suite.Client, suite.TimeoutConfig, kubernetes.NewGatewayRef(privateGatewayNN)) + if err != nil { + t.Fatalf("failed to get %s Gateway address: %v", privateGatewayNN.Name, err) + } + + parents := []gwapiv1.RouteParentStatus{ + createGatewayParent(suite.ControllerName, internetGatewayNN.Name, "http"), + createGatewayParent(suite.ControllerName, privateGatewayNN.Name, "http"), + } + + kubernetes.RouteMustHaveParents(t, suite.Client, suite.TimeoutConfig, routeNN, parents, false, &gwapiv1.HTTPRoute{}) + }) + }, +} + +var PolicyStatusAggregatesAcrossGatewayClassesTest = suite.ConformanceTest{ + ShortName: "PolicyStatusAggregatesAcrossGatewayClasses", + Description: "BackendTrafficPolicy status should aggregate ancestors across multiple GatewayClasses managed by the same controller", + Manifests: []string{"testdata/policy-status-multiple-gc.yaml"}, + Test: func(t *testing.T, suite *suite.ConformanceTestSuite) { + t.Run("backendtrafficpolicy status aggregates across gateway classes", func(t *testing.T) { + ns := "gateway-conformance-infra" + policyNN := types.NamespacedName{Name: "multiple-gc-btp", Namespace: ns} + internetGatewayNN := types.NamespacedName{Name: "internet-gateway", Namespace: ns} + privateGatewayNN := types.NamespacedName{Name: "private-gateway", Namespace: ns} + + _, err := kubernetes.WaitForGatewayAddress(t, suite.Client, suite.TimeoutConfig, kubernetes.NewGatewayRef(internetGatewayNN)) + if err != nil { + t.Fatalf("failed to get %s Gateway address: %v", internetGatewayNN.Name, err) + } + + _, err = kubernetes.WaitForGatewayAddress(t, suite.Client, suite.TimeoutConfig, kubernetes.NewGatewayRef(privateGatewayNN)) + if err != nil { + t.Fatalf("failed to get %s Gateway address: %v", privateGatewayNN.Name, err) + } + + internetAncestorRef := createGatewayPolicyAncestorRef(internetGatewayNN.Name, "http") + privateAncestorRef := createGatewayPolicyAncestorRef(privateGatewayNN.Name, "http") + + waitErr := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute, true, func(ctx context.Context) (bool, error) { + policy := &egv1a1.BackendTrafficPolicy{} + if err := suite.Client.Get(ctx, policyNN, policy); err != nil { + return false, err + } + + return policyAcceptedByAncestor(policy.Status.Ancestors, suite.ControllerName, internetAncestorRef) && + policyAcceptedByAncestor(policy.Status.Ancestors, suite.ControllerName, privateAncestorRef), nil + }) + + require.NoErrorf(t, waitErr, "error waiting for BackendTrafficPolicy status ancestors to aggregate across gateway classes") + }) + }, +} + +func createGatewayPolicyAncestorRef(gatewayName, sectionName string) gwapiv1.ParentReference { + return gwapiv1.ParentReference{ + Group: gatewayapi.GroupPtr(gwapiv1.GroupVersion.Group), + Kind: gatewayapi.KindPtr("Gateway"), + Name: gwapiv1.ObjectName(gatewayName), + Namespace: gatewayapi.NamespacePtr("gateway-conformance-infra"), + SectionName: gatewayapi.SectionNamePtr(sectionName), + } +} + +func createGatewayParent(controllerName, gatewayName, sectionName string) gwapiv1.RouteParentStatus { + return gwapiv1.RouteParentStatus{ + ParentRef: createGatewayPolicyAncestorRef(gatewayName, sectionName), + ControllerName: gwapiv1.GatewayController(controllerName), + Conditions: []metav1.Condition{ + { + Type: string(gwapiv1.RouteConditionAccepted), + Status: metav1.ConditionTrue, + Reason: string(gwapiv1.RouteReasonAccepted), + }, + { + Type: string(gwapiv1.RouteConditionResolvedRefs), + Status: metav1.ConditionTrue, + Reason: string(gwapiv1.RouteReasonResolvedRefs), + }, + }, + } +}