From 4cac789b4b0f3b1d32e5d7d8ff08bf99ed21f4fa Mon Sep 17 00:00:00 2001 From: Amine Hilaly Date: Thu, 22 Dec 2022 03:57:28 +0000 Subject: [PATCH 1/4] Add support for resource-specific resync periods and default drift remediation period This commit introduces the ability to specify resource-specific resync periods in the drift remediation configuration, as well as a default drift remediation period in the controller configuration. The resync period for each reconciler is determined by trying to retrieve it from the following sources, in this order: 1. A resource-specific period specified in the drift remediation configuration. 2. A resource-specific requeue on success period specified by the resource manager factory. 3. The default drift remediation period specified in the controller configuration. 4. The default resync period defined in the ACK runtime package. This allows users to customize the drift remediation behavior for different resources as needed, while still providing a fallback option for resources that do not have a specific period specified. Signed-off-by: Amine Hilaly --- pkg/config/config.go | 133 +++++++++++++++++++------ pkg/config/config_test.go | 61 ++++++++++++ pkg/runtime/reconciler.go | 97 ++++++++++++------ pkg/runtime/reconciler_test.go | 1 + pkg/runtime/service_controller_test.go | 1 + 5 files changed, 234 insertions(+), 59 deletions(-) create mode 100644 pkg/config/config_test.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 08a84034..62ddb1bc 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -17,6 +17,9 @@ import ( "errors" "fmt" "net/url" + "strconv" + "strings" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -32,20 +35,22 @@ import ( ) const ( - flagEnableLeaderElection = "enable-leader-election" - flagMetricAddr = "metrics-addr" - flagEnableDevLogging = "enable-development-logging" - flagAWSRegion = "aws-region" - flagAWSEndpointURL = "aws-endpoint-url" - flagAWSIdentityEndpointURL = "aws-identity-endpoint-url" - flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls" - flagLogLevel = "log-level" - flagResourceTags = "resource-tags" - flagWatchNamespace = "watch-namespace" - flagEnableWebhookServer = "enable-webhook-server" - flagWebhookServerAddr = "webhook-server-addr" - flagDeletionPolicy = "deletion-policy" - envVarAWSRegion = "AWS_REGION" + flagEnableLeaderElection = "enable-leader-election" + flagMetricAddr = "metrics-addr" + flagEnableDevLogging = "enable-development-logging" + flagAWSRegion = "aws-region" + flagAWSEndpointURL = "aws-endpoint-url" + flagAWSIdentityEndpointURL = "aws-identity-endpoint-url" + flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls" + flagLogLevel = "log-level" + flagResourceTags = "resource-tags" + flagWatchNamespace = "watch-namespace" + flagEnableWebhookServer = "enable-webhook-server" + flagWebhookServerAddr = "webhook-server-addr" + flagDeletionPolicy = "deletion-policy" + flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds" + flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds" + envVarAWSRegion = "AWS_REGION" ) var ( @@ -63,20 +68,22 @@ var ( // Config contains configuration options for ACK service controllers type Config struct { - MetricsAddr string - EnableLeaderElection bool - EnableDevelopmentLogging bool - AccountID string - Region string - IdentityEndpointURL string - EndpointURL string - AllowUnsafeEndpointURL bool - LogLevel string - ResourceTags []string - WatchNamespace string - EnableWebhookServer bool - WebhookServerAddr string - DeletionPolicy ackv1alpha1.DeletionPolicy + MetricsAddr string + EnableLeaderElection bool + EnableDevelopmentLogging bool + AccountID string + Region string + IdentityEndpointURL string + EndpointURL string + AllowUnsafeEndpointURL bool + LogLevel string + ResourceTags []string + WatchNamespace string + EnableWebhookServer bool + WebhookServerAddr string + DeletionPolicy ackv1alpha1.DeletionPolicy + ReconcileDefaultResyncSeconds int + ReconcileResourceResyncSeconds []string } // BindFlags defines CLI/runtime configuration options @@ -152,6 +159,19 @@ func (cfg *Config) BindFlags() { &cfg.DeletionPolicy, flagDeletionPolicy, "The default deletion policy for all resources managed by the controller", ) + flag.IntVar( + &cfg.ReconcileDefaultResyncSeconds, flagReconcileDefaultResyncSeconds, + 60, + "The default duration, in seconds, to wait before resyncing desired state of custom resources. "+ + "This value is used if no resource-specific override has been specified. Default is 60 seconds.", + ) + flag.StringArrayVar( + &cfg.ReconcileResourceResyncSeconds, flagReconcileResourceResyncSeconds, + []string{}, + "A Key/Value list of strings representing the reconcile resync configuration for each resource. This"+ + " configuration maps resource kinds to drift remediation periods in seconds. If provided, "+ + " resource-specific resync periods take precedence over the default period.", + ) } // SetupLogger initializes the logger used in the service controller @@ -233,6 +253,16 @@ func (cfg *Config) Validate() error { if cfg.DeletionPolicy == "" { cfg.DeletionPolicy = ackv1alpha1.DeletionPolicyDelete } + + if cfg.ReconcileDefaultResyncSeconds < 0 { + return fmt.Errorf("invalid value for flag '%s': resync seconds default must be greater than 0", flagReconcileDefaultResyncSeconds) + } + + _, err := cfg.ParseReconcileResourceResyncSeconds() + if err != nil { + return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceResyncSeconds, err) + } + return nil } @@ -244,3 +274,50 @@ func (cfg *Config) checkUnsafeEndpoint(endpoint *url.URL) error { } return nil } + +// ParseReconcileResourceResyncSeconds parses the values of the --reconcile-resource-resync-seconds +// flag and returns a map that maps resource names to resync periods. +// The flag arguments are expected to have the format "resource=seconds", where "resource" is the +// name of the resource and "seconds" is the number of seconds that the reconciler should wait before +// reconciling the resource again. +func (cfg *Config) ParseReconcileResourceResyncSeconds() (map[string]time.Duration, error) { + resourceResyncPeriods := make(map[string]time.Duration, len(cfg.ReconcileResourceResyncSeconds)) + for _, resourceResyncSecondsFlag := range cfg.ReconcileResourceResyncSeconds { + // Parse the resource name and resync period from the flag argument + resourceName, resyncSeconds, err := parseReconcileFlagArgument(resourceResyncSecondsFlag) + if err != nil { + return nil, fmt.Errorf("error parsing flag argument '%v': %v. Expected format: resource=seconds", resourceResyncSecondsFlag, err) + } + resourceResyncPeriods[strings.ToLower(resourceName)] = time.Duration(resyncSeconds) + } + return resourceResyncPeriods, nil +} + +// parseReconcileFlagArgument parses a flag argument of the form "key=value" into +// its individual elements. The key must be a non-empty string and the value must be +// a non-empty positive integer. If the flag argument is not in the expected format +// or has invalid elements, an error is returned. +// +// The function returns the parsed key and value as separate elements. +func parseReconcileFlagArgument(flagArgument string) (string, int, error) { + delimiter := "=" + elements := strings.Split(flagArgument, delimiter) + if len(elements) != 2 { + return "", 0, fmt.Errorf("invalid flag argument format: expected key=value") + } + if elements[0] == "" { + return "", 0, fmt.Errorf("missing key in flag argument") + } + if elements[1] == "" { + return "", 0, fmt.Errorf("missing value in flag argument") + } + + resyncSeconds, err := strconv.Atoi(elements[1]) + if err != nil { + return "", 0, fmt.Errorf("invalid value in flag argument: %v", err) + } + if resyncSeconds < 0 { + return "", 0, fmt.Errorf("invalid value in flag argument: expected non-negative integer, got %d", resyncSeconds) + } + return elements[0], resyncSeconds, nil +} diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go new file mode 100644 index 00000000..852f558c --- /dev/null +++ b/pkg/config/config_test.go @@ -0,0 +1,61 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package config + +import "testing" + +func TestParseReconcileFlagArgument(t *testing.T) { + tests := []struct { + flagArgument string + expectedKey string + expectedVal int + expectedErr bool + expectedErrMsg string + }{ + // Test valid flag arguments + {"key=1", "key", 1, false, ""}, + {"key=123456", "key", 123456, false, ""}, + {"key=600", "key", 600, false, ""}, + {"k=1", "k", 1, false, ""}, + {"ke_y=123456", "ke_y", 123456, false, ""}, + + // Test invalid flag arguments + {"key", "", 0, true, "invalid flag argument format: expected key=value"}, + {"key=", "", 0, true, "missing value in flag argument"}, + {"=value", "", 0, true, "missing key in flag argument"}, + {"key=value1=value2", "", 0, true, "invalid flag argument format: expected key=value"}, + {"key=a", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"a\": invalid syntax"}, + {"key=-1", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -1"}, + {"key=-123456", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -123456"}, + {"key=1.1", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"1.1\": invalid syntax"}, + } + for _, test := range tests { + key, val, err := parseReconcileFlagArgument(test.flagArgument) + if err != nil && !test.expectedErr { + t.Errorf("unexpected error for flag argument '%s': %v", test.flagArgument, err) + } + if err == nil && test.expectedErr { + t.Errorf("expected error for flag argument '%s', got nil", test.flagArgument) + } + if err != nil && err.Error() != test.expectedErrMsg { + t.Errorf("unexpected error message for flag argument '%s': expected '%s', got '%v'", test.flagArgument, test.expectedErrMsg, err) + } + if key != test.expectedKey { + t.Errorf("unexpected key for flag argument '%s': expected '%s', got '%s'", test.flagArgument, test.expectedKey, key) + } + if val != test.expectedVal { + t.Errorf("unexpected value for flag argument '%s': expected %d, got %d", test.flagArgument, test.expectedVal, val) + } + } +} diff --git a/pkg/runtime/reconciler.go b/pkg/runtime/reconciler.go index 1d5d916b..aa3ca3cd 100644 --- a/pkg/runtime/reconciler.go +++ b/pkg/runtime/reconciler.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" backoff "github.com/cenkalti/backoff/v4" @@ -47,7 +48,7 @@ const ( // The default duration to trigger the sync for an ACK resource after // the successful reconciliation. This behavior for a resource can be // overriden by RequeueOnSuccessSeconds configuration for that resource. - resyncPeriod = 10 * time.Hour + defaultResyncPeriod = 10 * time.Hour ) // reconciler describes a generic reconciler within ACK. @@ -70,8 +71,9 @@ type reconciler struct { // object)s and sharing watch and informer queues across those controllers. type resourceReconciler struct { reconciler - rmf acktypes.AWSResourceManagerFactory - rd acktypes.AWSResourceDescriptor + rmf acktypes.AWSResourceManagerFactory + rd acktypes.AWSResourceDescriptor + resyncPeriod time.Duration } // GroupKind returns the string containing the API group and kind reconciled by @@ -887,31 +889,8 @@ func (r *resourceReconciler) handleRequeues( } // The code below only executes for "ConditionTypeResourceSynced" if condition.Status == corev1.ConditionTrue { - if duration := r.rmf.RequeueOnSuccessSeconds(); duration > 0 { - rlog.Debug( - "requeueing resource after resource synced condition true", - ) - return latest, requeue.NeededAfter(nil, time.Duration(duration)*time.Second) - } - // Since RequeueOnSuccessSeconds <= 0, requeue the resource - // with "resyncPeriod" to perform periodic drift detection and - // sync the desired state. - // - // Upstream controller-runtime provides SyncPeriod functionality - // which flushes the go-client cache and triggers Sync for all - // the objects in cache every 10 hours by default. - // - // ACK controller use non-cached client to read objects - // from API Server, hence controller-runtime's SyncPeriod - // functionality does not work. - // https://github.com/aws-controllers-k8s/community/issues/1355 - // - // ACK controllers use api-reader(non-cached client) to avoid - // reading stale copies of ACK resource that can cause - // duplicate resource creation when resource identifier is - // not present in stale copy of resource. - // https://github.com/aws-controllers-k8s/community/issues/894#issuecomment-911876354 - return latest, requeue.NeededAfter(nil, resyncPeriod) + rlog.Debug("requeuing", "after", r.resyncPeriod) + return latest, requeue.NeededAfter(nil, r.resyncPeriod) } else { rlog.Debug( "requeueing resource after finding resource synced condition false", @@ -1103,6 +1082,55 @@ func (r *resourceReconciler) getEndpointURL( return r.cfg.EndpointURL } +// getResyncPeriod returns the period of the recurring reconciler process which ensures the desired +// state of custom resources is maintained. +// It attempts to retrieve the duration from the following sources, in this order: +// 1. A resource-specific reconciliation resync period specified in the reconciliation resync +// configuration map. +// 2. A resource-specific requeue on success period specified by the resource manager factory. +// The resource manager factory is controller-specific, and thus this period is to specified +// by controller authors. +// 3. The default reconciliation resync period period specified in the controller binary flags. +// 4. The default resync period defined in the ACK runtime package. Defined in defaultResyncPeriod +// within the same file +// +// Each reconciler has a unique value to use. This function should only be called during the +// instantiation of an AWSResourceReconciler and should not be called during the reconciliation +// function r.Sync +func getResyncPeriod(rmf acktypes.AWSResourceManagerFactory, cfg ackcfg.Config) time.Duration { + // The reconciliation resync period configuration has already been validated as + // a clean map. Therefore, we can safely ignore any errors that may occur while + // parsing it and avoid changing the signature of NewReconcilerWithClient. + drc, _ := cfg.ParseReconcileResourceResyncSeconds() + + // First, try to use a resource-specific resync period if provided in the resource + // resync period configuration. + resourceKind := rmf.ResourceDescriptor().GroupKind().Kind + if duration, ok := drc[strings.ToLower(resourceKind)]; ok && duration > 0 { + return time.Duration(duration) * time.Second + } + + // Second, try to use a resource-specific requeue on success period specified by the + // resource manager factory. This value is set during the code generation of the + // controller and takes precedence over the default resync period period because + // it allows existing controllers that rely on this value to maintain their intended + // behavior. + if duration := rmf.RequeueOnSuccessSeconds(); duration > 0 { + return time.Duration(duration) * time.Second + } + + // Third, try to use the default resync period resync period specified during controller + // start-up. + if cfg.ReconcileDefaultResyncSeconds > 0 { + return time.Duration(cfg.ReconcileDefaultResyncSeconds) * time.Second + } + + // If none of the above values are present or valid, use the default resync period + // defined in the ACK runtime package. Defined in `defaultResyncPeriod` within the + // same file + return defaultResyncPeriod +} + // NewReconciler returns a new reconciler object func NewReconciler( sc acktypes.ServiceController, @@ -1126,16 +1154,23 @@ func NewReconcilerWithClient( metrics *ackmetrics.Metrics, cache ackrtcache.Caches, ) acktypes.AWSResourceReconciler { + rtLog := log.WithName("ackrt") + resyncPeriod := getResyncPeriod(rmf, cfg) + rtLog.V(1).Info("Initiating reconciler", + "reconciler kind", rmf.ResourceDescriptor().GroupKind().Kind, + "resync period seconds", resyncPeriod.Seconds(), + ) return &resourceReconciler{ reconciler: reconciler{ sc: sc, kc: kc, - log: log.WithName("ackrt"), + log: rtLog, cfg: cfg, metrics: metrics, cache: cache, }, - rmf: rmf, - rd: rmf.ResourceDescriptor(), + rmf: rmf, + rd: rmf.ResourceDescriptor(), + resyncPeriod: resyncPeriod, } } diff --git a/pkg/runtime/reconciler_test.go b/pkg/runtime/reconciler_test.go index 9eeac515..53ea0c85 100644 --- a/pkg/runtime/reconciler_test.go +++ b/pkg/runtime/reconciler_test.go @@ -136,6 +136,7 @@ func managerFactoryMocks( rmf := &ackmocks.AWSResourceManagerFactory{} rmf.On("ResourceDescriptor").Return(rd) + rmf.On("RequeueOnSuccessSeconds").Return(0) reg := ackrt.NewRegistry() reg.RegisterResourceManagerFactory(rmf) diff --git a/pkg/runtime/service_controller_test.go b/pkg/runtime/service_controller_test.go index 667d45ce..e2bc6727 100644 --- a/pkg/runtime/service_controller_test.go +++ b/pkg/runtime/service_controller_test.go @@ -143,6 +143,7 @@ func TestServiceController(t *testing.T) { rmf := &mocks.AWSResourceManagerFactory{} rmf.On("ResourceDescriptor").Return(rd) + rmf.On("RequeueOnSuccessSeconds").Return(0) reg := ackrt.NewRegistry() reg.RegisterResourceManagerFactory(rmf) From 6656fa451a03eb4ab4920e7352a03a988fe69e34 Mon Sep 17 00:00:00 2001 From: Amine Hilaly Date: Fri, 3 Feb 2023 18:14:18 +0100 Subject: [PATCH 2/4] Default reconcile-default-resync-seconds to 0 Signed-off-by: Amine Hilaly --- pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 62ddb1bc..fb091d77 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -161,7 +161,7 @@ func (cfg *Config) BindFlags() { ) flag.IntVar( &cfg.ReconcileDefaultResyncSeconds, flagReconcileDefaultResyncSeconds, - 60, + 0, "The default duration, in seconds, to wait before resyncing desired state of custom resources. "+ "This value is used if no resource-specific override has been specified. Default is 60 seconds.", ) From e21c6cd0eccdd65a6029e121596a1e8a499142ec Mon Sep 17 00:00:00 2001 From: Amine Hilaly Date: Fri, 3 Feb 2023 18:18:54 +0100 Subject: [PATCH 3/4] Clarify more the source of the resyncPeriod in getResyncPeriod function comment Signed-off-by: Amine Hilaly --- pkg/runtime/reconciler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/runtime/reconciler.go b/pkg/runtime/reconciler.go index aa3ca3cd..262fc1b7 100644 --- a/pkg/runtime/reconciler.go +++ b/pkg/runtime/reconciler.go @@ -1086,11 +1086,12 @@ func (r *resourceReconciler) getEndpointURL( // state of custom resources is maintained. // It attempts to retrieve the duration from the following sources, in this order: // 1. A resource-specific reconciliation resync period specified in the reconciliation resync -// configuration map. +// configuration map (--reconcile-default-resync-seconds). // 2. A resource-specific requeue on success period specified by the resource manager factory. // The resource manager factory is controller-specific, and thus this period is to specified -// by controller authors. +// by controller authors (using ack-generate). // 3. The default reconciliation resync period period specified in the controller binary flags. +// (--reconcile-resource-resync-seconds) // 4. The default resync period defined in the ACK runtime package. Defined in defaultResyncPeriod // within the same file // From 27b87dbe1527b3a1a91836876f0ca3caa23295a5 Mon Sep 17 00:00:00 2001 From: Amine Date: Fri, 3 Feb 2023 20:44:25 +0100 Subject: [PATCH 4/4] Change flag description default to 10 hours Signed-off-by: Amine Hilaly --- pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index fb091d77..2cfe3ebb 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -163,7 +163,7 @@ func (cfg *Config) BindFlags() { &cfg.ReconcileDefaultResyncSeconds, flagReconcileDefaultResyncSeconds, 0, "The default duration, in seconds, to wait before resyncing desired state of custom resources. "+ - "This value is used if no resource-specific override has been specified. Default is 60 seconds.", + "This value is used if no resource-specific override has been specified. Default is 10 hours.", ) flag.StringArrayVar( &cfg.ReconcileResourceResyncSeconds, flagReconcileResourceResyncSeconds,