Skip to content

Commit

Permalink
Merge pull request #2677 from g-gaston/avoid-extra-calls-for-not-foun…
Browse files Browse the repository at this point in the history
…d-resource-0.16

[release-0.16] 🐛 Avoid extra calls for not found resource
  • Loading branch information
k8s-ci-robot authored Feb 8, 2024
2 parents 67d355d + 91a3132 commit f8acd6e
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 35 deletions.
62 changes: 47 additions & 15 deletions pkg/client/apiutil/restmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"sync"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -179,23 +180,28 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
Group: metav1.APIGroup{Name: groupName},
VersionedResources: make(map[string][]metav1.APIResource),
}
if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}

// Update information for group resources about versioned resources.
// The number of API calls is equal to the number of versions: /apis/<group>/<version>.
groupVersionResources, err := m.fetchGroupVersionResources(groupName, versions...)
// If we encounter a missing API version (NotFound error), we will remove the group from
// the m.apiGroups and m.knownGroups caches.
// If this happens, in the next call the group will be added back to apiGroups
// and only the existing versions will be loaded in knownGroups.
groupVersionResources, err := m.fetchGroupVersionResourcesLocked(groupName, versions...)
if err != nil {
return fmt.Errorf("failed to get API group resources: %w", err)
}
for version, resources := range groupVersionResources {
groupResources.VersionedResources[version.Version] = resources.APIResources

if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}

// Update information for group resources about the API group by adding new versions.
// Ignore the versions that are already registered.
for _, version := range versions {
for groupVersion, resources := range groupVersionResources {
version := groupVersion.Version

groupResources.VersionedResources[version] = resources.APIResources
found := false
for _, v := range groupResources.Group.Versions {
if v.Version == version {
Expand All @@ -216,12 +222,7 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
m.knownGroups[groupName] = groupResources

// Finally, update the group with received information and regenerate the mapper.
updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups))
for _, agr := range m.knownGroups {
updatedGroupResources = append(updatedGroupResources, agr)
}

m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources)
m.refreshMapper()
return nil
}

Expand Down Expand Up @@ -267,18 +268,31 @@ func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error)
return nil, fmt.Errorf("failed to find API group %q", groupName)
}

// fetchGroupVersionResources fetches the resources for the specified group and its versions.
func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) {
// fetchGroupVersionResourcesLocked fetches the resources for the specified group and its versions.
// This method might modify the cache so it needs to be called under the lock.
func (m *mapper) fetchGroupVersionResourcesLocked(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)

for _, version := range versions {
groupVersion := schema.GroupVersion{Group: groupName, Version: version}

apiResourceList, err := m.client.ServerResourcesForGroupVersion(groupVersion.String())
if apierrors.IsNotFound(err) && m.isGroupVersionCached(groupVersion) {
// If the version is not found, we remove the group from the cache
// so it gets refreshed on the next call.
delete(m.apiGroups, groupName)
delete(m.knownGroups, groupName)
// It's important to refresh the mapper after invalidating the cache, since returning an error
// aborts the call and leaves the underlying mapper unchanged. If not refreshed, the next call
// will still return a match for the NotFound version.
m.refreshMapper()
}

if err != nil {
failedGroups[groupVersion] = err
}

if apiResourceList != nil {
// even in case of error, some fallback might have been returned.
groupVersionResources[groupVersion] = apiResourceList
Expand All @@ -292,3 +306,21 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string

return groupVersionResources, nil
}

// isGroupVersionCached checks if a version for a group is cached in the known groups cache.
func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool {
if cachedGroup, ok := m.knownGroups[gv.Group]; ok {
_, cached := cachedGroup.VersionedResources[gv.Version]
return cached
}

return false
}

func (m *mapper) refreshMapper() {
updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups))
for _, agr := range m.knownGroups {
updatedGroupResources = append(updatedGroupResources, agr)
}
m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources)
}
213 changes: 193 additions & 20 deletions pkg/client/apiutil/restmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ package apiutil_test

