From b2cb840d680b4c805bf410028b5d1d8d1c6f715d Mon Sep 17 00:00:00 2001 From: Lorenz Bauer Date: Tue, 12 Sep 2023 17:23:14 +0100 Subject: [PATCH] egressgw: use Resource[CiliumEndpoint] Replace the custom watcher with Resource[CiliumEndpoint]. This also allows getting rid of the custom retry queue, since Resource will by default retry an event if Done is called with an error. Signed-off-by: Lorenz Bauer --- daemon/cmd/daemon.go | 6 - daemon/k8s/resources.go | 1 + pkg/egressgateway/helpers_test.go | 14 +- pkg/egressgateway/manager.go | 143 +++++-------------- pkg/egressgateway/manager_privileged_test.go | 25 ++-- pkg/k8s/watchers/cilium_endpoint.go | 7 - pkg/k8s/watchers/watcher.go | 8 -- pkg/k8s/watchers/watcher_test.go | 7 - 8 files changed, 60 insertions(+), 151 deletions(-) diff --git a/daemon/cmd/daemon.go b/daemon/cmd/daemon.go index e55a378bc46d6..acc62580c63d0 100644 --- a/daemon/cmd/daemon.go +++ b/daemon/cmd/daemon.go @@ -631,11 +631,6 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams d.cgroupManager = manager.NewCgroupManager() - var egressGatewayWatcher watchers.EgressGatewayManager - if d.egressGatewayManager != nil { - egressGatewayWatcher = d.egressGatewayManager - } - d.k8sWatcher = watchers.NewK8sWatcher( params.Clientset, d.endpointManager, @@ -646,7 +641,6 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams d.datapath, d.redirectPolicyManager, d.bgpSpeaker, - egressGatewayWatcher, d.l7Proxy, option.Config, d.ipcache, diff --git a/daemon/k8s/resources.go b/daemon/k8s/resources.go index c97579e2928a9..6fb710ad34ebf 100644 --- a/daemon/k8s/resources.go +++ b/daemon/k8s/resources.go @@ -64,6 +64,7 @@ var ( k8s.CiliumClusterwideNetworkPolicyResource, k8s.CiliumCIDRGroupResource, k8s.CiliumNodeResource, + k8s.CiliumSlimEndpointResource, ), ) ) diff --git a/pkg/egressgateway/helpers_test.go b/pkg/egressgateway/helpers_test.go index afe9098f723e6..8a5a3aafd7b55 100644 --- a/pkg/egressgateway/helpers_test.go +++ b/pkg/egressgateway/helpers_test.go @@ -12,6 +12,7 @@ import ( v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" "github.com/cilium/cilium/pkg/k8s/resource" slimv1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1" + k8sTypes "github.com/cilium/cilium/pkg/k8s/types" "github.com/cilium/cilium/pkg/policy/api" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -48,8 +49,10 @@ func (fr fakeResource[T]) Observe(ctx context.Context, next func(event resource. } func (fr fakeResource[T]) Events(ctx context.Context, opts ...resource.EventsOpt) <-chan resource.Event[T] { - if opts != nil { - panic("opts not supported") + if len(opts) > 1 { + // Ideally we'd only ignore resource.WithRateLimit here, but that + // isn't possible. + panic("more than one option is not supported") } return fr } @@ -157,3 +160,10 @@ func newCEGP(params *policyParams) (*v2.CiliumEgressGatewayPolicy, *PolicyConfig return cegp, policy } + +func addEndpoint(tb testing.TB, endpoints fakeResource[*k8sTypes.CiliumEndpoint], ep *k8sTypes.CiliumEndpoint) { + endpoints.process(tb, resource.Event[*k8sTypes.CiliumEndpoint]{ + Kind: resource.Upsert, + Object: ep, + }) +} diff --git a/pkg/egressgateway/manager.go b/pkg/egressgateway/manager.go index 5f72c9be4d743..ac84f8537241c 100644 --- a/pkg/egressgateway/manager.go +++ b/pkg/egressgateway/manager.go @@ -28,7 +28,6 @@ import ( "github.com/cilium/cilium/pkg/hive/cell" "github.com/cilium/cilium/pkg/identity" identityCache "github.com/cilium/cilium/pkg/identity/cache" - "github.com/cilium/cilium/pkg/k8s" cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" "github.com/cilium/cilium/pkg/k8s/resource" k8sTypes "github.com/cilium/cilium/pkg/k8s/types" @@ -116,6 +115,9 @@ type Manager struct { // nodesResource allows reading node CRD from k8s. ciliumNodes resource.Resource[*cilium_api_v2.CiliumNode] + // endpoints allows reading endpoint CRD from k8s. + endpoints resource.Resource[*k8sTypes.CiliumEndpoint] + // policyConfigs stores policy configs indexed by policyID policyConfigs map[policyID]*PolicyConfig @@ -126,21 +128,6 @@ type Manager struct { // epDataStore stores endpointId to endpoint metadata mapping epDataStore map[endpointID]*endpointMetadata - // pendingEndpointEvents stores the k8s CiliumEndpoint add/update events - // which still need to be processed by the manager, either because we - // just received the event, or because the processing failed due to the - // manager being unable to resolve the endpoint identity to a set of - // labels - pendingEndpointEvents map[endpointID]*k8sTypes.CiliumEndpoint - - // pendingEndpointEventsLock protects the access to the - // pendingEndpointEvents map - pendingEndpointEventsLock lock.RWMutex - - // endpointEventsQueue is a workqueue of CiliumEndpoint IDs that need to - // be processed by the manager - endpointEventsQueue workqueue.RateLimitingInterface - // identityAllocator is used to fetch identity labels for endpoint updates identityAllocator identityCache.IdentityAllocator @@ -176,11 +163,11 @@ type Params struct { Config Config DaemonConfig *option.DaemonConfig - CacheStatus k8s.CacheStatus IdentityAllocator identityCache.IdentityAllocator PolicyMap egressmap.PolicyMap Policies resource.Resource[*Policy] Nodes resource.Resource[*cilium_api_v2.CiliumNode] + Endpoints resource.Resource[*k8sTypes.CiliumEndpoint] Lifecycle hive.Lifecycle } @@ -238,26 +225,18 @@ func NewEgressGatewayManager(p Params) (out struct { } func newEgressGatewayManager(p Params) (*Manager, error) { - // here we try to mimic the same exponential backoff retry logic used by - // the identity allocator, where the minimum retry timeout is set to 20 - // milliseconds and the max number of attempts is 16 (so 20ms * 2^16 == - // ~20 minutes) - rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond*20, time.Minute*20) - endpointEventRetryQueue := workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{}) - manager := &Manager{ nodeDataStore: make(map[string]nodeTypes.Node), policyConfigs: make(map[policyID]*PolicyConfig), policyConfigsBySourceIP: make(map[string][]*PolicyConfig), epDataStore: make(map[endpointID]*endpointMetadata), - pendingEndpointEvents: make(map[endpointID]*k8sTypes.CiliumEndpoint), - endpointEventsQueue: endpointEventRetryQueue, identityAllocator: p.IdentityAllocator, installRoutes: p.Config.InstallEgressGatewayRoutes, reconciliationTriggerInterval: p.Config.EgressGatewayReconciliationTriggerInterval, policyMap: p.PolicyMap, policies: p.Policies, ciliumNodes: p.Nodes, + endpoints: p.Endpoints, } t, err := trigger.NewTrigger(trigger.Parameters{ @@ -288,8 +267,8 @@ func newEgressGatewayManager(p Params) (*Manager, error) { return fmt.Errorf("egress gateway needs kernel 5.2 or newer") } - go manager.processEvents(ctx, p.CacheStatus) - manager.processCiliumEndpoints(ctx, &wg) + go manager.processEvents(ctx) + return nil }, OnStop: func(hc hive.HookContext) error { @@ -337,10 +316,10 @@ func (manager *Manager) getIdentityLabels(securityIdentity uint32) (labels.Label // processEvents spawns a goroutine that waits for the agent to // sync with k8s and then runs the first reconciliation. -func (manager *Manager) processEvents(ctx context.Context, cacheStatus k8s.CacheStatus) { - var globalSync, policySync, nodeSync bool +func (manager *Manager) processEvents(ctx context.Context) { + var policySync, nodeSync, endpointSync bool maybeTriggerReconcile := func() { - if !globalSync || !policySync || !nodeSync { + if !policySync || !nodeSync || !endpointSync { return } @@ -356,18 +335,21 @@ func (manager *Manager) processEvents(ctx context.Context, cacheStatus k8s.Cache manager.reconciliationTrigger.TriggerWithReason("k8s sync done") } + // here we try to mimic the same exponential backoff retry logic used by + // the identity allocator, where the minimum retry timeout is set to 20 + // milliseconds and the max number of attempts is 16 (so 20ms * 2^16 == + // ~20 minutes) + endpointsRateLimit := workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond*20, time.Minute*20) + policyEvents := manager.policies.Events(ctx) nodeEvents := manager.ciliumNodes.Events(ctx) + endpointEvents := manager.endpoints.Events(ctx, resource.WithRateLimiter(endpointsRateLimit)) + for { select { case <-ctx.Done(): return - case <-cacheStatus: - globalSync = true - maybeTriggerReconcile() - cacheStatus = nil - case event := <-policyEvents: if event.Kind == resource.Sync { policySync = true @@ -385,6 +367,15 @@ func (manager *Manager) processEvents(ctx context.Context, cacheStatus k8s.Cache } else { manager.handleNodeEvent(event) } + + case event := <-endpointEvents: + if event.Kind == resource.Sync { + endpointSync = true + maybeTriggerReconcile() + event.Done(nil) + } else { + manager.handleEndpointEvent(event) + } } } } @@ -400,58 +391,6 @@ func (manager *Manager) handlePolicyEvent(event resource.Event[*Policy]) { } } -// processCiliumEndpoints spawns a goroutine that: -// - consumes the endpoint IDs returned by the endpointEventsQueue workqueue -// - processes the CiliumEndpoints stored in pendingEndpointEvents for these -// endpoint IDs -// - in case the endpoint ID -> labels resolution fails, it adds back the -// event to the workqueue so that it can be retried with an exponential -// backoff -func (manager *Manager) processCiliumEndpoints(ctx context.Context, wg *sync.WaitGroup) { - wg.Add(1) - - go func() { - defer wg.Done() - - retryQueue := manager.endpointEventsQueue - go func() { - <-ctx.Done() - retryQueue.ShutDown() - }() - - for { - item, shutdown := retryQueue.Get() - if shutdown { - break - } - endpointID := item.(types.NamespacedName) - - manager.pendingEndpointEventsLock.RLock() - ep, ok := manager.pendingEndpointEvents[endpointID] - manager.pendingEndpointEventsLock.RUnlock() - - var err error - if ok { - err = manager.addEndpoint(ep) - } else { - manager.deleteEndpoint(endpointID) - } - - if err != nil { - // if the endpoint event is still pending it means the manager - // failed to resolve the endpoint ID to a set of labels, so add back - // the item to the queue - manager.endpointEventsQueue.AddRateLimited(endpointID) - } else { - // otherwise just remove it - manager.endpointEventsQueue.Forget(endpointID) - } - - manager.endpointEventsQueue.Done(endpointID) - } - }() -} - // Event handlers // onAddEgressPolicy parses the given policy config, and updates internal state @@ -560,32 +499,20 @@ func (manager *Manager) deleteEndpoint(id types.NamespacedName) { manager.reconciliationTrigger.TriggerWithReason("endpoint deleted") } -// OnUpdateEndpoint is the event handler for endpoint additions and updates. -func (manager *Manager) OnUpdateEndpoint(endpoint *k8sTypes.CiliumEndpoint) { - id := types.NamespacedName{ - Name: endpoint.GetName(), - Namespace: endpoint.GetNamespace(), - } - - manager.pendingEndpointEventsLock.Lock() - manager.pendingEndpointEvents[id] = endpoint - manager.pendingEndpointEventsLock.Unlock() - - manager.endpointEventsQueue.Add(id) -} +func (manager *Manager) handleEndpointEvent(event resource.Event[*k8sTypes.CiliumEndpoint]) { + endpoint := event.Object -// OnDeleteEndpoint is the event handler for endpoint deletions. -func (manager *Manager) OnDeleteEndpoint(endpoint *k8sTypes.CiliumEndpoint) { id := types.NamespacedName{ Name: endpoint.GetName(), Namespace: endpoint.GetNamespace(), } - manager.pendingEndpointEventsLock.Lock() - delete(manager.pendingEndpointEvents, id) - manager.pendingEndpointEventsLock.Unlock() - - manager.endpointEventsQueue.Add(id) + if event.Kind == resource.Upsert { + event.Done(manager.addEndpoint(endpoint)) + } else { + manager.deleteEndpoint(id) + event.Done(nil) + } } // handleNodeEvent takes care of node upserts and removals. diff --git a/pkg/egressgateway/manager_privileged_test.go b/pkg/egressgateway/manager_privileged_test.go index 71ffda1bc6a5e..3e602abcbf2f0 100644 --- a/pkg/egressgateway/manager_privileged_test.go +++ b/pkg/egressgateway/manager_privileged_test.go @@ -20,7 +20,6 @@ import ( "github.com/cilium/cilium/pkg/hive" "github.com/cilium/cilium/pkg/hive/hivetest" "github.com/cilium/cilium/pkg/identity" - "github.com/cilium/cilium/pkg/k8s" cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" "github.com/cilium/cilium/pkg/k8s/resource" slimv1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1" @@ -105,10 +104,10 @@ type parsedEgressRule struct { // Hook up gocheck into the "go test" runner. type EgressGatewayTestSuite struct { - manager *Manager - policies fakeResource[*Policy] - nodes fakeResource[*cilium_api_v2.CiliumNode] - cacheStatus k8s.CacheStatus + manager *Manager + policies fakeResource[*Policy] + nodes fakeResource[*cilium_api_v2.CiliumNode] + endpoints fakeResource[*k8sTypes.CiliumEndpoint] } var _ = Suite(&EgressGatewayTestSuite{}) @@ -128,9 +127,9 @@ func (k *EgressGatewayTestSuite) SetUpSuite(c *C) { } func (k *EgressGatewayTestSuite) SetUpTest(c *C) { - k.cacheStatus = make(k8s.CacheStatus) k.policies = make(fakeResource[*Policy]) k.nodes = make(fakeResource[*cilium_api_v2.CiliumNode]) + k.endpoints = make(fakeResource[*k8sTypes.CiliumEndpoint]) lc := hivetest.Lifecycle(c) policyMap := egressmap.CreatePrivatePolicyMap(lc, egressmap.DefaultPolicyConfig) @@ -140,11 +139,11 @@ func (k *EgressGatewayTestSuite) SetUpTest(c *C) { Lifecycle: lc, Config: Config{true, 1 * time.Millisecond}, DaemonConfig: &option.DaemonConfig{}, - CacheStatus: k.cacheStatus, IdentityAllocator: identityAllocator, PolicyMap: policyMap, Policies: k.policies, Nodes: k.nodes, + Endpoints: k.endpoints, }) c.Assert(err, IsNil) c.Assert(k.manager, NotNil) @@ -229,9 +228,9 @@ func (k *EgressGatewayTestSuite) TestEgressGatewayManager(c *C) { egressGatewayManager := k.manager assertIPRules(c, []ipRule{}) - close(k.cacheStatus) k.policies.sync(c) k.nodes.sync(c) + k.endpoints.sync(c) reconciliationEventsCount := egressGatewayManager.reconciliationEventsCount.Load() @@ -266,7 +265,7 @@ func (k *EgressGatewayTestSuite) TestEgressGatewayManager(c *C) { // Add a new endpoint & ID which matches policy-1 ep1, id1 := newEndpointAndIdentity("ep-1", ep1IP, ep1Labels) - egressGatewayManager.OnUpdateEndpoint(&ep1) + addEndpoint(c, k.endpoints, &ep1) reconciliationEventsCount = waitForReconciliationRun(c, egressGatewayManager, reconciliationEventsCount) assertEgressRules(c, policyMap, []egressRule{ @@ -278,7 +277,7 @@ func (k *EgressGatewayTestSuite) TestEgressGatewayManager(c *C) { // Update the endpoint labels in order for it to not be a match id1 = updateEndpointAndIdentity(&ep1, id1, map[string]string{}) - egressGatewayManager.OnUpdateEndpoint(&ep1) + addEndpoint(c, k.endpoints, &ep1) reconciliationEventsCount = waitForReconciliationRun(c, egressGatewayManager, reconciliationEventsCount) assertEgressRules(c, policyMap, []egressRule{}) @@ -286,7 +285,7 @@ func (k *EgressGatewayTestSuite) TestEgressGatewayManager(c *C) { // Restore the old endpoint lables in order for it to be a match id1 = updateEndpointAndIdentity(&ep1, id1, ep1Labels) - egressGatewayManager.OnUpdateEndpoint(&ep1) + addEndpoint(c, k.endpoints, &ep1) reconciliationEventsCount = waitForReconciliationRun(c, egressGatewayManager, reconciliationEventsCount) assertEgressRules(c, policyMap, []egressRule{ @@ -333,7 +332,7 @@ func (k *EgressGatewayTestSuite) TestEgressGatewayManager(c *C) { // Add a new endpoint and ID which matches policy-2 ep2, _ := newEndpointAndIdentity("ep-2", ep2IP, ep2Labels) - egressGatewayManager.OnUpdateEndpoint(&ep2) + addEndpoint(c, k.endpoints, &ep2) reconciliationEventsCount = waitForReconciliationRun(c, egressGatewayManager, reconciliationEventsCount) assertEgressRules(c, policyMap, []egressRule{ @@ -543,7 +542,7 @@ func (k *EgressGatewayTestSuite) TestEgressGatewayManager(c *C) { // Update the endpoint labels in order for it to not be a match _ = updateEndpointAndIdentity(&ep1, id1, map[string]string{}) - egressGatewayManager.OnUpdateEndpoint(&ep1) + addEndpoint(c, k.endpoints, &ep1) waitForReconciliationRun(c, egressGatewayManager, reconciliationEventsCount) assertEgressRules(c, policyMap, []egressRule{ diff --git a/pkg/k8s/watchers/cilium_endpoint.go b/pkg/k8s/watchers/cilium_endpoint.go index 3721b1b227bd6..945fd34c116cb 100644 --- a/pkg/k8s/watchers/cilium_endpoint.go +++ b/pkg/k8s/watchers/cilium_endpoint.go @@ -221,10 +221,6 @@ func (k *K8sWatcher) endpointUpdated(oldEndpoint, endpoint *types.CiliumEndpoint } } } - - if k.egressGatewayManager != nil { - k.egressGatewayManager.OnUpdateEndpoint(endpoint) - } } func (k *K8sWatcher) endpointDeleted(endpoint *types.CiliumEndpoint) { @@ -249,9 +245,6 @@ func (k *K8sWatcher) endpointDeleted(endpoint *types.CiliumEndpoint) { k.policyManager.TriggerPolicyUpdates(true, "Named ports deleted") } } - if k.egressGatewayManager != nil { - k.egressGatewayManager.OnDeleteEndpoint(endpoint) - } } // CreateCiliumEndpointLocalPodIndexFunc returns an IndexFunc that indexes only local diff --git a/pkg/k8s/watchers/watcher.go b/pkg/k8s/watchers/watcher.go index 3bcf6ba898304..bb90c85704132 100644 --- a/pkg/k8s/watchers/watcher.go +++ b/pkg/k8s/watchers/watcher.go @@ -37,7 +37,6 @@ import ( k8smetrics "github.com/cilium/cilium/pkg/k8s/metrics" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" "github.com/cilium/cilium/pkg/k8s/synced" - k8sTypes "github.com/cilium/cilium/pkg/k8s/types" "github.com/cilium/cilium/pkg/k8s/utils" "github.com/cilium/cilium/pkg/k8s/watchers/resources" "github.com/cilium/cilium/pkg/k8s/watchers/subscriber" @@ -177,10 +176,6 @@ type bgpSpeakerManager interface { OnUpdateEndpoints(eps *k8s.Endpoints) error } -type EgressGatewayManager interface { - OnUpdateEndpoint(endpoint *k8sTypes.CiliumEndpoint) - OnDeleteEndpoint(endpoint *k8sTypes.CiliumEndpoint) -} type envoyConfigManager interface { UpsertEnvoyResources(context.Context, envoy.Resources, envoy.PortAllocator) error @@ -248,7 +243,6 @@ type K8sWatcher struct { svcManager svcManager redirectPolicyManager redirectPolicyManager bgpSpeakerManager bgpSpeakerManager - egressGatewayManager EgressGatewayManager ipcache ipcacheManager envoyConfigManager envoyConfigManager cgroupManager cgroupManager @@ -303,7 +297,6 @@ func NewK8sWatcher( datapath datapath.Datapath, redirectPolicyManager redirectPolicyManager, bgpSpeakerManager bgpSpeakerManager, - egressGatewayManager EgressGatewayManager, envoyConfigManager envoyConfigManager, cfg WatcherConfiguration, ipcache ipcacheManager, @@ -327,7 +320,6 @@ func NewK8sWatcher( datapath: datapath, redirectPolicyManager: redirectPolicyManager, bgpSpeakerManager: bgpSpeakerManager, - egressGatewayManager: egressGatewayManager, cgroupManager: cgroupManager, NodeChain: subscriber.NewNodeChain(), CiliumNodeChain: subscriber.NewCiliumNodeChain(), diff --git a/pkg/k8s/watchers/watcher_test.go b/pkg/k8s/watchers/watcher_test.go index d2844164f1be3..ececc1aa9a48a 100644 --- a/pkg/k8s/watchers/watcher_test.go +++ b/pkg/k8s/watchers/watcher_test.go @@ -196,7 +196,6 @@ func (s *K8sWatcherSuite) TestUpdateToServiceEndpointsGH9525(c *C) { nil, nil, nil, - nil, &fakeWatcherConfiguration{}, testipcache.NewMockIPCache(), nil, @@ -521,7 +520,6 @@ func (s *K8sWatcherSuite) Test_addK8sSVCs_ClusterIP(c *C) { nil, nil, nil, - nil, &fakeWatcherConfiguration{}, testipcache.NewMockIPCache(), nil, @@ -675,7 +673,6 @@ func (s *K8sWatcherSuite) TestChangeSVCPort(c *C) { nil, nil, nil, - nil, &fakeWatcherConfiguration{}, testipcache.NewMockIPCache(), nil, @@ -1158,7 +1155,6 @@ func (s *K8sWatcherSuite) Test_addK8sSVCs_NodePort(c *C) { nil, nil, nil, - nil, &fakeWatcherConfiguration{}, testipcache.NewMockIPCache(), nil, @@ -1475,7 +1471,6 @@ func (s *K8sWatcherSuite) Test_addK8sSVCs_GH9576_1(c *C) { nil, nil, nil, - nil, &fakeWatcherConfiguration{}, testipcache.NewMockIPCache(), nil, @@ -1785,7 +1780,6 @@ func (s *K8sWatcherSuite) Test_addK8sSVCs_GH9576_2(c *C) { nil, nil, nil, - nil, &fakeWatcherConfiguration{}, testipcache.NewMockIPCache(), nil, @@ -2709,7 +2703,6 @@ func (s *K8sWatcherSuite) Test_addK8sSVCs_ExternalIPs(c *C) { nil, nil, nil, - nil, &fakeWatcherConfiguration{}, testipcache.NewMockIPCache(), nil,