Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 105 additions & 28 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -32,20 +35,22 @@ import (
)

const (
flagEnableLeaderElection = "enable-leader-election"
flagMetricAddr = "metrics-addr"
flagEnableDevLogging = "enable-development-logging"
flagAWSRegion = "aws-region"
flagAWSEndpointURL = "aws-endpoint-url"
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
flagLogLevel = "log-level"
flagResourceTags = "resource-tags"
flagWatchNamespace = "watch-namespace"
flagEnableWebhookServer = "enable-webhook-server"
flagWebhookServerAddr = "webhook-server-addr"
flagDeletionPolicy = "deletion-policy"
envVarAWSRegion = "AWS_REGION"
flagEnableLeaderElection = "enable-leader-election"
flagMetricAddr = "metrics-addr"
flagEnableDevLogging = "enable-development-logging"
flagAWSRegion = "aws-region"
flagAWSEndpointURL = "aws-endpoint-url"
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
flagLogLevel = "log-level"
flagResourceTags = "resource-tags"
flagWatchNamespace = "watch-namespace"
flagEnableWebhookServer = "enable-webhook-server"
flagWebhookServerAddr = "webhook-server-addr"
flagDeletionPolicy = "deletion-policy"
flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds"
flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds"
envVarAWSRegion = "AWS_REGION"
)

var (
Expand All @@ -63,20 +68,22 @@ var (

// Config contains configuration options for ACK service controllers
type Config struct {
MetricsAddr string
EnableLeaderElection bool
EnableDevelopmentLogging bool
AccountID string
Region string
IdentityEndpointURL string
EndpointURL string
AllowUnsafeEndpointURL bool
LogLevel string
ResourceTags []string
WatchNamespace string
EnableWebhookServer bool
WebhookServerAddr string
DeletionPolicy ackv1alpha1.DeletionPolicy
MetricsAddr string
EnableLeaderElection bool
EnableDevelopmentLogging bool
AccountID string
Region string
IdentityEndpointURL string
EndpointURL string
AllowUnsafeEndpointURL bool
LogLevel string
ResourceTags []string
WatchNamespace string
EnableWebhookServer bool
WebhookServerAddr string
DeletionPolicy ackv1alpha1.DeletionPolicy
ReconcileDefaultResyncSeconds int
ReconcileResourceResyncSeconds []string
}

// BindFlags defines CLI/runtime configuration options
Expand Down Expand Up @@ -152,6 +159,19 @@ func (cfg *Config) BindFlags() {
&cfg.DeletionPolicy, flagDeletionPolicy,
"The default deletion policy for all resources managed by the controller",
)
flag.IntVar(
&cfg.ReconcileDefaultResyncSeconds, flagReconcileDefaultResyncSeconds,
0,
"The default duration, in seconds, to wait before resyncing desired state of custom resources. "+
"This value is used if no resource-specific override has been specified. Default is 10 hours.",
)
flag.StringArrayVar(
&cfg.ReconcileResourceResyncSeconds, flagReconcileResourceResyncSeconds,
[]string{},
"A Key/Value list of strings representing the reconcile resync configuration for each resource. This"+
" configuration maps resource kinds to drift remediation periods in seconds. If provided, "+
" resource-specific resync periods take precedence over the default period.",
)
}

// SetupLogger initializes the logger used in the service controller
Expand Down Expand Up @@ -233,6 +253,16 @@ func (cfg *Config) Validate() error {
if cfg.DeletionPolicy == "" {
cfg.DeletionPolicy = ackv1alpha1.DeletionPolicyDelete
}

if cfg.ReconcileDefaultResyncSeconds < 0 {
return fmt.Errorf("invalid value for flag '%s': resync seconds default must be greater than 0", flagReconcileDefaultResyncSeconds)
}

_, err := cfg.ParseReconcileResourceResyncSeconds()
if err != nil {
return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceResyncSeconds, err)
}

return nil
}

Expand All @@ -244,3 +274,50 @@ func (cfg *Config) checkUnsafeEndpoint(endpoint *url.URL) error {
}
return nil
}

