diff --git a/internal/provider/file/file.go b/internal/provider/file/file.go index 15777137bb..94bbf2b37c 100644 --- a/internal/provider/file/file.go +++ b/internal/provider/file/file.go @@ -13,6 +13,7 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -25,33 +26,45 @@ import ( "github.com/envoyproxy/gateway/internal/envoygateway/config" "github.com/envoyproxy/gateway/internal/filewatcher" "github.com/envoyproxy/gateway/internal/message" + "github.com/envoyproxy/gateway/internal/provider/kubernetes" "github.com/envoyproxy/gateway/internal/utils/path" ) type Provider struct { - paths []string - logger logr.Logger - watcher filewatcher.FileWatcher - resourcesStore *resourcesStore - extensionManagerEnabled bool + paths []string + logger logr.Logger + watcher filewatcher.FileWatcher + resources *message.ProviderResources + reconciler *kubernetes.OfflineGatewayAPIReconciler + store *resourcesStore + status *StatusHandler // ready indicates whether the provider can start watching filesystem events. ready atomic.Bool } -func New(svr *config.Server, resources *message.ProviderResources) (*Provider, error) { +func New(ctx context.Context, svr *config.Server, resources *message.ProviderResources) (*Provider, error) { logger := svr.Logger.Logger paths := sets.New[string]() if svr.EnvoyGateway.Provider.Custom.Resource.File != nil { paths.Insert(svr.EnvoyGateway.Provider.Custom.Resource.File.Paths...) } + // Create gateway-api offline reconciler. + statusHandler := NewStatusHandler(logger) + reconciler, err := kubernetes.NewOfflineGatewayAPIController(ctx, svr, statusHandler.Writer(), resources) + if err != nil { + return nil, fmt.Errorf("failed to create offline gateway-api controller") + } + return &Provider{ - paths: paths.UnsortedList(), - logger: logger, - watcher: filewatcher.NewWatcher(), - resourcesStore: newResourcesStore(svr.EnvoyGateway.Gateway.ControllerName, resources, logger), - extensionManagerEnabled: svr.EnvoyGateway.ExtensionManager != nil, + paths: paths.UnsortedList(), + logger: logger, + watcher: filewatcher.NewWatcher(), + resources: resources, + reconciler: reconciler, + store: newResourcesStore(svr.EnvoyGateway.Gateway.ControllerName, reconciler.Client, resources, logger), + status: statusHandler, }, nil } @@ -73,13 +86,18 @@ func (p *Provider) Start(ctx context.Context) error { } go p.startHealthProbeServer(ctx, readyzChecker) - // Subscribe resources status. - p.subscribeAndUpdateStatus(ctx) + // Offline controller should be started before initial resources load. + // Nor we may lose some messages from controller. + wg := new(sync.WaitGroup) + wg.Add(2) + go p.startReconciling(ctx, wg) + go p.status.Start(ctx, wg) + wg.Wait() initDirs, initFiles := path.ListDirsAndFiles(p.paths) - // Initially load resources from paths on host. - if err := p.resourcesStore.LoadAndStore(initFiles.UnsortedList(), initDirs.UnsortedList()); err != nil { - return fmt.Errorf("failed to load resources into store: %w", err) + // Initially load resources. + if err := p.store.ReloadAll(ctx, initFiles.UnsortedList(), initDirs.UnsortedList()); err != nil { + p.logger.Error(err, "failed to reload resources initially") } // Add paths to the watcher, and aggregate all path channels into one. @@ -150,18 +168,31 @@ func (p *Provider) Start(ctx context.Context) error { } p.logger.Info("file changed", "op", event.Op, "name", event.Name, "dir", filepath.Dir(event.Name)) - switch event.Op { - case fsnotify.Create, fsnotify.Write, fsnotify.Remove: - // Since we do not watch any events in the subdirectories, any events involving files - // modifications in current directory will trigger the event handling. - goto handle - default: - // do nothing - continue + handle: + if err := p.store.ReloadAll(ctx, curFiles.UnsortedList(), curDirs.UnsortedList()); err != nil { + p.logger.Error(err, "error when reload resources", "op", event.Op, "name", event.Name) } + } + } +} - handle: - p.resourcesStore.HandleEvent(curFiles.UnsortedList(), curDirs.UnsortedList()) +// startReconciling starts reconcile on offline controller when receiving signal from resources store. +func (p *Provider) startReconciling(ctx context.Context, ready *sync.WaitGroup) { + p.logger.Info("start reconciling") + defer p.logger.Info("stop reconciling") + ready.Done() + + for { + select { + case rid := <-p.store.reconcile: + p.logger.Info("start reconcile", "id", rid, "time", time.Now()) + if err := p.reconciler.Reconcile(ctx); err != nil { + p.logger.Error(err, "failed to reconcile", "id", rid) + } + p.logger.Info("reconcile finished", "id", rid, "time", time.Now()) + + case <-ctx.Done(): + return } } } diff --git a/internal/provider/file/file_test.go b/internal/provider/file/file_test.go index 57909ea696..89db93f763 100644 --- a/internal/provider/file/file_test.go +++ b/internal/provider/file/file_test.go @@ -6,6 +6,7 @@ package file import ( + "bytes" "context" "html/template" "io" @@ -18,6 +19,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/yaml" egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" @@ -36,16 +38,32 @@ type resourcesParam struct { GatewayName string GatewayListenerPort string HTTPRouteName string + HTTPRouteHostname string BackendName string + EndpointPort string } -func newDefaultResourcesParam() *resourcesParam { +func newResourcesParam1() *resourcesParam { return &resourcesParam{ - GatewayClassName: "eg", - GatewayName: "eg", - GatewayListenerPort: "8888", - HTTPRouteName: "backend", - BackendName: "backend", + GatewayClassName: "eg-1", + GatewayName: "eg-1", + GatewayListenerPort: "8801", + HTTPRouteName: "backend-1", + HTTPRouteHostname: "www.test1.com", + BackendName: "backend-1", + EndpointPort: "3001", + } +} + +func newResourcesParam2() *resourcesParam { + return &resourcesParam{ + GatewayClassName: "eg-2", + GatewayName: "eg-2", + GatewayListenerPort: "8802", + HTTPRouteName: "backend-2", + HTTPRouteHostname: "www.test2.com", + BackendName: "backend-2", + EndpointPort: "3002", } } @@ -70,22 +88,24 @@ func newFileProviderConfig(paths []string) (*config.Server, error) { } func TestFileProvider(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + watchFileBase, _ := os.MkdirTemp(os.TempDir(), "test-files-*") watchFilePath := filepath.Join(watchFileBase, "test.yaml") watchDirPath, _ := os.MkdirTemp(os.TempDir(), "test-dir-*") // Prepare the watched test file. - writeResourcesFile(t, "testdata/resources.tmpl", watchFilePath, newDefaultResourcesParam()) + writeResourcesFile(t, watchFilePath, newResourcesParam1()) require.FileExists(t, watchFilePath) require.DirExists(t, watchDirPath) cfg, err := newFileProviderConfig([]string{watchFilePath, watchDirPath}) require.NoError(t, err) pResources := new(message.ProviderResources) - fp, err := New(cfg, pResources) + fp, err := New(ctx, cfg, pResources) require.NoError(t, err) // Start file provider. go func() { - if err := fp.Start(context.Background()); err != nil { + if err := fp.Start(ctx); err != nil { t.Errorf("failed to start file provider: %v", err) } }() @@ -93,104 +113,148 @@ func TestFileProvider(t *testing.T) { // Wait for file provider to be ready. waitFileProviderReady(t) - require.Equal(t, "gateway.envoyproxy.io/gatewayclass-controller", fp.resourcesStore.name) + require.Equal(t, "gateway.envoyproxy.io/gatewayclass-controller", fp.store.name) t.Run("initial resource load", func(t *testing.T) { - require.NotZero(t, pResources.GatewayAPIResources.Len()) - resources := pResources.GetResourcesByGatewayClass("eg") + // Wait for the first reconcile to kick in. + require.Eventually(t, func() bool { + return pResources.GatewayAPIResources.Len() > 0 + }, resourcesUpdateTimeout, resourcesUpdateTick) + resources := pResources.GetResourcesByGatewayClass("eg-1") require.NotNil(t, resources) want := &resource.Resources{} - mustUnmarshal(t, "testdata/resources.all.yaml", want) - - opts := []cmp.Option{ - cmpopts.IgnoreFields(resource.Resources{}, "serviceMap"), - cmpopts.EquateEmpty(), - } - require.Empty(t, cmp.Diff(want, resources, opts...)) + mustUnmarshal(t, "testdata/resources.1.yaml", want) + cmpResources(t, want, resources) }) t.Run("rename the watched file then rename it back", func(t *testing.T) { - // Rename it + // Rename it first, the watched file is losed. renameFilePath := filepath.Join(watchFileBase, "foobar.yaml") err := os.Rename(watchFilePath, renameFilePath) require.NoError(t, err) require.Eventually(t, func() bool { - return pResources.GetResourcesByGatewayClass("eg") == nil + return pResources.GetResourcesByGatewayClass("eg-1") == nil }, resourcesUpdateTimeout, resourcesUpdateTick) - // Rename it back + // Rename it back, the watched file is resumed. err = os.Rename(renameFilePath, watchFilePath) require.NoError(t, err) require.Eventually(t, func() bool { - return pResources.GetResourcesByGatewayClass("eg") != nil + return pResources.GetResourcesByGatewayClass("eg-1") != nil }, resourcesUpdateTimeout, resourcesUpdateTick) - resources := pResources.GetResourcesByGatewayClass("eg") + resources := pResources.GetResourcesByGatewayClass("eg-1") want := &resource.Resources{} - mustUnmarshal(t, "testdata/resources.all.yaml", want) - - opts := []cmp.Option{ - cmpopts.IgnoreFields(resource.Resources{}, "serviceMap"), - cmpopts.EquateEmpty(), - } - require.Empty(t, cmp.Diff(want, resources, opts...)) + mustUnmarshal(t, "testdata/resources.1.yaml", want) + cmpResources(t, want, resources) }) t.Run("remove the watched file", func(t *testing.T) { - err := os.Remove(watchFilePath) - require.NoError(t, err) + require.NoError(t, os.Remove(watchFilePath)) require.Eventually(t, func() bool { - return pResources.GetResourcesByGatewayClass("eg") == nil + return len(pResources.GetResources()) == 0 }, resourcesUpdateTimeout, resourcesUpdateTick) }) - t.Run("add a file in watched dir", func(t *testing.T) { - // Write a new file under watched directory. + t.Run("add a new file in watched dir", func(t *testing.T) { + // Write a new file under empty watched directory. newFilePath := filepath.Join(watchDirPath, "test.yaml") - writeResourcesFile(t, "testdata/resources.tmpl", newFilePath, newDefaultResourcesParam()) + writeResourcesFile(t, newFilePath, newResourcesParam1()) require.Eventually(t, func() bool { - return pResources.GetResourcesByGatewayClass("eg") != nil + return pResources.GetResourcesByGatewayClass("eg-1") != nil }, resourcesUpdateTimeout, resourcesUpdateTick) - resources := pResources.GetResourcesByGatewayClass("eg") + resources := pResources.GetResourcesByGatewayClass("eg-1") want := &resource.Resources{} - mustUnmarshal(t, "testdata/resources.all.yaml", want) + mustUnmarshal(t, "testdata/resources.1.yaml", want) + cmpResources(t, want, resources) + }) - opts := []cmp.Option{ - cmpopts.IgnoreFields(resource.Resources{}, "serviceMap"), - cmpopts.EquateEmpty(), - } - require.Empty(t, cmp.Diff(want, resources, opts...)) + t.Run("rename the file then rename it back in watched dir", func(t *testing.T) { + // Rename it first. + srcFilePath := filepath.Join(watchDirPath, "test.yaml") + dstFilePath := filepath.Join(watchDirPath, "foobar.yaml") + err := os.Rename(srcFilePath, dstFilePath) + require.NoError(t, err) + require.Eventually(t, func() bool { + return pResources.GetResourcesByGatewayClass("eg-1") != nil + }, resourcesUpdateTimeout, resourcesUpdateTick) + + // Rename it back. + err = os.Rename(dstFilePath, srcFilePath) + require.NoError(t, err) + require.Eventually(t, func() bool { + return pResources.GetResourcesByGatewayClass("eg-1") != nil + }, resourcesUpdateTimeout, resourcesUpdateTick) + + resources := pResources.GetResourcesByGatewayClass("eg-1") + want := &resource.Resources{} + mustUnmarshal(t, "testdata/resources.1.yaml", want) + cmpResources(t, want, resources) }) - t.Run("remove a file in watched dir", func(t *testing.T) { + t.Run("update file content in watched dir", func(t *testing.T) { + // Rewrite the file under watched directory. newFilePath := filepath.Join(watchDirPath, "test.yaml") - err := os.Remove(newFilePath) - require.NoError(t, err) + writeResourcesFile(t, newFilePath, newResourcesParam2()) + + require.Eventually(t, func() bool { + return pResources.GetResourcesByGatewayClass("eg-1") == nil && + pResources.GetResourcesByGatewayClass("eg-2") != nil + }, resourcesUpdateTimeout, resourcesUpdateTick) + }) + + t.Run("add another file with new gatewayclass in watched dir", func(t *testing.T) { + // The test.yaml was changed by previous case, safe to use resources param 1 here. + newFilePath := filepath.Join(watchDirPath, "another.yaml") + writeResourcesFile(t, newFilePath, newResourcesParam1()) + require.Eventually(t, func() bool { - return pResources.GetResourcesByGatewayClass("eg") == nil + return pResources.GetResourcesByGatewayClass("eg-1") != nil && + pResources.GetResourcesByGatewayClass("eg-2") != nil + }, resourcesUpdateTimeout, resourcesUpdateTick) + + resources1 := pResources.GetResourcesByGatewayClass("eg-1") + want1 := &resource.Resources{} + mustUnmarshal(t, "testdata/resources.1.yaml", want1) + cmpResources(t, want1, resources1) + + resources2 := pResources.GetResourcesByGatewayClass("eg-2") + want2 := &resource.Resources{} + mustUnmarshal(t, "testdata/resources.2.yaml", want2) + cmpResources(t, want2, resources2) + }) + + t.Run("remove all files in watched dir", func(t *testing.T) { + fp1 := filepath.Join(watchDirPath, "test.yaml") + fp2 := filepath.Join(watchDirPath, "another.yaml") + require.NoError(t, os.Remove(fp1)) + require.NoError(t, os.Remove(fp2)) + require.Eventually(t, func() bool { + return len(pResources.GetResources()) == 0 }, resourcesUpdateTimeout, resourcesUpdateTick) }) t.Cleanup(func() { + cancel() _ = os.RemoveAll(watchFileBase) _ = os.RemoveAll(watchDirPath) }) } -func writeResourcesFile(t *testing.T, tmpl, dst string, params *resourcesParam) { - dstFile, err := os.Create(dst) - require.NoError(t, err) +func writeResourcesFile(t *testing.T, dst string, params *resourcesParam) { + var buf bytes.Buffer // Write parameters into target file. - tmplFile, err := template.ParseFiles(tmpl) + tmplFile, err := template.ParseFiles("testdata/resources.tmpl") require.NoError(t, err) - err = tmplFile.Execute(dstFile, params) + err = tmplFile.Execute(&buf, params) require.NoError(t, err) - require.NoError(t, dstFile.Close()) + // Write file in an atomic way, prevent unnecessary reconcile. + require.NoError(t, os.WriteFile(dst, buf.Bytes(), 0o600)) } func waitFileProviderReady(t *testing.T) { @@ -223,3 +287,12 @@ func mustUnmarshal(t *testing.T, path string, out interface{}) { require.NoError(t, err) require.NoError(t, yaml.UnmarshalStrict(content, out, yaml.DisallowUnknownFields)) } + +func cmpResources(t *testing.T, x, y interface{}) { + opts := []cmp.Option{ + cmpopts.IgnoreFields(resource.Resources{}, "serviceMap"), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + cmpopts.EquateEmpty(), + } + require.Empty(t, cmp.Diff(x, y, opts...)) +} diff --git a/internal/provider/file/status.go b/internal/provider/file/status.go index 83d86086bd..0e50c5fbb2 100644 --- a/internal/provider/file/status.go +++ b/internal/provider/file/status.go @@ -8,287 +8,99 @@ package file import ( "context" "fmt" + "sync" - "k8s.io/apimachinery/pkg/types" - gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" - gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/yaml" - egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" - "github.com/envoyproxy/gateway/internal/gatewayapi/resource" - "github.com/envoyproxy/gateway/internal/gatewayapi/status" - "github.com/envoyproxy/gateway/internal/message" + "github.com/envoyproxy/gateway/internal/provider/kubernetes" ) -func (p *Provider) subscribeAndUpdateStatus(ctx context.Context) { - // TODO: trigger gatewayclass status update in file-provider - // GatewayClass object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "gatewayclass-status"}, - p.resourcesStore.resources.GatewayClassStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - p.logStatus(*update.Value, resource.KindGateway) - }, - ) - p.logger.Info("gatewayClass status subscriber shutting down") - }() - - // Gateway object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "gateway-status"}, - p.resourcesStore.resources.GatewayStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1.GatewayStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - // Update Gateway conditions, ignore addresses - gtw := new(gwapiv1.Gateway) - gtw.Status = *update.Value - status.UpdateGatewayStatusAccepted(gtw) - - p.logStatus(gtw.Status, resource.KindGateway) - }, - ) - p.logger.Info("gateway status subscriber shutting down") - }() - - // HTTPRoute object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "httproute-status"}, - p.resourcesStore.resources.HTTPRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1.HTTPRouteStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - p.logStatus(*update.Value, resource.KindHTTPRoute) - }, - ) - p.logger.Info("httpRoute status subscriber shutting down") - }() - - // GRPCRoute object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "grpcroute-status"}, - p.resourcesStore.resources.GRPCRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1.GRPCRouteStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - p.logStatus(*update.Value, resource.KindGRPCRoute) - }, - ) - p.logger.Info("grpcRoute status subscriber shutting down") - }() - - // TLSRoute object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "tlsroute-status"}, - p.resourcesStore.resources.TLSRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.TLSRouteStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - p.logStatus(*update.Value, resource.KindTLSRoute) - }, - ) - p.logger.Info("tlsRoute status subscriber shutting down") - }() - - // TCPRoute object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "tcproute-status"}, - p.resourcesStore.resources.TCPRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.TCPRouteStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - p.logStatus(*update.Value, resource.KindTCPRoute) - }, - ) - p.logger.Info("tcpRoute status subscriber shutting down") - }() - - // UDPRoute object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "udproute-status"}, - p.resourcesStore.resources.UDPRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.UDPRouteStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - p.logStatus(*update.Value, resource.KindUDPRoute) - }, - ) - p.logger.Info("udpRoute status subscriber shutting down") - }() - - // EnvoyPatchPolicy object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "envoypatchpolicy-status"}, - p.resourcesStore.resources.EnvoyPatchPolicyStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.PolicyStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - p.logStatus(*update.Value, resource.KindEnvoyPatchPolicy) - }, - ) - p.logger.Info("envoyPatchPolicy status subscriber shutting down") - }() - - // ClientTrafficPolicy object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "clienttrafficpolicy-status"}, - p.resourcesStore.resources.ClientTrafficPolicyStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.PolicyStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - p.logStatus(*update.Value, resource.KindClientTrafficPolicy) - }, - ) - p.logger.Info("clientTrafficPolicy status subscriber shutting down") - }() - - // BackendTrafficPolicy object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "backendtrafficpolicy-status"}, - p.resourcesStore.resources.BackendTrafficPolicyStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.PolicyStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } - - p.logStatus(*update.Value, resource.KindBackendTrafficPolicy) - }, - ) - p.logger.Info("backendTrafficPolicy status subscriber shutting down") - }() +type StatusHandler struct { + logger logr.Logger + updateChannel chan kubernetes.Update + wg *sync.WaitGroup +} - // SecurityPolicy object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "securitypolicy-status"}, - p.resourcesStore.resources.SecurityPolicyStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.PolicyStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } +func NewStatusHandler(log logr.Logger) *StatusHandler { + u := &StatusHandler{ + logger: log, + updateChannel: make(chan kubernetes.Update, 1000), + wg: new(sync.WaitGroup), + } - p.logStatus(*update.Value, resource.KindSecurityPolicy) - }, - ) - p.logger.Info("securityPolicy status subscriber shutting down") - }() + u.wg.Add(1) - // BackendTLSPolicy object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "backendtlspolicy-status"}, - p.resourcesStore.resources.BackendTLSPolicyStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.PolicyStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } + return u +} - p.logStatus(*update.Value, resource.KindBackendTLSPolicy) - }, - ) - p.logger.Info("backendTLSPolicy status subscriber shutting down") - }() +// Start runs the goroutine to perform status writes. +func (u *StatusHandler) Start(ctx context.Context, ready *sync.WaitGroup) { + u.logger.Info("started status update handler") + defer u.logger.Info("stopped status update handler") + + // Enable Updaters to start sending updates to this handler. + u.wg.Done() + ready.Done() + + for { + select { + case <-ctx.Done(): + return + case update := <-u.updateChannel: + u.logger.Info("received a status update", "namespace", update.NamespacedName.Namespace, + "name", update.NamespacedName.Name) + + u.logStatus(update) + } + } +} - // EnvoyExtensionPolicy object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "envoyextensionpolicy-status"}, - p.resourcesStore.resources.EnvoyExtensionPolicyStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.PolicyStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } +func (u *StatusHandler) logStatus(update kubernetes.Update) { + obj := update.Resource + newObj := update.Mutator.Mutate(obj) + log := u.logger.WithValues("key", update.NamespacedName.String()) - p.logStatus(*update.Value, resource.KindEnvoyExtensionPolicy) - }, - ) - p.logger.Info("envoyExtensionPolicy status subscriber shutting down") - }() + // Log the resource status. + raw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newObj) + if err != nil { + log.Error(err, "failed to convert object") + return + } - // Backend object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "backend-status"}, - p.resourcesStore.resources.BackendStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *egv1a1.BackendStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } + rawStatus, ok := raw["status"] + if !ok { + log.Error(fmt.Errorf("no status field"), "failed to log status") + return + } - p.logStatus(*update.Value, resource.KindBackend) - }, - ) - p.logger.Info("backend status subscriber shutting down") - }() + byteStatus, err := yaml.Marshal(rawStatus) + if err != nil { + log.Error(err, "failed to marshal object") + return + } - if p.extensionManagerEnabled { - // ExtensionServerPolicy object status updater - go func() { - message.HandleSubscription( - message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "extensionserverpolicies-status"}, - p.resourcesStore.resources.ExtensionPolicyStatuses.Subscribe(ctx), - func(update message.Update[message.NamespacedNameAndGVK, *gwapiv1a2.PolicyStatus], errChan chan error) { - // skip delete updates. - if update.Delete { - return - } + log.Info(fmt.Sprintf("Got new status for %s\n%s", kubernetes.KindOf(obj), string(byteStatus))) +} - p.logStatus(*update.Value, "ExtensionServerPolicy") - }, - ) - p.logger.Info("extensionServerPolicies status subscriber shutting down") - }() +// Writer retrieves the interface that should be used to write to the StatusHandler. +func (u *StatusHandler) Writer() kubernetes.Updater { + return &StatusWriter{ + updateChannel: u.updateChannel, + wg: u.wg, } } -func (p *Provider) logStatus(obj interface{}, statusType string) { - if status, err := yaml.Marshal(obj); err == nil { - p.logger.Info(fmt.Sprintf("Got new status for %s \n%s", statusType, string(status))) - } else { - p.logger.Error(err, "failed to log status", "type", statusType) - } +// StatusWriter takes status updates and sends these to the StatusHandler via a channel. +type StatusWriter struct { + updateChannel chan<- kubernetes.Update + wg *sync.WaitGroup +} + +// Send sends the given Update off to the update channel for writing by the StatusHandler. +func (u *StatusWriter) Send(update kubernetes.Update) { + // Wait until updater is ready + u.wg.Wait() + u.updateChannel <- update } diff --git a/internal/provider/file/store.go b/internal/provider/file/store.go index 448f1807cf..cf70c78b28 100644 --- a/internal/provider/file/store.go +++ b/internal/provider/file/store.go @@ -6,68 +6,349 @@ package file import ( + "context" + "crypto/rand" + "errors" + "fmt" + "math" + "math/big" + "reflect" + "sort" + "time" + "github.com/go-logr/logr" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/envoyproxy/gateway/internal/gatewayapi/resource" "github.com/envoyproxy/gateway/internal/message" ) +const ( + GatewayDeletionOrder = 3 +) + type resourcesStore struct { name string + keys sets.Set[storeKey] + client client.Client resources *message.ProviderResources + reconcile chan int64 logger logr.Logger } -func newResourcesStore(name string, resources *message.ProviderResources, logger logr.Logger) *resourcesStore { +type storeKey struct { + schema.GroupVersionKind + types.NamespacedName + + // deletionOrder is used to determine the order in which a resource is deleted. + // The larger the value, the earlier it is deleted. + deletionOrder int +} + +func (s storeKey) String() string { + return fmt.Sprintf("%s/%s/%d", + s.GroupVersionKind.String(), s.NamespacedName.String(), s.deletionOrder) +} + +func newResourcesStore(name string, client client.Client, resources *message.ProviderResources, logger logr.Logger) *resourcesStore { return &resourcesStore{ name: name, + keys: sets.New[storeKey](), + client: client, resources: resources, + reconcile: make(chan int64), logger: logger, } } -// HandleEvent simply removes all the resources and triggers a resources reload from files -// and directories despite of the event type. -// TODO: Enhance this method by respecting the event type, and add support for multiple GatewayClass. -func (r *resourcesStore) HandleEvent(files, dirs []string) { - r.logger.Info("reload all resources") - - r.resources.GatewayAPIResources.Delete(r.name) - if err := r.LoadAndStore(files, dirs); err != nil { - r.logger.Error(err, "failed to load and store resources") +func newStoreKey(obj client.Object) storeKey { + return storeKey{ + GroupVersionKind: obj.GetObjectKind().GroupVersionKind(), + NamespacedName: client.ObjectKeyFromObject(obj), } } -// LoadAndStore loads and stores all resources from files and directories. -func (r *resourcesStore) LoadAndStore(files, dirs []string) error { +// ReloadAll loads and stores all resources from all given files and directories. +func (r *resourcesStore) ReloadAll(ctx context.Context, files, dirs []string) error { + // TODO(sh2): add arbitrary number of resources support for load function. resources, err := loadFromFilesAndDirs(files, dirs) if err != nil { return err } - // TODO(sh2): For now, we assume that one file only contains one GatewayClass and all its other - // related resources, like Gateway, HTTPRoute, etc. If we managed to extend Resources structure, - // we also need to process all the resources and its relationship, like what is done in - // Kubernetes provider. However, this will cause us to maintain two places of the same logic - // in each provider. The ideal case is two different providers share the same resources process logic. - // - // - This issue is tracked by https://github.com/envoyproxy/gateway/issues/3213 - - // We cannot make sure by the time the Write event was triggered, whether the GatewayClass exist, - // so here we just simply Store the first gatewayapi.Resources that has GatewayClass. - gwcResources := make(resource.ControllerResources, 0, 1) + var errList error + currentKeys := sets.New[storeKey]() for _, res := range resources { - if res.GatewayClass != nil { - gwcResources = append(gwcResources, res) + collectKeys, err := r.storeResources(ctx, res) + if err != nil { + errList = errors.Join(errList, err) + } + currentKeys = currentKeys.Union(collectKeys) + } + + // If no resources were created or updated, stop reconciling. + if errList != nil && len(currentKeys) == 0 { + return errList + } + + // Remove the resources that no longer exist. + rn := 0 + deletedKeys := r.keys.Difference(currentKeys) + for _, k := range deletionOrderKeyList(deletedKeys) { + delObj := makeUnstructuredObjectFromKey(k) + if err := r.client.Delete(ctx, delObj); err != nil { + errList = errors.Join(errList, err) + // Insert back if the object is not be removed. + currentKeys.Insert(k) + } else if k.deletionOrder <= GatewayDeletionOrder { + // Reconcile once if gateway got deleted, this may be able to + // remove the finalizer on gatewayclass. + r.reconcile <- generateReconcileID() + rn++ } } - if len(gwcResources) == 0 { - return nil + + r.keys = currentKeys + r.reconcile <- generateReconcileID() + rn++ + + r.logger.Info("reload resources finished", + "reload_resources_num", len(r.keys), "reconcile_times", rn, "time", time.Now()) + return errList +} + +// storeResources stores resources via offline gateway-api client. +// For file provider, all gateway-api resources will be stored except: +// - Service +// - ServiceImport +// - EndpointSlices +// Becasues these resources has no effects on the host infra layer. +func (r *resourcesStore) storeResources(ctx context.Context, re *resource.Resources) (sets.Set[storeKey], error) { + if re == nil { + return nil, nil + } + + var ( + errs error + collectKeys = sets.New[storeKey]() + ) + + if err := r.stroeObjectWithKeys(ctx, re.EnvoyProxyForGatewayClass, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + + if err := r.stroeObjectWithKeys(ctx, re.GatewayClass, collectKeys); err != nil { + errs = errors.Join(errs, err) } - r.resources.GatewayAPIResources.Store(r.name, &gwcResources) - r.logger.Info("loaded and stored resources successfully") + for _, obj := range re.EnvoyProxiesForGateways { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.Gateways { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.HTTPRoutes { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.GRPCRoutes { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.TLSRoutes { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.TCPRoutes { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.UDPRoutes { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.ReferenceGrants { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.Namespaces { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.Secrets { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.ConfigMaps { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.EnvoyPatchPolicies { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.ClientTrafficPolicies { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.BackendTrafficPolicies { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.SecurityPolicies { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.BackendTLSPolicies { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.EnvoyExtensionPolicies { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.Backends { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + for _, obj := range re.HTTPRouteFilters { + if err := r.stroeObjectWithKeys(ctx, obj, collectKeys); err != nil { + errs = errors.Join(errs, err) + } + } + + return collectKeys, errs +} + +// stroeObjectWithKeys stores object while collecting its key. +func (r *resourcesStore) stroeObjectWithKeys(ctx context.Context, obj client.Object, keys sets.Set[storeKey]) error { + key, err := r.storeObject(ctx, obj) + if err != nil && key != nil { + return fmt.Errorf("failed to store %s %s: %w", key.Kind, key.NamespacedName.String(), err) + } else if err != nil { + return fmt.Errorf("failed to store object: %w", err) + } + + if key != nil { + keys.Insert(*key) + } return nil } + +// storeObject will do create for non-exist object and update for existing object. +func (r *resourcesStore) storeObject(ctx context.Context, obj client.Object) (*storeKey, error) { + if obj == nil || reflect.ValueOf(obj).IsNil() { + return nil, nil + } + + var ( + err error + key = newStoreKey(obj) + oldObj = makeUnstructuredObjectFromKey(key) + ) + + if err = r.client.Get(ctx, key.NamespacedName, oldObj); err == nil { + return &key, r.client.Patch(ctx, obj, client.Merge) + } + if kerrors.IsNotFound(err) { + return &key, r.client.Create(ctx, obj) + } + + return nil, err +} + +func makeUnstructuredObjectFromKey(key storeKey) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetGroupVersionKind(key.GroupVersionKind) + obj.SetNamespace(key.Namespace) + obj.SetName(key.Name) + return obj +} + +// deletionOrderKeyList returns a list sorted in descending order by deletionOrder in its key. +func deletionOrderKeyList(keys sets.Set[storeKey]) []storeKey { + out := keys.UnsortedList() + for i, k := range out { + switch k.Kind { + case resource.KindNamespace, resource.KindReferenceGrant, + resource.KindConfigMap, resource.KindSecret: + out[i].deletionOrder = GatewayDeletionOrder - 3 + + case resource.KindEnvoyProxy: + out[i].deletionOrder = GatewayDeletionOrder - 2 + + case resource.KindGatewayClass: + out[i].deletionOrder = GatewayDeletionOrder - 1 + + case resource.KindGateway: + out[i].deletionOrder = GatewayDeletionOrder + + case resource.KindHTTPRoute, resource.KindGRPCRoute, + resource.KindTLSRoute, resource.KindTCPRoute, resource.KindUDPRoute, + resource.KindSecurityPolicy, resource.KindClientTrafficPolicy, resource.KindBackendTrafficPolicy, + resource.KindEnvoyPatchPolicy, resource.KindEnvoyExtensionPolicy, resource.KindBackendTLSPolicy: + out[i].deletionOrder = GatewayDeletionOrder + 1 + + case resource.KindBackend, resource.KindHTTPRouteFilter: + out[i].deletionOrder = GatewayDeletionOrder + 2 + + default: + out[i].deletionOrder = GatewayDeletionOrder + 3 + } + } + + sort.Slice(out, func(i, j int) bool { + return out[i].deletionOrder > out[j].deletionOrder + }) + return out +} + +func generateReconcileID() int64 { + n, _ := rand.Int(rand.Reader, big.NewInt(math.MaxInt64)) + return n.Int64() +} diff --git a/internal/provider/file/testdata/resources.all.yaml b/internal/provider/file/testdata/resources.1.yaml similarity index 83% rename from internal/provider/file/testdata/resources.all.yaml rename to internal/provider/file/testdata/resources.1.yaml index 989ae8025a..fef92caaa7 100644 --- a/internal/provider/file/testdata/resources.all.yaml +++ b/internal/provider/file/testdata/resources.1.yaml @@ -3,21 +3,23 @@ backends: apiVersion: gateway.envoyproxy.io/v1alpha1 metadata: creationTimestamp: null - name: backend + name: backend-1 namespace: envoy-gateway-system spec: type: Endpoints endpoints: - ip: address: 0.0.0.0 - port: 3000 + port: 3001 status: {} gatewayClass: kind: GatewayClass apiVersion: gateway.networking.k8s.io/v1 metadata: creationTimestamp: null - name: eg + name: eg-1 + finalizers: + - gateway-exists-finalizer.gateway.networking.k8s.io spec: controllerName: gateway.envoyproxy.io/gatewayclass-controller status: {} @@ -26,16 +28,16 @@ gateways: apiVersion: gateway.networking.k8s.io/v1 metadata: creationTimestamp: null - name: eg + name: eg-1 namespace: envoy-gateway-system spec: - gatewayClassName: eg + gatewayClassName: eg-1 listeners: - allowedRoutes: namespaces: from: Same name: http - port: 8888 + port: 8801 protocol: HTTP status: {} httpRoutes: @@ -43,20 +45,20 @@ httpRoutes: apiVersion: gateway.networking.k8s.io/v1 metadata: creationTimestamp: null - name: backend + name: backend-1 namespace: envoy-gateway-system spec: hostnames: - - www.example.com + - www.test1.com parentRefs: - group: gateway.networking.k8s.io kind: Gateway - name: eg + name: eg-1 rules: - backendRefs: - group: gateway.envoyproxy.io kind: Backend - name: backend + name: backend-1 weight: 1 matches: - path: diff --git a/internal/provider/file/testdata/resources.2.yaml b/internal/provider/file/testdata/resources.2.yaml new file mode 100644 index 0000000000..938a7845b4 --- /dev/null +++ b/internal/provider/file/testdata/resources.2.yaml @@ -0,0 +1,76 @@ +backends: +- kind: Backend + apiVersion: gateway.envoyproxy.io/v1alpha1 + metadata: + creationTimestamp: null + name: backend-2 + namespace: envoy-gateway-system + spec: + type: Endpoints + endpoints: + - ip: + address: 0.0.0.0 + port: 3002 + status: {} +gatewayClass: + kind: GatewayClass + apiVersion: gateway.networking.k8s.io/v1 + metadata: + creationTimestamp: null + name: eg-2 + finalizers: + - gateway-exists-finalizer.gateway.networking.k8s.io + spec: + controllerName: gateway.envoyproxy.io/gatewayclass-controller + status: {} +gateways: +- kind: Gateway + apiVersion: gateway.networking.k8s.io/v1 + metadata: + creationTimestamp: null + name: eg-2 + namespace: envoy-gateway-system + spec: + gatewayClassName: eg-2 + listeners: + - allowedRoutes: + namespaces: + from: Same + name: http + port: 8802 + protocol: HTTP + status: {} +httpRoutes: +- kind: HTTPRoute + apiVersion: gateway.networking.k8s.io/v1 + metadata: + creationTimestamp: null + name: backend-2 + namespace: envoy-gateway-system + spec: + hostnames: + - www.test2.com + parentRefs: + - group: gateway.networking.k8s.io + kind: Gateway + name: eg-2 + rules: + - backendRefs: + - group: gateway.envoyproxy.io + kind: Backend + name: backend-2 + weight: 1 + matches: + - path: + type: PathPrefix + value: / + status: + parents: null +namespaces: +- kind: Namespace + apiVersion: v1 + metadata: + creationTimestamp: null + name: envoy-gateway-system + spec: {} + status: {} diff --git a/internal/provider/file/testdata/resources.tmpl b/internal/provider/file/testdata/resources.tmpl index f34bf1e0c3..d472be52b0 100644 --- a/internal/provider/file/testdata/resources.tmpl +++ b/internal/provider/file/testdata/resources.tmpl @@ -24,7 +24,7 @@ spec: parentRefs: - name: {{.GatewayName}} hostnames: - - "www.example.com" + - {{.HTTPRouteHostname}} rules: - backendRefs: - group: "gateway.envoyproxy.io" @@ -43,4 +43,4 @@ spec: endpoints: - ip: address: 0.0.0.0 - port: 3000 + port: {{.EndpointPort}} diff --git a/internal/provider/kubernetes/controller_offline.go b/internal/provider/kubernetes/controller_offline.go new file mode 100644 index 0000000000..e6d96bbf9f --- /dev/null +++ b/internal/provider/kubernetes/controller_offline.go @@ -0,0 +1,131 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package kubernetes + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" + gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gwapiv1a3 "sigs.k8s.io/gateway-api/apis/v1alpha3" + gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1" + + egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/envoyproxy/gateway/internal/envoygateway" + "github.com/envoyproxy/gateway/internal/envoygateway/config" + "github.com/envoyproxy/gateway/internal/message" +) + +// OfflineGatewayAPIReconciler can be used for non-kuberetes provider. +// It can let other providers to have the same reconcile logic without rely on apiserver. +type OfflineGatewayAPIReconciler struct { + gatewayAPIReconciler + + Client client.Client +} + +func NewOfflineGatewayAPIController( + ctx context.Context, cfg *config.Server, su Updater, resources *message.ProviderResources, +) (*OfflineGatewayAPIReconciler, error) { + if cfg == nil || resources == nil { + return nil, fmt.Errorf("missing config or resources that offline controller requires") + } + + // Check provider type. + if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes { + return nil, fmt.Errorf("offline controller cannot work with kubernetes provider") + } + + // Gather additional resources to watch from registered extensions. + var ( + extGVKs []schema.GroupVersionKind + extServerPoliciesGVKs []schema.GroupVersionKind + ) + + if cfg.EnvoyGateway.ExtensionManager != nil { + for _, rsrc := range cfg.EnvoyGateway.ExtensionManager.Resources { + gvk := schema.GroupVersionKind(rsrc) + extGVKs = append(extGVKs, gvk) + } + for _, rsrc := range cfg.EnvoyGateway.ExtensionManager.PolicyResources { + gvk := schema.GroupVersionKind(rsrc) + extServerPoliciesGVKs = append(extServerPoliciesGVKs, gvk) + } + } + + cli := newOfflineGatewayAPIClient() + r := gatewayAPIReconciler{ + client: cli, + log: cfg.Logger, + classController: gwapiv1.GatewayController(cfg.EnvoyGateway.Gateway.ControllerName), + namespace: cfg.ControllerNamespace, + statusUpdater: su, + resources: resources, + extGVKs: extGVKs, + store: newProviderStore(), + envoyGateway: cfg.EnvoyGateway, + mergeGateways: sets.New[string](), + extServerPolicies: extServerPoliciesGVKs, + } + + r.log.Info("created offline gatewayapi controller") + if su != nil { + r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.ExtensionManager != nil) + } + + return &OfflineGatewayAPIReconciler{ + gatewayAPIReconciler: r, + Client: cli, + }, nil +} + +// Reconcile calls reconcile method in gateway-api controller, this method +// should be called manually. +func (r *OfflineGatewayAPIReconciler) Reconcile(ctx context.Context) error { + _, err := r.gatewayAPIReconciler.Reconcile(ctx, reconcile.Request{}) + return err +} + +// newOfflineGatewayAPIClient returns a offline client with gateway-api schemas and indexes. +func newOfflineGatewayAPIClient() client.Client { + return fake.NewClientBuilder(). + WithScheme(envoygateway.GetScheme()). + WithIndex(&gwapiv1.Gateway{}, classGatewayIndex, gatewayIndexFunc). + WithIndex(&gwapiv1.Gateway{}, secretGatewayIndex, secretGatewayIndexFunc). + WithIndex(&gwapiv1.HTTPRoute{}, gatewayHTTPRouteIndex, gatewayHTTPRouteIndexFunc). + WithIndex(&gwapiv1.HTTPRoute{}, backendHTTPRouteIndex, backendHTTPRouteIndexFunc). + WithIndex(&gwapiv1.HTTPRoute{}, httpRouteFilterHTTPRouteIndex, httpRouteFilterHTTPRouteIndexFunc). + WithIndex(&gwapiv1.GRPCRoute{}, gatewayGRPCRouteIndex, gatewayGRPCRouteIndexFunc). + WithIndex(&gwapiv1.GRPCRoute{}, backendGRPCRouteIndex, backendGRPCRouteIndexFunc). + WithIndex(&gwapiv1a2.TCPRoute{}, gatewayTCPRouteIndex, gatewayTCPRouteIndexFunc). + WithIndex(&gwapiv1a2.TCPRoute{}, backendTCPRouteIndex, backendTCPRouteIndexFunc). + WithIndex(&gwapiv1a2.UDPRoute{}, gatewayUDPRouteIndex, gatewayUDPRouteIndexFunc). + WithIndex(&gwapiv1a2.UDPRoute{}, backendUDPRouteIndex, backendUDPRouteIndexFunc). + WithIndex(&gwapiv1a2.TLSRoute{}, gatewayTLSRouteIndex, gatewayTLSRouteIndexFunc). + WithIndex(&gwapiv1a2.TLSRoute{}, backendTLSRouteIndex, backendTLSRouteIndexFunc). + WithIndex(&egv1a1.EnvoyProxy{}, backendEnvoyProxyTelemetryIndex, backendEnvoyProxyTelemetryIndexFunc). + WithIndex(&egv1a1.EnvoyProxy{}, secretEnvoyProxyIndex, secretEnvoyProxyIndexFunc). + WithIndex(&egv1a1.BackendTrafficPolicy{}, configMapBtpIndex, configMapBtpIndexFunc). + WithIndex(&egv1a1.ClientTrafficPolicy{}, configMapCtpIndex, configMapCtpIndexFunc). + WithIndex(&egv1a1.ClientTrafficPolicy{}, secretCtpIndex, secretCtpIndexFunc). + WithIndex(&egv1a1.SecurityPolicy{}, secretSecurityPolicyIndex, secretSecurityPolicyIndexFunc). + WithIndex(&egv1a1.SecurityPolicy{}, backendSecurityPolicyIndex, backendSecurityPolicyIndexFunc). + WithIndex(&egv1a1.SecurityPolicy{}, configMapSecurityPolicyIndex, configMapSecurityPolicyIndexFunc). + WithIndex(&egv1a1.EnvoyExtensionPolicy{}, backendEnvoyExtensionPolicyIndex, backendEnvoyExtensionPolicyIndexFunc). + WithIndex(&egv1a1.EnvoyExtensionPolicy{}, secretEnvoyExtensionPolicyIndex, secretEnvoyExtensionPolicyIndexFunc). + WithIndex(&gwapiv1a3.BackendTLSPolicy{}, configMapBtlsIndex, configMapBtlsIndexFunc). + WithIndex(&gwapiv1a3.BackendTLSPolicy{}, secretBtlsIndex, secretBtlsIndexFunc). + WithIndex(&egv1a1.HTTPRouteFilter{}, configMapHTTPRouteFilterIndex, configMapRouteFilterIndexFunc). + WithIndex(&egv1a1.HTTPRouteFilter{}, secretHTTPRouteFilterIndex, secretRouteFilterIndexFunc). + WithIndex(&gwapiv1b1.ReferenceGrant{}, targetRefGrantRouteIndex, getReferenceGrantIndexerFunc). + Build() +} diff --git a/internal/provider/kubernetes/controller_offline_test.go b/internal/provider/kubernetes/controller_offline_test.go new file mode 100644 index 0000000000..a8b3296a26 --- /dev/null +++ b/internal/provider/kubernetes/controller_offline_test.go @@ -0,0 +1,49 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package kubernetes + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/require" + + egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/envoyproxy/gateway/internal/envoygateway/config" + "github.com/envoyproxy/gateway/internal/message" +) + +func TestNewOfflineGatewayAPIController(t *testing.T) { + t.Run("offline controller requires config and resources", func(t *testing.T) { + _, err := NewOfflineGatewayAPIController(context.Background(), nil, nil, nil) + require.Error(t, err) + }) + + t.Run("offline controller does not support k8s provider type", func(t *testing.T) { + cfg, err := config.New(os.Stdout) + require.NoError(t, err) + + cfg.EnvoyGateway.Provider = &egv1a1.EnvoyGatewayProvider{ + Type: egv1a1.ProviderTypeKubernetes, + } + pResources := new(message.ProviderResources) + _, err = NewOfflineGatewayAPIController(context.Background(), cfg, nil, pResources) + require.Error(t, err) + }) + + t.Run("offline controller creation success", func(t *testing.T) { + cfg, err := config.New(os.Stdout) + require.NoError(t, err) + + cfg.EnvoyGateway.Provider = &egv1a1.EnvoyGatewayProvider{ + Type: egv1a1.ProviderTypeCustom, + } + pResources := new(message.ProviderResources) + _, err = NewOfflineGatewayAPIController(context.Background(), cfg, nil, pResources) + require.NoError(t, err) + }) +} diff --git a/internal/provider/kubernetes/controller_test.go b/internal/provider/kubernetes/controller_test.go index b2d6cdaf79..4ac85754f1 100644 --- a/internal/provider/kubernetes/controller_test.go +++ b/internal/provider/kubernetes/controller_test.go @@ -434,7 +434,7 @@ func TestProcessEnvoyExtensionPolicyObjectRefs(t *testing.T) { r.client = fakeclient.NewClientBuilder(). WithScheme(envoygateway.GetScheme()). WithObjects(objs...). - WithIndex(&gwapiv1b1.ReferenceGrant{}, targetRefGrantRouteIndex, getReferenceGrantIndexerFunc()). + WithIndex(&gwapiv1b1.ReferenceGrant{}, targetRefGrantRouteIndex, getReferenceGrantIndexerFunc). Build() resourceTree := resource.NewResources() diff --git a/internal/provider/kubernetes/indexers.go b/internal/provider/kubernetes/indexers.go index cd43fc34ba..5114ce5882 100644 --- a/internal/provider/kubernetes/indexers.go +++ b/internal/provider/kubernetes/indexers.go @@ -57,21 +57,19 @@ const ( ) func addReferenceGrantIndexers(ctx context.Context, mgr manager.Manager) error { - if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1b1.ReferenceGrant{}, targetRefGrantRouteIndex, getReferenceGrantIndexerFunc()); err != nil { + if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1b1.ReferenceGrant{}, targetRefGrantRouteIndex, getReferenceGrantIndexerFunc); err != nil { return err } return nil } -func getReferenceGrantIndexerFunc() func(rawObj client.Object) []string { - return func(rawObj client.Object) []string { - refGrant := rawObj.(*gwapiv1b1.ReferenceGrant) - var referredServices []string - for _, target := range refGrant.Spec.To { - referredServices = append(referredServices, string(target.Kind)) - } - return referredServices +func getReferenceGrantIndexerFunc(rawObj client.Object) []string { + refGrant := rawObj.(*gwapiv1b1.ReferenceGrant) + var referredServices []string + for _, target := range refGrant.Spec.To { + referredServices = append(referredServices, string(target.Kind)) } + return referredServices } // addHTTPRouteIndexers adds indexing on HTTPRoute. @@ -176,23 +174,6 @@ func httpRouteFilterHTTPRouteIndexFunc(rawObj client.Object) []string { return refs } -func secretEnvoyProxyIndexFunc(rawObj client.Object) []string { - ep := rawObj.(*egv1a1.EnvoyProxy) - var secretReferences []string - if ep.Spec.BackendTLS != nil { - if ep.Spec.BackendTLS.ClientCertificateRef != nil { - if *ep.Spec.BackendTLS.ClientCertificateRef.Kind == resource.KindSecret { - secretReferences = append(secretReferences, - types.NamespacedName{ - Namespace: gatewayapi.NamespaceDerefOr(ep.Spec.BackendTLS.ClientCertificateRef.Namespace, ep.Namespace), - Name: string(ep.Spec.BackendTLS.ClientCertificateRef.Name), - }.String()) - } - } - } - return secretReferences -} - func addEnvoyProxyIndexers(ctx context.Context, mgr manager.Manager) error { if err := mgr.GetFieldIndexer().IndexField(ctx, &egv1a1.EnvoyProxy{}, backendEnvoyProxyTelemetryIndex, backendEnvoyProxyTelemetryIndexFunc); err != nil { return err @@ -216,6 +197,23 @@ func backendEnvoyProxyTelemetryIndexFunc(rawObj client.Object) []string { return refs.UnsortedList() } +func secretEnvoyProxyIndexFunc(rawObj client.Object) []string { + ep := rawObj.(*egv1a1.EnvoyProxy) + var secretReferences []string + if ep.Spec.BackendTLS != nil { + if ep.Spec.BackendTLS.ClientCertificateRef != nil { + if *ep.Spec.BackendTLS.ClientCertificateRef.Kind == resource.KindSecret { + secretReferences = append(secretReferences, + types.NamespacedName{ + Namespace: gatewayapi.NamespaceDerefOr(ep.Spec.BackendTLS.ClientCertificateRef.Namespace, ep.Namespace), + Name: string(ep.Spec.BackendTLS.ClientCertificateRef.Name), + }.String()) + } + } + } + return secretReferences +} + func accessLogRefs(ep *egv1a1.EnvoyProxy) []string { var refs []string @@ -353,23 +351,7 @@ func backendGRPCRouteIndexFunc(rawObj client.Object) []string { // referenced in TLSRoute objects via `.spec.rules.backendRefs`. This helps in // querying for TLSRoutes that are affected by a particular Service CRUD. func addTLSRouteIndexers(ctx context.Context, mgr manager.Manager) error { - if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1a2.TLSRoute{}, gatewayTLSRouteIndex, func(rawObj client.Object) []string { - tlsRoute := rawObj.(*gwapiv1a2.TLSRoute) - var gateways []string - for _, parent := range tlsRoute.Spec.ParentRefs { - if string(*parent.Kind) == resource.KindGateway { - // If an explicit Gateway namespace is not provided, use the TLSRoute namespace to - // lookup the provided Gateway Name. - gateways = append(gateways, - types.NamespacedName{ - Namespace: gatewayapi.NamespaceDerefOr(parent.Namespace, tlsRoute.Namespace), - Name: string(parent.Name), - }.String(), - ) - } - } - return gateways - }); err != nil { + if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1a2.TLSRoute{}, gatewayTLSRouteIndex, gatewayTLSRouteIndexFunc); err != nil { return err } @@ -379,6 +361,24 @@ func addTLSRouteIndexers(ctx context.Context, mgr manager.Manager) error { return nil } +func gatewayTLSRouteIndexFunc(rawObj client.Object) []string { + tlsRoute := rawObj.(*gwapiv1a2.TLSRoute) + var gateways []string + for _, parent := range tlsRoute.Spec.ParentRefs { + if string(*parent.Kind) == resource.KindGateway { + // If an explicit Gateway namespace is not provided, use the TLSRoute namespace to + // lookup the provided Gateway Name. + gateways = append(gateways, + types.NamespacedName{ + Namespace: gatewayapi.NamespaceDerefOr(parent.Namespace, tlsRoute.Namespace), + Name: string(parent.Name), + }.String(), + ) + } + } + return gateways +} + func backendTLSRouteIndexFunc(rawObj client.Object) []string { tlsroute := rawObj.(*gwapiv1a2.TLSRoute) var backendRefs []string @@ -403,23 +403,7 @@ func backendTLSRouteIndexFunc(rawObj client.Object) []string { // referenced in TCPRoute objects via `.spec.rules.backendRefs`. This helps in // querying for TCPRoutes that are affected by a particular Service CRUD. func addTCPRouteIndexers(ctx context.Context, mgr manager.Manager) error { - if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1a2.TCPRoute{}, gatewayTCPRouteIndex, func(rawObj client.Object) []string { - tcpRoute := rawObj.(*gwapiv1a2.TCPRoute) - var gateways []string - for _, parent := range tcpRoute.Spec.ParentRefs { - if string(*parent.Kind) == resource.KindGateway { - // If an explicit Gateway namespace is not provided, use the TCPRoute namespace to - // lookup the provided Gateway Name. - gateways = append(gateways, - types.NamespacedName{ - Namespace: gatewayapi.NamespaceDerefOr(parent.Namespace, tcpRoute.Namespace), - Name: string(parent.Name), - }.String(), - ) - } - } - return gateways - }); err != nil { + if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1a2.TCPRoute{}, gatewayTCPRouteIndex, gatewayTCPRouteIndexFunc); err != nil { return err } @@ -429,6 +413,24 @@ func addTCPRouteIndexers(ctx context.Context, mgr manager.Manager) error { return nil } +func gatewayTCPRouteIndexFunc(rawObj client.Object) []string { + tcpRoute := rawObj.(*gwapiv1a2.TCPRoute) + var gateways []string + for _, parent := range tcpRoute.Spec.ParentRefs { + if string(*parent.Kind) == resource.KindGateway { + // If an explicit Gateway namespace is not provided, use the TCPRoute namespace to + // lookup the provided Gateway Name. + gateways = append(gateways, + types.NamespacedName{ + Namespace: gatewayapi.NamespaceDerefOr(parent.Namespace, tcpRoute.Namespace), + Name: string(parent.Name), + }.String(), + ) + } + } + return gateways +} + func backendTCPRouteIndexFunc(rawObj client.Object) []string { tcpRoute := rawObj.(*gwapiv1a2.TCPRoute) var backendRefs []string @@ -455,23 +457,7 @@ func backendTCPRouteIndexFunc(rawObj client.Object) []string { // - For Service objects that are referenced in UDPRoute objects via `.spec.rules.backendRefs`. This helps in // querying for UDPRoutes that are affected by a particular Service CRUD. func addUDPRouteIndexers(ctx context.Context, mgr manager.Manager) error { - if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1a2.UDPRoute{}, gatewayUDPRouteIndex, func(rawObj client.Object) []string { - udpRoute := rawObj.(*gwapiv1a2.UDPRoute) - var gateways []string - for _, parent := range udpRoute.Spec.ParentRefs { - if string(*parent.Kind) == resource.KindGateway { - // If an explicit Gateway namespace is not provided, use the UDPRoute namespace to - // lookup the provided Gateway Name. - gateways = append(gateways, - types.NamespacedName{ - Namespace: gatewayapi.NamespaceDerefOr(parent.Namespace, udpRoute.Namespace), - Name: string(parent.Name), - }.String(), - ) - } - } - return gateways - }); err != nil { + if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1a2.UDPRoute{}, gatewayUDPRouteIndex, gatewayUDPRouteIndexFunc); err != nil { return err } @@ -481,6 +467,24 @@ func addUDPRouteIndexers(ctx context.Context, mgr manager.Manager) error { return nil } +func gatewayUDPRouteIndexFunc(rawObj client.Object) []string { + udpRoute := rawObj.(*gwapiv1a2.UDPRoute) + var gateways []string + for _, parent := range udpRoute.Spec.ParentRefs { + if string(*parent.Kind) == resource.KindGateway { + // If an explicit Gateway namespace is not provided, use the UDPRoute namespace to + // lookup the provided Gateway Name. + gateways = append(gateways, + types.NamespacedName{ + Namespace: gatewayapi.NamespaceDerefOr(parent.Namespace, udpRoute.Namespace), + Name: string(parent.Name), + }.String(), + ) + } + } + return gateways +} + func backendUDPRouteIndexFunc(rawObj client.Object) []string { udproute := rawObj.(*gwapiv1a2.UDPRoute) var backendRefs []string @@ -509,10 +513,7 @@ func addGatewayIndexers(ctx context.Context, mgr manager.Manager) error { return err } - if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1.Gateway{}, classGatewayIndex, func(rawObj client.Object) []string { - gateway := rawObj.(*gwapiv1.Gateway) - return []string{string(gateway.Spec.GatewayClassName)} - }); err != nil { + if err := mgr.GetFieldIndexer().IndexField(ctx, &gwapiv1.Gateway{}, classGatewayIndex, gatewayIndexFunc); err != nil { return err } return nil @@ -541,6 +542,11 @@ func secretGatewayIndexFunc(rawObj client.Object) []string { return secretReferences } +func gatewayIndexFunc(rawObj client.Object) []string { + gateway := rawObj.(*gwapiv1.Gateway) + return []string{string(gateway.Spec.GatewayClassName)} +} + // addSecurityPolicyIndexers adds indexing on SecurityPolicy. // - For Secret objects that are referenced in SecurityPolicy objects via // `.spec.OIDC.clientSecret` and `.spec.basicAuth.users`. This helps in diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index 1bafe23668..c8b0472953 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -78,7 +78,7 @@ func (u *UpdateHandler) apply(update Update) { var ( startTime = time.Now() obj = update.Resource - objKind = kindOf(obj) + objKind = KindOf(obj) ) defer func() { @@ -297,7 +297,7 @@ func isStatusEqual(objA, objB interface{}) bool { return false } -// kindOf returns the known kind string for the given Kubernetes object, +// KindOf returns the known kind string for the given Kubernetes object, // returns Unknown for the unsupported object. // // Supported objects: @@ -316,7 +316,7 @@ func isStatusEqual(objA, objB interface{}) bool { // BackendTLSPolicy // EnvoyExtensionPolicy // Unstructured (for Extension Policies) -func kindOf(obj interface{}) string { +func KindOf(obj interface{}) string { var kind string switch o := obj.(type) { case *gwapiv1.GatewayClass: diff --git a/internal/provider/runner/runner.go b/internal/provider/runner/runner.go index a475e3204a..128184197a 100644 --- a/internal/provider/runner/runner.go +++ b/internal/provider/runner/runner.go @@ -53,7 +53,7 @@ func (r *Runner) Start(ctx context.Context) (err error) { } case egv1a1.ProviderTypeCustom: - p, err = r.createCustomResourceProvider() + p, err = r.createCustomResourceProvider(ctx) if err != nil { return fmt.Errorf("failed to create custom provider: %w", err) } @@ -87,10 +87,10 @@ func (r *Runner) createKubernetesProvider(ctx context.Context) (*kubernetes.Prov return p, err } -func (r *Runner) createCustomResourceProvider() (p provider.Provider, err error) { +func (r *Runner) createCustomResourceProvider(ctx context.Context) (p provider.Provider, err error) { switch r.EnvoyGateway.Provider.Custom.Resource.Type { case egv1a1.ResourceProviderTypeFile: - p, err = file.New(&r.Server, r.ProviderResources) + p, err = file.New(ctx, &r.Server, r.ProviderResources) if err != nil { return nil, fmt.Errorf("failed to create provider %s: %w", egv1a1.ProviderTypeCustom, err) }