diff --git a/pkg/ingress/apisix_pluginconfig.go b/pkg/ingress/apisix_pluginconfig.go index a1e4db2c34..3faf885bcf 100644 --- a/pkg/ingress/apisix_pluginconfig.go +++ b/pkg/ingress/apisix_pluginconfig.go @@ -27,6 +27,7 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/apache/apisix-ingress-controller/pkg/config" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" "github.com/apache/apisix-ingress-controller/pkg/kube" "github.com/apache/apisix-ingress-controller/pkg/kube/translation" "github.com/apache/apisix-ingress-controller/pkg/log" @@ -167,14 +168,14 @@ func (c *apisixPluginConfigController) sync(ctx context.Context, ev *types.Event zap.Any("pluginConfigs", tctx.PluginConfigs), ) - m := &manifest{ - pluginConfigs: tctx.PluginConfigs, + m := &utils.Manifest{ + PluginConfigs: tctx.PluginConfigs, } var ( - added *manifest - updated *manifest - deleted *manifest + added *utils.Manifest + updated *utils.Manifest + deleted *utils.Manifest ) if ev.Type == types.EventDelete { @@ -199,10 +200,10 @@ func (c *apisixPluginConfigController) sync(ctx context.Context, ev *types.Event return err } - om := &manifest{ - pluginConfigs: oldCtx.PluginConfigs, + om := &utils.Manifest{ + PluginConfigs: oldCtx.PluginConfigs, } - added, updated, deleted = m.diff(om) + added, updated, deleted = m.Diff(om) } return c.controller.syncManifests(ctx, added, updated, deleted) diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go index aece96874d..c2ecee339b 100644 --- a/pkg/ingress/apisix_route.go +++ b/pkg/ingress/apisix_route.go @@ -26,6 +26,7 @@ import ( "k8s.io/client-go/util/workqueue" apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" "github.com/apache/apisix-ingress-controller/pkg/kube" v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2" "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3" @@ -189,17 +190,17 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error zap.Any("pluginConfigs", tctx.PluginConfigs), ) - m := &manifest{ - routes: tctx.Routes, - upstreams: tctx.Upstreams, - streamRoutes: tctx.StreamRoutes, - pluginConfigs: tctx.PluginConfigs, + m := &utils.Manifest{ + Routes: tctx.Routes, + Upstreams: tctx.Upstreams, + StreamRoutes: tctx.StreamRoutes, + PluginConfigs: tctx.PluginConfigs, } var ( - added *manifest - updated *manifest - deleted *manifest + added *utils.Manifest + updated *utils.Manifest + deleted *utils.Manifest ) if ev.Type == types.EventDelete { @@ -226,13 +227,13 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error return err } - om := &manifest{ - routes: oldCtx.Routes, - upstreams: oldCtx.Upstreams, - streamRoutes: oldCtx.StreamRoutes, - pluginConfigs: oldCtx.PluginConfigs, + om := &utils.Manifest{ + Routes: oldCtx.Routes, + Upstreams: oldCtx.Upstreams, + StreamRoutes: oldCtx.StreamRoutes, + PluginConfigs: oldCtx.PluginConfigs, } - added, updated, deleted = m.diff(om) + added, updated, deleted = m.Diff(om) } return c.controller.syncManifests(ctx, added, updated, deleted) diff --git a/pkg/ingress/compare.go b/pkg/ingress/compare.go index 6ab081e46d..3818d2e1d4 100644 --- a/pkg/ingress/compare.go +++ b/pkg/ingress/compare.go @@ -20,7 +20,6 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/apache/apisix-ingress-controller/pkg/api/validation" "github.com/apache/apisix-ingress-controller/pkg/log" ) @@ -46,24 +45,9 @@ func (c *Controller) CompareResources(ctx context.Context) error { consumerMapA6 = make(map[string]string) pluginConfigMapA6 = make(map[string]string) ) - // watchingNamespaces and watchingLabels are empty means to monitor all namespaces. - if !validation.HasValueInSyncMap(c.watchingNamespaces) && len(c.watchingLabels) == 0 { - opts := v1.ListOptions{} - // list all namespaces - nsList, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts) - if err != nil { - log.Error(err.Error()) - ctx.Done() - } else { - wns := new(sync.Map) - for _, v := range nsList.Items { - wns.Store(v.Name, struct{}{}) - } - c.watchingNamespaces = wns - } - } - c.watchingNamespaces.Range(func(key, value interface{}) bool { + namespaces := c.namespaceProvider.WatchingNamespaces() + for _, key := range namespaces { log.Debugf("start to watch namespace: %s", key) wg.Add(1) go func(ns string) { @@ -139,9 +123,8 @@ func (c *Controller) CompareResources(ctx context.Context) error { } } } - }(key.(string)) - return true - }) + }(key) + } wg.Wait() // 2.get all cache routes diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go index f9c834ce6f..80c36e342b 100644 --- a/pkg/ingress/controller.go +++ b/pkg/ingress/controller.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "os" - "strings" "sync" "time" @@ -35,13 +34,14 @@ import ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" - gatewaylistersv1alpha2 "sigs.k8s.io/gateway-api/pkg/client/listers/gateway/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/pkg/api" - "github.com/apache/apisix-ingress-controller/pkg/api/validation" "github.com/apache/apisix-ingress-controller/pkg/apisix" apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" "github.com/apache/apisix-ingress-controller/pkg/config" + "github.com/apache/apisix-ingress-controller/pkg/ingress/gateway" + "github.com/apache/apisix-ingress-controller/pkg/ingress/namespace" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" "github.com/apache/apisix-ingress-controller/pkg/kube" configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3" apisixscheme "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme" @@ -68,18 +68,15 @@ const ( // Controller is the ingress apisix controller object. type Controller struct { - name string - namespace string - cfg *config.Config - wg sync.WaitGroup - watchingNamespaces *sync.Map - watchingLabels types.Labels - apisix apisix.APISIX - podCache types.PodCache - translator translation.Translator - apiServer *api.Server - MetricsCollector metrics.Collector - kubeClient *kube.KubeClient + name string + namespace string + cfg *config.Config + apisix apisix.APISIX + podCache types.PodCache + translator translation.Translator + apiServer *api.Server + MetricsCollector metrics.Collector + kubeClient *kube.KubeClient // recorder event recorder record.EventRecorder // this map enrolls which ApisixTls objects refer to a Kubernetes @@ -93,8 +90,6 @@ type Controller struct { leaderContextCancelFunc context.CancelFunc // common informers and listers - namespaceInformer cache.SharedIndexInformer - namespaceLister listerscorev1.NamespaceLister podInformer cache.SharedIndexInformer podLister listerscorev1.PodLister epInformer cache.SharedIndexInformer @@ -117,20 +112,16 @@ type Controller struct { apisixConsumerLister kube.ApisixConsumerLister apisixPluginConfigInformer cache.SharedIndexInformer apisixPluginConfigLister kube.ApisixPluginConfigLister - gatewayInformer cache.SharedIndexInformer - gatewayLister gatewaylistersv1alpha2.GatewayLister - gatewayHttpRouteInformer cache.SharedIndexInformer - gatewayHttpRouteLister gatewaylistersv1alpha2.HTTPRouteLister // resource controllers - namespaceController *namespaceController - podController *podController - endpointsController *endpointsController - endpointSliceController *endpointSliceController - ingressController *ingressController - secretController *secretController - gatewayController *gatewayController - gatewayHTTPRouteController *gatewayHTTPRouteController + podController *podController + endpointsController *endpointsController + endpointSliceController *endpointSliceController + ingressController *ingressController + secretController *secretController + + namespaceProvider namespace.WatchingProvider + gatewayProvider *gateway.Provider apisixUpstreamController *apisixUpstreamController apisixRouteController *apisixRouteController @@ -162,39 +153,21 @@ func NewController(cfg *config.Config) (*Controller, error) { return nil, err } - var ( - watchingNamespace = new(sync.Map) - watchingLabels = make(map[string]string) - ) - if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll { - for _, ns := range cfg.Kubernetes.AppNamespaces { - watchingNamespace.Store(ns, struct{}{}) - } - } - - // support namespace label-selector - for _, labels := range cfg.Kubernetes.NamespaceSelector { - labelSlice := strings.Split(labels, "=") - watchingLabels[labelSlice[0]] = labelSlice[1] - } - // recorder utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme)) eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.Client.CoreV1().Events("")}) c := &Controller{ - name: podName, - namespace: podNamespace, - cfg: cfg, - apiServer: apiSrv, - apisix: client, - MetricsCollector: metrics.NewPrometheusCollector(), - kubeClient: kubeClient, - watchingNamespaces: watchingNamespace, - watchingLabels: watchingLabels, - secretSSLMap: new(sync.Map), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}), + name: podName, + namespace: podNamespace, + cfg: cfg, + apiServer: apiSrv, + apisix: client, + MetricsCollector: metrics.NewPrometheusCollector(), + kubeClient: kubeClient, + secretSSLMap: new(sync.Map), + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}), podCache: types.NewPodCache(), } @@ -213,9 +186,7 @@ func (c *Controller) initWhenStartLeading() { kubeFactory := c.kubeClient.NewSharedIndexInformerFactory() apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory() - gatewayFactory := c.kubeClient.NewGatewaySharedIndexInformerFactory() - c.namespaceLister = kubeFactory.Core().V1().Namespaces().Lister() c.podLister = kubeFactory.Core().V1().Pods().Lister() c.epLister, c.epInformer = kube.NewEndpointListerAndInformer(kubeFactory, c.cfg.Kubernetes.WatchEndpointSlices) c.svcLister = kubeFactory.Core().V1().Services().Lister() @@ -266,12 +237,6 @@ func (c *Controller) initWhenStartLeading() { ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer() } - c.gatewayLister = gatewayFactory.Gateway().V1alpha2().Gateways().Lister() - c.gatewayInformer = gatewayFactory.Gateway().V1alpha2().Gateways().Informer() - - c.gatewayHttpRouteLister = gatewayFactory.Gateway().V1alpha2().HTTPRoutes().Lister() - c.gatewayHttpRouteInformer = gatewayFactory.Gateway().V1alpha2().HTTPRoutes().Informer() - switch c.cfg.Kubernetes.ApisixRouteVersion { case config.ApisixRouteV2beta2: apisixRouteInformer = apisixFactory.Apisix().V2beta2().ApisixRoutes().Informer() @@ -319,7 +284,6 @@ func (c *Controller) initWhenStartLeading() { panic(fmt.Errorf("unsupported ApisixPluginConfig version %v", c.cfg.Kubernetes.ApisixPluginConfigVersion)) } - c.namespaceInformer = kubeFactory.Core().V1().Namespaces().Informer() c.podInformer = kubeFactory.Core().V1().Pods().Informer() c.svcInformer = kubeFactory.Core().V1().Services().Informer() c.ingressInformer = ingressInformer @@ -336,7 +300,6 @@ func (c *Controller) initWhenStartLeading() { } else { c.endpointsController = c.newEndpointsController() } - c.namespaceController = c.newNamespaceController() c.podController = c.newPodController() c.apisixUpstreamController = c.newApisixUpstreamController() c.ingressController = c.newIngressController() @@ -346,8 +309,10 @@ func (c *Controller) initWhenStartLeading() { c.secretController = c.newSecretController() c.apisixConsumerController = c.newApisixConsumerController() c.apisixPluginConfigController = c.newApisixPluginConfigController() - c.gatewayController = c.newGatewayController() - c.gatewayHTTPRouteController = c.newGatewayHTTPRouteController() +} + +func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted *utils.Manifest) error { + return utils.SyncManifests(ctx, c.apisix, c.cfg.APISIX.DefaultClusterName, added, updated, deleted) } // recorderEvent recorder events for resources @@ -366,14 +331,6 @@ func (c *Controller) recorderEventS(object runtime.Object, eventtype, reason str c.recorder.Event(object, eventtype, reason, msg) } -func (c *Controller) goAttach(handler func()) { - c.wg.Add(1) - go func() { - defer c.wg.Done() - handler() - }() -} - // Eventf implements the resourcelock.EventRecorder interface. func (c *Controller) Eventf(_ runtime.Object, eventType string, reason string, message string, _ ...interface{}) { log.Infow(reason, zap.String("message", message), zap.String("event_type", eventType)) @@ -501,63 +458,75 @@ func (c *Controller) run(ctx context.Context) { c.initWhenStartLeading() - // list namespaces and init watchingNamespaces - if err := c.initWatchingNamespacesByLabels(ctx); err != nil { + c.namespaceProvider, err = namespace.NewWatchingProvider(ctx, c.kubeClient, c.cfg) + if err != nil { + ctx.Done() + return + } + + c.gatewayProvider, err = gateway.NewGatewayProvider(&gateway.ProviderOptions{ + Cfg: c.cfg, + APISIX: c.apisix, + APISIXClusterName: c.cfg.APISIX.DefaultClusterName, + KubeTranslator: c.translator, + RestConfig: nil, + KubeClient: c.kubeClient.Client, + MetricsCollector: c.MetricsCollector, + NamespaceProvider: c.namespaceProvider, + }) + if err != nil { ctx.Done() return } + // compare resources of k8s with objects of APISIX if err = c.CompareResources(ctx); err != nil { ctx.Done() return } - c.goAttach(func() { + e := utils.ParallelExecutor{} + + e.Add(func() { c.checkClusterHealth(ctx, cancelFunc) }) - c.goAttach(func() { - c.namespaceInformer.Run(ctx.Done()) - }) - c.goAttach(func() { + e.Add(func() { c.podInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.epInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.svcInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.ingressInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.apisixRouteInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.apisixUpstreamInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.apisixClusterConfigInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.secretInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.apisixTlsInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.apisixConsumerInformer.Run(ctx.Done()) }) - c.goAttach(func() { + e.Add(func() { c.apisixPluginConfigInformer.Run(ctx.Done()) }) - c.goAttach(func() { - c.namespaceController.run(ctx) - }) - c.goAttach(func() { + e.Add(func() { c.podController.run(ctx) }) - c.goAttach(func() { + e.Add(func() { if c.cfg.Kubernetes.WatchEndpointSlices { c.endpointSliceController.run(ctx) } else { @@ -565,46 +534,38 @@ func (c *Controller) run(ctx context.Context) { } }) - if c.cfg.Kubernetes.EnableGatewayAPI { - c.goAttach(func() { - c.gatewayInformer.Run(ctx.Done()) - }) - - c.goAttach(func() { - c.gatewayHttpRouteInformer.Run(ctx.Done()) - }) - - c.goAttach(func() { - c.gatewayController.run(ctx) - }) + e.Add(func() { + c.namespaceProvider.Run(ctx) + }) - c.goAttach(func() { - c.gatewayHTTPRouteController.run(ctx) + if c.cfg.Kubernetes.EnableGatewayAPI { + e.Add(func() { + c.gatewayProvider.Run(ctx) }) } - c.goAttach(func() { + e.Add(func() { c.apisixUpstreamController.run(ctx) }) - c.goAttach(func() { + e.Add(func() { c.ingressController.run(ctx) }) - c.goAttach(func() { + e.Add(func() { c.apisixRouteController.run(ctx) }) - c.goAttach(func() { + e.Add(func() { c.apisixClusterConfigController.run(ctx) }) - c.goAttach(func() { + e.Add(func() { c.apisixTlsController.run(ctx) }) - c.goAttach(func() { + e.Add(func() { c.secretController.run(ctx) }) - c.goAttach(func() { + e.Add(func() { c.apisixConsumerController.run(ctx) }) - c.goAttach(func() { + e.Add(func() { c.apisixPluginConfigController.run(ctx) }) @@ -616,25 +577,21 @@ func (c *Controller) run(ctx context.Context) { ) <-ctx.Done() - c.wg.Wait() + e.Wait() + + for _, execErr := range e.Errors() { + log.Error(execErr.Error()) + } + if len(e.Errors()) > 0 { + log.Error("Start failed, abort...") + cancelFunc() + } } // isWatchingNamespace accepts a resource key, getting the namespace part // and checking whether the namespace is being watched. func (c *Controller) isWatchingNamespace(key string) (ok bool) { - if !validation.HasValueInSyncMap(c.watchingNamespaces) { - ok = true - return - } - ns, _, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - // Ignore resource with invalid key. - ok = false - log.Warnf("resource %s was ignored since: %s", key, err) - return - } - _, ok = c.watchingNamespaces.Load(ns) - return + return c.namespaceProvider.IsWatchingNamespace(key) } func (c *Controller) syncSSL(ctx context.Context, ssl *apisixv1.Ssl, event types.EventType) error { @@ -739,8 +696,8 @@ func (c *Controller) syncUpstreamNodesChangeToCluster(ctx context.Context, clust zap.String("cluster", cluster.String()), ) - updated := &manifest{ - upstreams: []*apisixv1.Upstream{upstream}, + updated := &utils.Manifest{ + Upstreams: []*apisixv1.Upstream{upstream}, } return c.syncManifests(ctx, nil, updated, nil) } diff --git a/pkg/ingress/gateway.go b/pkg/ingress/gateway/gateway.go similarity index 64% rename from pkg/ingress/gateway.go rename to pkg/ingress/gateway/gateway.go index 3e4ef6a594..58bea6bbba 100644 --- a/pkg/ingress/gateway.go +++ b/pkg/ingress/gateway/gateway.go @@ -12,30 +12,33 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package ingress +package gateway import ( "context" "time" "go.uber.org/zap" + apiv1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/types" ) type gatewayController struct { - controller *Controller + controller *Provider workqueue workqueue.RateLimitingInterface workers int } -func (c *Controller) newGatewayController() *gatewayController { +func newGatewayController(c *Provider) *gatewayController { ctl := &gatewayController{ controller: c, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Gateway"), @@ -108,6 +111,10 @@ func (c *gatewayController) sync(ctx context.Context, ev *types.Event) error { return nil } gateway = ev.Tombstone.(*gatewayv1alpha2.Gateway) + //} else { + //if c.controller.HasGatewayClass(string(gateway.Spec.GatewayClassName)) { + // // TODO: Translate listeners + //} } // TODO The current implementation does not fully support the definition of Gateway. @@ -115,7 +122,7 @@ func (c *gatewayController) sync(ctx context.Context, ev *types.Event) error { // At present, we choose to directly update `GatewayStatus.Addresses` // to indicate that we have picked the Gateway resource. - c.controller.recordStatus(gateway, string(gatewayv1alpha2.ListenerReasonReady), nil, metav1.ConditionTrue, gateway.Generation) + c.recordStatus(gateway, string(gatewayv1alpha2.ListenerReasonReady), metav1.ConditionTrue, gateway.Generation) return nil } @@ -148,7 +155,7 @@ func (c *gatewayController) onAdd(obj interface{}) { log.Errorf("found gateway resource with bad meta namespace key: %s", err) return } - if !c.controller.isWatchingNamespace(key) { + if !c.controller.NamespaceProvider.IsWatchingNamespace(key) { return } log.Debugw("gateway add event arrived", @@ -162,3 +169,67 @@ func (c *gatewayController) onAdd(obj interface{}) { } func (c *gatewayController) onUpdate(oldObj, newObj interface{}) {} func (c *gatewayController) OnDelete(obj interface{}) {} + +// recordStatus record resources status +func (c *gatewayController) recordStatus(v *gatewayv1alpha2.Gateway, reason string, status metav1.ConditionStatus, generation int64) { + v = v.DeepCopy() + + gatewayCondition := metav1.Condition{ + Type: string(gatewayv1alpha2.ListenerConditionReady), + Reason: reason, + Status: status, + Message: "Gateway's status has been successfully updated", + ObservedGeneration: generation, + } + + if v.Status.Conditions == nil { + conditions := make([]metav1.Condition, 0) + v.Status.Conditions = conditions + } else { + meta.SetStatusCondition(&v.Status.Conditions, gatewayCondition) + } + + lbips, err := utils.IngressLBStatusIPs(c.controller.Cfg.IngressPublishService, c.controller.Cfg.IngressStatusAddress, c.controller.KubeClient) + if err != nil { + log.Errorw("failed to get APISIX gateway external IPs", + zap.Error(err), + ) + } + + v.Status.Addresses = convLBIPToGatewayAddr(lbips) + if _, errRecord := c.controller.gatewayClient.GatewayV1alpha2().Gateways(v.Namespace).UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil { + log.Errorw("failed to record status change for Gateway resource", + zap.Error(errRecord), + zap.String("name", v.Name), + zap.String("namespace", v.Namespace), + ) + } +} + +// convLBIPToGatewayAddr convert LoadBalancerIngress to GatewayAddress format +func convLBIPToGatewayAddr(lbips []apiv1.LoadBalancerIngress) []gatewayv1alpha2.GatewayAddress { + var gas []gatewayv1alpha2.GatewayAddress + + // In the definition, there is also an address type called NamedAddress, + // which we currently do not implement + HostnameAddressType := gatewayv1alpha2.HostnameAddressType + IPAddressType := gatewayv1alpha2.IPAddressType + + for _, lbip := range lbips { + if v := lbip.Hostname; v != "" { + gas = append(gas, gatewayv1alpha2.GatewayAddress{ + Type: &HostnameAddressType, + Value: v, + }) + } + + if v := lbip.IP; v != "" { + gas = append(gas, gatewayv1alpha2.GatewayAddress{ + Type: &IPAddressType, + Value: v, + }) + } + } + + return gas +} diff --git a/pkg/ingress/gateway/gateway_class.go b/pkg/ingress/gateway/gateway_class.go new file mode 100644 index 0000000000..00d2a4fb4f --- /dev/null +++ b/pkg/ingress/gateway/gateway_class.go @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package gateway + +import ( + "context" + "fmt" + "time" + + "go.uber.org/zap" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/gateway-api/apis/v1alpha2" + + "github.com/apache/apisix-ingress-controller/pkg/log" + "github.com/apache/apisix-ingress-controller/pkg/types" +) + +const ( + GatewayClassName = "apisix-ingress-controller" +) + +type gatewayClassController struct { + controller *Provider + workqueue workqueue.RateLimitingInterface + workers int +} + +func newGatewayClassController(c *Provider) (*gatewayClassController, error) { + ctrl := &gatewayClassController{ + controller: c, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "GatewayClass"), + workers: 1, + } + + err := ctrl.init() + if err != nil { + return nil, err + } + + // TODO: change to event channel + ctrl.controller.gatewayClassInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: ctrl.onAdd, + UpdateFunc: ctrl.onUpdate, + DeleteFunc: ctrl.onDelete, + }) + return ctrl, nil +} + +func (c *gatewayClassController) init() error { + classes, err := c.controller.gatewayClassLister.List(labels.Everything()) + if err != nil { + return err + } + + for _, gatewayClass := range classes { + if gatewayClass.Spec.ControllerName == GatewayClassName { + err := c.markAsUpdated(gatewayClass) + if err != nil { + return err + } + } + } + + return nil +} + +func (c *gatewayClassController) markAsUpdated(gatewayClass *v1alpha2.GatewayClass) error { + gc := gatewayClass.DeepCopy() + + condition := metav1.Condition{ + Type: string(v1alpha2.GatewayClassConditionStatusAccepted), + Status: metav1.ConditionTrue, + Reason: "Updated", + Message: fmt.Sprintf("Updated by apisix-ingress-controller, sync at %v", time.Now()), + LastTransitionTime: metav1.Now(), + } + + var newConditions []metav1.Condition + for _, cond := range gc.Status.Conditions { + if cond.Type == condition.Type { + if cond.Status == condition.Status { + // Update message to record last sync time, don't change LastTransitionTime + cond.Message = condition.Message + } else { + newConditions = append(newConditions, condition) + } + } + + if cond.Type != condition.Type { + newConditions = append(newConditions, cond) + } + } + + gc.Status.Conditions = newConditions + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := c.controller.gatewayClient.GatewayV1alpha2().GatewayClasses().UpdateStatus(ctx, gc, metav1.UpdateOptions{}) + if err != nil { + log.Errorw("failed to update GatewayClass status", + zap.Error(err), + zap.String("name", gatewayClass.Name), + ) + return err + } + + c.controller.AddGatewayClass(gatewayClass.Name) + + return nil +} + +func (c *gatewayClassController) run(ctx context.Context) { + log.Info("gateway HTTPRoute controller started") + defer log.Info("gateway HTTPRoute controller exited") + defer c.workqueue.ShutDown() + + if !cache.WaitForCacheSync(ctx.Done(), c.controller.gatewayClassInformer.HasSynced) { + log.Error("sync Gateway HTTPRoute cache failed") + return + } + + for i := 0; i < c.workers; i++ { + go c.runWorker(ctx) + } + <-ctx.Done() +} + +func (c *gatewayClassController) runWorker(ctx context.Context) { + for { + obj, quit := c.workqueue.Get() + if quit { + return + } + err := c.sync(ctx, obj.(*types.Event)) + c.workqueue.Done(obj) + c.handleSyncErr(obj, err) + } +} + +func (c *gatewayClassController) sync(ctx context.Context, ev *types.Event) error { + if ev.Type == types.EventAdd { + key := ev.Object.(string) + gatewayClass, err := c.controller.gatewayClassLister.Get(key) + if err != nil { + return err + } + + if gatewayClass.Spec.ControllerName == GatewayClassName { + return c.markAsUpdated(gatewayClass) + } + } else if ev.Type == types.EventDelete { + key := ev.Object.(string) + c.controller.RemoveGatewayClass(key) + } + + return nil +} + +func (c *gatewayClassController) handleSyncErr(obj interface{}, err error) { + if err == nil { + c.workqueue.Forget(obj) + c.controller.MetricsCollector.IncrSyncOperation("gateway_class", "success") + return + } + event := obj.(*types.Event) + if k8serrors.IsNotFound(err) && event.Type != types.EventDelete { + log.Infow("sync gateway HTTPRoute but not found, ignore", + zap.String("event_type", event.Type.String()), + zap.String("HTTPRoute ", event.Object.(string)), + ) + c.workqueue.Forget(event) + return + } + log.Warnw("sync gateway HTTPRoute failed, will retry", + zap.Any("object", obj), + zap.Error(err), + ) + c.workqueue.AddRateLimited(obj) + c.controller.MetricsCollector.IncrSyncOperation("gateway_class", "failure") +} + +func (c *gatewayClassController) onAdd(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorf("found gateway HTTPRoute resource with bad meta namespace key: %s", err) + return + } + if !c.controller.NamespaceProvider.IsWatchingNamespace(key) { + return + } + log.Debugw("gateway HTTPRoute add event arrived", + zap.Any("object", obj), + ) + + c.workqueue.Add(&types.Event{ + Type: types.EventAdd, + Object: key, + }) +} + +func (c *gatewayClassController) onUpdate(oldObj, newObj interface{}) { + // Ignore update event since ControllerName is immutable +} + +func (c *gatewayClassController) onDelete(obj interface{}) { + gatewayClass := obj.(*v1alpha2.GatewayClass) + c.workqueue.Add(&types.Event{ + Type: types.EventDelete, + Object: gatewayClass.Name, + Tombstone: gatewayClass, + }) +} diff --git a/pkg/ingress/gateway_httproute.go b/pkg/ingress/gateway/gateway_httproute.go similarity index 87% rename from pkg/ingress/gateway_httproute.go rename to pkg/ingress/gateway/gateway_httproute.go index acb9c5e35a..a710924001 100644 --- a/pkg/ingress/gateway_httproute.go +++ b/pkg/ingress/gateway/gateway_httproute.go @@ -12,7 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package ingress +package gateway import ( "context" @@ -24,25 +24,26 @@ import ( "k8s.io/client-go/util/workqueue" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" "github.com/apache/apisix-ingress-controller/pkg/kube/translation" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/types" ) type gatewayHTTPRouteController struct { - controller *Controller + controller *Provider workqueue workqueue.RateLimitingInterface workers int } -func (c *Controller) newGatewayHTTPRouteController() *gatewayHTTPRouteController { +func newGatewayHTTPRouteController(c *Provider) *gatewayHTTPRouteController { ctrl := &gatewayHTTPRouteController{ controller: c, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "GatewayHTTPRoute"), workers: 1, } - ctrl.controller.gatewayHttpRouteInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + ctrl.controller.gatewayHTTPRouteInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ctrl.onAdd, UpdateFunc: ctrl.onUpdate, DeleteFunc: ctrl.OnDelete, @@ -55,7 +56,7 @@ func (c *gatewayHTTPRouteController) run(ctx context.Context) { defer log.Info("gateway HTTPRoute controller exited") defer c.workqueue.ShutDown() - if !cache.WaitForCacheSync(ctx.Done(), c.controller.gatewayHttpRouteInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), c.controller.gatewayHTTPRouteInformer.HasSynced) { log.Error("sync Gateway HTTPRoute cache failed") return } @@ -91,7 +92,7 @@ func (c *gatewayHTTPRouteController) sync(ctx context.Context, ev *types.Event) log.Debugw("sync HTTPRoute", zap.String("key", key)) - httpRoute, err := c.controller.gatewayHttpRouteLister.HTTPRoutes(namespace).Get(name) + httpRoute, err := c.controller.gatewayHTTPRouteLister.HTTPRoutes(namespace).Get(name) if err != nil { if !k8serrors.IsNotFound(err) { log.Errorw("failed to get Gateway HTTPRoute", @@ -134,15 +135,15 @@ func (c *gatewayHTTPRouteController) sync(ctx context.Context, ev *types.Event) zap.Any("routes", tctx.Routes), zap.Any("upstreams", tctx.Upstreams), ) - m := &manifest{ - routes: tctx.Routes, - upstreams: tctx.Upstreams, + m := &utils.Manifest{ + Routes: tctx.Routes, + Upstreams: tctx.Upstreams, } var ( - added *manifest - updated *manifest - deleted *manifest + added *utils.Manifest + updated *utils.Manifest + deleted *utils.Manifest ) if ev.Type == types.EventDelete { @@ -163,14 +164,14 @@ func (c *gatewayHTTPRouteController) sync(ctx context.Context, ev *types.Event) return err } - om := &manifest{ - routes: oldCtx.Routes, - upstreams: oldCtx.Upstreams, + om := &utils.Manifest{ + Routes: oldCtx.Routes, + Upstreams: oldCtx.Upstreams, } - added, updated, deleted = m.diff(om) + added, updated, deleted = m.Diff(om) } - return c.controller.syncManifests(ctx, added, updated, deleted) + return utils.SyncManifests(ctx, c.controller.APISIX, c.controller.APISIXClusterName, added, updated, deleted) } func (c *gatewayHTTPRouteController) handleSyncErr(obj interface{}, err error) { @@ -202,7 +203,7 @@ func (c *gatewayHTTPRouteController) onAdd(obj interface{}) { log.Errorf("found gateway HTTPRoute resource with bad meta namespace key: %s", err) return } - if !c.controller.isWatchingNamespace(key) { + if !c.controller.NamespaceProvider.IsWatchingNamespace(key) { return } log.Debugw("gateway HTTPRoute add event arrived", diff --git a/pkg/ingress/gateway/provider.go b/pkg/ingress/gateway/provider.go new file mode 100644 index 0000000000..0e87a82c21 --- /dev/null +++ b/pkg/ingress/gateway/provider.go @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package gateway + +import ( + "context" + "sync" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + gatewayclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/gateway/versioned" + gatewayexternalversions "sigs.k8s.io/gateway-api/pkg/client/informers/gateway/externalversions" + gatewaylistersv1alpha2 "sigs.k8s.io/gateway-api/pkg/client/listers/gateway/apis/v1alpha2" + + "github.com/apache/apisix-ingress-controller/pkg/apisix" + "github.com/apache/apisix-ingress-controller/pkg/config" + gatewaytranslation "github.com/apache/apisix-ingress-controller/pkg/ingress/gateway/translation" + "github.com/apache/apisix-ingress-controller/pkg/ingress/namespace" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" + "github.com/apache/apisix-ingress-controller/pkg/kube" + "github.com/apache/apisix-ingress-controller/pkg/kube/translation" + "github.com/apache/apisix-ingress-controller/pkg/metrics" +) + +const ( + ProviderName = "GatewayAPI" +) + +type Provider struct { + name string + + gatewayNamesLock sync.RWMutex + gatewayNames map[string]struct{} + + *ProviderOptions + gatewayClient gatewayclientset.Interface + + translator gatewaytranslation.Translator + + gatewayController *gatewayController + gatewayInformer cache.SharedIndexInformer + gatewayLister gatewaylistersv1alpha2.GatewayLister + + gatewayClassController *gatewayClassController + gatewayClassInformer cache.SharedIndexInformer + gatewayClassLister gatewaylistersv1alpha2.GatewayClassLister + + gatewayHTTPRouteController *gatewayHTTPRouteController + gatewayHTTPRouteInformer cache.SharedIndexInformer + gatewayHTTPRouteLister gatewaylistersv1alpha2.HTTPRouteLister +} + +type ProviderOptions struct { + Cfg *config.Config + APISIX apisix.APISIX + APISIXClusterName string + KubeTranslator translation.Translator + RestConfig *rest.Config + KubeClient kubernetes.Interface + MetricsCollector metrics.Collector + NamespaceProvider namespace.WatchingProvider +} + +func NewGatewayProvider(opts *ProviderOptions) (*Provider, error) { + var err error + if opts.RestConfig == nil { + restConfig, err := kube.BuildRestConfig(opts.Cfg.Kubernetes.Kubeconfig, "") + if err != nil { + return nil, err + } + + opts.RestConfig = restConfig + } + gatewayKubeClient, err := gatewayclientset.NewForConfig(opts.RestConfig) + if err != nil { + return nil, err + } + + p := &Provider{ + name: ProviderName, + + ProviderOptions: opts, + gatewayClient: gatewayKubeClient, + + translator: gatewaytranslation.NewTranslator(&gatewaytranslation.TranslatorOptions{ + KubeTranslator: opts.KubeTranslator, + }), + } + + gatewayFactory := gatewayexternalversions.NewSharedInformerFactory(p.gatewayClient, p.Cfg.Kubernetes.ResyncInterval.Duration) + + p.gatewayLister = gatewayFactory.Gateway().V1alpha2().Gateways().Lister() + p.gatewayInformer = gatewayFactory.Gateway().V1alpha2().Gateways().Informer() + + p.gatewayClassLister = gatewayFactory.Gateway().V1alpha2().GatewayClasses().Lister() + p.gatewayClassInformer = gatewayFactory.Gateway().V1alpha2().GatewayClasses().Informer() + + p.gatewayHTTPRouteLister = gatewayFactory.Gateway().V1alpha2().HTTPRoutes().Lister() + p.gatewayHTTPRouteInformer = gatewayFactory.Gateway().V1alpha2().HTTPRoutes().Informer() + + p.gatewayController = newGatewayController(p) + + p.gatewayClassController, err = newGatewayClassController(p) + if err != nil { + return nil, err + } + + p.gatewayHTTPRouteController = newGatewayHTTPRouteController(p) + + return p, nil +} + +func (p *Provider) Run(ctx context.Context) { + e := utils.ParallelExecutor{} + + e.Add(func() { + p.gatewayInformer.Run(ctx.Done()) + }) + + e.Add(func() { + p.gatewayClassInformer.Run(ctx.Done()) + }) + + e.Add(func() { + p.gatewayHTTPRouteInformer.Run(ctx.Done()) + }) + + e.Add(func() { + p.gatewayController.run(ctx) + }) + + e.Add(func() { + p.gatewayClassController.run(ctx) + }) + + e.Add(func() { + p.gatewayHTTPRouteController.run(ctx) + }) + + e.Wait() +} + +func (p *Provider) AddGatewayClass(name string) { + p.gatewayNamesLock.Lock() + defer p.gatewayNamesLock.Unlock() + + p.gatewayNames[name] = struct{}{} + +} +func (p *Provider) RemoveGatewayClass(name string) { + p.gatewayNamesLock.Lock() + defer p.gatewayNamesLock.Unlock() + + delete(p.gatewayNames, name) +} + +func (p *Provider) HasGatewayClass(name string) bool { + p.gatewayNamesLock.RLock() + defer p.gatewayNamesLock.RUnlock() + + _, ok := p.gatewayNames[name] + return ok +} diff --git a/pkg/kube/translation/gateway_httproute.go b/pkg/ingress/gateway/translation/gateway_httproute.go similarity index 92% rename from pkg/kube/translation/gateway_httproute.go rename to pkg/ingress/gateway/translation/gateway_httproute.go index 31a176d430..3868bafb3c 100644 --- a/pkg/kube/translation/gateway_httproute.go +++ b/pkg/ingress/gateway/translation/gateway_httproute.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. // -package translation +package gateway_translation import ( "fmt" @@ -26,12 +26,14 @@ import ( gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/pkg/id" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" + "github.com/apache/apisix-ingress-controller/pkg/kube/translation" "github.com/apache/apisix-ingress-controller/pkg/log" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) -func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha2.HTTPRoute) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() +func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha2.HTTPRoute) (*translation.TranslateContext, error) { + ctx := translation.DefaultEmptyTranslateContext() var hosts []string for _, hostname := range httpRoute.Spec.Hostnames { @@ -82,7 +84,7 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha continue } - ups, err := t.TranslateUpstream(ns, string(backend.Name), "", int32(*backend.Port)) + ups, err := t.KubeTranslator.TranslateUpstream(ns, string(backend.Name), "", int32(*backend.Port)) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].BackendRefs[%v]", i, j)) } @@ -90,12 +92,12 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha // APISIX limits max length of label value // https://github.com/apache/apisix/blob/5b95b85faea3094d5e466ee2d39a52f1f805abbb/apisix/schema_def.lua#L85 - ups.Labels["meta_namespace"] = truncate(ns, 64) - ups.Labels["meta_backend"] = truncate(string(backend.Name), 64) + ups.Labels["meta_namespace"] = utils.TruncateString(ns, 64) + ups.Labels["meta_backend"] = utils.TruncateString(string(backend.Name), 64) ups.Labels["meta_port"] = fmt.Sprintf("%v", int32(*backend.Port)) ups.ID = id.GenID(name) - ctx.addUpstream(ups) + ctx.AddUpstream(ups) ruleUpstreams = append(ruleUpstreams, ups) if backend.Weight == nil { @@ -154,7 +156,7 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha } } - ctx.addRoute(route) + ctx.AddRoute(route) } //TODO: Support filters diff --git a/pkg/kube/translation/gateway_httproute_test.go b/pkg/ingress/gateway/translation/gateway_httproute_test.go similarity index 97% rename from pkg/kube/translation/gateway_httproute_test.go rename to pkg/ingress/gateway/translation/gateway_httproute_test.go index 9b8d98f233..4f26c45456 100644 --- a/pkg/kube/translation/gateway_httproute_test.go +++ b/pkg/ingress/gateway/translation/gateway_httproute_test.go @@ -12,7 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package translation +package gateway_translation import ( "context" @@ -30,6 +30,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/kube" fakeapisix "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/fake" apisixinformers "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions" + "github.com/apache/apisix-ingress-controller/pkg/kube/translation" v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -102,9 +103,11 @@ func mockHTTPRouteTranslator(t *testing.T) (*translator, <-chan struct{}) { tr := &translator{ &TranslatorOptions{ - EndpointLister: epLister, - ServiceLister: svcLister, - ApisixUpstreamLister: apisixInformersFactory.Apisix().V2beta3().ApisixUpstreams().Lister(), + KubeTranslator: translation.NewTranslator(&translation.TranslatorOptions{ + EndpointLister: epLister, + ServiceLister: svcLister, + ApisixUpstreamLister: apisixInformersFactory.Apisix().V2beta3().ApisixUpstreams().Lister(), + }), }, } diff --git a/pkg/ingress/gateway/translation/translator.go b/pkg/ingress/gateway/translation/translator.go new file mode 100644 index 0000000000..3a8a5fc3f6 --- /dev/null +++ b/pkg/ingress/gateway/translation/translator.go @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package gateway_translation + +import ( + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + "github.com/apache/apisix-ingress-controller/pkg/kube/translation" +) + +type TranslatorOptions struct { + KubeTranslator translation.Translator +} + +type translator struct { + *TranslatorOptions +} + +type Translator interface { + // TranslateGatewayHTTPRouteV1Alpha2 translates Gateway API HTTPRoute to APISIX resources + TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha2.HTTPRoute) (*translation.TranslateContext, error) +} + +// NewTranslator initializes a APISIX CRD resources Translator. +func NewTranslator(opts *TranslatorOptions) Translator { + return &translator{ + TranslatorOptions: opts, + } +} diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go index b99c800c9d..8e5c0d1ebb 100644 --- a/pkg/ingress/ingress.go +++ b/pkg/ingress/ingress.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" "github.com/apache/apisix-ingress-controller/pkg/kube" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/types" @@ -143,17 +144,17 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error { zap.Any("pluginConfigs", tctx.PluginConfigs), ) - m := &manifest{ - ssl: tctx.SSL, - routes: tctx.Routes, - upstreams: tctx.Upstreams, - pluginConfigs: tctx.PluginConfigs, + m := &utils.Manifest{ + SSLs: tctx.SSL, + Routes: tctx.Routes, + Upstreams: tctx.Upstreams, + PluginConfigs: tctx.PluginConfigs, } var ( - added *manifest - updated *manifest - deleted *manifest + added *utils.Manifest + updated *utils.Manifest + deleted *utils.Manifest ) if ev.Type == types.EventDelete { @@ -170,13 +171,13 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error { ) return err } - om := &manifest{ - routes: oldCtx.Routes, - upstreams: oldCtx.Upstreams, - ssl: oldCtx.SSL, - pluginConfigs: oldCtx.PluginConfigs, + om := &utils.Manifest{ + Routes: oldCtx.Routes, + Upstreams: oldCtx.Upstreams, + SSLs: oldCtx.SSL, + PluginConfigs: oldCtx.PluginConfigs, } - added, updated, deleted = m.diff(om) + added, updated, deleted = m.Diff(om) } if err := c.controller.syncManifests(ctx, added, updated, deleted); err != nil { log.Errorw("failed to sync ingress artifacts", diff --git a/pkg/ingress/namespace.go b/pkg/ingress/namespace/namespace.go similarity index 84% rename from pkg/ingress/namespace.go rename to pkg/ingress/namespace/namespace.go index 1e4a566a41..a00acbec23 100644 --- a/pkg/ingress/namespace.go +++ b/pkg/ingress/namespace/namespace.go @@ -12,7 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package ingress +package namespace import ( "context" @@ -22,7 +22,6 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -30,13 +29,21 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/types" ) +// FIXME: Controller should be the Core Part, +// Provider should act as "EventHandler", register there functions to Controller +type EventHandler interface { + OnAdd() + OnUpdate() + OnDelete() +} + type namespaceController struct { - controller *Controller + controller *watchingProvider workqueue workqueue.RateLimitingInterface workers int } -func (c *Controller) newNamespaceController() *namespaceController { +func newNamespaceController(c *watchingProvider) *namespaceController { ctl := &namespaceController{ controller: c, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Namespace"), @@ -52,25 +59,6 @@ func (c *Controller) newNamespaceController() *namespaceController { return ctl } -func (c *Controller) initWatchingNamespacesByLabels(ctx context.Context) error { - labelSelector := metav1.LabelSelector{MatchLabels: c.watchingLabels} - opts := metav1.ListOptions{ - LabelSelector: labels.Set(labelSelector.MatchLabels).String(), - } - namespaces, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts) - if err != nil { - return err - } - var nss []string - - for _, ns := range namespaces.Items { - nss = append(nss, ns.Name) - c.watchingNamespaces.Store(ns.Name, struct{}{}) - } - log.Infow("label selector watching namespaces", zap.Strings("namespaces", nss)) - return nil -} - func (c *namespaceController) run(ctx context.Context) { if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.namespaceInformer.HasSynced); !ok { log.Error("namespace informers sync failed") @@ -99,7 +87,7 @@ func (c *namespaceController) runWorker(ctx context.Context) { func (c *namespaceController) sync(ctx context.Context, ev *types.Event) error { if ev.Type != types.EventDelete { // check the labels of specify namespace - namespace, err := c.controller.kubeClient.Client.CoreV1().Namespaces().Get(ctx, ev.Object.(string), metav1.GetOptions{}) + namespace, err := c.controller.kube.Client.CoreV1().Namespaces().Get(ctx, ev.Object.(string), metav1.GetOptions{}) if err != nil { return err } else { diff --git a/pkg/ingress/namespace/provider.go b/pkg/ingress/namespace/provider.go new file mode 100644 index 0000000000..e8f442dd19 --- /dev/null +++ b/pkg/ingress/namespace/provider.go @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package namespace + +import ( + "context" + "strings" + "sync" + + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + listerscorev1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + + "github.com/apache/apisix-ingress-controller/pkg/api/validation" + "github.com/apache/apisix-ingress-controller/pkg/config" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" + "github.com/apache/apisix-ingress-controller/pkg/kube" + "github.com/apache/apisix-ingress-controller/pkg/log" + "github.com/apache/apisix-ingress-controller/pkg/types" +) + +type WatchingProvider interface { + Run(ctx context.Context) + IsWatchingNamespace(key string) bool + WatchingNamespaces() []string +} + +func NewWatchingProvider(ctx context.Context, kube *kube.KubeClient, cfg *config.Config) (WatchingProvider, error) { + var ( + watchingNamespaces = new(sync.Map) + watchingLabels = make(map[string]string) + ) + if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll { + for _, ns := range cfg.Kubernetes.AppNamespaces { + watchingNamespaces.Store(ns, struct{}{}) + } + } + // support namespace label-selector + for _, selector := range cfg.Kubernetes.NamespaceSelector { + labelSlice := strings.Split(selector, "=") + watchingLabels[labelSlice[0]] = labelSlice[1] + } + + // watchingNamespaces and watchingLabels are empty means to monitor all namespaces. + if !validation.HasValueInSyncMap(watchingNamespaces) && len(watchingLabels) == 0 { + opts := metav1.ListOptions{} + // list all namespaces + nsList, err := kube.Client.CoreV1().Namespaces().List(ctx, opts) + if err != nil { + log.Error(err.Error()) + ctx.Done() + } else { + wns := new(sync.Map) + for _, v := range nsList.Items { + wns.Store(v.Name, struct{}{}) + } + watchingNamespaces = wns + } + } + + c := &watchingProvider{ + kube: kube, + cfg: cfg, + + watchingNamespaces: watchingNamespaces, + watchingLabels: watchingLabels, + } + + kubeFactory := kube.NewSharedIndexInformerFactory() + c.namespaceInformer = kubeFactory.Core().V1().Namespaces().Informer() + c.namespaceLister = kubeFactory.Core().V1().Namespaces().Lister() + + c.controller = newNamespaceController(c) + + err := c.initWatchingNamespacesByLabels(ctx) + if err != nil { + return nil, err + } + return c, nil +} + +type watchingProvider struct { + kube *kube.KubeClient + cfg *config.Config + + watchingNamespaces *sync.Map + watchingLabels types.Labels + + namespaceInformer cache.SharedIndexInformer + namespaceLister listerscorev1.NamespaceLister + + controller *namespaceController +} + +func (c *watchingProvider) initWatchingNamespacesByLabels(ctx context.Context) error { + labelSelector := metav1.LabelSelector{MatchLabels: c.watchingLabels} + opts := metav1.ListOptions{ + LabelSelector: labels.Set(labelSelector.MatchLabels).String(), + } + namespaces, err := c.kube.Client.CoreV1().Namespaces().List(ctx, opts) + if err != nil { + return err + } + var nss []string + + for _, ns := range namespaces.Items { + nss = append(nss, ns.Name) + c.watchingNamespaces.Store(ns.Name, struct{}{}) + } + log.Infow("label selector watching namespaces", zap.Strings("namespaces", nss)) + return nil +} + +func (c *watchingProvider) Run(ctx context.Context) { + e := utils.ParallelExecutor{} + + e.Add(func() { + c.namespaceInformer.Run(ctx.Done()) + }) + + e.Add(func() { + c.controller.run(ctx) + }) + + e.Wait() +} + +func (c *watchingProvider) WatchingNamespaces() []string { + var keys []string + c.watchingNamespaces.Range(func(key, _ interface{}) bool { + keys = append(keys, key.(string)) + return true + }) + return keys +} + +// IsWatchingNamespace accepts a resource key, getting the namespace part +// and checking whether the namespace is being watched. +func (c *watchingProvider) IsWatchingNamespace(key string) (ok bool) { + if !validation.HasValueInSyncMap(c.watchingNamespaces) { + ok = true + return + } + ns, _, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + // Ignore resource with invalid key. + ok = false + log.Warnf("resource %s was ignored since: %s", key, err) + return + } + _, ok = c.watchingNamespaces.Load(ns) + return +} diff --git a/pkg/ingress/namespace/provider_mock.go b/pkg/ingress/namespace/provider_mock.go new file mode 100644 index 0000000000..e211d400ff --- /dev/null +++ b/pkg/ingress/namespace/provider_mock.go @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package namespace + +import ( + "context" + + "k8s.io/client-go/tools/cache" +) + +func NewMockWatchingProvider(namespaces []string) WatchingProvider { + return &mockWatchingProvider{ + namespaces: namespaces, + } +} + +type mockWatchingProvider struct { + namespaces []string +} + +func (c *mockWatchingProvider) Run(ctx context.Context) { +} + +func (c *mockWatchingProvider) WatchingNamespaces() []string { + return c.namespaces +} + +func (c *mockWatchingProvider) IsWatchingNamespace(key string) (ok bool) { + ns, _, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return false + } + + for _, namespace := range c.namespaces { + if namespace == ns { + return true + } + } + return false +} diff --git a/pkg/ingress/pod_test.go b/pkg/ingress/pod_test.go index b75886b346..4235703ccd 100644 --- a/pkg/ingress/pod_test.go +++ b/pkg/ingress/pod_test.go @@ -15,7 +15,6 @@ package ingress import ( - "sync" "testing" "time" @@ -23,18 +22,17 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/apache/apisix-ingress-controller/pkg/ingress/namespace" "github.com/apache/apisix-ingress-controller/pkg/metrics" "github.com/apache/apisix-ingress-controller/pkg/types" ) func TestPodOnAdd(t *testing.T) { - watchingNamespace := new(sync.Map) - watchingNamespace.Store("default", struct{}{}) ctl := &podController{ controller: &Controller{ - watchingNamespaces: watchingNamespace, - podCache: types.NewPodCache(), - MetricsCollector: metrics.NewPrometheusCollector(), + namespaceProvider: namespace.NewMockWatchingProvider([]string{"default"}), + podCache: types.NewPodCache(), + MetricsCollector: metrics.NewPrometheusCollector(), }, } @@ -70,13 +68,11 @@ func TestPodOnAdd(t *testing.T) { } func TestPodOnDelete(t *testing.T) { - watchingNamespace := new(sync.Map) - watchingNamespace.Store("default", struct{}{}) ctl := &podController{ controller: &Controller{ - watchingNamespaces: watchingNamespace, - podCache: types.NewPodCache(), - MetricsCollector: metrics.NewPrometheusCollector(), + namespaceProvider: namespace.NewMockWatchingProvider([]string{"default"}), + podCache: types.NewPodCache(), + MetricsCollector: metrics.NewPrometheusCollector(), }, } @@ -115,13 +111,11 @@ func TestPodOnDelete(t *testing.T) { } func TestPodOnUpdate(t *testing.T) { - watchingNamespace := new(sync.Map) - watchingNamespace.Store("default", struct{}{}) ctl := &podController{ controller: &Controller{ - watchingNamespaces: watchingNamespace, - podCache: types.NewPodCache(), - MetricsCollector: metrics.NewPrometheusCollector(), + namespaceProvider: namespace.NewMockWatchingProvider([]string{"default"}), + podCache: types.NewPodCache(), + MetricsCollector: metrics.NewPrometheusCollector(), }, } diff --git a/pkg/ingress/status.go b/pkg/ingress/status.go index 604227a00e..4a1a4975ea 100644 --- a/pkg/ingress/status.go +++ b/pkg/ingress/status.go @@ -17,9 +17,6 @@ package ingress import ( "context" - "fmt" - "net" - "time" "go.uber.org/zap" apiv1 "k8s.io/api/core/v1" @@ -29,9 +26,8 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/cache" - gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + "github.com/apache/apisix-ingress-controller/pkg/ingress/utils" configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2" configv2beta2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta2" configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3" @@ -39,9 +35,8 @@ import ( ) const ( - _conditionType = "ResourcesAvailable" - _commonSuccessMessage = "Sync Successfully" - _gatewayLBNotReadyMessage = "The LoadBalancer used by the APISIX gateway is not yet ready" + _conditionType = "ResourcesAvailable" + _commonSuccessMessage = "Sync Successfully" ) // verifyGeneration verify generation to decide whether to update status @@ -301,148 +296,13 @@ func (c *Controller) recordStatus(at interface{}, reason string, err error, stat zap.String("namespace", v.Namespace), ) } - case *gatewayv1alpha2.Gateway: - gatewayCondition := metav1.Condition{ - Type: string(gatewayv1alpha2.ListenerConditionReady), - Reason: reason, - Status: status, - Message: "Gateway's status has been successfully updated", - ObservedGeneration: generation, - } - - gatewayKubeClient := c.kubeClient.GatewayClient - - if v.Status.Conditions == nil { - conditions := make([]metav1.Condition, 0) - v.Status.Conditions = conditions - } else { - meta.SetStatusCondition(&v.Status.Conditions, gatewayCondition) - } - - lbips, err := c.ingressLBStatusIPs() - if err != nil { - log.Errorw("failed to get APISIX gateway external IPs", - zap.Error(err), - ) - } - - v.Status.Addresses = convLBIPToGatewayAddr(lbips) - if _, errRecord := gatewayKubeClient.GatewayV1alpha2().Gateways(v.Namespace).UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil { - log.Errorw("failed to record status change for Gateway resource", - zap.Error(errRecord), - zap.String("name", v.Name), - zap.String("namespace", v.Namespace), - ) - } default: // This should not be executed log.Errorf("unsupported resource record: %s", v) } } -// ingressPublishAddresses get addressed used to expose Ingress -func (c *Controller) ingressPublishAddresses() ([]string, error) { - ingressPublishService := c.cfg.IngressPublishService - ingressStatusAddress := c.cfg.IngressStatusAddress - addrs := []string{} - - // if ingressStatusAddress is specified, it will be used first - if len(ingressStatusAddress) > 0 { - addrs = append(addrs, ingressStatusAddress...) - return addrs, nil - } - - namespace, name, err := cache.SplitMetaNamespaceKey(ingressPublishService) - if err != nil { - log.Errorf("invalid ingressPublishService %s: %s", ingressPublishService, err) - return nil, err - } - - kubeClient := c.kubeClient.Client - svc, err := kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - - switch svc.Spec.Type { - case apiv1.ServiceTypeLoadBalancer: - if len(svc.Status.LoadBalancer.Ingress) < 1 { - return addrs, fmt.Errorf("%s", _gatewayLBNotReadyMessage) - } - - for _, ip := range svc.Status.LoadBalancer.Ingress { - if ip.IP == "" { - // typically AWS load-balancers - addrs = append(addrs, ip.Hostname) - } else { - addrs = append(addrs, ip.IP) - } - } - - addrs = append(addrs, svc.Spec.ExternalIPs...) - return addrs, nil - default: - return addrs, nil - } - -} - // ingressLBStatusIPs organizes the available addresses func (c *Controller) ingressLBStatusIPs() ([]apiv1.LoadBalancerIngress, error) { - lbips := []apiv1.LoadBalancerIngress{} - var ips []string - - for { - var err error - ips, err = c.ingressPublishAddresses() - if err != nil { - if err.Error() == _gatewayLBNotReadyMessage { - log.Warnf("%s. Provided service: %s", _gatewayLBNotReadyMessage, c.cfg.IngressPublishService) - time.Sleep(time.Second) - continue - } - - return nil, err - } - break - } - - for _, ip := range ips { - if net.ParseIP(ip) == nil { - lbips = append(lbips, apiv1.LoadBalancerIngress{Hostname: ip}) - } else { - lbips = append(lbips, apiv1.LoadBalancerIngress{IP: ip}) - } - - } - - return lbips, nil -} - -// convLBIPToGatewayAddr convert LoadBalancerIngress to GatewayAddress format -func convLBIPToGatewayAddr(lbips []apiv1.LoadBalancerIngress) []gatewayv1alpha2.GatewayAddress { - gas := []gatewayv1alpha2.GatewayAddress{} - - // In the definition, there is also an address type called NamedAddress, - // which we currently do not implement - HostnameAddressType := gatewayv1alpha2.HostnameAddressType - IPAddressType := gatewayv1alpha2.IPAddressType - - for _, lbip := range lbips { - if v := lbip.Hostname; v != "" { - gas = append(gas, gatewayv1alpha2.GatewayAddress{ - Type: &HostnameAddressType, - Value: v, - }) - } - - if v := lbip.IP; v != "" { - gas = append(gas, gatewayv1alpha2.GatewayAddress{ - Type: &IPAddressType, - Value: v, - }) - } - } - - return gas + return utils.IngressLBStatusIPs(c.cfg.IngressPublishService, c.cfg.IngressStatusAddress, c.kubeClient.Client) } diff --git a/pkg/ingress/utils/executor.go b/pkg/ingress/utils/executor.go new file mode 100644 index 0000000000..d4e2646168 --- /dev/null +++ b/pkg/ingress/utils/executor.go @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package utils + +import "sync" + +type ParallelExecutor struct { + wg sync.WaitGroup + + errorsLock sync.Mutex + errors []error +} + +func (exec *ParallelExecutor) Add(handler func()) { + exec.wg.Add(1) + go func() { + defer exec.wg.Done() + handler() + }() +} + +func (exec *ParallelExecutor) AddE(handler func() error) { + exec.wg.Add(1) + go func() { + defer exec.wg.Done() + err := handler() + if err != nil { + exec.errorsLock.Lock() + defer exec.errorsLock.Unlock() + exec.errors = append(exec.errors, err) + } + }() +} + +func (exec *ParallelExecutor) Wait() { + exec.wg.Wait() +} + +func (exec *ParallelExecutor) Errors() []error { + return exec.errors +} diff --git a/pkg/ingress/utils/ingress_status.go b/pkg/ingress/utils/ingress_status.go new file mode 100644 index 0000000000..2adb3905b9 --- /dev/null +++ b/pkg/ingress/utils/ingress_status.go @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package utils + +import ( + "context" + "fmt" + "net" + "time" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "github.com/apache/apisix-ingress-controller/pkg/log" +) + +const ( + _gatewayLBNotReadyMessage = "The LoadBalancer used by the APISIX gateway is not yet ready" +) + +// IngressPublishAddresses get addressed used to expose Ingress +func IngressPublishAddresses(ingressPublishService string, ingressStatusAddress []string, kubeClient kubernetes.Interface) ([]string, error) { + addrs := []string{} + + // if ingressStatusAddress is specified, it will be used first + if len(ingressStatusAddress) > 0 { + addrs = append(addrs, ingressStatusAddress...) + return addrs, nil + } + + namespace, name, err := cache.SplitMetaNamespaceKey(ingressPublishService) + if err != nil { + log.Errorf("invalid ingressPublishService %s: %s", ingressPublishService, err) + return nil, err + } + + svc, err := kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + switch svc.Spec.Type { + case apiv1.ServiceTypeLoadBalancer: + if len(svc.Status.LoadBalancer.Ingress) < 1 { + return addrs, fmt.Errorf("%s", _gatewayLBNotReadyMessage) + } + + for _, ip := range svc.Status.LoadBalancer.Ingress { + if ip.IP == "" { + // typically AWS load-balancers + addrs = append(addrs, ip.Hostname) + } else { + addrs = append(addrs, ip.IP) + } + } + + addrs = append(addrs, svc.Spec.ExternalIPs...) + return addrs, nil + default: + return addrs, nil + } + +} + +// IngressLBStatusIPs organizes the available addresses +func IngressLBStatusIPs(ingressPublishService string, ingressStatusAddress []string, kubeClient kubernetes.Interface) ([]apiv1.LoadBalancerIngress, error) { + lbips := []apiv1.LoadBalancerIngress{} + var ips []string + + for { + var err error + ips, err = IngressPublishAddresses(ingressPublishService, ingressStatusAddress, kubeClient) + if err != nil { + if err.Error() == _gatewayLBNotReadyMessage { + log.Warnf("%s. Provided service: %s", _gatewayLBNotReadyMessage, ingressPublishService) + time.Sleep(time.Second) + continue + } + + return nil, err + } + break + } + + for _, ip := range ips { + if net.ParseIP(ip) == nil { + lbips = append(lbips, apiv1.LoadBalancerIngress{Hostname: ip}) + } else { + lbips = append(lbips, apiv1.LoadBalancerIngress{IP: ip}) + } + + } + + return lbips, nil +} diff --git a/pkg/ingress/manifest.go b/pkg/ingress/utils/manifest.go similarity index 63% rename from pkg/ingress/manifest.go rename to pkg/ingress/utils/manifest.go index bf4edfa6cb..841248ead7 100644 --- a/pkg/ingress/manifest.go +++ b/pkg/ingress/utils/manifest.go @@ -12,7 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package ingress +package utils import ( "context" @@ -21,12 +21,13 @@ import ( "github.com/hashicorp/go-multierror" "go.uber.org/zap" + "github.com/apache/apisix-ingress-controller/pkg/apisix" "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" "github.com/apache/apisix-ingress-controller/pkg/log" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) -func diffSSL(olds, news []*apisixv1.Ssl) (added, updated, deleted []*apisixv1.Ssl) { +func DiffSSL(olds, news []*apisixv1.Ssl) (added, updated, deleted []*apisixv1.Ssl) { if olds == nil { return news, nil, nil } @@ -58,7 +59,7 @@ func diffSSL(olds, news []*apisixv1.Ssl) (added, updated, deleted []*apisixv1.Ss return } -func diffRoutes(olds, news []*apisixv1.Route) (added, updated, deleted []*apisixv1.Route) { +func DiffRoutes(olds, news []*apisixv1.Route) (added, updated, deleted []*apisixv1.Route) { if olds == nil { return news, nil, nil } @@ -90,7 +91,7 @@ func diffRoutes(olds, news []*apisixv1.Route) (added, updated, deleted []*apisix return } -func diffUpstreams(olds, news []*apisixv1.Upstream) (added, updated, deleted []*apisixv1.Upstream) { +func DiffUpstreams(olds, news []*apisixv1.Upstream) (added, updated, deleted []*apisixv1.Upstream) { oldMap := make(map[string]*apisixv1.Upstream, len(olds)) newMap := make(map[string]*apisixv1.Upstream, len(news)) for _, u := range olds { @@ -115,7 +116,7 @@ func diffUpstreams(olds, news []*apisixv1.Upstream) (added, updated, deleted []* return } -func diffStreamRoutes(olds, news []*apisixv1.StreamRoute) (added, updated, deleted []*apisixv1.StreamRoute) { +func DiffStreamRoutes(olds, news []*apisixv1.StreamRoute) (added, updated, deleted []*apisixv1.StreamRoute) { oldMap := make(map[string]*apisixv1.StreamRoute, len(olds)) newMap := make(map[string]*apisixv1.StreamRoute, len(news)) for _, sr := range olds { @@ -140,7 +141,7 @@ func diffStreamRoutes(olds, news []*apisixv1.StreamRoute) (added, updated, delet return } -func diffPluginConfigs(olds, news []*apisixv1.PluginConfig) (added, updated, deleted []*apisixv1.PluginConfig) { +func DiffPluginConfigs(olds, news []*apisixv1.PluginConfig) (added, updated, deleted []*apisixv1.PluginConfig) { oldMap := make(map[string]*apisixv1.PluginConfig, len(olds)) newMap := make(map[string]*apisixv1.PluginConfig, len(news)) for _, sr := range olds { @@ -165,73 +166,72 @@ func diffPluginConfigs(olds, news []*apisixv1.PluginConfig) (added, updated, del return } -type manifest struct { - routes []*apisixv1.Route - upstreams []*apisixv1.Upstream - streamRoutes []*apisixv1.StreamRoute - ssl []*apisixv1.Ssl - pluginConfigs []*apisixv1.PluginConfig +type Manifest struct { + Routes []*apisixv1.Route + Upstreams []*apisixv1.Upstream + StreamRoutes []*apisixv1.StreamRoute + SSLs []*apisixv1.Ssl + PluginConfigs []*apisixv1.PluginConfig } -func (m *manifest) diff(om *manifest) (added, updated, deleted *manifest) { - sa, su, sd := diffSSL(om.ssl, m.ssl) - ar, ur, dr := diffRoutes(om.routes, m.routes) - au, uu, du := diffUpstreams(om.upstreams, m.upstreams) - asr, usr, dsr := diffStreamRoutes(om.streamRoutes, m.streamRoutes) - apc, upc, dpc := diffPluginConfigs(om.pluginConfigs, m.pluginConfigs) +func (m *Manifest) Diff(om *Manifest) (added, updated, deleted *Manifest) { + sa, su, sd := DiffSSL(om.SSLs, m.SSLs) + ar, ur, dr := DiffRoutes(om.Routes, m.Routes) + au, uu, du := DiffUpstreams(om.Upstreams, m.Upstreams) + asr, usr, dsr := DiffStreamRoutes(om.StreamRoutes, m.StreamRoutes) + apc, upc, dpc := DiffPluginConfigs(om.PluginConfigs, m.PluginConfigs) if ar != nil || au != nil || asr != nil || sa != nil || apc != nil { - added = &manifest{ - routes: ar, - upstreams: au, - streamRoutes: asr, - ssl: sa, - pluginConfigs: apc, + added = &Manifest{ + Routes: ar, + Upstreams: au, + StreamRoutes: asr, + SSLs: sa, + PluginConfigs: apc, } } if ur != nil || uu != nil || usr != nil || su != nil || upc != nil { - updated = &manifest{ - routes: ur, - upstreams: uu, - streamRoutes: usr, - ssl: su, - pluginConfigs: upc, + updated = &Manifest{ + Routes: ur, + Upstreams: uu, + StreamRoutes: usr, + SSLs: su, + PluginConfigs: upc, } } if dr != nil || du != nil || dsr != nil || sd != nil || dpc != nil { - deleted = &manifest{ - routes: dr, - upstreams: du, - streamRoutes: dsr, - ssl: sd, - pluginConfigs: dpc, + deleted = &Manifest{ + Routes: dr, + Upstreams: du, + StreamRoutes: dsr, + SSLs: sd, + PluginConfigs: dpc, } } return } -func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted *manifest) error { +func SyncManifests(ctx context.Context, apisix apisix.APISIX, clusterName string, added, updated, deleted *Manifest) error { var merr *multierror.Error - clusterName := c.cfg.APISIX.DefaultClusterName if deleted != nil { - for _, ssl := range deleted.ssl { - if err := c.apisix.Cluster(clusterName).SSL().Delete(ctx, ssl); err != nil { + for _, ssl := range deleted.SSLs { + if err := apisix.Cluster(clusterName).SSL().Delete(ctx, ssl); err != nil { merr = multierror.Append(merr, err) } } - for _, r := range deleted.routes { - if err := c.apisix.Cluster(clusterName).Route().Delete(ctx, r); err != nil { + for _, r := range deleted.Routes { + if err := apisix.Cluster(clusterName).Route().Delete(ctx, r); err != nil { merr = multierror.Append(merr, err) } } - for _, sr := range deleted.streamRoutes { - if err := c.apisix.Cluster(clusterName).StreamRoute().Delete(ctx, sr); err != nil { + for _, sr := range deleted.StreamRoutes { + if err := apisix.Cluster(clusterName).StreamRoute().Delete(ctx, sr); err != nil { merr = multierror.Append(merr, err) } } - for _, u := range deleted.upstreams { - if err := c.apisix.Cluster(clusterName).Upstream().Delete(ctx, u); err != nil { + for _, u := range deleted.Upstreams { + if err := apisix.Cluster(clusterName).Upstream().Delete(ctx, u); err != nil { // Upstream might be referenced by other routes. if err != cache.ErrStillInUse { merr = multierror.Append(merr, err) @@ -243,8 +243,8 @@ func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted } } } - for _, pc := range deleted.pluginConfigs { - if err := c.apisix.Cluster(clusterName).PluginConfig().Delete(ctx, pc); err != nil { + for _, pc := range deleted.PluginConfigs { + if err := apisix.Cluster(clusterName).PluginConfig().Delete(ctx, pc); err != nil { // pluginConfig might be referenced by other routes. if err != cache.ErrStillInUse { merr = multierror.Append(merr, err) @@ -259,55 +259,55 @@ func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted } if added != nil { // Should create upstreams firstly due to the dependencies. - for _, ssl := range added.ssl { - if _, err := c.apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil { + for _, ssl := range added.SSLs { + if _, err := apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil { merr = multierror.Append(merr, err) } } - for _, u := range added.upstreams { - if _, err := c.apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil { + for _, u := range added.Upstreams { + if _, err := apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil { merr = multierror.Append(merr, err) } } - for _, pc := range added.pluginConfigs { - if _, err := c.apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc); err != nil { + for _, pc := range added.PluginConfigs { + if _, err := apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc); err != nil { merr = multierror.Append(merr, err) } } - for _, r := range added.routes { - if _, err := c.apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil { + for _, r := range added.Routes { + if _, err := apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil { merr = multierror.Append(merr, err) } } - for _, sr := range added.streamRoutes { - if _, err := c.apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil { + for _, sr := range added.StreamRoutes { + if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil { merr = multierror.Append(merr, err) } } } if updated != nil { - for _, ssl := range updated.ssl { - if _, err := c.apisix.Cluster(clusterName).SSL().Update(ctx, ssl); err != nil { + for _, ssl := range updated.SSLs { + if _, err := apisix.Cluster(clusterName).SSL().Update(ctx, ssl); err != nil { merr = multierror.Append(merr, err) } } - for _, r := range updated.upstreams { - if _, err := c.apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil { + for _, r := range updated.Upstreams { + if _, err := apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil { merr = multierror.Append(merr, err) } } - for _, pc := range updated.pluginConfigs { - if _, err := c.apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc); err != nil { + for _, pc := range updated.PluginConfigs { + if _, err := apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc); err != nil { merr = multierror.Append(merr, err) } } - for _, r := range updated.routes { - if _, err := c.apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil { + for _, r := range updated.Routes { + if _, err := apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil { merr = multierror.Append(merr, err) } } - for _, sr := range updated.streamRoutes { - if _, err := c.apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil { + for _, sr := range updated.StreamRoutes { + if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil { merr = multierror.Append(merr, err) } } diff --git a/pkg/ingress/manifest_test.go b/pkg/ingress/utils/manifest_test.go similarity index 80% rename from pkg/ingress/manifest_test.go rename to pkg/ingress/utils/manifest_test.go index 9bad334921..0085af8efe 100644 --- a/pkg/ingress/manifest_test.go +++ b/pkg/ingress/utils/manifest_test.go @@ -12,7 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package ingress +package utils import ( "testing" @@ -36,7 +36,7 @@ func TestDiffRoutes(t *testing.T) { Methods: []string{"POST"}, }, } - added, updated, deleted := diffRoutes(nil, news) + added, updated, deleted := DiffRoutes(nil, news) assert.Nil(t, updated) assert.Nil(t, deleted) assert.Len(t, added, 2) @@ -57,7 +57,7 @@ func TestDiffRoutes(t *testing.T) { Methods: []string{"POST", "PUT"}, }, } - added, updated, deleted = diffRoutes(olds, nil) + added, updated, deleted = DiffRoutes(olds, nil) assert.Nil(t, updated) assert.Nil(t, added) assert.Len(t, deleted, 2) @@ -65,7 +65,7 @@ func TestDiffRoutes(t *testing.T) { assert.Equal(t, "3", deleted[1].ID) assert.Equal(t, []string{"POST", "PUT"}, deleted[1].Methods) - added, updated, deleted = diffRoutes(olds, news) + added, updated, deleted = DiffRoutes(olds, news) assert.Len(t, added, 1) assert.Equal(t, "1", added[0].ID) assert.Len(t, updated, 1) @@ -85,7 +85,7 @@ func TestDiffStreamRoutes(t *testing.T) { ServerPort: 8080, }, } - added, updated, deleted := diffStreamRoutes(nil, news) + added, updated, deleted := DiffStreamRoutes(nil, news) assert.Nil(t, updated) assert.Nil(t, deleted) assert.Len(t, added, 2) @@ -102,7 +102,7 @@ func TestDiffStreamRoutes(t *testing.T) { ServerPort: 8081, }, } - added, updated, deleted = diffStreamRoutes(olds, nil) + added, updated, deleted = DiffStreamRoutes(olds, nil) assert.Nil(t, updated) assert.Nil(t, added) assert.Len(t, deleted, 2) @@ -110,7 +110,7 @@ func TestDiffStreamRoutes(t *testing.T) { assert.Equal(t, "3", deleted[1].ID) assert.Equal(t, int32(8081), deleted[1].ServerPort) - added, updated, deleted = diffStreamRoutes(olds, news) + added, updated, deleted = DiffStreamRoutes(olds, news) assert.Len(t, added, 1) assert.Equal(t, "1", added[0].ID) assert.Len(t, updated, 1) @@ -135,7 +135,7 @@ func TestDiffUpstreams(t *testing.T) { Retries: &retries, }, } - added, updated, deleted := diffUpstreams(nil, news) + added, updated, deleted := DiffUpstreams(nil, news) assert.Nil(t, updated) assert.Nil(t, deleted) assert.Len(t, added, 2) @@ -160,7 +160,7 @@ func TestDiffUpstreams(t *testing.T) { }, }, } - added, updated, deleted = diffUpstreams(olds, nil) + added, updated, deleted = DiffUpstreams(olds, nil) assert.Nil(t, updated) assert.Nil(t, added) assert.Len(t, deleted, 2) @@ -169,7 +169,7 @@ func TestDiffUpstreams(t *testing.T) { assert.Equal(t, 5, *deleted[1].Retries) assert.Equal(t, 10, deleted[1].Timeout.Connect) - added, updated, deleted = diffUpstreams(olds, news) + added, updated, deleted = DiffUpstreams(olds, news) assert.Len(t, added, 1) assert.Equal(t, "1", added[0].ID) assert.Len(t, updated, 1) @@ -196,7 +196,7 @@ func TestDiffPluginConfigs(t *testing.T) { }, }, } - added, updated, deleted := diffPluginConfigs(nil, news) + added, updated, deleted := DiffPluginConfigs(nil, news) assert.Nil(t, updated) assert.Nil(t, deleted) assert.Len(t, added, 2) @@ -225,7 +225,7 @@ func TestDiffPluginConfigs(t *testing.T) { }, }, } - added, updated, deleted = diffPluginConfigs(olds, nil) + added, updated, deleted = DiffPluginConfigs(olds, nil) assert.Nil(t, updated) assert.Nil(t, added) assert.Len(t, deleted, 2) @@ -233,7 +233,7 @@ func TestDiffPluginConfigs(t *testing.T) { assert.Equal(t, "3", deleted[1].ID) assert.Equal(t, olds[1].Plugins, deleted[1].Plugins) - added, updated, deleted = diffPluginConfigs(olds, news) + added, updated, deleted = DiffPluginConfigs(olds, news) assert.Len(t, added, 1) assert.Equal(t, "1", added[0].ID) assert.Len(t, updated, 1) @@ -245,8 +245,8 @@ func TestDiffPluginConfigs(t *testing.T) { func TestManifestDiff(t *testing.T) { retries := 2 - m := &manifest{ - routes: []*apisixv1.Route{ + m := &Manifest{ + Routes: []*apisixv1.Route{ { Metadata: apisixv1.Metadata{ ID: "1", @@ -259,7 +259,7 @@ func TestManifestDiff(t *testing.T) { Methods: []string{"GET"}, }, }, - upstreams: []*apisixv1.Upstream{ + Upstreams: []*apisixv1.Upstream{ { Metadata: apisixv1.Metadata{ ID: "4", @@ -267,7 +267,7 @@ func TestManifestDiff(t *testing.T) { Retries: &retries, }, }, - pluginConfigs: []*apisixv1.PluginConfig{ + PluginConfigs: []*apisixv1.PluginConfig{ { Metadata: apisixv1.Metadata{ ID: "5", @@ -284,8 +284,8 @@ func TestManifestDiff(t *testing.T) { }, }, } - om := &manifest{ - routes: []*apisixv1.Route{ + om := &Manifest{ + Routes: []*apisixv1.Route{ { Metadata: apisixv1.Metadata{ ID: "2", @@ -300,22 +300,22 @@ func TestManifestDiff(t *testing.T) { }, } - added, updated, deleted := m.diff(om) - assert.Len(t, added.routes, 1) - assert.Equal(t, "1", added.routes[0].ID) - assert.Len(t, added.upstreams, 1) - assert.Equal(t, "4", added.upstreams[0].ID) - assert.Len(t, added.pluginConfigs, 1) - assert.Equal(t, "5", added.pluginConfigs[0].ID) + added, updated, deleted := m.Diff(om) + assert.Len(t, added.Routes, 1) + assert.Equal(t, "1", added.Routes[0].ID) + assert.Len(t, added.Upstreams, 1) + assert.Equal(t, "4", added.Upstreams[0].ID) + assert.Len(t, added.PluginConfigs, 1) + assert.Equal(t, "5", added.PluginConfigs[0].ID) - assert.Len(t, updated.routes, 1) - assert.Equal(t, "3", updated.routes[0].ID) - assert.Equal(t, []string{"GET"}, updated.routes[0].Methods) - assert.Nil(t, updated.upstreams) - assert.Nil(t, updated.pluginConfigs) + assert.Len(t, updated.Routes, 1) + assert.Equal(t, "3", updated.Routes[0].ID) + assert.Equal(t, []string{"GET"}, updated.Routes[0].Methods) + assert.Nil(t, updated.Upstreams) + assert.Nil(t, updated.PluginConfigs) - assert.Len(t, deleted.routes, 1) - assert.Equal(t, "2", deleted.routes[0].ID) - assert.Nil(t, updated.upstreams) - assert.Nil(t, updated.pluginConfigs) + assert.Len(t, deleted.Routes, 1) + assert.Equal(t, "2", deleted.Routes[0].ID) + assert.Nil(t, updated.Upstreams) + assert.Nil(t, updated.PluginConfigs) } diff --git a/pkg/ingress/utils/string.go b/pkg/ingress/utils/string.go new file mode 100644 index 0000000000..0e8c883d5b --- /dev/null +++ b/pkg/ingress/utils/string.go @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package utils + +func TruncateString(s string, max int) string { + if max > len(s) || max < 0 { + return s + } + return s[:max] +} diff --git a/pkg/kube/translation/apisix_pluginconfig.go b/pkg/kube/translation/apisix_pluginconfig.go index 14ce0467e4..4bf35510e2 100644 --- a/pkg/kube/translation/apisix_pluginconfig.go +++ b/pkg/kube/translation/apisix_pluginconfig.go @@ -25,7 +25,7 @@ import ( ) func (t *translator) TranslatePluginConfigV2beta3(config *configv2beta3.ApisixPluginConfig) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() pluginMap := make(apisixv1.Plugins) if len(config.Spec.Plugins) > 0 { for _, plugin := range config.Spec.Plugins { @@ -51,21 +51,21 @@ func (t *translator) TranslatePluginConfigV2beta3(config *configv2beta3.ApisixPl pc.Name = apisixv1.ComposePluginConfigName(config.Namespace, config.Name) pc.ID = id.GenID(pc.Name) pc.Plugins = pluginMap - ctx.addPluginConfig(pc) + ctx.AddPluginConfig(pc) return ctx, nil } func (t *translator) TranslatePluginConfigV2beta3NotStrictly(config *configv2beta3.ApisixPluginConfig) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() pc := apisixv1.NewDefaultPluginConfig() pc.Name = apisixv1.ComposePluginConfigName(config.Namespace, config.Name) pc.ID = id.GenID(pc.Name) - ctx.addPluginConfig(pc) + ctx.AddPluginConfig(pc) return ctx, nil } func (t *translator) TranslatePluginConfigV2(config *configv2.ApisixPluginConfig) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() pluginMap := make(apisixv1.Plugins) if len(config.Spec.Plugins) > 0 { for _, plugin := range config.Spec.Plugins { @@ -91,15 +91,15 @@ func (t *translator) TranslatePluginConfigV2(config *configv2.ApisixPluginConfig pc.Name = apisixv1.ComposePluginConfigName(config.Namespace, config.Name) pc.ID = id.GenID(pc.Name) pc.Plugins = pluginMap - ctx.addPluginConfig(pc) + ctx.AddPluginConfig(pc) return ctx, nil } func (t *translator) TranslatePluginConfigV2NotStrictly(config *configv2.ApisixPluginConfig) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() pc := apisixv1.NewDefaultPluginConfig() pc.Name = apisixv1.ComposePluginConfigName(config.Namespace, config.Name) pc.ID = id.GenID(pc.Name) - ctx.addPluginConfig(pc) + ctx.AddPluginConfig(pc) return ctx, nil } diff --git a/pkg/kube/translation/apisix_route.go b/pkg/kube/translation/apisix_route.go index 5dafd8426c..7bdfbedc96 100644 --- a/pkg/kube/translation/apisix_route.go +++ b/pkg/kube/translation/apisix_route.go @@ -30,7 +30,7 @@ import ( ) func (t *translator) TranslateRouteV2beta2(ar *configv2beta2.ApisixRoute) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() if err := t.translateHTTPRouteV2beta2(ctx, ar); err != nil { return nil, err @@ -42,7 +42,7 @@ func (t *translator) TranslateRouteV2beta2(ar *configv2beta2.ApisixRoute) (*Tran } func (t *translator) TranslateRouteV2beta2NotStrictly(ar *configv2beta2.ApisixRoute) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() if err := t.translateHTTPRouteV2beta2NotStrictly(ctx, ar); err != nil { return nil, err @@ -54,7 +54,7 @@ func (t *translator) TranslateRouteV2beta2NotStrictly(ar *configv2beta2.ApisixRo } func (t *translator) TranslateRouteV2beta3(ar *configv2beta3.ApisixRoute) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() if err := t.translateHTTPRouteV2beta3(ctx, ar); err != nil { return nil, err @@ -66,7 +66,7 @@ func (t *translator) TranslateRouteV2beta3(ar *configv2beta3.ApisixRoute) (*Tran } func (t *translator) TranslateRouteV2beta3NotStrictly(ar *configv2beta3.ApisixRoute) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() if err := t.translateHTTPRouteV2beta3NotStrictly(ctx, ar); err != nil { return nil, err @@ -78,7 +78,7 @@ func (t *translator) TranslateRouteV2beta3NotStrictly(ar *configv2beta3.ApisixRo } func (t *translator) TranslateRouteV2(ar *configv2.ApisixRoute) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() if err := t.translateHTTPRouteV2(ctx, ar); err != nil { return nil, err @@ -90,7 +90,7 @@ func (t *translator) TranslateRouteV2(ar *configv2.ApisixRoute) (*TranslateConte } func (t *translator) TranslateRouteV2NotStrictly(ar *configv2.ApisixRoute) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() if err := t.translateHTTPRouteV2NotStrictly(ctx, ar); err != nil { return nil, err @@ -200,13 +200,13 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *TranslateContext, ar *config } route.Plugins["traffic-split"] = plugin } - ctx.addRoute(route) - if !ctx.checkUpstreamExist(upstreamName) { + ctx.AddRoute(route) + if !ctx.CheckUpstreamExist(upstreamName) { ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) if err != nil { return err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) } } return nil @@ -336,13 +336,13 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *TranslateContext, ar *config } route.Plugins["traffic-split"] = plugin } - ctx.addRoute(route) - if !ctx.checkUpstreamExist(upstreamName) { + ctx.AddRoute(route) + if !ctx.CheckUpstreamExist(upstreamName) { ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) if err != nil { return err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) } } return nil @@ -472,13 +472,13 @@ func (t *translator) translateHTTPRouteV2(ctx *TranslateContext, ar *configv2.Ap } route.Plugins["traffic-split"] = plugin } - ctx.addRoute(route) - if !ctx.checkUpstreamExist(upstreamName) { + ctx.AddRoute(route) + if !ctx.CheckUpstreamExist(upstreamName) { ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) if err != nil { return err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) } } return nil @@ -595,13 +595,13 @@ func (t *translator) translateHTTPRouteV2beta2NotStrictly(ctx *TranslateContext, route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) - ctx.addRoute(route) - if !ctx.checkUpstreamExist(upstreamName) { + ctx.AddRoute(route) + if !ctx.CheckUpstreamExist(upstreamName) { ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) if err != nil { return err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) } } return nil @@ -654,13 +654,13 @@ func (t *translator) translateHTTPRouteV2beta3NotStrictly(ctx *TranslateContext, route.PluginConfigId = id.GenID(apisixv1.ComposePluginConfigName(ar.Namespace, part.PluginConfigName)) } - ctx.addRoute(route) - if !ctx.checkUpstreamExist(upstreamName) { + ctx.AddRoute(route) + if !ctx.CheckUpstreamExist(upstreamName) { ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) if err != nil { return err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) } } return nil @@ -713,13 +713,13 @@ func (t *translator) translateHTTPRouteV2NotStrictly(ctx *TranslateContext, ar * route.PluginConfigId = id.GenID(apisixv1.ComposePluginConfigName(ar.Namespace, part.PluginConfigName)) } - ctx.addRoute(route) - if !ctx.checkUpstreamExist(upstreamName) { + ctx.AddRoute(route) + if !ctx.CheckUpstreamExist(upstreamName) { ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) if err != nil { return err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) } } return nil @@ -751,9 +751,9 @@ func (t *translator) translateStreamRouteV2beta2(ctx *TranslateContext, ar *conf return err } sr.UpstreamId = ups.ID - ctx.addStreamRoute(sr) - if !ctx.checkUpstreamExist(ups.Name) { - ctx.addUpstream(ups) + ctx.AddStreamRoute(sr) + if !ctx.CheckUpstreamExist(ups.Name) { + ctx.AddUpstream(ups) } } @@ -786,9 +786,9 @@ func (t *translator) translateStreamRouteV2beta3(ctx *TranslateContext, ar *conf return err } sr.UpstreamId = ups.ID - ctx.addStreamRoute(sr) - if !ctx.checkUpstreamExist(ups.Name) { - ctx.addUpstream(ups) + ctx.AddStreamRoute(sr) + if !ctx.CheckUpstreamExist(ups.Name) { + ctx.AddUpstream(ups) } } @@ -821,9 +821,9 @@ func (t *translator) translateStreamRouteV2(ctx *TranslateContext, ar *configv2. return err } sr.UpstreamId = ups.ID - ctx.addStreamRoute(sr) - if !ctx.checkUpstreamExist(ups.Name) { - ctx.addUpstream(ups) + ctx.AddStreamRoute(sr) + if !ctx.CheckUpstreamExist(ups.Name) { + ctx.AddUpstream(ups) } } @@ -843,9 +843,9 @@ func (t *translator) translateStreamRouteNotStrictlyV2beta2(ctx *TranslateContex return err } sr.UpstreamId = ups.ID - ctx.addStreamRoute(sr) - if !ctx.checkUpstreamExist(ups.Name) { - ctx.addUpstream(ups) + ctx.AddStreamRoute(sr) + if !ctx.CheckUpstreamExist(ups.Name) { + ctx.AddUpstream(ups) } } return nil @@ -864,9 +864,9 @@ func (t *translator) translateStreamRouteNotStrictlyV2beta3(ctx *TranslateContex return err } sr.UpstreamId = ups.ID - ctx.addStreamRoute(sr) - if !ctx.checkUpstreamExist(ups.Name) { - ctx.addUpstream(ups) + ctx.AddStreamRoute(sr) + if !ctx.CheckUpstreamExist(ups.Name) { + ctx.AddUpstream(ups) } } return nil @@ -885,9 +885,9 @@ func (t *translator) translateStreamRouteNotStrictlyV2(ctx *TranslateContext, ar return err } sr.UpstreamId = ups.ID - ctx.addStreamRoute(sr) - if !ctx.checkUpstreamExist(ups.Name) { - ctx.addUpstream(ups) + ctx.AddStreamRoute(sr) + if !ctx.CheckUpstreamExist(ups.Name) { + ctx.AddUpstream(ups) } } return nil diff --git a/pkg/kube/translation/context.go b/pkg/kube/translation/context.go index dc67ef74ba..10aafb62ef 100644 --- a/pkg/kube/translation/context.go +++ b/pkg/kube/translation/context.go @@ -26,25 +26,25 @@ type TranslateContext struct { PluginConfigs []*apisix.PluginConfig } -func defaultEmptyTranslateContext() *TranslateContext { +func DefaultEmptyTranslateContext() *TranslateContext { return &TranslateContext{ upstreamMap: make(map[string]struct{}), } } -func (tc *TranslateContext) addRoute(r *apisix.Route) { +func (tc *TranslateContext) AddRoute(r *apisix.Route) { tc.Routes = append(tc.Routes, r) } -func (tc *TranslateContext) addSSL(ssl *apisix.Ssl) { +func (tc *TranslateContext) AddSSL(ssl *apisix.Ssl) { tc.SSL = append(tc.SSL, ssl) } -func (tc *TranslateContext) addStreamRoute(sr *apisix.StreamRoute) { +func (tc *TranslateContext) AddStreamRoute(sr *apisix.StreamRoute) { tc.StreamRoutes = append(tc.StreamRoutes, sr) } -func (tc *TranslateContext) addUpstream(u *apisix.Upstream) { +func (tc *TranslateContext) AddUpstream(u *apisix.Upstream) { if _, ok := tc.upstreamMap[u.Name]; ok { return } @@ -52,11 +52,11 @@ func (tc *TranslateContext) addUpstream(u *apisix.Upstream) { tc.Upstreams = append(tc.Upstreams, u) } -func (tc *TranslateContext) checkUpstreamExist(name string) (ok bool) { +func (tc *TranslateContext) CheckUpstreamExist(name string) (ok bool) { _, ok = tc.upstreamMap[name] return } -func (tc *TranslateContext) addPluginConfig(pc *apisix.PluginConfig) { +func (tc *TranslateContext) AddPluginConfig(pc *apisix.PluginConfig) { tc.PluginConfigs = append(tc.PluginConfigs, pc) } diff --git a/pkg/kube/translation/context_test.go b/pkg/kube/translation/context_test.go index 78cabc5f9c..045fa8deaf 100644 --- a/pkg/kube/translation/context_test.go +++ b/pkg/kube/translation/context_test.go @@ -23,7 +23,7 @@ import ( ) func TestTranslateContext(t *testing.T) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() r1 := &apisix.Route{ Metadata: apisix.Metadata{ @@ -65,14 +65,14 @@ func TestTranslateContext(t *testing.T) { Name: "aaa", }, } - ctx.addRoute(r1) - ctx.addRoute(r2) - ctx.addStreamRoute(sr1) - ctx.addStreamRoute(sr2) - ctx.addUpstream(u1) - ctx.addUpstream(u2) - ctx.addPluginConfig(pc1) - ctx.addPluginConfig(pc2) + ctx.AddRoute(r1) + ctx.AddRoute(r2) + ctx.AddStreamRoute(sr1) + ctx.AddStreamRoute(sr2) + ctx.AddUpstream(u1) + ctx.AddUpstream(u2) + ctx.AddPluginConfig(pc1) + ctx.AddPluginConfig(pc2) assert.Len(t, ctx.Routes, 2) assert.Len(t, ctx.StreamRoutes, 2) @@ -87,6 +87,6 @@ func TestTranslateContext(t *testing.T) { assert.Equal(t, pc1, ctx.PluginConfigs[0]) assert.Equal(t, pc2, ctx.PluginConfigs[1]) - assert.Equal(t, true, ctx.checkUpstreamExist("aaa")) - assert.Equal(t, false, ctx.checkUpstreamExist("bbb")) + assert.Equal(t, true, ctx.CheckUpstreamExist("aaa")) + assert.Equal(t, false, ctx.CheckUpstreamExist("bbb")) } diff --git a/pkg/kube/translation/ingress.go b/pkg/kube/translation/ingress.go index 3724981b0e..39a4e45253 100644 --- a/pkg/kube/translation/ingress.go +++ b/pkg/kube/translation/ingress.go @@ -40,7 +40,7 @@ const ( ) func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() plugins := t.translateAnnotations(ing.Annotations) annoExtractor := annotations.NewExtractor(ing.Annotations) useRegex := annoExtractor.GetBoolAnnotation(annotations.AnnotationsPrefix + "use-regex") @@ -72,7 +72,7 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*TranslateCo ) return nil, err } - ctx.addSSL(ssl) + ctx.AddSSL(ssl) } for _, rule := range ing.Spec.Rules { for _, pathRule := range rule.HTTP.Paths { @@ -90,7 +90,7 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*TranslateCo ) return nil, err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) } uris := []string{pathRule.Path} var nginxVars []kubev2.ApisixRouteHTTPMatchExpr @@ -143,21 +143,21 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*TranslateCo pluginConfig.Name = composeIngressPluginName(ing.Namespace, pathRule.Backend.Service.Name) pluginConfig.ID = id.GenID(route.Name) pluginConfig.Plugins = *(plugins.DeepCopy()) - ctx.addPluginConfig(pluginConfig) + ctx.AddPluginConfig(pluginConfig) route.PluginConfigId = pluginConfig.ID } if ups != nil { route.UpstreamId = ups.ID } - ctx.addRoute(route) + ctx.AddRoute(route) } } return ctx, nil } func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() plugins := t.translateAnnotations(ing.Annotations) annoExtractor := annotations.NewExtractor(ing.Annotations) useRegex := annoExtractor.GetBoolAnnotation(annotations.AnnotationsPrefix + "use-regex") @@ -189,7 +189,7 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*T ) return nil, err } - ctx.addSSL(ssl) + ctx.AddSSL(ssl) } for _, rule := range ing.Spec.Rules { for _, pathRule := range rule.HTTP.Paths { @@ -207,7 +207,7 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*T ) return nil, err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) } uris := []string{pathRule.Path} var nginxVars []kubev2.ApisixRouteHTTPMatchExpr @@ -260,14 +260,14 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*T pluginConfig.Name = composeIngressPluginName(ing.Namespace, pathRule.Backend.ServiceName) pluginConfig.ID = id.GenID(route.Name) pluginConfig.Plugins = *(plugins.DeepCopy()) - ctx.addPluginConfig(pluginConfig) + ctx.AddPluginConfig(pluginConfig) route.PluginConfigId = pluginConfig.ID } if ups != nil { route.UpstreamId = ups.ID } - ctx.addRoute(route) + ctx.AddRoute(route) } } return ctx, nil @@ -305,7 +305,7 @@ func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *n } func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.Ingress) (*TranslateContext, error) { - ctx := defaultEmptyTranslateContext() + ctx := DefaultEmptyTranslateContext() plugins := t.translateAnnotations(ing.Annotations) annoExtractor := annotations.NewExtractor(ing.Annotations) useRegex := annoExtractor.GetBoolAnnotation(annotations.AnnotationsPrefix + "use-regex") @@ -327,7 +327,7 @@ func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.In ) return nil, err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) } uris := []string{pathRule.Path} var nginxVars []kubev2.ApisixRouteHTTPMatchExpr @@ -380,14 +380,14 @@ func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.In pluginConfig.Name = composeIngressPluginName(ing.Namespace, pathRule.Backend.ServiceName) pluginConfig.ID = id.GenID(route.Name) pluginConfig.Plugins = *(plugins.DeepCopy()) - ctx.addPluginConfig(pluginConfig) + ctx.AddPluginConfig(pluginConfig) route.PluginConfigId = pluginConfig.ID } if ups != nil { route.UpstreamId = ups.ID } - ctx.addRoute(route) + ctx.AddRoute(route) } } return ctx, nil diff --git a/pkg/kube/translation/plugin.go b/pkg/kube/translation/plugin.go index 0340a2d5c2..f9236ae8c4 100644 --- a/pkg/kube/translation/plugin.go +++ b/pkg/kube/translation/plugin.go @@ -53,7 +53,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ns strin if err != nil { return nil, err } - ctx.addUpstream(ups) + ctx.AddUpstream(ups) weight := _defaultWeight if backend.Weight != nil { diff --git a/pkg/kube/translation/translator.go b/pkg/kube/translation/translator.go index 7764144989..59b3b1bd80 100644 --- a/pkg/kube/translation/translator.go +++ b/pkg/kube/translation/translator.go @@ -21,7 +21,6 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" listerscorev1 "k8s.io/client-go/listers/core/v1" - gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/pkg/kube" configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2" @@ -117,8 +116,6 @@ type Translator interface { // ExtractKeyPair extracts certificate and private key pair from secret // Supports APISIX style ("cert" and "key") and Kube style ("tls.crt" and "tls.key) ExtractKeyPair(s *corev1.Secret, hasPrivateKey bool) ([]byte, []byte, error) - // TranslateGatewayHTTPRouteV1Alpha2 translates Gateway API HTTPRoute to APISIX resources - TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha2.HTTPRoute) (*TranslateContext, error) } // TranslatorOptions contains options to help Translator diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go index cab6d405c2..2ce379b8df 100644 --- a/pkg/kube/translation/util.go +++ b/pkg/kube/translation/util.go @@ -261,10 +261,3 @@ func validateRemoteAddrs(remoteAddrs []string) error { } return nil } - -func truncate(s string, max int) string { - if max > len(s) || max < 0 { - return s - } - return s[:max] -}