// ParseReconcileResourceResyncSeconds parses the values of the --reconcile-resource-resync-seconds
// flag and returns a map that maps resource names to resync periods.
// The flag arguments are expected to have the format "resource=seconds", where "resource" is the
// name of the resource and "seconds" is the number of seconds that the reconciler should wait before
// reconciling the resource again.
func (cfg *Config) ParseReconcileResourceResyncSeconds() (map[string]time.Duration, error) {
resourceResyncPeriods := make(map[string]time.Duration, len(cfg.ReconcileResourceResyncSeconds))
for _, resourceResyncSecondsFlag := range cfg.ReconcileResourceResyncSeconds {
// Parse the resource name and resync period from the flag argument
resourceName, resyncSeconds, err := parseReconcileFlagArgument(resourceResyncSecondsFlag)
if err != nil {
return nil, fmt.Errorf("error parsing flag argument '%v': %v. Expected format: resource=seconds", resourceResyncSecondsFlag, err)
}
resourceResyncPeriods[strings.ToLower(resourceName)] = time.Duration(resyncSeconds)
}
return resourceResyncPeriods, nil
}

// parseReconcileFlagArgument parses a flag argument of the form "key=value" into
// its individual elements. The key must be a non-empty string and the value must be
// a non-empty positive integer. If the flag argument is not in the expected format
// or has invalid elements, an error is returned.
//
// The function returns the parsed key and value as separate elements.
func parseReconcileFlagArgument(flagArgument string) (string, int, error) {
delimiter := "="
elements := strings.Split(flagArgument, delimiter)
if len(elements) != 2 {
return "", 0, fmt.Errorf("invalid flag argument format: expected key=value")
}
if elements[0] == "" {
return "", 0, fmt.Errorf("missing key in flag argument")
}
if elements[1] == "" {
return "", 0, fmt.Errorf("missing value in flag argument")
}

resyncSeconds, err := strconv.Atoi(elements[1])
if err != nil {
return "", 0, fmt.Errorf("invalid value in flag argument: %v", err)
}
if resyncSeconds < 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think resyncSeconds <= 0 should be the check. If there are 0 seconds of reconciliation, it'll be constantly reconciled with no exponential backoff. Maybe we can suggest at least 1 second of wait?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked here, if a used provides 0 seconds, the controller will use the package default value which is 10hours

return "", 0, fmt.Errorf("invalid value in flag argument: expected non-negative integer, got %d", resyncSeconds)
}
return elements[0], resyncSeconds, nil
}
61 changes: 61 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package config

import "testing"

