Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions internal/gatewayapi/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re
r.Logger.Error(err, "unable to validate infra ir, skipped sending it")
errChan <- err
} else {
message.HandleStore(message.Metadata{
r.InfraIR.Store(key, val)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.InfraIRMessageName,
},
key, val, &r.InfraIR.Map)
})
newIRKeys = append(newIRKeys, key)
}
}
Expand All @@ -209,67 +209,67 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re
r.Logger.Error(err, "unable to validate xds ir, skipped sending it")
errChan <- err
} else {
message.HandleStore(message.Metadata{
r.XdsIR.Store(key, val)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.XDSIRMessageName,
},
key, val, &r.XdsIR.Map)
})
}
}

// Update Status
for _, gateway := range result.Gateways {
key := utils.NamespacedName(gateway)
message.HandleStore(message.Metadata{
r.ProviderResources.GatewayStatuses.Store(key, &gateway.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.GatewayStatusMessageName,
},
key, &gateway.Status, &r.ProviderResources.GatewayStatuses)
})
delete(statusesToDelete.GatewayStatusKeys, key)
}
for _, httpRoute := range result.HTTPRoutes {
key := utils.NamespacedName(httpRoute)
message.HandleStore(message.Metadata{
r.ProviderResources.HTTPRouteStatuses.Store(key, &httpRoute.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.HTTPRouteStatusMessageName,
},
key, &httpRoute.Status, &r.ProviderResources.HTTPRouteStatuses)
})
delete(statusesToDelete.HTTPRouteStatusKeys, key)
}
for _, grpcRoute := range result.GRPCRoutes {
key := utils.NamespacedName(grpcRoute)
message.HandleStore(message.Metadata{
r.ProviderResources.GRPCRouteStatuses.Store(key, &grpcRoute.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.GRPCRouteStatusMessageName,
},
key, &grpcRoute.Status, &r.ProviderResources.GRPCRouteStatuses)
})
delete(statusesToDelete.GRPCRouteStatusKeys, key)
}
for _, tlsRoute := range result.TLSRoutes {
key := utils.NamespacedName(tlsRoute)
message.HandleStore(message.Metadata{
r.ProviderResources.TLSRouteStatuses.Store(key, &tlsRoute.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.TLSRouteStatusMessageName,
},
key, &tlsRoute.Status, &r.ProviderResources.TLSRouteStatuses)
})
delete(statusesToDelete.TLSRouteStatusKeys, key)
}
for _, tcpRoute := range result.TCPRoutes {
key := utils.NamespacedName(tcpRoute)
message.HandleStore(message.Metadata{
r.ProviderResources.TCPRouteStatuses.Store(key, &tcpRoute.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.TCPRouteStatusMessageName,
},
key, &tcpRoute.Status, &r.ProviderResources.TCPRouteStatuses)
})
delete(statusesToDelete.TCPRouteStatusKeys, key)
}
for _, udpRoute := range result.UDPRoutes {
key := utils.NamespacedName(udpRoute)
message.HandleStore(message.Metadata{
r.ProviderResources.UDPRouteStatuses.Store(key, &udpRoute.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.UDPRouteStatusMessageName,
},
key, &udpRoute.Status, &r.ProviderResources.UDPRouteStatuses)
})
delete(statusesToDelete.UDPRouteStatusKeys, key)
}

Expand All @@ -280,67 +280,67 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re
for _, backendTLSPolicy := range result.BackendTLSPolicies {
key := utils.NamespacedName(backendTLSPolicy)
if !(reflect.ValueOf(backendTLSPolicy.Status).IsZero()) {
message.HandleStore(message.Metadata{
r.ProviderResources.BackendTLSPolicyStatuses.Store(key, &backendTLSPolicy.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.BackendTLSPolicyStatusMessageName,
},
key, &backendTLSPolicy.Status, &r.ProviderResources.BackendTLSPolicyStatuses)
})
}
delete(statusesToDelete.BackendTLSPolicyStatusKeys, key)
}

