Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠️ Add TypedReconciler #2799

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ issues:
- linters:
- staticcheck
text: "SA1019: .*The component config package has been deprecated and will be removed in a future release."
- linters:
- staticcheck
# Will be addressed separately.
text: "SA1019: workqueue.(RateLimitingInterface|DefaultControllerRateLimiter|New|NewItemExponentialFailureRateLimiter|NewRateLimitingQueueWithConfig|DefaultItemBasedRateLimiter|RateLimitingQueueConfig) is deprecated:"
# With Go 1.16, the new embed directive can be used with an un-named import,
# revive (previously, golint) only allows these to be imported in a main.go, which wouldn't work for us.
# This directive allows the embed package to be imported with an underscore everywhere.
Expand Down
178 changes: 178 additions & 0 deletions examples/multiclustersync/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package main

import (
"context"
"fmt"
"os"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

const (
sourceNamespace = "namespace-to-sync-all-secrets-from"
targetNamespace = "namespace-to-sync-all-secrets-to"
)

func run() error {
log.SetLogger(zap.New())

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
return fmt.Errorf("failed to construct manager: %w", err)
}

allTargets := map[string]cluster.Cluster{}

cluster, err := cluster.New(ctrl.GetConfigOrDie())
if err != nil {
return fmt.Errorf("failed to construct clusters: %w", err)
}
if err := mgr.Add(cluster); err != nil {
return fmt.Errorf("failed to add cluster to manager: %w", err)
}

// Add more target clusters here as needed
allTargets["self"] = cluster

b := builder.TypedControllerManagedBy[request](mgr).
Named("secret-sync").
// Watch secrets in the source namespace of the source cluster and
// create requests for each target cluster
WatchesRawSource(source.TypedKind(
mgr.GetCache(),
&corev1.Secret{},
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, s *corev1.Secret) []request {
if s.Namespace != sourceNamespace {
return nil
}

result := make([]request, 0, len(allTargets))
for targetCluster := range allTargets {
result = append(result, request{
NamespacedName: types.NamespacedName{Namespace: s.Namespace, Name: s.Name},
clusterName: targetCluster,
})
}

return result
}),
)).
WithOptions(controller.TypedOptions[request]{MaxConcurrentReconciles: 10})

for targetClusterName, targetCluster := range allTargets {
// Watch secrets in the target namespace of each target cluster
// and create a request for itself.
b = b.WatchesRawSource(source.TypedKind(
targetCluster.GetCache(),
&corev1.Secret{},
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, s *corev1.Secret) []request {
if s.Namespace != targetNamespace {
return nil
}

return []request{{
NamespacedName: types.NamespacedName{Namespace: sourceNamespace, Name: s.Name},
clusterName: targetClusterName,
}}
}),
))
}

clients := make(map[string]client.Client, len(allTargets))
for targetClusterName, targetCluster := range allTargets {
clients[targetClusterName] = targetCluster.GetClient()
}

if err := b.Complete(&secretSyncReconcier{
source: mgr.GetClient(),
targets: clients,
}); err != nil {
return fmt.Errorf("failed to build reconciler: %w", err)
}

ctx := signals.SetupSignalHandler()
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}

return nil
}

type request struct {
types.NamespacedName
clusterName string
}

// secretSyncReconcier is a simple reconciler that keeps all secrets in the source namespace of a given
// source cluster in sync with the secrets in the target namespace of all target clusters.
type secretSyncReconcier struct {
source client.Client
targets map[string]client.Client
}

func (s *secretSyncReconcier) Reconcile(ctx context.Context, req request) (reconcile.Result, error) {
targetClient, found := s.targets[req.clusterName]
if !found {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("target cluster %s not found", req.clusterName))
}

var reference corev1.Secret
if err := s.source.Get(ctx, req.NamespacedName, &reference); err != nil {
if !apierrors.IsNotFound(err) {
return reconcile.Result{}, fmt.Errorf("failed to get secret %s from reference cluster: %w", req.String(), err)
}
if err := targetClient.Delete(ctx, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{
Name: req.Name,
Namespace: targetNamespace,
}}); err != nil {
if !apierrors.IsNotFound(err) {
return reconcile.Result{}, fmt.Errorf("failed to delete secret %s/%s in cluster %s: %w", targetNamespace, req.Name, req.clusterName, err)
}

return reconcile.Result{}, nil
}

log.FromContext(ctx).Info("Deleted secret", "cluster", req.clusterName, "namespace", targetNamespace, "name", req.Name)
return reconcile.Result{}, nil
}

target := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{
Name: reference.Name,
Namespace: targetNamespace,
}}
result, err := controllerutil.CreateOrUpdate(ctx, targetClient, target, func() error {
target.Data = reference.Data
return nil
})
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to upsert target secret %s/%s: %w", target.Namespace, target.Name, err)
}

if result != controllerutil.OperationResultNone {
log.FromContext(ctx).Info("Upserted secret", "cluster", req.clusterName, "namespace", targetNamespace, "name", req.Name, "result", result)
}

return reconcile.Result{}, nil
}
66 changes: 66 additions & 0 deletions examples/typed/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions this can be a good use for multiple clusters. Could we add an example that expands on the current reconcile.Request to add Cluster information?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added one under examples/multiclustersync


import (
"context"
"fmt"
"os"

networkingv1 "k8s.io/api/networking/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

func run() error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
return fmt.Errorf("failed to construct manager: %w", err)
}

// Use a request type that is always equal to itself so the workqueue
// de-duplicates all events.
// This can for example be useful for an ingress-controller that
// generates a config from all ingresses, rather than individual ones.
type request struct{}

r := reconcile.TypedFunc[request](func(ctx context.Context, _ request) (reconcile.Result, error) {
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +34 to +36
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To expand on this a bit more can we add some reasoning on why this is better than say have a normal reconciler that just ignores the incoming request?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code comment a few lines above:

	// Use a request type that is always equal to itself so the workqueue
	// de-duplicates all events.
	// This can for example be useful for an ingress-controller that
	// generates a config from all ingresses, rather than individual ones.

ingressList := &networkingv1.IngressList{}
if err := mgr.GetClient().List(ctx, ingressList); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to list ingresses: %w", err)
}

buildIngressConfig(ingressList)
return reconcile.Result{}, nil
})
if err := builder.TypedControllerManagedBy[request](mgr).
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
WatchesRawSource(source.TypedKind(
mgr.GetCache(),
&networkingv1.Ingress{},
handler.TypedEnqueueRequestsFromMapFunc(func(context.Context, *networkingv1.Ingress) []request {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume here is one of the most important parts? The custom requests, outside of reconcile.Request can only be used with custom enqueuer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you need a custom event handler to be able to use a custom request type

return []request{{}}
})),
).
Named("ingress_controller").
Complete(r); err != nil {
return fmt.Errorf("failed to construct ingress-controller: %w", err)
}

ctx := signals.SetupSignalHandler()
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}

return nil
}

func buildIngressConfig(*networkingv1.IngressList) {}
Loading