Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 5 additions & 32 deletions internal/gatewayapi/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type Config struct {

type Runner struct {
Config
xdsIRReady bool
}

func New(cfg *Config) *Runner {
Expand Down Expand Up @@ -54,15 +53,10 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Receive subscribed resource notifications
select {
case <-gatewayClassesCh:
r.waitUntilGCAndGatewaysInitialized()
case <-gatewaysCh:
r.waitUntilGCAndGatewaysInitialized()
case <-httpRoutesCh:
r.waitUntilAllGAPIInitialized()
case <-servicesCh:
r.waitUntilAllGAPIInitialized()
case <-namespacesCh:
r.waitUntilGCAndGatewaysInitialized()
}
r.Logger.Info("received a notification")
// Load all resources required for translation
Expand Down Expand Up @@ -104,15 +98,11 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
r.InfraIR.Store(key, val)
}
}
// Wait until all HTTPRoutes have been reconciled , else the translation
// result will be incomplete, and might cause churn in the data plane.
if r.xdsIRReady {
for key, val := range result.XdsIR {
if err := val.Validate(); err != nil {
r.Logger.Error(err, "unable to validate xds ir, skipped sending it")
} else {
r.XdsIR.Store(key, val)
}
for key, val := range result.XdsIR {
if err := val.Validate(); err != nil {
r.Logger.Error(err, "unable to validate xds ir, skipped sending it")
} else {
r.XdsIR.Store(key, val)
}
}

Expand All @@ -129,20 +119,3 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
}
r.Logger.Info("shutting down")
}

// waitUntilGCAndGatewaysInitialized waits until gateway classes and
// gateways have been initialized during startup
func (r *Runner) waitUntilGCAndGatewaysInitialized() {
r.ProviderResources.GatewayClassesInitialized.Wait()
r.ProviderResources.GatewaysInitialized.Wait()
}

// waitUntilAllGAPIInitialized waits until gateway classes,
// gateways and httproutes have been initialized during startup
func (r *Runner) waitUntilAllGAPIInitialized() {
r.waitUntilGCAndGatewaysInitialized()
r.ProviderResources.HTTPRoutesInitialized.Wait()
// Now that the httproute resources have been initialized,
// allow the runner to publish the translated xdsIR.
r.xdsIRReady = true
}
6 changes: 0 additions & 6 deletions internal/message/types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package message

import (
"sync"

"github.com/telepresenceio/watchable"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -22,10 +20,6 @@ type ProviderResources struct {

GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1b1.Gateway]
HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1b1.HTTPRoute]

GatewayClassesInitialized sync.WaitGroup
GatewaysInitialized sync.WaitGroup
HTTPRoutesInitialized sync.WaitGroup
}

func (p *ProviderResources) DeleteGatewayClasses() {
Expand Down
8 changes: 1 addition & 7 deletions internal/provider/kubernetes/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package kubernetes
import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -40,15 +39,13 @@ type gatewayReconciler struct {
statusUpdater status.Updater
log logr.Logger

initializeOnce sync.Once
resources *message.ProviderResources
resources *message.ProviderResources
}

// newGatewayController creates a gateway controller. The controller will watch for
// Gateway objects across all namespaces and reconcile those that match the configured
// gatewayclass controller name.
func newGatewayController(mgr manager.Manager, cfg *config.Server, su status.Updater, resources *message.ProviderResources) error {
resources.GatewaysInitialized.Add(1)
r := &gatewayReconciler{
client: mgr.GetClient(),
classController: gwapiv1b1.GatewayController(cfg.EnvoyGateway.Gateway.ControllerName),
Expand Down Expand Up @@ -143,9 +140,6 @@ func (r *gatewayReconciler) enqueueRequestForOwningGatewayClass() handler.EventH
func (r *gatewayReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
r.log.Info("reconciling gateway", "namespace", request.Namespace, "name", request.Name)

// Once we've processed `allGateways`, record that we've fully initialized.
defer r.initializeOnce.Do(r.resources.GatewaysInitialized.Done)

allClasses := &gwapiv1b1.GatewayClassList{}
if err := r.client.List(ctx, allClasses); err != nil {
return reconcile.Result{}, fmt.Errorf("error listing gatewayclasses")
Expand Down
8 changes: 1 addition & 7 deletions internal/provider/kubernetes/gatewayclass.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package kubernetes
import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -31,15 +30,13 @@ type gatewayClassReconciler struct {
statusUpdater status.Updater
log logr.Logger

initializeOnce sync.Once
resources *message.ProviderResources
resources *message.ProviderResources
}

// newGatewayClassController creates the gatewayclass controller. The controller
// will be pre-configured to watch for cluster-scoped GatewayClass objects with
// a controller field that matches name.
func newGatewayClassController(mgr manager.Manager, cfg *config.Server, su status.Updater, resources *message.ProviderResources) error {
resources.GatewayClassesInitialized.Add(1)
r := &gatewayClassReconciler{
client: mgr.GetClient(),
controller: gwapiv1b1.GatewayController(cfg.EnvoyGateway.Gateway.ControllerName),
Expand Down Expand Up @@ -91,9 +88,6 @@ func (r *gatewayClassReconciler) hasMatchingController(obj client.Object) bool {
func (r *gatewayClassReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
r.log.WithName(request.Name).Info("reconciling gatewayclass")

// Once we've iterated over all listed classes, mark that we've fully initialized.
defer r.initializeOnce.Do(r.resources.GatewayClassesInitialized.Done)

var gatewayClasses gwapiv1b1.GatewayClassList
if err := r.client.List(ctx, &gatewayClasses); err != nil {
return reconcile.Result{}, fmt.Errorf("error listing gatewayclasses: %v", err)
Expand Down
7 changes: 1 addition & 6 deletions internal/provider/kubernetes/httproute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package kubernetes
import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -38,14 +37,12 @@ type httpRouteReconciler struct {
statusUpdater status.Updater
classController gwapiv1b1.GatewayController

initializeOnce sync.Once
resources *message.ProviderResources
resources *message.ProviderResources
}

// newHTTPRouteController creates the httproute controller from mgr. The controller will be pre-configured
// to watch for HTTPRoute objects across all namespaces.
func newHTTPRouteController(mgr manager.Manager, cfg *config.Server, su status.Updater, resources *message.ProviderResources) error {
resources.HTTPRoutesInitialized.Add(1)
r := &httpRouteReconciler{
client: mgr.GetClient(),
log: cfg.Logger,
Expand Down Expand Up @@ -182,8 +179,6 @@ func (r *httpRouteReconciler) Reconcile(ctx context.Context, request reconcile.R

log.Info("reconciling httproute")

defer r.initializeOnce.Do(r.resources.HTTPRoutesInitialized.Done)

// Fetch all HTTPRoutes from the cache.
routeList := &gwapiv1b1.HTTPRouteList{}
if err := r.client.List(ctx, routeList); err != nil {
Expand Down