Skip to content

Commit

Permalink
Attempt watch of sources after seeing an instance
Browse files Browse the repository at this point in the history
To be able to react to a source changing, we have to `Watch` the
source's kind; but since we don't know the kinds people will want to use
ahead of time, we have to start watching them after we've seen it being
used.

There's no point watching until we've actually seen an instance of the
kind (meaning there's something to watch, and that the type exists); so,
this commit rearranges things a little so that watches are called during
Reconcile, once the source has been fetched.

Signed-off-by: Michael Bridgen <[email protected]>
  • Loading branch information
squaremo committed Oct 25, 2022
1 parent bf8cdbc commit 2960a27
Showing 1 changed file with 52 additions and 45 deletions.
97 changes: 52 additions & 45 deletions pkg/controller/stack/stack_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func Add(mgr manager.Manager) error {
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
func newReconciler(mgr manager.Manager) *ReconcileStack {
return &ReconcileStack{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
Expand All @@ -114,7 +114,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
func add(mgr manager.Manager, r *ReconcileStack) error {
var err error
maxConcurrentReconciles := defaultMaxConcurrentReconciles
maxConcurrentReconcilesStr, set := os.LookupEnv("MAX_CONCURRENT_RECONCILES")
Expand Down Expand Up @@ -232,57 +232,53 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
// We can't watch a specific type (i.e., using source.Kind) here; what we have to do is wait
// until we see stacks that refer to particular kinds, then watch those. Technically this can
// "leak" watches -- we may end up watching kinds that are no longer mentioned in stacks. My
// assumption is that the number of distinct types that might be mentioned (including typos,
// which are likely to just fail to start) is low enough that this remains acceptably cheap.
// assumption is that the number of distinct types that might be mentioned (including typos) is
// low enough that this remains acceptably cheap.

// Keep track of types we've already watched, so we don't install more handlers than needed.
// Keep track of types we've already watched, so we don't install more than one handler for a
// type.
watched := make(map[schema.GroupVersionKind]struct{})
watchedMu := sync.Mutex{}

// this will install a watch for any new kinds it sees
maybeWatchFluxSourceKind := func(stack *pulumiv1.Stack) {
if src := stack.Spec.FluxSource; src != nil {
gvk, err := getSourceGVK(src.SourceRef)
if err != nil {
mgr.GetLogger().Error(err, "unable to parse .sourceRef.apiVersion in Flux source")
return
}
watchedMu.Lock()
_, ok := watched[gvk]
if !ok {
watched[gvk] = struct{}{}
}
watchedMu.Unlock()
if !ok {
// Using PartialObjectMetadata means we don't need the actual types, and it bypasses
var sourceKind metav1.PartialObjectMetadata
sourceKind.SetGroupVersionKind(gvk)
mgr.GetLogger().Info("installing watcher for newly seen source kind", "gvk", gvk)
if err := c.Watch(&source.Kind{Type: &sourceKind},
ctrlhandler.EnqueueRequestsFromMapFunc(
enqueueStacksForSourceFunc(fluxSourceIndexFieldName, func(obj client.Object) string {
gvk := obj.GetObjectKind().GroupVersionKind()
return fluxSourceKey(gvk, obj.GetName())
}))); err != nil {
mgr.GetLogger().Error(err, "failed to watch newly seen source kind", "gvk", gvk)
}
// Calling this will attempt to install a watch for the kind given in the source reference. It
// will return an error if there's something wrong with the source reference or if the watch
// could not be attempted otherwise. If the kind cannot be found then this will keep trying in
// the background until the context given to controller.Start is cancelled, rather than return
// an error.
r.maybeWatchFluxSourceKind = func(src shared.FluxSourceReference) error {
gvk, err := getSourceGVK(src)
if err != nil {
return err
}
watchedMu.Lock()
_, ok := watched[gvk]
if !ok {
watched[gvk] = struct{}{}
}
watchedMu.Unlock()
if !ok {
// Using PartialObjectMetadata means we don't need the actual types registered in the
// schema.
var sourceKind metav1.PartialObjectMetadata
sourceKind.SetGroupVersionKind(gvk)
mgr.GetLogger().Info("installing watcher for newly seen source kind", "GroupVersionKind", gvk)
if err := c.Watch(&source.Kind{Type: &sourceKind},
ctrlhandler.EnqueueRequestsFromMapFunc(
enqueueStacksForSourceFunc(fluxSourceIndexFieldName, func(obj client.Object) string {
gvk := obj.GetObjectKind().GroupVersionKind()
return fluxSourceKey(gvk, obj.GetName())
}))); err != nil {
watchedMu.Lock()
delete(watched, gvk)
watchedMu.Unlock()
mgr.GetLogger().Error(err, "failed to watch source kind", "GroupVersionKind", gvk)
return err
}
}
return nil
}

// scan every Stack that goes past to see if it has a new kind to watch
stackInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
stack := obj.(*pulumiv1.Stack)
maybeWatchFluxSourceKind(stack)
},
UpdateFunc: func(_oldObj, newObj interface{}) {
stack := newObj.(*pulumiv1.Stack)
maybeWatchFluxSourceKind(stack)
},
})

return err
return nil
}

// blank assignment to verify that ReconcileStack implements reconcile.Reconciler
Expand All @@ -295,6 +291,9 @@ type ReconcileStack struct {
client client.Client
scheme *runtime.Scheme
recorder record.EventRecorder

// this is initialised by add(), to be available to Reconcile
maybeWatchFluxSourceKind func(shared.FluxSourceReference) error
}

// StallError represents a problem that makes a Stack spec unprocessable, while otherwise being
Expand Down Expand Up @@ -468,6 +467,14 @@ func (r *ReconcileStack) Reconcile(ctx context.Context, request reconcile.Reques
return reconcile.Result{}, nil
}

// Watch this kind of source, if we haven't already.
if err := r.maybeWatchFluxSourceKind(fluxSource.SourceRef); err != nil {
reterr := fmt.Errorf("cannot process source reference: %w", err)
r.markStackFailed(sess, instance, reterr, "", "")
instance.Status.MarkStalledCondition(pulumiv1.StalledSpecInvalidReason, reterr.Error())
return reconcile.Result{}, nil
}

if err := checkFluxSourceReady(sourceObject); err != nil {
r.markStackFailed(sess, instance, err, "", "")
// This is marked as retrying, but we're really waiting until the source is ready, at
Expand Down

0 comments on commit 2960a27

Please sign in to comment.