for _, clientTrafficPolicy := range result.ClientTrafficPolicies {
key := utils.NamespacedName(clientTrafficPolicy)
if !(reflect.ValueOf(clientTrafficPolicy.Status).IsZero()) {
message.HandleStore(message.Metadata{
r.ProviderResources.ClientTrafficPolicyStatuses.Store(key, &clientTrafficPolicy.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.ClientTrafficPolicyStatusMessageName,
},
key, &clientTrafficPolicy.Status, &r.ProviderResources.ClientTrafficPolicyStatuses)
})
}
delete(statusesToDelete.ClientTrafficPolicyStatusKeys, key)
}
for _, backendTrafficPolicy := range result.BackendTrafficPolicies {
key := utils.NamespacedName(backendTrafficPolicy)
if !(reflect.ValueOf(backendTrafficPolicy.Status).IsZero()) {
message.HandleStore(message.Metadata{
r.ProviderResources.BackendTrafficPolicyStatuses.Store(key, &backendTrafficPolicy.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.BackendTrafficPolicyStatusMessageName,
},
key, &backendTrafficPolicy.Status, &r.ProviderResources.BackendTrafficPolicyStatuses)
})
}
delete(statusesToDelete.BackendTrafficPolicyStatusKeys, key)
}
for _, securityPolicy := range result.SecurityPolicies {
key := utils.NamespacedName(securityPolicy)
if !(reflect.ValueOf(securityPolicy.Status).IsZero()) {
message.HandleStore(message.Metadata{
r.ProviderResources.SecurityPolicyStatuses.Store(key, &securityPolicy.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.SecurityPolicyStatusMessageName,
},
key, &securityPolicy.Status, &r.ProviderResources.SecurityPolicyStatuses)
})
}
delete(statusesToDelete.SecurityPolicyStatusKeys, key)
}
for _, envoyExtensionPolicy := range result.EnvoyExtensionPolicies {
key := utils.NamespacedName(envoyExtensionPolicy)
if !(reflect.ValueOf(envoyExtensionPolicy.Status).IsZero()) {
message.HandleStore(message.Metadata{
r.ProviderResources.EnvoyExtensionPolicyStatuses.Store(key, &envoyExtensionPolicy.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.EnvoyExtensionPolicyStatusMessageName,
},
key, &envoyExtensionPolicy.Status, &r.ProviderResources.EnvoyExtensionPolicyStatuses)
})
}
delete(statusesToDelete.EnvoyExtensionPolicyStatusKeys, key)
}
for _, backend := range result.Backends {
key := utils.NamespacedName(backend)
if !(reflect.ValueOf(backend.Status).IsZero()) {
message.HandleStore(message.Metadata{
r.ProviderResources.BackendStatuses.Store(key, &backend.Status)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.BackendStatusMessageName,
},
key, &backend.Status, &r.ProviderResources.BackendStatuses)
})
}
delete(statusesToDelete.BackendStatusKeys, key)
}
Expand All @@ -351,11 +351,11 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re
}
if !(reflect.ValueOf(extServerPolicy.Object["status"]).IsZero()) {
policyStatus := unstructuredToPolicyStatus(extServerPolicy.Object["status"].(map[string]any))
message.HandleStore(message.Metadata{
r.ProviderResources.ExtensionPolicyStatuses.Store(key, &policyStatus)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.ExtensionServerPoliciesStatusMessageName,
},
key, &policyStatus, &r.ProviderResources.ExtensionPolicyStatuses)
})
}
delete(statusesToDelete.ExtensionServerPolicyStatusKeys, key)
}
Expand Down
9 changes: 4 additions & 5 deletions internal/message/watchutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type Metadata struct {
Message MessageName
}

func PublishMetric(meta Metadata) {
watchablePublishTotal.WithSuccess(meta.LabelValues()...).Increment()
}

