Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func GetServerCommand(asyncErrHandler func(string, error)) *cobra.Command {
RunE: func(cmd *cobra.Command, _ []string) error {
runnerErrors := &message.RunnerErrors{}
defer runnerErrors.Close()
go message.HandleSubscription(message.Metadata{Runner: "runner-errors", Message: message.RunnerErrorsMessageName},
go message.HandleSubscription(
logging.NewLogger(cmd.OutOrStdout(), egv1a1.DefaultEnvoyGatewayLogging()),
message.Metadata{Runner: "runner-errors", Message: message.RunnerErrorsMessageName},
runnerErrors.Subscribe(cmd.Context()),
func(update message.Update[string, message.WatchableError], _ chan error) {
if asyncErrHandler != nil {
Expand Down
4 changes: 3 additions & 1 deletion internal/gatewayapi/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func (r *Runner) startWasmCache(ctx context.Context) {
}

func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *resource.ControllerResourcesContext]) {
message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.ProviderResourcesMessageName}, sub,
message.HandleSubscription(
r.Logger,
message.Metadata{Runner: r.Name(), Message: message.ProviderResourcesMessageName}, sub,
func(update message.Update[string, *resource.ControllerResourcesContext], errChan chan error) {
parentCtx := context.Background()
if update.Value != nil && update.Value.Context != nil {
Expand Down
4 changes: 3 additions & 1 deletion internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ func (r *Runner) translateFromSubscription(ctx context.Context, c <-chan watchab
// rateLimitConfigsCache is a cache of the rate limit config, which is keyed by the xdsIR key.
rateLimitConfigsCache := map[string][]cachetype.Resource{}

message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, c,
message.HandleSubscription(
r.Logger,
message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, c,
func(update message.Update[string, *message.XdsIRWithContext], errChan chan error) {
parentCtx := ctx
if update.Value != nil && update.Value.Context != nil {
Expand Down
4 changes: 3 additions & 1 deletion internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func (r *Runner) Start(ctx context.Context) (err error) {

func (r *Runner) updateProxyInfraFromSubscription(ctx context.Context, sub <-chan watchable.Snapshot[string, *ir.Infra]) {
// Subscribe to resources
message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.InfraIRMessageName}, sub,
message.HandleSubscription(
r.Logger,
message.Metadata{Runner: r.Name(), Message: message.InfraIRMessageName}, sub,
func(update message.Update[string, *ir.Infra], errChan chan error) {
r.Logger.Info("received an update", "key", update.Key, "delete", update.Delete)
val := update.Value
Expand Down
23 changes: 10 additions & 13 deletions internal/message/watchutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,17 @@ package message

import (
"fmt"
"os"
"runtime/debug"
"time"

"github.com/telepresenceio/watchable"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/metrics"
)

type Update[K comparable, V any] watchable.Update[K, V]

// TODO: Remove the global logger and localize the scope of the logger.
var logger = logging.DefaultLogger(os.Stdout, egv1a1.LogLevelInfo).WithName("watchable")

type Metadata struct {
Runner string
Message MessageName
Expand All @@ -47,14 +42,16 @@ func (m Metadata) LabelValues() []metrics.LabelValue {
// handleWithCrashRecovery calls the provided handle function and gracefully recovers from any panics
// that might occur when the handle function is called.
func handleWithCrashRecovery[K comparable, V any](
l logging.Logger,
handle func(updateFunc Update[K, V], errChans chan error),
update Update[K, V],
meta Metadata,
errChans chan error,
) {
logger := l.WithValues("runner", meta.Runner)
defer func() {
if r := recover(); r != nil {
logger.WithValues("runner", meta.Runner).Error(fmt.Errorf("%+v", r), "observed a panic",
logger.Error(fmt.Errorf("%+v", r), "observed a panic",
"stackTrace", string(debug.Stack()))
watchableSubscribeTotal.WithFailure(metrics.ReasonError, meta.LabelValues()...).Increment()
panicCounter.WithFailure(metrics.ReasonError, meta.LabelValues()...).Increment()
Expand All @@ -74,7 +71,7 @@ func handleWithCrashRecovery[K comparable, V any](
// This is better than simply iterating over snapshot.Updates because
// it handles the case where the watchable.Map already contains
// entries before .Subscribe is called.
func HandleSubscription[K comparable, V any](
func HandleSubscription[K comparable, V any](l logging.Logger,
meta Metadata,
subscription <-chan watchable.Snapshot[K, V],
handle func(updateFunc Update[K, V], errChans chan error),
Expand All @@ -83,15 +80,15 @@ func HandleSubscription[K comparable, V any](
errChans := make(chan error, 10)
go func() {
for err := range errChans {
logger.WithValues("runner", meta.Runner).Error(err, "observed an error")
l.Error(err, "observed an error")
watchableSubscribeTotal.WithFailure(metrics.ReasonError, meta.LabelValues()...).Increment()
}
}()
defer close(errChans)

if snapshot, ok := <-subscription; ok {
for k, v := range snapshot.State {
handleWithCrashRecovery(handle, Update[K, V]{
handleWithCrashRecovery(l, handle, Update[K, V]{
Key: k,
Value: v,
}, meta, errChans)
Expand All @@ -100,16 +97,16 @@ func HandleSubscription[K comparable, V any](
for snapshot := range subscription {
watchableDepth.With(meta.LabelValues()...).Record(float64(len(subscription)))

for _, update := range coalesceUpdates(meta.Runner, snapshot.Updates) {
handleWithCrashRecovery(handle, Update[K, V](update), meta, errChans)
for _, update := range coalesceUpdates(l, snapshot.Updates) {
handleWithCrashRecovery(l, handle, Update[K, V](update), meta, errChans)
}
}
}

// coalesceUpdates merges multiple updates for the same key into a single update,
// preserving the latest state for each key.
// This helps reduce redundant processing and ensures that only the most recent update per key is handled.
func coalesceUpdates[K comparable, V any](runner string, updates []watchable.Update[K, V]) []watchable.Update[K, V] {
func coalesceUpdates[K comparable, V any](logger logging.Logger, updates []watchable.Update[K, V]) []watchable.Update[K, V] {
if len(updates) <= 1 {
return updates
}
Expand All @@ -129,7 +126,7 @@ func coalesceUpdates[K comparable, V any](runner string, updates []watchable.Upd

result := updates[write+1:]
if len(result) != len(updates) {
logger.WithValues("runner", runner).Info(
logger.Info(
"coalesced updates",
"count", len(result),
"before", len(updates),
Expand Down
8 changes: 6 additions & 2 deletions internal/message/watchutil_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
package message

import (
"os"
"testing"

"github.com/stretchr/testify/require"
"github.com/telepresenceio/watchable"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/logging"
)

func TestCoalesceUpdates(t *testing.T) {
t.Parallel()

logger := logging.NewLogger(os.Stdout, egv1a1.DefaultEnvoyGatewayLogging())
tests := []struct {
name string
input []watchable.Update[string, int]
Expand Down Expand Up @@ -61,7 +65,7 @@ func TestCoalesceUpdates(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

actual := coalesceUpdates("test-runner", tc.input)
actual := coalesceUpdates(logger, tc.input)
require.Equal(t, tc.expected, actual)
})
}
Expand Down
27 changes: 17 additions & 10 deletions internal/message/watchutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/gatewayapi/resource"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
)

Expand All @@ -25,6 +27,7 @@ func TestHandleSubscriptionAlreadyClosed(t *testing.T) {

var calls int
message.HandleSubscription(
logging.NewLogger(t.Output(), egv1a1.DefaultEnvoyGatewayLogging()),
message.Metadata{Runner: "demo", Message: "demo"},
ch,
func(_ message.Update[string, any], _ chan error) { calls++ },
Expand All @@ -50,6 +53,7 @@ func TestPanicInSubscriptionHandler(t *testing.T) {

numCalls := 0
message.HandleSubscription(
logging.NewLogger(t.Output(), egv1a1.DefaultEnvoyGatewayLogging()),
message.Metadata{Runner: "demo", Message: "demo"},
m.Subscribe(context.Background()),
func(update message.Update[string, any], _ chan error) {
Expand Down Expand Up @@ -78,6 +82,7 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) {
var storeCalls int
var deleteCalls int
message.HandleSubscription(
logging.NewLogger(t.Output(), egv1a1.DefaultEnvoyGatewayLogging()),
message.Metadata{Runner: "demo", Message: "demo"},
m.Subscribe(context.Background()),
func(update message.Update[string, any], _ chan error) {
Expand Down Expand Up @@ -246,7 +251,7 @@ func TestControllerResourceUpdate(t *testing.T) {
m := &message.ProviderResources{}

snapshotC := m.GatewayAPIResources.Subscribe(ctx)
endCtx, end := context.WithCancel(ctx)
endCtx, cancel := context.WithCancel(ctx)
m.GatewayAPIResources.Store("start", &resource.ControllerResourcesContext{
Resources: &resource.ControllerResources{},
Context: ctx,
Expand All @@ -268,15 +273,17 @@ func TestControllerResourceUpdate(t *testing.T) {
}()

updates := 0
message.HandleSubscription(message.Metadata{Runner: "demo", Message: "demo"}, snapshotC, func(u message.Update[string, *resource.ControllerResourcesContext], _ chan error) {
end()
if u.Key == "test" {
updates += 1
}
if u.Key == "end" {
m.GatewayAPIResources.Close()
}
})
message.HandleSubscription(
logging.NewLogger(t.Output(), egv1a1.DefaultEnvoyGatewayLogging()),
message.Metadata{Runner: "demo", Message: "demo"}, snapshotC, func(u message.Update[string, *resource.ControllerResourcesContext], _ chan error) {
cancel()
if u.Key == "test" {
updates += 1
}
if u.Key == "end" {
m.GatewayAPIResources.Close()
}
})
if tc.updates > 1 {
require.LessOrEqual(t, updates, tc.updates) // Updates can be coalesced
} else {
Expand Down
39 changes: 23 additions & 16 deletions internal/provider/kubernetes/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context, extensionManagerEnabled bool) {
// GatewayClass object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GatewayClassStatusMessageName},
r.subscriptions.gatewayClassStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], _ chan error) {
Expand Down Expand Up @@ -64,7 +64,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// Gateway object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GatewayStatusMessageName},
r.subscriptions.gatewayStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.GatewayStatus], errChan chan error) {
Expand All @@ -89,7 +89,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// HTTPRoute object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.HTTPRouteStatusMessageName},
r.subscriptions.httpRouteStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.HTTPRouteStatus], errChan chan error) {
Expand Down Expand Up @@ -131,7 +131,9 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// GRPCRoute object status updater
go func() {
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GRPCRouteStatusMessageName}, r.subscriptions.grpcRouteStatuses,
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.GRPCRouteStatusMessageName},
r.subscriptions.grpcRouteStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.GRPCRouteStatus], errChan chan error) {
// skip delete updates.
if update.Delete {
Expand Down Expand Up @@ -171,7 +173,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// TLSRoute object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.TLSRouteStatusMessageName},
r.subscriptions.tlsRouteStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1a2.TLSRouteStatus], errChan chan error) {
Expand Down Expand Up @@ -213,7 +215,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// TCPRoute object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.TCPRouteStatusMessageName},
r.subscriptions.tcpRouteStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1a2.TCPRouteStatus], errChan chan error) {
Expand Down Expand Up @@ -255,7 +257,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// UDPRoute object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.UDPRouteStatusMessageName},
r.subscriptions.udpRouteStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1a2.UDPRouteStatus], errChan chan error) {
Expand Down Expand Up @@ -297,7 +299,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// XListenerSet object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.XListenerSetStatusMessageName},
r.subscriptions.xListenerSetStatuses,
func(update message.Update[types.NamespacedName, *gwapixv1a1.ListenerSetStatus], errChan chan error) {
Expand Down Expand Up @@ -334,7 +336,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// EnvoyPatchPolicy object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.EnvoyPatchPolicyStatusMessageName},
r.subscriptions.envoyPatchPolicyStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) {
Expand Down Expand Up @@ -372,7 +374,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// ClientTrafficPolicy object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.ClientTrafficPolicyStatusMessageName},
r.subscriptions.clientTrafficPolicyStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) {
Expand Down Expand Up @@ -410,7 +412,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// BackendTrafficPolicy object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.BackendTrafficPolicyStatusMessageName},
r.subscriptions.backendTrafficPolicyStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) {
Expand Down Expand Up @@ -448,7 +450,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// SecurityPolicy object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.SecurityPolicyStatusMessageName},
r.subscriptions.securityPolicyStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) {
Expand Down Expand Up @@ -486,7 +488,12 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// BackendTLSPolicy object status updater
go func() {
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.BackendTLSPolicyStatusMessageName}, r.subscriptions.backendTLSPolicyStatuses,
message.HandleSubscription(r.log,
message.Metadata{
Runner: string(egv1a1.LogComponentProviderRunner),
Message: message.BackendTLSPolicyStatusMessageName,
},
r.subscriptions.backendTLSPolicyStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) {
// skip delete updates.
if update.Delete {
Expand Down Expand Up @@ -522,7 +529,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// EnvoyExtensionPolicy object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.EnvoyExtensionPolicyStatusMessageName},
r.subscriptions.envoyExtensionPolicyStatuses,
func(update message.Update[types.NamespacedName, *gwapiv1.PolicyStatus], errChan chan error) {
Expand Down Expand Up @@ -560,7 +567,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context

// Backend object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.BackendStatusMessageName},
r.subscriptions.backendStatuses,
func(update message.Update[types.NamespacedName, *egv1a1.BackendStatus], errChan chan error) {
Expand Down Expand Up @@ -599,7 +606,7 @@ func (r *gatewayAPIReconciler) updateStatusFromSubscriptions(ctx context.Context
if extensionManagerEnabled {
// ExtensionServerPolicy object status updater
go func() {
message.HandleSubscription(
message.HandleSubscription(r.log,
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: message.ExtensionServerPoliciesStatusMessageName},
r.subscriptions.extensionPolicyStatuses,
func(update message.Update[message.NamespacedNameAndGVK, *gwapiv1.PolicyStatus], errChan chan error) {
Expand Down
Loading