Skip to content

Commit

Permalink
Merge pull request #2799 from alvaroaleman/workq
Browse files Browse the repository at this point in the history
⚠️ Add TypedReconciler
  • Loading branch information
k8s-ci-robot authored Jul 6, 2024
2 parents e28a842 + 64e0f0b commit 33446bc
Show file tree
Hide file tree
Showing 29 changed files with 954 additions and 567 deletions.
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ issues:
- linters:
- staticcheck
text: "SA1019: .*The component config package has been deprecated and will be removed in a future release."
- linters:
- staticcheck
# Will be addressed separately.
text: "SA1019: workqueue.(RateLimitingInterface|DefaultControllerRateLimiter|New|NewItemExponentialFailureRateLimiter|NewRateLimitingQueueWithConfig|DefaultItemBasedRateLimiter|RateLimitingQueueConfig) is deprecated:"
# With Go 1.16, the new embed directive can be used with an un-named import,
# revive (previously, golint) only allows these to be imported in a main.go, which wouldn't work for us.
# This directive allows the embed package to be imported with an underscore everywhere.
Expand Down
178 changes: 178 additions & 0 deletions examples/multiclustersync/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package main

import (
"context"
"fmt"
"os"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

const (
sourceNamespace = "namespace-to-sync-all-secrets-from"
targetNamespace = "namespace-to-sync-all-secrets-to"
)

func run() error {
log.SetLogger(zap.New())

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
return fmt.Errorf("failed to construct manager: %w", err)
}

allTargets := map[string]cluster.Cluster{}

cluster, err := cluster.New(ctrl.GetConfigOrDie())
if err != nil {
return fmt.Errorf("failed to construct clusters: %w", err)
}
if err := mgr.Add(cluster); err != nil {
return fmt.Errorf("failed to add cluster to manager: %w", err)
}

// Add more target clusters here as needed
allTargets["self"] = cluster

b := builder.TypedControllerManagedBy[request](mgr).
Named("secret-sync").
// Watch secrets in the source namespace of the source cluster and
// create requests for each target cluster
WatchesRawSource(source.TypedKind(
mgr.GetCache(),
&corev1.Secret{},
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, s *corev1.Secret) []request {
if s.Namespace != sourceNamespace {
return nil
}

result := make([]request, 0, len(allTargets))
for targetCluster := range allTargets {
result = append(result, request{
NamespacedName: types.NamespacedName{Namespace: s.Namespace, Name: s.Name},
clusterName: targetCluster,
})
}

return result
}),
)).
WithOptions(controller.TypedOptions[request]{MaxConcurrentReconciles: 10})

for targetClusterName, targetCluster := range allTargets {
// Watch secrets in the target namespace of each target cluster
// and create a request for itself.
b = b.WatchesRawSource(source.TypedKind(
targetCluster.GetCache(),
&corev1.Secret{},
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, s *corev1.Secret) []request {
if s.Namespace != targetNamespace {
return nil
}

return []request{{
NamespacedName: types.NamespacedName{Namespace: sourceNamespace, Name: s.Name},
clusterName: targetClusterName,
}}
}),
))
}

clients := make(map[string]client.Client, len(allTargets))
for targetClusterName, targetCluster := range allTargets {
clients[targetClusterName] = targetCluster.GetClient()
}

if err := b.Complete(&secretSyncReconcier{
source: mgr.GetClient(),
targets: clients,
}); err != nil {
return fmt.Errorf("failed to build reconciler: %w", err)
}

ctx := signals.SetupSignalHandler()
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}

return nil
}

type request struct {
types.NamespacedName
clusterName string
}

// secretSyncReconcier is a simple reconciler that keeps all secrets in the source namespace of a given
// source cluster in sync with the secrets in the target namespace of all target clusters.
type secretSyncReconcier struct {
source client.Client
targets map[string]client.Client
}

func (s *secretSyncReconcier) Reconcile(ctx context.Context, req request) (reconcile.Result, error) {
targetClient, found := s.targets[req.clusterName]
if !found {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("target cluster %s not found", req.clusterName))
}

var reference corev1.Secret
if err := s.source.Get(ctx, req.NamespacedName, &reference); err != nil {
if !apierrors.IsNotFound(err) {
return reconcile.Result{}, fmt.Errorf("failed to get secret %s from reference cluster: %w", req.String(), err)
}
if err := targetClient.Delete(ctx, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{
Name: req.Name,
Namespace: targetNamespace,
}}); err != nil {
if !apierrors.IsNotFound(err) {
return reconcile.Result{}, fmt.Errorf("failed to delete secret %s/%s in cluster %s: %w", targetNamespace, req.Name, req.clusterName, err)
}

return reconcile.Result{}, nil
}

log.FromContext(ctx).Info("Deleted secret", "cluster", req.clusterName, "namespace", targetNamespace, "name", req.Name)
return reconcile.Result{}, nil
}

target := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{
Name: reference.Name,
Namespace: targetNamespace,
}}
result, err := controllerutil.CreateOrUpdate(ctx, targetClient, target, func() error {
target.Data = reference.Data
return nil
})
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to upsert target secret %s/%s: %w", target.Namespace, target.Name, err)
}

if result != controllerutil.OperationResultNone {
log.FromContext(ctx).Info("Upserted secret", "cluster", req.clusterName, "namespace", targetNamespace, "name", req.Name, "result", result)
}

return reconcile.Result{}, nil
}
66 changes: 66 additions & 0 deletions examples/typed/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"fmt"
"os"

networkingv1 "k8s.io/api/networking/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

func run() error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
return fmt.Errorf("failed to construct manager: %w", err)
}

// Use a request type that is always equal to itself so the workqueue
// de-duplicates all events.
// This can for example be useful for an ingress-controller that
// generates a config from all ingresses, rather than individual ones.
type request struct{}

r := reconcile.TypedFunc[request](func(ctx context.Context, _ request) (reconcile.Result, error) {
ingressList := &networkingv1.IngressList{}
if err := mgr.GetClient().List(ctx, ingressList); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to list ingresses: %w", err)
}

buildIngressConfig(ingressList)
return reconcile.Result{}, nil
})
if err := builder.TypedControllerManagedBy[request](mgr).
WatchesRawSource(source.TypedKind(
mgr.GetCache(),
&networkingv1.Ingress{},
handler.TypedEnqueueRequestsFromMapFunc(func(context.Context, *networkingv1.Ingress) []request {
return []request{{}}
})),
).
Named("ingress_controller").
Complete(r); err != nil {
return fmt.Errorf("failed to construct ingress-controller: %w", err)
}

ctx := signals.SetupSignalHandler()
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}

return nil
}

func buildIngressConfig(*networkingv1.IngressList) {}
Loading

0 comments on commit 33446bc

Please sign in to comment.