func (m Metadata) LabelValues() []metrics.LabelValue {
labels := make([]metrics.LabelValue, 0, 2)
if m.Runner != "" {
Expand Down Expand Up @@ -100,8 +104,3 @@ func HandleSubscription[K comparable, V any](
}
}
}

func HandleStore[K comparable, V any](meta Metadata, key K, value V, publish *watchable.Map[K, V]) {
publish.Store(key, value)
watchablePublishTotal.WithSuccess(meta.LabelValues()...).Increment()
}
33 changes: 0 additions & 33 deletions internal/message/watchutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,39 +91,6 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) {
assert.Equal(t, 1, deleteCalls)
}

func TestHandleStore(t *testing.T) {
var m watchable.Map[string, any]
message.HandleStore(message.Metadata{Runner: "demo", Message: "demo"}, "foo", "bar", &m)

endCtx, end := context.WithCancel(context.Background())
go func() {
<-endCtx.Done()
message.HandleStore(message.Metadata{Runner: "demo", Message: "demo"}, "baz", "qux", &m)
m.Delete("qux") // no-op
message.HandleStore(message.Metadata{Runner: "demo", Message: "demo"}, "foo", "bar", &m) // no-op
m.Delete("baz")
time.Sleep(100 * time.Millisecond)
m.Close()
}()

var storeCalls int
var deleteCalls int
message.HandleSubscription[string, any](
message.Metadata{Runner: "demo", Message: "demo"},
m.Subscribe(context.Background()),
func(update message.Update[string, any], errChans chan error) {
end()
if update.Delete {
deleteCalls++
} else {
storeCalls++
}
},
)
assert.Equal(t, 2, storeCalls)
assert.Equal(t, 1, deleteCalls)
}

func TestXdsIRUpdates(t *testing.T) {
tests := []struct {
desc string
Expand Down
18 changes: 9 additions & 9 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques
false,
string(gwapiv1.GatewayClassReasonInvalidParameters),
msg)
message.HandleStore(message.Metadata{
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)
message.PublishMetric(message.Metadata{
Runner: string(egv1a1.LogComponentProviderRunner),
Message: message.GatewayClassStatusMessageName,
},
utils.NamespacedName(gc), &gc.Status, &r.resources.GatewayClassStatuses)
})
failToProcessGCParamsRef = true
}
}
Expand All @@ -322,11 +322,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques
false,
string(gwapiv1.GatewayClassReasonAccepted),
fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err))
message.HandleStore(message.Metadata{
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)
message.PublishMetric(message.Metadata{
Runner: string(egv1a1.LogComponentProviderRunner),
Message: message.GatewayClassStatusMessageName,
},
utils.NamespacedName(gc), &gc.Status, &r.resources.GatewayClassStatuses)
})
failToProcessGCParamsRef = true
}

Expand Down Expand Up @@ -512,11 +512,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques
// The Store is triggered even when there are no Gateways associated to the
// GatewayClass. This would happen in case the last Gateway is removed and the
// Store will be required to trigger a cleanup of envoy infra resources.
message.HandleStore(message.Metadata{
r.resources.GatewayAPIResources.Store(string(r.classController), &gwcResources)
message.PublishMetric(message.Metadata{
Runner: string(egv1a1.LogComponentProviderRunner),
Message: message.ProviderResourcesMessageName,
},
string(r.classController), &gwcResources, &r.resources.GatewayAPIResources)
})

r.log.Info("reconciled gateways successfully")
return reconcile.Result{}, nil
Expand Down
6 changes: 3 additions & 3 deletions internal/xds/translator/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *ir

// Publish
if err == nil {
message.HandleStore(message.Metadata{
r.Xds.Store(key, result)
message.PublishMetric(message.Metadata{
Runner: r.Name(),
Message: message.XDSMessageName,
},
key, result, &r.Xds.Map)
})
} else {
r.Logger.Error(err, "skipped publishing xds resources")
}
Expand Down
Loading