import (
"context"
"fmt"
"net/http"
"testing"

"k8s.io/apimachinery/pkg/api/meta"

_ "github.com/onsi/ginkgo/v2"
gmg "github.com/onsi/gomega"

"github.com/onsi/gomega/format"
gomegatypes "github.com/onsi/gomega/types"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -493,23 +496,7 @@ func TestLazyRestMapperProvider(t *testing.T) {
g.Expect(err).NotTo(gmg.HaveOccurred())

// Register another CRD in runtime - "riders.crew.example.com".

crd := &apiextensionsv1.CustomResourceDefinition{}
err = c.Get(context.TODO(), types.NamespacedName{Name: "drivers.crew.example.com"}, crd)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver"))

newCRD := &apiextensionsv1.CustomResourceDefinition{}
crd.DeepCopyInto(newCRD)
newCRD.Name = "riders.crew.example.com"
newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{
Kind: "Rider",
Plural: "riders",
}
newCRD.ResourceVersion = ""

// Create the new CRD.
g.Expect(c.Create(context.TODO(), newCRD)).To(gmg.Succeed())
createNewCRD(context.TODO(), g, c, "crew.example.com", "Rider", "riders")

// Wait a bit until the CRD is registered.
g.Eventually(func() error {
Expand All @@ -528,4 +515,190 @@ func TestLazyRestMapperProvider(t *testing.T) {
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("rider"))
})

t.Run("LazyRESTMapper should invalidate the group cache if a version is not found", func(t *testing.T) {
g := gmg.NewWithT(t)
ctx := context.Background()

httpClient, err := rest.HTTPClientFor(restCfg)
g.Expect(err).NotTo(gmg.HaveOccurred())

crt := newCountingRoundTripper(httpClient.Transport)
httpClient.Transport = crt

lazyRestMapper, err := apiutil.NewDynamicRESTMapper(restCfg, httpClient)
g.Expect(err).NotTo(gmg.HaveOccurred())

s := scheme.Scheme
err = apiextensionsv1.AddToScheme(s)
g.Expect(err).NotTo(gmg.HaveOccurred())

c, err := client.New(restCfg, client.Options{Scheme: s})
g.Expect(err).NotTo(gmg.HaveOccurred())

// Register a new CRD in a new group to avoid collisions when deleting versions - "taxis.inventory.example.com".
group := "inventory.example.com"
kind := "Taxi"
plural := "taxis"
crdName := plural + "." + group
// Create a CRD with two versions: v1alpha1 and v1 where both are served and
// v1 is the storage version so we can easily remove v1alpha1 later.
crd := newCRD(ctx, g, c, group, kind, plural)
v1alpha1 := crd.Spec.Versions[0]
v1alpha1.Name = "v1alpha1"
v1alpha1.Storage = false
v1alpha1.Served = true
v1 := crd.Spec.Versions[0]
v1.Name = "v1"
v1.Storage = true
v1.Served = true
crd.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{v1alpha1, v1}
g.Expect(c.Create(ctx, crd)).To(gmg.Succeed())
t.Cleanup(func() {
g.Expect(c.Delete(ctx, crd)).To(gmg.Succeed())
})

// Wait until the CRD is registered.
discHTTP, err := rest.HTTPClientFor(restCfg)
g.Expect(err).NotTo(gmg.HaveOccurred())
discClient, err := discovery.NewDiscoveryClientForConfigAndClient(restCfg, discHTTP)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Eventually(func(g gmg.Gomega) {
_, err = discClient.ServerResourcesForGroupVersion(group + "/v1")
g.Expect(err).NotTo(gmg.HaveOccurred())
}).Should(gmg.Succeed(), "v1 should be available")

// There are no requests before any call
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// Since we don't specify what version we expect, restmapper will fetch them all and search there.
// To fetch a list of available versions
// #1: GET https://host/api
// #2: GET https://host/apis
// Then, for all available versions:
// #3: GET https://host/apis/inventory.example.com/v1alpha1
// #4: GET https://host/apis/inventory.example.com/v1
// This should fill the cache for apiGroups and versions.
mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal(kind))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(4))
crt.Reset() // We reset the counter to check how many additional requests are made later.

// At this point v1alpha1 should be cached
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// We update the CRD to only have v1 version.
g.Expect(c.Get(ctx, types.NamespacedName{Name: crdName}, crd)).To(gmg.Succeed())
for _, version := range crd.Spec.Versions {
if version.Name == "v1" {
v1 = version
break
}
}
crd.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{v1}
g.Expect(c.Update(ctx, crd)).To(gmg.Succeed())

// We wait until v1alpha1 is not available anymore.
g.Eventually(func(g gmg.Gomega) {
_, err = discClient.ServerResourcesForGroupVersion(group + "/v1alpha1")
g.Expect(apierrors.IsNotFound(err)).To(gmg.BeTrue(), "v1alpha1 should not be available anymore")
}).Should(gmg.Succeed())

// Although v1alpha1 is not available anymore, the cache is not invalidated yet so it should return a mapping.
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// We request Limo, which is not in the mapper because it doesn't exist.
// This will trigger a reload of the lazy mapper cache.
// Reloading the cache will read v2 again and since it's not available anymore, it should invalidate the cache.
// #1: GET https://host/apis/inventory.example.com/v1alpha1
// #2: GET https://host/apis/inventory.example.com/v1
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: "Limo"})
g.Expect(err).To(beNoMatchError())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(2))
crt.Reset()