func TestParseReconcileFlagArgument(t *testing.T) {
tests := []struct {
flagArgument string
expectedKey string
expectedVal int
expectedErr bool
expectedErrMsg string
}{
// Test valid flag arguments
{"key=1", "key", 1, false, ""},
{"key=123456", "key", 123456, false, ""},
{"key=600", "key", 600, false, ""},
{"k=1", "k", 1, false, ""},
{"ke_y=123456", "ke_y", 123456, false, ""},

// Test invalid flag arguments
{"key", "", 0, true, "invalid flag argument format: expected key=value"},
{"key=", "", 0, true, "missing value in flag argument"},
{"=value", "", 0, true, "missing key in flag argument"},
{"key=value1=value2", "", 0, true, "invalid flag argument format: expected key=value"},
{"key=a", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"a\": invalid syntax"},
{"key=-1", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -1"},
{"key=-123456", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -123456"},
{"key=1.1", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"1.1\": invalid syntax"},
}
for _, test := range tests {
key, val, err := parseReconcileFlagArgument(test.flagArgument)
if err != nil && !test.expectedErr {
t.Errorf("unexpected error for flag argument '%s': %v", test.flagArgument, err)
}
if err == nil && test.expectedErr {
t.Errorf("expected error for flag argument '%s', got nil", test.flagArgument)
}
if err != nil && err.Error() != test.expectedErrMsg {
t.Errorf("unexpected error message for flag argument '%s': expected '%s', got '%v'", test.flagArgument, test.expectedErrMsg, err)
}
if key != test.expectedKey {
t.Errorf("unexpected key for flag argument '%s': expected '%s', got '%s'", test.flagArgument, test.expectedKey, key)
}
if val != test.expectedVal {
t.Errorf("unexpected value for flag argument '%s': expected %d, got %d", test.flagArgument, test.expectedVal, val)
}
}
}
98 changes: 67 additions & 31 deletions pkg/runtime/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

backoff "github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -47,7 +48,7 @@ const (
// The default duration to trigger the sync for an ACK resource after
// the successful reconciliation. This behavior for a resource can be
// overriden by RequeueOnSuccessSeconds configuration for that resource.
resyncPeriod = 10 * time.Hour
defaultResyncPeriod = 10 * time.Hour
)

// reconciler describes a generic reconciler within ACK.
Expand All @@ -70,8 +71,9 @@ type reconciler struct {
// object)s and sharing watch and informer queues across those controllers.
type resourceReconciler struct {
reconciler
rmf acktypes.AWSResourceManagerFactory
rd acktypes.AWSResourceDescriptor
rmf acktypes.AWSResourceManagerFactory
rd acktypes.AWSResourceDescriptor
resyncPeriod time.Duration
}

// GroupKind returns the string containing the API group and kind reconciled by
Expand Down Expand Up @@ -887,31 +889,8 @@ func (r *resourceReconciler) handleRequeues(
}
// The code below only executes for "ConditionTypeResourceSynced"
if condition.Status == corev1.ConditionTrue {
if duration := r.rmf.RequeueOnSuccessSeconds(); duration > 0 {
rlog.Debug(
"requeueing resource after resource synced condition true",
)
return latest, requeue.NeededAfter(nil, time.Duration(duration)*time.Second)
}
// Since RequeueOnSuccessSeconds <= 0, requeue the resource
// with "resyncPeriod" to perform periodic drift detection and
// sync the desired state.
//
// Upstream controller-runtime provides SyncPeriod functionality
// which flushes the go-client cache and triggers Sync for all
// the objects in cache every 10 hours by default.
//
// ACK controller use non-cached client to read objects
// from API Server, hence controller-runtime's SyncPeriod
// functionality does not work.
// https://github.com/aws-controllers-k8s/community/issues/1355
//
// ACK controllers use api-reader(non-cached client) to avoid
// reading stale copies of ACK resource that can cause
// duplicate resource creation when resource identifier is
// not present in stale copy of resource.
// https://github.com/aws-controllers-k8s/community/issues/894#issuecomment-911876354
return latest, requeue.NeededAfter(nil, resyncPeriod)
rlog.Debug("requeuing", "after", r.resyncPeriod)
return latest, requeue.NeededAfter(nil, r.resyncPeriod)
} else {
rlog.Debug(
"requeueing resource after finding resource synced condition false",
Expand Down Expand Up @@ -1103,6 +1082,56 @@ func (r *resourceReconciler) getEndpointURL(
return r.cfg.EndpointURL
}

// getResyncPeriod returns the period of the recurring reconciler process which ensures the desired
// state of custom resources is maintained.
// It attempts to retrieve the duration from the following sources, in this order:
// 1. A resource-specific reconciliation resync period specified in the reconciliation resync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a super fan of resource-level drift remediation control - but happy to hear what other folks think about it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why resync configuration map?

can we use annotation on the resource like other existing features e.g.

* https://aws-controllers-k8s.github.io/community/docs/user-docs/multi-region-resource-management/

* https://aws-controllers-k8s.github.io/community/docs/user-docs/deletion-policy/

@surajkota for both of those annotations, there is a corresponding controller CLI flag:

flag.StringVar(
&cfg.Region, flagAWSRegion,
envutil.WithDefault(envVarAWSRegion, ""),
"The AWS Region in which the service controller will create its resources",
)

flag.Var(
&cfg.DeletionPolicy, flagDeletionPolicy,
"The default deletion policy for all resources managed by the controller",
)

The CLI flags serve as defaults if the annotation is not present.

Copy link
Contributor

@surajkota surajkota Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack on the controller flag, my question is related to highest in order to precedence.

Annotation or configmap, both are giving control to the user to configure the resync period. Since we don't have a common configmap or a CRD to define all controller configurations, my preference is to keep the experience consistent and not introduce another place~~

Copy link
Contributor

@surajkota surajkota Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi folks, Nick helped me with clarification that resync configuration map != ConfigMap, so we can resolve this comment with one suggestion, rename the comment to say resource resync CLI flag or something along those lines

I do think even CLI flag is not a great option because changing it requires restarting the controller or reinstalling helm chart but thats a discussion for another time. This is good to get started

see my comment on No 3 here. We should drop it

// configuration map (--reconcile-default-resync-seconds).
// 2. A resource-specific requeue on success period specified by the resource manager factory.
// The resource manager factory is controller-specific, and thus this period is to specified
// by controller authors (using ack-generate).
// 3. The default reconciliation resync period period specified in the controller binary flags.
// (--reconcile-resource-resync-seconds)
// 4. The default resync period defined in the ACK runtime package. Defined in defaultResyncPeriod
// within the same file
//
// Each reconciler has a unique value to use. This function should only be called during the
// instantiation of an AWSResourceReconciler and should not be called during the reconciliation
// function r.Sync
func getResyncPeriod(rmf acktypes.AWSResourceManagerFactory, cfg ackcfg.Config) time.Duration {
// The reconciliation resync period configuration has already been validated as
// a clean map. Therefore, we can safely ignore any errors that may occur while
// parsing it and avoid changing the signature of NewReconcilerWithClient.
drc, _ := cfg.ParseReconcileResourceResyncSeconds()

// First, try to use a resource-specific resync period if provided in the resource
// resync period configuration.
resourceKind := rmf.ResourceDescriptor().GroupKind().Kind
if duration, ok := drc[strings.ToLower(resourceKind)]; ok && duration > 0 {
return time.Duration(duration) * time.Second
}

// Second, try to use a resource-specific requeue on success period specified by the
// resource manager factory. This value is set during the code generation of the
// controller and takes precedence over the default resync period period because
// it allows existing controllers that rely on this value to maintain their intended
// behavior.
if duration := rmf.RequeueOnSuccessSeconds(); duration > 0 {
return time.Duration(duration) * time.Second
}

// Third, try to use the default resync period resync period specified during controller
// start-up.
if cfg.ReconcileDefaultResyncSeconds > 0 {
return time.Duration(cfg.ReconcileDefaultResyncSeconds) * time.Second
}

// If none of the above values are present or valid, use the default resync period
// defined in the ACK runtime package. Defined in `defaultResyncPeriod` within the
// same file
return defaultResyncPeriod
}

// NewReconciler returns a new reconciler object
func NewReconciler(
sc acktypes.ServiceController,
Expand All @@ -1126,16 +1155,23 @@ func NewReconcilerWithClient(
metrics *ackmetrics.Metrics,
cache ackrtcache.Caches,
) acktypes.AWSResourceReconciler {
rtLog := log.WithName("ackrt")
resyncPeriod := getResyncPeriod(rmf, cfg)
rtLog.V(1).Info("Initiating reconciler",
"reconciler kind", rmf.ResourceDescriptor().GroupKind().Kind,
"resync period seconds", resyncPeriod.Seconds(),
)
return &resourceReconciler{
reconciler: reconciler{
sc: sc,
kc: kc,
log: log.WithName("ackrt"),
log: rtLog,
cfg: cfg,
metrics: metrics,
cache: cache,
},
rmf: rmf,
rd: rmf.ResourceDescriptor(),
rmf: rmf,
rd: rmf.ResourceDescriptor(),
resyncPeriod: resyncPeriod,
}
}
1 change: 1 addition & 0 deletions pkg/runtime/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func managerFactoryMocks(

rmf := &ackmocks.AWSResourceManagerFactory{}
rmf.On("ResourceDescriptor").Return(rd)
rmf.On("RequeueOnSuccessSeconds").Return(0)

reg := ackrt.NewRegistry()
reg.RegisterResourceManagerFactory(rmf)
Expand Down
1 change: 1 addition & 0 deletions pkg/runtime/service_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func TestServiceController(t *testing.T) {

rmf := &mocks.AWSResourceManagerFactory{}
rmf.On("ResourceDescriptor").Return(rd)
rmf.On("RequeueOnSuccessSeconds").Return(0)

reg := ackrt.NewRegistry()
reg.RegisterResourceManagerFactory(rmf)
Expand Down