diff --git a/internal/cmd/server.go b/internal/cmd/server.go index dad54df941..9be5707e56 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -54,7 +54,9 @@ func GetServerCommand(asyncErrHandler func(string, error)) *cobra.Command { RunE: func(cmd *cobra.Command, _ []string) error { runnerErrors := &message.RunnerErrors{} defer runnerErrors.Close() - go message.HandleSubscription(message.Metadata{Runner: "runner-errors", Message: message.RunnerErrorsMessageName}, + go message.HandleSubscription( + logging.NewLogger(cmd.OutOrStdout(), egv1a1.DefaultEnvoyGatewayLogging()), + message.Metadata{Runner: "runner-errors", Message: message.RunnerErrorsMessageName}, runnerErrors.Subscribe(cmd.Context()), func(update message.Update[string, message.WatchableError], _ chan error) { if asyncErrHandler != nil { diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index ad1e195f8d..77dcf79481 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -138,7 +138,9 @@ func (r *Runner) startWasmCache(ctx context.Context) { } func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *resource.ControllerResourcesContext]) { - message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.ProviderResourcesMessageName}, sub, + message.HandleSubscription( + r.Logger, + message.Metadata{Runner: r.Name(), Message: message.ProviderResourcesMessageName}, sub, func(update message.Update[string, *resource.ControllerResourcesContext], errChan chan error) { parentCtx := context.Background() if update.Value != nil && update.Value.Context != nil { diff --git a/internal/globalratelimit/runner/runner.go b/internal/globalratelimit/runner/runner.go index 3a0efad2f3..d684b59044 100644 --- a/internal/globalratelimit/runner/runner.go +++ b/internal/globalratelimit/runner/runner.go @@ -141,7 +141,9 @@ func (r *Runner) translateFromSubscription(ctx context.Context, c <-chan watchab // rateLimitConfigsCache is a cache of the rate limit config, which is keyed by the xdsIR key. rateLimitConfigsCache := map[string][]cachetype.Resource{} - message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, c, + message.HandleSubscription( + r.Logger, + message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, c, func(update message.Update[string, *message.XdsIRWithContext], errChan chan error) { parentCtx := ctx if update.Value != nil && update.Value.Context != nil { diff --git a/internal/infrastructure/runner/runner.go b/internal/infrastructure/runner/runner.go index 68437b4b22..fa557b682b 100644 --- a/internal/infrastructure/runner/runner.go +++ b/internal/infrastructure/runner/runner.go @@ -105,7 +105,9 @@ func (r *Runner) Start(ctx context.Context) (err error) { func (r *Runner) updateProxyInfraFromSubscription(ctx context.Context, sub <-chan watchable.Snapshot[string, *ir.Infra]) { // Subscribe to resources - message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.InfraIRMessageName}, sub, + message.HandleSubscription( + r.Logger, + message.Metadata{Runner: r.Name(), Message: message.InfraIRMessageName}, sub, func(update message.Update[string, *ir.Infra], errChan chan error) { r.Logger.Info("received an update", "key", update.Key, "delete", update.Delete) val := update.Value diff --git a/internal/message/watchutil.go b/internal/message/watchutil.go index 83ef831f67..ac0d6990b6 100644 --- a/internal/message/watchutil.go +++ b/internal/message/watchutil.go @@ -7,22 +7,17 @@ package message import ( "fmt" - "os" "runtime/debug" "time" "github.com/telepresenceio/watchable" - egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/envoyproxy/gateway/internal/logging" "github.com/envoyproxy/gateway/internal/metrics" ) type Update[K comparable, V any] watchable.Update[K, V] -// TODO: Remove the global logger and localize the scope of the logger. -var logger = logging.DefaultLogger(os.Stdout, egv1a1.LogLevelInfo).WithName("watchable") - type Metadata struct { Runner string Message MessageName @@ -47,14 +42,16 @@ func (m Metadata) LabelValues() []metrics.LabelValue { // handleWithCrashRecovery calls the provided handle function and gracefully recovers from any panics // that might occur when the handle function is called. func handleWithCrashRecovery[K comparable, V any]( + l logging.Logger, handle func(updateFunc Update[K, V], errChans chan error), update Update[K, V], meta Metadata, errChans chan error, ) { + logger := l.WithValues("runner", meta.Runner) defer func() { if r := recover(); r != nil { - logger.WithValues("runner", meta.Runner).Error(fmt.Errorf("%+v", r), "observed a panic", + logger.Error(fmt.Errorf("%+v", r), "observed a panic", "stackTrace", string(debug.Stack())) watchableSubscribeTotal.WithFailure(metrics.ReasonError, meta.LabelValues()...).Increment() panicCounter.WithFailure(metrics.ReasonError, meta.LabelValues()...).Increment() @@ -74,7 +71,7 @@ func handleWithCrashRecovery[K comparable, V any]( // This is better than simply iterating over snapshot.Updates because // it handles the case where the watchable.Map already contains // entries before .Subscribe is called. -func HandleSubscription[K comparable, V any]( +func HandleSubscription[K comparable, V any](l logging.Logger, meta Metadata, subscription <-chan watchable.Snapshot[K, V], handle func(updateFunc Update[K, V], errChans chan error), @@ -83,7 +80,7 @@ func HandleSubscription[K comparable, V any]( errChans := make(chan error, 10) go func() { for err := range errChans { - logger.WithValues("runner", meta.Runner).Error(err, "observed an error") + l.Error(err, "observed an error") watchableSubscribeTotal.WithFailure(metrics.ReasonError, meta.LabelValues()...).Increment() } }() @@ -91,7 +88,7 @@ func HandleSubscription[K comparable, V any]( if snapshot, ok := <-subscription; ok { for k, v := range snapshot.State { - handleWithCrashRecovery(handle, Update[K, V]{ + handleWithCrashRecovery(l, handle, Update[K, V]{ Key: k, Value: v, }, meta, errChans) @@ -100,8 +97,8 @@ func HandleSubscription[K comparable, V any]( for snapshot := range subscription { watchableDepth.With(meta.LabelValues()...).Record(float64(len(subscription))) - for _, update := range coalesceUpdates(meta.Runner, snapshot.Updates) { - handleWithCrashRecovery(handle, Update[K, V](update), meta, errChans) + for _, update := range coalesceUpdates(l, snapshot.Updates) { + handleWithCrashRecovery(l, handle, Update[K, V](update), meta, errChans) } } } @@ -109,7 +106,7 @@ func HandleSubscription[K comparable, V any]( // coalesceUpdates merges multiple updates for the same key into a single update, // preserving the latest state for each key. // This helps reduce redundant processing and ensures that only the most recent update per key is handled. -func coalesceUpdates[K comparable, V any](runner string, updates []watchable.Update[K, V]) []watchable.Update[K, V] { +func coalesceUpdates[K comparable, V any](logger logging.Logger, updates []watchable.Update[K, V]) []watchable.Update[K, V] { if len(updates) <= 1 { return updates } @@ -129,7 +126,7 @@ func coalesceUpdates[K comparable, V any](runner string, updates []watchable.Upd result := updates[write+1:] if len(result) != len(updates) { - logger.WithValues("runner", runner).Info( + logger.Info( "coalesced updates", "count", len(result), "before", len(updates), diff --git a/internal/message/watchutil_internal_test.go b/internal/message/watchutil_internal_test.go index 8bf56ab2e2..445457c1a5 100644 --- a/internal/message/watchutil_internal_test.go +++ b/internal/message/watchutil_internal_test.go @@ -6,15 +6,19 @@ package message import ( + "os" "testing" "github.com/stretchr/testify/require" "github.com/telepresenceio/watchable" + + egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/envoyproxy/gateway/internal/logging" ) func TestCoalesceUpdates(t *testing.T) { t.Parallel() - + logger := logging.NewLogger(os.Stdout, egv1a1.DefaultEnvoyGatewayLogging()) tests := []struct { name string input []watchable.Update[string, int] @@ -61,7 +65,7 @@ func TestCoalesceUpdates(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - actual := coalesceUpdates("test-runner", tc.input) + actual := coalesceUpdates(logger, tc.input) require.Equal(t, tc.expected, actual) }) } diff --git a/internal/message/watchutil_test.go b/internal/message/watchutil_test.go index f08f987c9d..2c4aa107b3 100644 --- a/internal/message/watchutil_test.go +++ b/internal/message/watchutil_test.go @@ -15,7 +15,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" + egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/envoyproxy/gateway/internal/gatewayapi/resource" + "github.com/envoyproxy/gateway/internal/logging" "github.com/envoyproxy/gateway/internal/message" ) @@ -25,6 +27,7 @@ func TestHandleSubscriptionAlreadyClosed(t *testing.T) { var calls int message.HandleSubscription( + logging.NewLogger(t.Output(), egv1a1.DefaultEnvoyGatewayLogging()), message.Metadata{Runner: "demo", Message: "demo"}, ch, func(_ message.Update[string, any], _ chan error) { calls++ }, @@ -50,6 +53,7 @@ func TestPanicInSubscriptionHandler(t *testing.T) { numCalls := 0 message.HandleSubscription( + logging.NewLogger(t.Output(), egv1a1.DefaultEnvoyGatewayLogging()), message.Metadata{Runner: "demo", Message: "demo"}, m.Subscribe(context.Background()), func(update message.Update[string, any], _ chan error) { @@ -78,6 +82,7 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) { var storeCalls int var deleteCalls int message.HandleSubscription( + logging.NewLogger(t.Output(), egv1a1.DefaultEnvoyGatewayLogging()), message.Metadata{Runner: "demo", Message: "demo"}, m.Subscribe(context.Background()), func(update message.Update[string, any], _ chan error) { @@ -246,7 +251,7 @@ func TestControllerResourceUpdate(t *testing.T) { m := &message.ProviderResources{} snapshotC := m.GatewayAPIResources.Subscribe(ctx) - endCtx, end := context.WithCancel(ctx) + endCtx, cancel := context.WithCancel(ctx) m.GatewayAPIResources.Store("start", &resource.ControllerResourcesContext{ Resources: &resource.ControllerResources{}, Context: ctx, @@ -268,15 +273,17 @@ func TestControllerResourceUpdate(t *testing.T) { }() updates := 0 - message.HandleSubscription(message.Metadata{Runner: "demo", Message: "demo"}, snapshotC, func(u message.Update[string, *resource.ControllerResourcesContext], _ chan error) { - end() - if u.Key == "test" { - updates += 1 - } - if u.Key == "end" { - m.GatewayAPIResources.Close() - } - }) + message.HandleSubscription( + logging.NewLogger(t.Output(), egv1a1.DefaultEnvoyGatewayLogging()), + message.Metadata{Runner: "demo", Message: "demo"}, snapshotC, func(u message.Update[string, *resource.ControllerResourcesContext], _ chan error) { + cancel() + if u.Key == "test" { + updates += 1 + } + if u.Key == "end" { + m.GatewayAPIResources.Close() + } + }) if tc.updates > 1 { require.LessOrEqual(t, updates, tc.updates) // Updates can be coalesced } else { diff --git a/internal/provider/kubernetes/status.go b/internal/provider/kubernetes/status.go index 457d9408ac..31603e7700 100644 --- a/internal/provider/kubernetes/status.go +++ b/internal/provider/kubernetes/status.go @@ -29,7 +29,7 @@ import ( func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context, extensionManagerEnabled bool) { // GatewayClass object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GatewayClassStatusMessageName}, r.subscriptions.gatewayClassStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], _ chan error) { @@ -64,7 +64,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // Gateway object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GatewayStatusMessageName}, r.subscriptions.gatewayStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.GatewayStatus], errChan chan error) { @@ -89,7 +89,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // HTTPRoute object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.HTTPRouteStatusMessageName}, r.subscriptions.httpRouteStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.HTTPRouteStatus], errChan chan error) { @@ -131,7 +131,9 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // GRPCRoute object status updater go func() { - message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GRPCRouteStatusMessageName}, r.subscriptions.grpcRouteStatuses, + message.HandleSubscription(r.log, + message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GRPCRouteStatusMessageName}, + r.subscriptions.grpcRouteStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.GRPCRouteStatus], errChan chan error) { // skip delete updates. if update.Delete { @@ -171,7 +173,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // TLSRoute object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.TLSRouteStatusMessageName}, r.subscriptions.tlsRouteStatuses, func(update message.Update[types.NamespacedName, *gwapiv1a2.TLSRouteStatus], errChan chan error) { @@ -213,7 +215,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // TCPRoute object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.TCPRouteStatusMessageName}, r.subscriptions.tcpRouteStatuses, func(update message.Update[types.NamespacedName, *gwapiv1a2.TCPRouteStatus], errChan chan error) { @@ -255,7 +257,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // UDPRoute object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.UDPRouteStatusMessageName}, r.subscriptions.udpRouteStatuses, func(update message.Update[types.NamespacedName, *gwapiv1a2.UDPRouteStatus], errChan chan error) { @@ -297,7 +299,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // XListenerSet object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.XListenerSetStatusMessageName}, r.subscriptions.xListenerSetStatuses, func(update message.Update[types.NamespacedName, *gwapixv1a1.ListenerSetStatus], errChan chan error) { @@ -334,7 +336,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // EnvoyPatchPolicy object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.EnvoyPatchPolicyStatusMessageName}, r.subscriptions.envoyPatchPolicyStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) { @@ -372,7 +374,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // ClientTrafficPolicy object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.ClientTrafficPolicyStatusMessageName}, r.subscriptions.clientTrafficPolicyStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) { @@ -410,7 +412,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // BackendTrafficPolicy object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.BackendTrafficPolicyStatusMessageName}, r.subscriptions.backendTrafficPolicyStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) { @@ -448,7 +450,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // SecurityPolicy object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.SecurityPolicyStatusMessageName}, r.subscriptions.securityPolicyStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) { @@ -486,7 +488,12 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // BackendTLSPolicy object status updater go func() { - message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.BackendTLSPolicyStatusMessageName}, r.subscriptions.backendTLSPolicyStatuses, + message.HandleSubscription(r.log, + message.Metadata{ + Runner: string(egv1a1.LogComponentProviderRunner), + Message: message.BackendTLSPolicyStatusMessageName, + }, + r.subscriptions.backendTLSPolicyStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) { // skip delete updates. if update.Delete { @@ -522,7 +529,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // EnvoyExtensionPolicy object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.EnvoyExtensionPolicyStatusMessageName}, r.subscriptions.envoyExtensionPolicyStatuses, func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) { @@ -560,7 +567,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context // Backend object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.BackendStatusMessageName}, r.subscriptions.backendStatuses, func(update message.Update[types.NamespacedName, *egv1a1.BackendStatus], errChan chan error) { @@ -599,7 +606,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context if extensionManagerEnabled { // ExtensionServerPolicy object status updater go func() { - message.HandleSubscription( + message.HandleSubscription(r.log, message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.ExtensionServerPoliciesStatusMessageName}, r.subscriptions.extensionPolicyStatuses, func(update message.Update[message.NamespacedNameAndGVK, *gwapiv1.PolicyStatus], errChan chan error) { diff --git a/internal/xds/runner/runner.go b/internal/xds/runner/runner.go index 4b5c635b84..4481c3590c 100644 --- a/internal/xds/runner/runner.go +++ b/internal/xds/runner/runner.go @@ -259,7 +259,9 @@ func registerServer(srv serverv3.Server, g *grpc.Server) { func (r *Runner) translateFromSubscription(sub <-chan watchable.Snapshot[string, *message.XdsIRWithContext]) { // Subscribe to resources - message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, sub, + message.HandleSubscription( + r.Logger, + message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, sub, func(update message.Update[string, *message.XdsIRWithContext], errChan chan error) { parentCtx := context.Background() if update.Value != nil && update.Value.Context != nil { diff --git a/release-notes/current.yaml b/release-notes/current.yaml index 792dc71653..79231ac0a6 100644 --- a/release-notes/current.yaml +++ b/release-notes/current.yaml @@ -11,6 +11,7 @@ new features: | bug fixes: | Allowed single-label backend hostnames when running with the Host infrastructure, enabling Docker Compose service names for telemetry backends. + Fixed an issue that message package didn't adopt logging level. # Enhancements that improve performance. performance improvements: |