// Now we request v1alpha1 again and it should return an error since the cache was invalidated.
// #1: GET https://host/apis/inventory.example.com/v1alpha1
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1")
g.Expect(err).To(beNoMatchError())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(1))
crt.Reset()

// Since we don't specify what version we expect, restmapper will fetch them all and search there.
// To fetch a list of available versions
// #1: GET https://host/api
// #2: GET https://host/apis
// Then, for all available versions:
// #3: GET https://host/apis/inventory.example.com/v1
mapping, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal(kind))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(3))
})
}

// createNewCRD creates a new CRD with the given group, kind, and plural and returns it.
func createNewCRD(ctx context.Context, g gmg.Gomega, c client.Client, group, kind, plural string) *apiextensionsv1.CustomResourceDefinition {
newCRD := newCRD(ctx, g, c, group, kind, plural)
g.Expect(c.Create(ctx, newCRD)).To(gmg.Succeed())

return newCRD
}

// newCRD returns a new CRD with the given group, kind, and plural.
func newCRD(ctx context.Context, g gmg.Gomega, c client.Client, group, kind, plural string) *apiextensionsv1.CustomResourceDefinition {
crd := &apiextensionsv1.CustomResourceDefinition{}
err := c.Get(ctx, types.NamespacedName{Name: "drivers.crew.example.com"}, crd)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver"))

newCRD := &apiextensionsv1.CustomResourceDefinition{}
crd.DeepCopyInto(newCRD)
newCRD.Spec.Group = group
newCRD.Name = plural + "." + group
newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{
Kind: kind,
Plural: plural,
}
newCRD.ResourceVersion = ""

return newCRD
}

func beNoMatchError() gomegatypes.GomegaMatcher {
return &errorMatcher{
checkFunc: meta.IsNoMatchError,
message: "NoMatch",
}
}

type errorMatcher struct {
checkFunc func(error) bool
message string
}

func (e *errorMatcher) Match(actual interface{}) (success bool, err error) {
if actual == nil {
return false, nil
}

actualErr, actualOk := actual.(error)
if !actualOk {
return false, fmt.Errorf("expected an error-type. got:\n%s", format.Object(actual, 1))
}

return e.checkFunc(actualErr), nil
}

func (e *errorMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, fmt.Sprintf("to be %s error", e.message))
}

func (e *errorMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, fmt.Sprintf("not to be %s error", e.message))
}

0 comments on commit f8acd6e

Please sign in to comment.