diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index 2eccf8832d..37e7519a6a 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -194,11 +194,11 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re r.Logger.Error(err, "unable to validate infra ir, skipped sending it") errChan <- err } else { - message.HandleStore(message.Metadata{ + r.InfraIR.Store(key, val) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.InfraIRMessageName, - }, - key, val, &r.InfraIR.Map) + }) newIRKeys = append(newIRKeys, key) } } @@ -209,67 +209,67 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re r.Logger.Error(err, "unable to validate xds ir, skipped sending it") errChan <- err } else { - message.HandleStore(message.Metadata{ + r.XdsIR.Store(key, val) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.XDSIRMessageName, - }, - key, val, &r.XdsIR.Map) + }) } } // Update Status for _, gateway := range result.Gateways { key := utils.NamespacedName(gateway) - message.HandleStore(message.Metadata{ + r.ProviderResources.GatewayStatuses.Store(key, &gateway.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.GatewayStatusMessageName, - }, - key, &gateway.Status, &r.ProviderResources.GatewayStatuses) + }) delete(statusesToDelete.GatewayStatusKeys, key) } for _, httpRoute := range result.HTTPRoutes { key := utils.NamespacedName(httpRoute) - message.HandleStore(message.Metadata{ + r.ProviderResources.HTTPRouteStatuses.Store(key, &httpRoute.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.HTTPRouteStatusMessageName, - }, - key, &httpRoute.Status, &r.ProviderResources.HTTPRouteStatuses) + }) delete(statusesToDelete.HTTPRouteStatusKeys, key) } for _, grpcRoute := range result.GRPCRoutes { key := utils.NamespacedName(grpcRoute) - message.HandleStore(message.Metadata{ + r.ProviderResources.GRPCRouteStatuses.Store(key, &grpcRoute.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.GRPCRouteStatusMessageName, - }, - key, &grpcRoute.Status, &r.ProviderResources.GRPCRouteStatuses) + }) delete(statusesToDelete.GRPCRouteStatusKeys, key) } for _, tlsRoute := range result.TLSRoutes { key := utils.NamespacedName(tlsRoute) - message.HandleStore(message.Metadata{ + r.ProviderResources.TLSRouteStatuses.Store(key, &tlsRoute.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.TLSRouteStatusMessageName, - }, - key, &tlsRoute.Status, &r.ProviderResources.TLSRouteStatuses) + }) delete(statusesToDelete.TLSRouteStatusKeys, key) } for _, tcpRoute := range result.TCPRoutes { key := utils.NamespacedName(tcpRoute) - message.HandleStore(message.Metadata{ + r.ProviderResources.TCPRouteStatuses.Store(key, &tcpRoute.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.TCPRouteStatusMessageName, - }, - key, &tcpRoute.Status, &r.ProviderResources.TCPRouteStatuses) + }) delete(statusesToDelete.TCPRouteStatusKeys, key) } for _, udpRoute := range result.UDPRoutes { key := utils.NamespacedName(udpRoute) - message.HandleStore(message.Metadata{ + r.ProviderResources.UDPRouteStatuses.Store(key, &udpRoute.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.UDPRouteStatusMessageName, - }, - key, &udpRoute.Status, &r.ProviderResources.UDPRouteStatuses) + }) delete(statusesToDelete.UDPRouteStatusKeys, key) } @@ -280,11 +280,11 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re for _, backendTLSPolicy := range result.BackendTLSPolicies { key := utils.NamespacedName(backendTLSPolicy) if !(reflect.ValueOf(backendTLSPolicy.Status).IsZero()) { - message.HandleStore(message.Metadata{ + r.ProviderResources.BackendTLSPolicyStatuses.Store(key, &backendTLSPolicy.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.BackendTLSPolicyStatusMessageName, - }, - key, &backendTLSPolicy.Status, &r.ProviderResources.BackendTLSPolicyStatuses) + }) } delete(statusesToDelete.BackendTLSPolicyStatusKeys, key) } @@ -292,55 +292,55 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re for _, clientTrafficPolicy := range result.ClientTrafficPolicies { key := utils.NamespacedName(clientTrafficPolicy) if !(reflect.ValueOf(clientTrafficPolicy.Status).IsZero()) { - message.HandleStore(message.Metadata{ + r.ProviderResources.ClientTrafficPolicyStatuses.Store(key, &clientTrafficPolicy.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.ClientTrafficPolicyStatusMessageName, - }, - key, &clientTrafficPolicy.Status, &r.ProviderResources.ClientTrafficPolicyStatuses) + }) } delete(statusesToDelete.ClientTrafficPolicyStatusKeys, key) } for _, backendTrafficPolicy := range result.BackendTrafficPolicies { key := utils.NamespacedName(backendTrafficPolicy) if !(reflect.ValueOf(backendTrafficPolicy.Status).IsZero()) { - message.HandleStore(message.Metadata{ + r.ProviderResources.BackendTrafficPolicyStatuses.Store(key, &backendTrafficPolicy.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.BackendTrafficPolicyStatusMessageName, - }, - key, &backendTrafficPolicy.Status, &r.ProviderResources.BackendTrafficPolicyStatuses) + }) } delete(statusesToDelete.BackendTrafficPolicyStatusKeys, key) } for _, securityPolicy := range result.SecurityPolicies { key := utils.NamespacedName(securityPolicy) if !(reflect.ValueOf(securityPolicy.Status).IsZero()) { - message.HandleStore(message.Metadata{ + r.ProviderResources.SecurityPolicyStatuses.Store(key, &securityPolicy.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.SecurityPolicyStatusMessageName, - }, - key, &securityPolicy.Status, &r.ProviderResources.SecurityPolicyStatuses) + }) } delete(statusesToDelete.SecurityPolicyStatusKeys, key) } for _, envoyExtensionPolicy := range result.EnvoyExtensionPolicies { key := utils.NamespacedName(envoyExtensionPolicy) if !(reflect.ValueOf(envoyExtensionPolicy.Status).IsZero()) { - message.HandleStore(message.Metadata{ + r.ProviderResources.EnvoyExtensionPolicyStatuses.Store(key, &envoyExtensionPolicy.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.EnvoyExtensionPolicyStatusMessageName, - }, - key, &envoyExtensionPolicy.Status, &r.ProviderResources.EnvoyExtensionPolicyStatuses) + }) } delete(statusesToDelete.EnvoyExtensionPolicyStatusKeys, key) } for _, backend := range result.Backends { key := utils.NamespacedName(backend) if !(reflect.ValueOf(backend.Status).IsZero()) { - message.HandleStore(message.Metadata{ + r.ProviderResources.BackendStatuses.Store(key, &backend.Status) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.BackendStatusMessageName, - }, - key, &backend.Status, &r.ProviderResources.BackendStatuses) + }) } delete(statusesToDelete.BackendStatusKeys, key) } @@ -351,11 +351,11 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re } if !(reflect.ValueOf(extServerPolicy.Object["status"]).IsZero()) { policyStatus := unstructuredToPolicyStatus(extServerPolicy.Object["status"].(map[string]any)) - message.HandleStore(message.Metadata{ + r.ProviderResources.ExtensionPolicyStatuses.Store(key, &policyStatus) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.ExtensionServerPoliciesStatusMessageName, - }, - key, &policyStatus, &r.ProviderResources.ExtensionPolicyStatuses) + }) } delete(statusesToDelete.ExtensionServerPolicyStatusKeys, key) } diff --git a/internal/message/watchutil.go b/internal/message/watchutil.go index 32a9f61d3a..645bd7eb0b 100644 --- a/internal/message/watchutil.go +++ b/internal/message/watchutil.go @@ -28,6 +28,10 @@ type Metadata struct { Message MessageName } +func PublishMetric(meta Metadata) { + watchablePublishTotal.WithSuccess(meta.LabelValues()...).Increment() +} + func (m Metadata) LabelValues() []metrics.LabelValue { labels := make([]metrics.LabelValue, 0, 2) if m.Runner != "" { @@ -100,8 +104,3 @@ func HandleSubscription[K comparable, V any]( } } } - -func HandleStore[K comparable, V any](meta Metadata, key K, value V, publish *watchable.Map[K, V]) { - publish.Store(key, value) - watchablePublishTotal.WithSuccess(meta.LabelValues()...).Increment() -} diff --git a/internal/message/watchutil_test.go b/internal/message/watchutil_test.go index 109c023f43..6e6472d14f 100644 --- a/internal/message/watchutil_test.go +++ b/internal/message/watchutil_test.go @@ -91,39 +91,6 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) { assert.Equal(t, 1, deleteCalls) } -func TestHandleStore(t *testing.T) { - var m watchable.Map[string, any] - message.HandleStore(message.Metadata{Runner: "demo", Message: "demo"}, "foo", "bar", &m) - - endCtx, end := context.WithCancel(context.Background()) - go func() { - <-endCtx.Done() - message.HandleStore(message.Metadata{Runner: "demo", Message: "demo"}, "baz", "qux", &m) - m.Delete("qux") // no-op - message.HandleStore(message.Metadata{Runner: "demo", Message: "demo"}, "foo", "bar", &m) // no-op - m.Delete("baz") - time.Sleep(100 * time.Millisecond) - m.Close() - }() - - var storeCalls int - var deleteCalls int - message.HandleSubscription[string, any]( - message.Metadata{Runner: "demo", Message: "demo"}, - m.Subscribe(context.Background()), - func(update message.Update[string, any], errChans chan error) { - end() - if update.Delete { - deleteCalls++ - } else { - storeCalls++ - } - }, - ) - assert.Equal(t, 2, storeCalls) - assert.Equal(t, 1, deleteCalls) -} - func TestXdsIRUpdates(t *testing.T) { tests := []struct { desc string diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index bf6c1295ae..7db2b157c6 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -300,11 +300,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques false, string(gwapiv1.GatewayClassReasonInvalidParameters), msg) - message.HandleStore(message.Metadata{ + r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status) + message.PublishMetric(message.Metadata{ Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GatewayClassStatusMessageName, - }, - utils.NamespacedName(gc), &gc.Status, &r.resources.GatewayClassStatuses) + }) failToProcessGCParamsRef = true } } @@ -322,11 +322,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques false, string(gwapiv1.GatewayClassReasonAccepted), fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err)) - message.HandleStore(message.Metadata{ + r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status) + message.PublishMetric(message.Metadata{ Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GatewayClassStatusMessageName, - }, - utils.NamespacedName(gc), &gc.Status, &r.resources.GatewayClassStatuses) + }) failToProcessGCParamsRef = true } @@ -512,11 +512,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // The Store is triggered even when there are no Gateways associated to the // GatewayClass. This would happen in case the last Gateway is removed and the // Store will be required to trigger a cleanup of envoy infra resources. - message.HandleStore(message.Metadata{ + r.resources.GatewayAPIResources.Store(string(r.classController), &gwcResources) + message.PublishMetric(message.Metadata{ Runner: string(egv1a1.LogComponentProviderRunner), Message: message.ProviderResourcesMessageName, - }, - string(r.classController), &gwcResources, &r.resources.GatewayAPIResources) + }) r.log.Info("reconciled gateways successfully") return reconcile.Result{}, nil diff --git a/internal/xds/translator/runner/runner.go b/internal/xds/translator/runner/runner.go index c3e33c1969..7f9bb2b5de 100644 --- a/internal/xds/translator/runner/runner.go +++ b/internal/xds/translator/runner/runner.go @@ -131,11 +131,11 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *ir // Publish if err == nil { - message.HandleStore(message.Metadata{ + r.Xds.Store(key, result) + message.PublishMetric(message.Metadata{ Runner: r.Name(), Message: message.XDSMessageName, - }, - key, result, &r.Xds.Map) + }) } else { r.Logger.Error(err, "skipped publishing xds resources") }