From 3337be7c7d5f959301171a243f4c0c0d49360503 Mon Sep 17 00:00:00 2001 From: Alex Zhang Date: Wed, 2 Jun 2021 11:24:18 +0800 Subject: [PATCH] feat: subset changes in controllers (#507) --- docs/en/latest/references/apisix_upstream.md | 3 + pkg/ingress/apisix_upstream.go | 89 ++++----- pkg/ingress/controller.go | 2 + pkg/ingress/endpoint.go | 39 ++-- pkg/ingress/pod.go | 43 ++++- pkg/ingress/pod_test.go | 53 +++++- pkg/kube/translation/apisix_route.go | 4 +- pkg/kube/translation/ingress.go | 4 +- pkg/kube/translation/translator.go | 44 +++-- pkg/kube/translation/translator_test.go | 6 +- pkg/kube/translation/util.go | 4 +- pkg/types/apisix/v1/types.go | 17 +- pkg/types/pod.go | 21 ++- pkg/types/pod_test.go | 4 +- .../deploy/crd/v1beta1/ApisixUpstream.yaml | 1 - test/e2e/features/subset.go | 172 ++++++++++++++++++ test/e2e/ingress/resourcepushing.go | 2 +- test/e2e/scaffold/httpbin.go | 4 +- test/e2e/scaffold/k8s.go | 7 + 19 files changed, 412 insertions(+), 107 deletions(-) create mode 100644 test/e2e/features/subset.go diff --git a/docs/en/latest/references/apisix_upstream.md b/docs/en/latest/references/apisix_upstream.md index 84ab1feaf6..0d23a66135 100644 --- a/docs/en/latest/references/apisix_upstream.md +++ b/docs/en/latest/references/apisix_upstream.md @@ -66,3 +66,6 @@ title: ApisixUpstream Reference | portLevelSettings.scheme | string | same as `scheme` but takes higher precedence. | | portLevelSettings.loadbalancer | object | same as `loadbalancer` but takes higher precedence. | | portLevelSettings.healthCheck | object | same as `healthCheck` but takes higher precedence. | +| subsets | array | service subset list, use pod labels to organize service endpoints to different groups. | +| subsets[].name | string | the subset name. | +| subsets[].labels | object | the subset label map. | diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go index 4ba0162783..df38f7a534 100644 --- a/pkg/ingress/apisix_upstream.go +++ b/pkg/ingress/apisix_upstream.go @@ -129,58 +129,65 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er return err } + var subsets []configv1.ApisixUpstreamSubset + subsets = append(subsets, configv1.ApisixUpstreamSubset{}) + if len(au.Spec.Subsets) > 0 { + subsets = append(subsets, au.Spec.Subsets...) + } clusterName := c.controller.cfg.APISIX.DefaultClusterName for _, port := range svc.Spec.Ports { - upsName := apisixv1.ComposeUpstreamName(namespace, name, port.Port) - // TODO: multiple cluster - ups, err := c.controller.apisix.Cluster(clusterName).Upstream().Get(ctx, upsName) - if err != nil { - if err == apisixcache.ErrNotFound { - continue - } - log.Errorf("failed to get upstream %s: %s", upsName, err) - c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err) - c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse) - return err - } - var newUps *apisixv1.Upstream - if ev.Type != types.EventDelete { - cfg, ok := portLevelSettings[port.Port] - if !ok { - cfg = &au.Spec.ApisixUpstreamConfig - } - // FIXME Same ApisixUpstreamConfig might be translated multiple times. - newUps, err = c.controller.translator.TranslateUpstreamConfig(cfg) + for _, subset := range subsets { + upsName := apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port) + // TODO: multiple cluster + ups, err := c.controller.apisix.Cluster(clusterName).Upstream().Get(ctx, upsName) if err != nil { - log.Errorw("found malformed ApisixUpstream", - zap.Any("object", au), - zap.Error(err), - ) + if err == apisixcache.ErrNotFound { + continue + } + log.Errorf("failed to get upstream %s: %s", upsName, err) c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err) c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse) return err } - } else { - newUps = apisixv1.NewDefaultUpstream() - } + var newUps *apisixv1.Upstream + if ev.Type != types.EventDelete { + cfg, ok := portLevelSettings[port.Port] + if !ok { + cfg = &au.Spec.ApisixUpstreamConfig + } + // FIXME Same ApisixUpstreamConfig might be translated multiple times. + newUps, err = c.controller.translator.TranslateUpstreamConfig(cfg) + if err != nil { + log.Errorw("found malformed ApisixUpstream", + zap.Any("object", au), + zap.Error(err), + ) + c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err) + c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse) + return err + } + } else { + newUps = apisixv1.NewDefaultUpstream() + } - newUps.Metadata = ups.Metadata - newUps.Nodes = ups.Nodes - log.Debugw("updating upstream since ApisixUpstream changed", - zap.String("event", ev.Type.String()), - zap.Any("upstream", newUps), - zap.Any("ApisixUpstream", au), - ) - if _, err := c.controller.apisix.Cluster(clusterName).Upstream().Update(ctx, newUps); err != nil { - log.Errorw("failed to update upstream", - zap.Error(err), + newUps.Metadata = ups.Metadata + newUps.Nodes = ups.Nodes + log.Debugw("updating upstream since ApisixUpstream changed", + zap.String("event", ev.Type.String()), zap.Any("upstream", newUps), zap.Any("ApisixUpstream", au), - zap.String("cluster", clusterName), ) - c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err) - c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse) - return err + if _, err := c.controller.apisix.Cluster(clusterName).Upstream().Update(ctx, newUps); err != nil { + log.Errorw("failed to update upstream", + zap.Error(err), + zap.Any("upstream", newUps), + zap.Any("ApisixUpstream", au), + zap.String("cluster", clusterName), + ) + c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err) + c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse) + return err + } } } c.controller.recorderEvent(au, corev1.EventTypeNormal, _resourceSynced, nil) diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go index 04ecfe12d9..f9f905f084 100644 --- a/pkg/ingress/controller.go +++ b/pkg/ingress/controller.go @@ -196,6 +196,8 @@ func (c *Controller) initWhenStartLeading() { c.apisixClusterConfigLister = apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister() c.translator = translation.NewTranslator(&translation.TranslatorOptions{ + PodCache: c.podCache, + PodLister: c.podLister, EndpointsLister: c.epLister, ServiceLister: c.svcLister, ApisixUpstreamLister: c.apisixUpstreamLister, diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go index cfcea7fca5..46c00cba67 100644 --- a/pkg/ingress/endpoint.go +++ b/pkg/ingress/endpoint.go @@ -26,6 +26,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/apisix" apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" + configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" @@ -96,6 +97,18 @@ func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error { log.Errorf("failed to get service %s/%s: %s", ep.Namespace, ep.Name, err) return err } + var subsets []configv1.ApisixUpstreamSubset + subsets = append(subsets, configv1.ApisixUpstreamSubset{}) + au, err := c.controller.apisixUpstreamLister.ApisixUpstreams(ep.Namespace).Get(ep.Name) + if err != nil { + if !k8serrors.IsNotFound(err) { + log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace, ep.Name, err) + return err + } + } else if len(au.Spec.Subsets) > 0 { + subsets = append(subsets, au.Spec.Subsets...) + } + portMap := make(map[string]int32) for _, port := range svc.Spec.Ports { portMap[port.Name] = port.Port @@ -109,18 +122,20 @@ func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error { log.Errorf("port %s in endpoints %s/%s but not in service", port.Name, ep.Namespace, ep.Name) continue } - nodes, err := c.controller.translator.TranslateUpstreamNodes(ep, svcPort) - if err != nil { - log.Errorw("failed to translate upstream nodes", - zap.Error(err), - zap.Any("endpoints", ep), - zap.Int32("port", svcPort), - ) - } - name := apisixv1.ComposeUpstreamName(ep.Namespace, ep.Name, svcPort) - for _, cluster := range clusters { - if err := c.syncToCluster(ctx, cluster, nodes, name); err != nil { - return err + for _, subset := range subsets { + nodes, err := c.controller.translator.TranslateUpstreamNodes(ep, svcPort, subset.Labels) + if err != nil { + log.Errorw("failed to translate upstream nodes", + zap.Error(err), + zap.Any("endpoints", ep), + zap.Int32("port", svcPort), + ) + } + name := apisixv1.ComposeUpstreamName(ep.Namespace, ep.Name, subset.Name, svcPort) + for _, cluster := range clusters { + if err := c.syncToCluster(ctx, cluster, nodes, name); err != nil { + return err + } } } } diff --git a/pkg/ingress/pod.go b/pkg/ingress/pod.go index b98c29d9bc..624ad01fd4 100644 --- a/pkg/ingress/pod.go +++ b/pkg/ingress/pod.go @@ -22,6 +22,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/apache/apisix-ingress-controller/pkg/log" + "github.com/apache/apisix-ingress-controller/pkg/types" ) type podController struct { @@ -35,6 +36,7 @@ func (c *Controller) newPodController() *podController { ctl.controller.podInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: ctl.onAdd, + UpdateFunc: ctl.onUpdate, DeleteFunc: ctl.onDelete, }, ) @@ -67,10 +69,43 @@ func (c *podController) onAdd(obj interface{}) { ) pod := obj.(*corev1.Pod) if err := c.controller.podCache.Add(pod); err != nil { - log.Errorw("failed to add pod to cache", - zap.Error(err), - zap.Any("pod", pod), - ) + if err == types.ErrPodNoAssignedIP { + log.Debugw("pod no assigned ip, postpone the adding in subsequent update event", + zap.Any("pod", pod), + ) + } else { + log.Errorw("failed to add pod to cache", + zap.Error(err), + zap.Any("pod", pod), + ) + } + } +} + +func (c *podController) onUpdate(_, cur interface{}) { + pod := cur.(*corev1.Pod) + + if !c.controller.namespaceWatching(pod.Namespace + "/" + pod.Name) { + return + } + log.Debugw("pod update event arrived", + zap.Any("final state", pod), + ) + if pod.DeletionTimestamp != nil { + if err := c.controller.podCache.Delete(pod); err != nil { + log.Errorw("failed to delete pod from cache", + zap.Error(err), + zap.Any("pod", pod), + ) + } + } + if pod.Status.PodIP != "" { + if err := c.controller.podCache.Add(pod); err != nil { + log.Errorw("failed to add pod to cache", + zap.Error(err), + zap.Any("pod", pod), + ) + } } } diff --git a/pkg/ingress/pod_test.go b/pkg/ingress/pod_test.go index e58085abb2..289c4e7af6 100644 --- a/pkg/ingress/pod_test.go +++ b/pkg/ingress/pod_test.go @@ -16,13 +16,13 @@ package ingress import ( "testing" + "time" "github.com/stretchr/testify/assert" - - "github.com/apache/apisix-ingress-controller/pkg/types" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/apache/apisix-ingress-controller/pkg/types" ) func TestPodOnAdd(t *testing.T) { @@ -109,3 +109,50 @@ func TestPodOnDelete(t *testing.T) { assert.Equal(t, name, "abc") assert.Nil(t, err) } + +func TestPodOnUpdate(t *testing.T) { + ctl := &podController{ + controller: &Controller{ + watchingNamespace: map[string]struct{}{ + "default": {}, + }, + podCache: types.NewPodCache(), + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "nginx", + DeletionTimestamp: &metav1.Time{ + Time: time.Now(), + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + PodIP: "10.0.5.12", + }, + } + assert.Nil(t, ctl.controller.podCache.Add(pod), "adding pod") + + ctl.onUpdate(nil, pod) + name, err := ctl.controller.podCache.GetNameByIP("10.0.5.12") + assert.Equal(t, name, "nginx") + assert.Equal(t, err, nil) + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "public", + Name: "abc", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + PodIP: "10.0.5.13", + }, + } + assert.Nil(t, ctl.controller.podCache.Add(pod2), "adding pod") + ctl.onUpdate(nil, pod2) + name, err = ctl.controller.podCache.GetNameByIP("10.0.5.13") + assert.Equal(t, name, "abc") + assert.Nil(t, err) +} diff --git a/pkg/kube/translation/apisix_route.go b/pkg/kube/translation/apisix_route.go index 2360d72eef..9285bb6c75 100644 --- a/pkg/kube/translation/apisix_route.go +++ b/pkg/kube/translation/apisix_route.go @@ -54,7 +54,7 @@ func (t *translator) TranslateRouteV1(ar *configv1.ApisixRoute) (*TranslateConte } } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, p.Backend.ServiceName, int32(p.Backend.ServicePort)) + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, p.Backend.ServiceName, "", int32(p.Backend.ServicePort)) route := apisixv1.NewDefaultRoute() route.Name = r.Host + p.Path route.ID = id.GenID(route.Name) @@ -150,7 +150,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, svcPort) + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) diff --git a/pkg/kube/translation/ingress.go b/pkg/kube/translation/ingress.go index 5169b68cb2..3ac43bd4b1 100644 --- a/pkg/kube/translation/ingress.go +++ b/pkg/kube/translation/ingress.go @@ -173,7 +173,7 @@ func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *n if err != nil { return nil, err } - ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, svcPort) + ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", svcPort) ups.ID = id.GenID(ups.Name) return ups, nil } @@ -264,7 +264,7 @@ func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcNa if err != nil { return nil, err } - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, portNumber) + ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber) ups.ID = id.GenID(ups.Name) return ups, nil } diff --git a/pkg/kube/translation/translator.go b/pkg/kube/translation/translator.go index aed32515af..5f833f52ad 100644 --- a/pkg/kube/translation/translator.go +++ b/pkg/kube/translation/translator.go @@ -45,8 +45,9 @@ func (te *translateError) Error() string { // Translator translates Apisix* CRD resources to the description in APISIX. type Translator interface { // TranslateUpstreamNodes translate Endpoints resources to APISIX Upstream nodes - // according to the give port. - TranslateUpstreamNodes(*corev1.Endpoints, int32) (apisixv1.UpstreamNodes, error) + // according to the give port. Extra labels can be passed to filter the ultimate + // upstream nodes. + TranslateUpstreamNodes(*corev1.Endpoints, int32, types.Labels) (apisixv1.UpstreamNodes, error) // TranslateUpstreamConfig translates ApisixUpstreamConfig (part of ApisixUpstream) // to APISIX Upstream, it doesn't fill the the Upstream metadata and nodes. TranslateUpstreamConfig(*configv1.ApisixUpstreamConfig) (*apisixv1.Upstream, error) @@ -126,39 +127,40 @@ func (t *translator) TranslateUpstream(namespace, name, subset string, port int3 reason: err.Error(), } } - nodes, err := t.TranslateUpstreamNodes(endpoints, port) - if err != nil { - return nil, err - } - ups := apisixv1.NewDefaultUpstream() au, err := t.ApisixUpstreamLister.ApisixUpstreams(namespace).Get(name) + ups := apisixv1.NewDefaultUpstream() if err != nil { if k8serrors.IsNotFound(err) { // If subset in ApisixRoute is not empty but the ApisixUpstream resouce not found, // just set an empty node list. if subset != "" { ups.Nodes = apisixv1.UpstreamNodes{} - } else { - ups.Nodes = nodes + return ups, nil + } + } else { + return nil, &translateError{ + field: "ApisixUpstream", + reason: err.Error(), } - return ups, nil - } - return nil, &translateError{ - field: "ApisixUpstream", - reason: err.Error(), } } - - // Filter nodes by subset. + var labels types.Labels if subset != "" { - var labels types.Labels for _, ss := range au.Spec.Subsets { if ss.Name == subset { labels = ss.Labels break } } - nodes = t.filterNodesByLabels(nodes, labels, au.Namespace) + } + // Filter nodes by subset. + nodes, err := t.TranslateUpstreamNodes(endpoints, port, labels) + if err != nil { + return nil, err + } + if au == nil { + ups.Nodes = nodes + return ups, nil } upsCfg := &au.Spec.ApisixUpstreamConfig @@ -176,7 +178,7 @@ func (t *translator) TranslateUpstream(namespace, name, subset string, port int3 return ups, nil } -func (t *translator) TranslateUpstreamNodes(endpoints *corev1.Endpoints, port int32) (apisixv1.UpstreamNodes, error) { +func (t *translator) TranslateUpstreamNodes(endpoints *corev1.Endpoints, port int32, labels types.Labels) (apisixv1.UpstreamNodes, error) { svc, err := t.ServiceLister.Services(endpoints.Namespace).Get(endpoints.Name) if err != nil { return nil, &translateError{ @@ -220,6 +222,10 @@ func (t *translator) TranslateUpstreamNodes(endpoints *corev1.Endpoints, port in } } } + if labels != nil { + nodes = t.filterNodesByLabels(nodes, labels, endpoints.Namespace) + return nodes, nil + } return nodes, nil } diff --git a/pkg/kube/translation/translator_test.go b/pkg/kube/translation/translator_test.go index e056130af8..c2df7bdb16 100644 --- a/pkg/kube/translation/translator_test.go +++ b/pkg/kube/translation/translator_test.go @@ -175,14 +175,14 @@ func TestTranslateUpstreamNodes(t *testing.T) { }} <-processCh - nodes, err := tr.TranslateUpstreamNodes(endpoints, 10080) + nodes, err := tr.TranslateUpstreamNodes(endpoints, 10080, nil) assert.Nil(t, nodes) assert.Equal(t, err, &translateError{ field: "service.spec.ports", reason: "port not defined", }) - nodes, err = tr.TranslateUpstreamNodes(endpoints, 80) + nodes, err = tr.TranslateUpstreamNodes(endpoints, 80, nil) assert.Nil(t, err) assert.Equal(t, nodes, apisixv1.UpstreamNodes{ { @@ -197,7 +197,7 @@ func TestTranslateUpstreamNodes(t *testing.T) { }, }) - nodes, err = tr.TranslateUpstreamNodes(endpoints, 443) + nodes, err = tr.TranslateUpstreamNodes(endpoints, 443, nil) assert.Nil(t, err) assert.Equal(t, nodes, apisixv1.UpstreamNodes{ { diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go index 3584205d04..0dff70debb 100644 --- a/pkg/kube/translation/util.go +++ b/pkg/kube/translation/util.go @@ -124,7 +124,7 @@ func (t *translator) translateUpstream(namespace, svcName, subset, svcResolveGra }, } } - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, svcPort) + ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, subset, svcPort) ups.ID = id.GenID(ups.Name) return ups, nil } @@ -134,7 +134,7 @@ func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels ty return nodes } - var filteredNodes apisixv1.UpstreamNodes + filteredNodes := make(apisixv1.UpstreamNodes, 0) for _, node := range nodes { podName, err := t.PodCache.GetNameByIP(node.Host) if err != nil { diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go index 467872ee31..3c5ff473ec 100644 --- a/pkg/types/apisix/v1/types.go +++ b/pkg/types/apisix/v1/types.go @@ -387,19 +387,28 @@ func NewDefaultConsumer() *Consumer { } } -// ComposeUpstreamName uses namespace, name and port info to compose +// ComposeUpstreamName uses namespace, name, subset (optional) and port info to compose // the upstream name. -func ComposeUpstreamName(namespace, name string, port int32) string { +func ComposeUpstreamName(namespace, name, subset string, port int32) string { pstr := strconv.Itoa(int(port)) // FIXME Use sync.Pool to reuse this buffer if the upstream // name composing code path is hot. - p := make([]byte, 0, len(namespace)+len(name)+len(pstr)+2) - buf := bytes.NewBuffer(p) + var p []byte + if subset == "" { + p = make([]byte, 0, len(namespace)+len(name)+len(pstr)+2) + } else { + p = make([]byte, 0, len(namespace)+len(name)+len(subset)+len(pstr)+3) + } + buf := bytes.NewBuffer(p) buf.WriteString(namespace) buf.WriteByte('_') buf.WriteString(name) buf.WriteByte('_') + if subset != "" { + buf.WriteString(subset) + buf.WriteByte('_') + } buf.WriteString(pstr) return buf.String() diff --git a/pkg/types/pod.go b/pkg/types/pod.go index f529920d78..50ed8aeea3 100644 --- a/pkg/types/pod.go +++ b/pkg/types/pod.go @@ -22,9 +22,9 @@ import ( ) var ( - // ErrPodNotRunning represents that PodCache operation is failed due to the + // ErrPodNoAssignedIP represents that PodCache operation is failed due to the // target Pod is not in Running phase. - ErrPodNotRunning = errors.New("pod not running") + ErrPodNoAssignedIP = errors.New("pod not running") // ErrPodNotFound represents that the target pod not found from the PodCache. ErrPodNotFound = errors.New("pod not found") ) @@ -43,6 +43,7 @@ type PodCache interface { type podCache struct { sync.RWMutex + nameByIP map[string]string } @@ -54,22 +55,24 @@ func NewPodCache() PodCache { } func (p *podCache) Add(pod *corev1.Pod) error { - if pod.Status.Phase != corev1.PodRunning { - return ErrPodNotRunning + ip := pod.Status.PodIP + if len(ip) == 0 { + return ErrPodNoAssignedIP } p.Lock() defer p.Unlock() - p.nameByIP[pod.Status.PodIP] = pod.Name + p.nameByIP[ip] = pod.Name return nil } func (p *podCache) Delete(pod *corev1.Pod) error { + ip := pod.Status.PodIP + if len(ip) == 0 { + return ErrPodNoAssignedIP + } p.Lock() defer p.Unlock() - if _, ok := p.nameByIP[pod.Status.PodIP]; !ok { - return ErrPodNotFound - } - delete(p.nameByIP, pod.Status.PodIP) + delete(p.nameByIP, ip) return nil } diff --git a/pkg/types/pod_test.go b/pkg/types/pod_test.go index 844f01fcdb..15930a9ca1 100644 --- a/pkg/types/pod_test.go +++ b/pkg/types/pod_test.go @@ -35,8 +35,8 @@ func TestPodCacheBadCases(t *testing.T) { Phase: corev1.PodPending, }, } - assert.Equal(t, pc.Add(pod1), ErrPodNotRunning, "adding pod") - assert.Equal(t, pc.Delete(pod1), ErrPodNotFound, "deleting pod") + assert.Equal(t, pc.Add(pod1), ErrPodNoAssignedIP, "adding pod") + assert.Equal(t, pc.Delete(pod1), ErrPodNoAssignedIP, "deleting pod") } func TestPodCache(t *testing.T) { diff --git a/samples/deploy/crd/v1beta1/ApisixUpstream.yaml b/samples/deploy/crd/v1beta1/ApisixUpstream.yaml index 6e3825433d..aebb4e11a5 100644 --- a/samples/deploy/crd/v1beta1/ApisixUpstream.yaml +++ b/samples/deploy/crd/v1beta1/ApisixUpstream.yaml @@ -32,7 +32,6 @@ spec: kind: ApisixUpstream shortNames: - au - preserveUnknownFields: false subresources: status: {} validation: diff --git a/test/e2e/features/subset.go b/test/e2e/features/subset.go new file mode 100644 index 0000000000..64231799a9 --- /dev/null +++ b/test/e2e/features/subset.go @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package features + +import ( + "fmt" + "net/http" + "time" + + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" + "github.com/onsi/ginkgo" + "github.com/stretchr/testify/assert" +) + +var _ = ginkgo.Describe("service subset", func() { + opts := &scaffold.Options{ + Name: "default", + Kubeconfig: scaffold.GetKubeconfig(), + APISIXConfigPath: "testdata/apisix-gw-config.yaml", + IngressAPISIXReplicas: 1, + HTTPBinServicePort: 80, + APISIXRouteVersion: "apisix.apache.org/v2alpha1", + } + s := scaffold.NewScaffold(opts) + ginkgo.It("subset not found", func() { + assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(2), "scaling number of httpbin instances") + assert.Nil(ginkgo.GinkgoT(), s.WaitAllHTTPBINPodsAvailable(), "waiting for all httpbin pods ready") + backendSvc, backendSvcPort := s.DefaultHTTPBackend() + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2alpha1 +kind: ApisixRoute +metadata: + name: httpbin-route +spec: + http: + - name: rule1 + match: + hosts: + - httpbin.com + paths: + - /ip + backend: + serviceName: %s + servicePort: %d + subset: not_exist +`, backendSvc, backendSvcPort[0]) + err := s.CreateResourceFromString(ar) + assert.Nil(ginkgo.GinkgoT(), err, "creating ApisixRoute") + + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes") + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams") + + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err, "listing upstreams") + assert.Len(ginkgo.GinkgoT(), ups, 1) + assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 0, "upstreams nodes not expect") + + s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusServiceUnavailable).Body().Raw() + }) + + ginkgo.It("subset with bad labels", func() { + backendSvc, backendSvcPort := s.DefaultHTTPBackend() + au := fmt.Sprintf(` +apiVersion: apisix.apache.org/v1 +kind: ApisixUpstream +metadata: + name: %s +spec: + subsets: + - name: aa + labels: + aa: bb + cc: dd +`, backendSvc) + err := s.CreateResourceFromString(au) + assert.Nil(ginkgo.GinkgoT(), err, "create ApisixUpstream") + time.Sleep(1 * time.Second) + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2alpha1 +kind: ApisixRoute +metadata: + name: httpbin-route +spec: + http: + - name: rule1 + match: + hosts: + - httpbin.com + paths: + - /ip + backend: + serviceName: %s + servicePort: %d + subset: aa +`, backendSvc, backendSvcPort[0]) + err = s.CreateResourceFromString(ar) + assert.Nil(ginkgo.GinkgoT(), err, "creating ApisixRoute") + time.Sleep(3 * time.Second) + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes") + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams") + + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err, "listing upstreams") + assert.Len(ginkgo.GinkgoT(), ups, 1) + assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 0, "upstreams nodes not expect") + + s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusServiceUnavailable).Body().Raw() + }) + + ginkgo.It("subset with good labels (all)", func() { + assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(2), "scaling number of httpbin instances") + assert.Nil(ginkgo.GinkgoT(), s.WaitAllHTTPBINPodsAvailable(), "waiting for all httpbin pods ready") + + backendSvc, backendSvcPort := s.DefaultHTTPBackend() + au := fmt.Sprintf(` +apiVersion: apisix.apache.org/v1 +kind: ApisixUpstream +metadata: + name: %s +spec: + subsets: + - name: all + labels: + app: httpbin-deployment-e2e-test +`, backendSvc) + err := s.CreateResourceFromString(au) + assert.Nil(ginkgo.GinkgoT(), err, "create ApisixUpstream") + time.Sleep(1 * time.Second) + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2alpha1 +kind: ApisixRoute +metadata: + name: httpbin-route +spec: + http: + - name: rule1 + match: + hosts: + - httpbin.com + paths: + - /ip + backend: + serviceName: %s + servicePort: %d + subset: all +`, backendSvc, backendSvcPort[0]) + err = s.CreateResourceFromString(ar) + assert.Nil(ginkgo.GinkgoT(), err, "creating ApisixRoute") + + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes") + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams") + + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err, "listing upstreams") + assert.Len(ginkgo.GinkgoT(), ups, 1) + assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2, "upstreams nodes not expect") + + s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw() + }) +}) diff --git a/test/e2e/ingress/resourcepushing.go b/test/e2e/ingress/resourcepushing.go index fa0675c266..6f432bb193 100644 --- a/test/e2e/ingress/resourcepushing.go +++ b/test/e2e/ingress/resourcepushing.go @@ -61,7 +61,7 @@ spec: err = s.EnsureNumApisixUpstreamsCreated(1) assert.Nil(ginkgo.GinkgoT(), err, "Checking number of upstreams") assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(2), "scaling number of httpbin instances") - assert.Nil(ginkgo.GinkgoT(), s.WaitAllHTTPBINPoddsAvailable(), "waiting for all httpbin pods ready") + assert.Nil(ginkgo.GinkgoT(), s.WaitAllHTTPBINPodsAvailable(), "waiting for all httpbin pods ready") // TODO When ingress controller can feedback the lifecycle of CRDs to the // status field, we can poll it rather than sleeping. time.Sleep(10 * time.Second) diff --git a/test/e2e/scaffold/httpbin.go b/test/e2e/scaffold/httpbin.go index d9fd861942..0850a9c850 100644 --- a/test/e2e/scaffold/httpbin.go +++ b/test/e2e/scaffold/httpbin.go @@ -124,8 +124,8 @@ func (s *Scaffold) DeleteHTTPBINService() error { return nil } -// WaitAllHTTPBINPods waits until all httpbin pods ready. -func (s *Scaffold) WaitAllHTTPBINPoddsAvailable() error { +// WaitAllHTTPBINPodsAvailable waits until all httpbin pods ready. +func (s *Scaffold) WaitAllHTTPBINPodsAvailable() error { opts := metav1.ListOptions{ LabelSelector: "app=httpbin-deployment-e2e-test", } diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go index 4f4a912591..1ca1a99056 100644 --- a/test/e2e/scaffold/k8s.go +++ b/test/e2e/scaffold/k8s.go @@ -106,6 +106,13 @@ func (s *Scaffold) GetServiceByName(name string) (*corev1.Service, error) { return k8s.GetServiceE(s.t, s.kubectlOptions, name) } +// ListPodsByLabels lists all pods which matching the label selector. +func (s *Scaffold) ListPodsByLabels(labels string) ([]corev1.Pod, error) { + return k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{ + LabelSelector: labels, + }) +} + // CreateResourceFromStringWithNamespace creates resource from a loaded yaml string // and sets its namespace to the specified one. func (s *Scaffold) CreateResourceFromStringWithNamespace(yaml, namespace string) error {