Skip to content

Commit d760ea7

Browse files
committed
[WIP] feat: replace StatusPoller w/ StatusWatcher
- Add DynamicClusterReader that wraps DynamicClient. This is now used to look up generated resources (ex: Deployment > ReplicaSets > Pods). - Add BlindWatcher for testing and disabling for dry-run. - Add ObjectSetWatcher that wraps DynamicClient and handles building and starting informers for a set of predefined objects. - An informer is created for each unique GroupKind. - If the GroupKind does not yet exist, the informer is delayed until the CRD is applied. - If a CRD is deleted, the informer for that GroupKind is stopped. - Delay reporting pods that can't be scheduled for 15s. If an update is received in that 15s, reset or cancel the delay. - Add new DefaultStatusReader which uses a DelegatingStatusReader to wrap a list of conventional and specific StatusReaders. This should make it easier to extend the list of StatusReaders. - Fix event sorting to work correctly with sort.SliceStable. - Move some pending WaitEvents to be optional in testing, now that StatusWatcher can resolve their status before the WaitTask starts. - Add check to avoid redundant RESTMapper resets. The DynamicWatcher handles most resets now, so the WaitTask doesn't need to. BREAKING CHANGE: Replace StatusPoller w/ StatusWatcher BREAKING CHANGE: Remove PollInterval (obsolete with watcher)
1 parent 8aaf739 commit d760ea7

24 files changed

+1109
-204
lines changed

cmd/apply/cmdapply.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ func GetRunner(factory cmdutil.Factory, invFactory inventory.ClientFactory,
4545

4646
cmd.Flags().StringVar(&r.output, "output", printers.DefaultPrinter(),
4747
fmt.Sprintf("Output format, must be one of %s", strings.Join(printers.SupportedPrinters(), ",")))
48-
cmd.Flags().DurationVar(&r.period, "poll-period", 2*time.Second,
49-
"Polling period for resource statuses.")
5048
cmd.Flags().DurationVar(&r.reconcileTimeout, "reconcile-timeout", time.Duration(0),
5149
"Timeout threshold for waiting for all resources to reach the Current status.")
5250
cmd.Flags().BoolVar(&r.noPrune, "no-prune", r.noPrune,
@@ -81,7 +79,6 @@ type Runner struct {
8179

8280
serverSideOptions common.ServerSideOptions
8381
output string
84-
period time.Duration
8582
reconcileTimeout time.Duration
8683
noPrune bool
8784
prunePropagationPolicy string
@@ -156,7 +153,6 @@ func (r *Runner) RunE(cmd *cobra.Command, args []string) error {
156153

157154
ch := a.Run(ctx, inv, objs, apply.ApplierOptions{
158155
ServerSideOptions: r.serverSideOptions,
159-
PollInterval: r.period,
160156
ReconcileTimeout: r.reconcileTimeout,
161157
// If we are not waiting for status, tell the applier to not
162158
// emit the events.

pkg/apply/applier.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,16 @@ import (
2020
"sigs.k8s.io/cli-utils/pkg/apply/filter"
2121
"sigs.k8s.io/cli-utils/pkg/apply/info"
2222
"sigs.k8s.io/cli-utils/pkg/apply/mutator"
23-
"sigs.k8s.io/cli-utils/pkg/apply/poller"
2423
"sigs.k8s.io/cli-utils/pkg/apply/prune"
2524
"sigs.k8s.io/cli-utils/pkg/apply/solver"
2625
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
2726
"sigs.k8s.io/cli-utils/pkg/common"
2827
"sigs.k8s.io/cli-utils/pkg/inventory"
28+
"sigs.k8s.io/cli-utils/pkg/kstatus"
2929
"sigs.k8s.io/cli-utils/pkg/object"
3030
"sigs.k8s.io/cli-utils/pkg/object/validation"
3131
)
3232

33-
const defaultPollInterval = 2 * time.Second
34-
3533
// Applier performs the step of applying a set of resources into a cluster,
3634
// conditionally waits for all of them to be fully reconciled and finally
3735
// performs prune to clean up any resources that has been deleted.
@@ -44,7 +42,7 @@ const defaultPollInterval = 2 * time.Second
4442
// cluster, different sets of tasks might be needed.
4543
type Applier struct {
4644
pruner *prune.Pruner
47-
statusPoller poller.Poller
45+
statusWatcher kstatus.Watcher
4846
invClient inventory.Client
4947
client dynamic.Interface
5048
openAPIGetter discovery.OpenAPISchemaInterface
@@ -236,10 +234,14 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects objec
236234
// Create a new TaskStatusRunner to execute the taskQueue.
237235
klog.V(4).Infoln("applier building TaskStatusRunner...")
238236
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
239-
runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller)
237+
statusWatcher := a.statusWatcher
238+
// Disable watcher for dry runs
239+
if opts.DryRunStrategy.ClientOrServerDryRun() {
240+
statusWatcher = kstatus.BlindWatcher{}
241+
}
242+
runner := taskrunner.NewTaskStatusRunner(allIds, statusWatcher)
240243
klog.V(4).Infoln("applier running TaskStatusRunner...")
241244
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
242-
PollInterval: options.PollInterval,
243245
EmitStatusEvents: options.EmitStatusEvents,
244246
})
245247
if err != nil {
@@ -259,10 +261,6 @@ type ApplierOptions struct {
259261
// how long to wait.
260262
ReconcileTimeout time.Duration
261263

262-
// PollInterval defines how often we should poll for the status
263-
// of resources.
264-
PollInterval time.Duration
265-
266264
// EmitStatusEvents defines whether status events should be
267265
// emitted on the eventChannel to the caller.
268266
EmitStatusEvents bool
@@ -295,9 +293,6 @@ type ApplierOptions struct {
295293
// setDefaults set the options to the default values if they
296294
// have not been provided.
297295
func setDefaults(o *ApplierOptions) {
298-
if o.PollInterval == 0 {
299-
o.PollInterval = defaultPollInterval
300-
}
301296
if o.PrunePropagationPolicy == "" {
302297
o.PrunePropagationPolicy = metav1.DeletePropagationBackground
303298
}

pkg/apply/applier_builder.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,18 @@ package apply
66
import (
77
"errors"
88
"fmt"
9+
"time"
910

1011
"k8s.io/apimachinery/pkg/api/meta"
1112
"k8s.io/cli-runtime/pkg/resource"
1213
"k8s.io/client-go/discovery"
1314
"k8s.io/client-go/dynamic"
1415
"k8s.io/client-go/rest"
1516
"k8s.io/kubectl/pkg/cmd/util"
16-
"k8s.io/kubectl/pkg/scheme"
1717
"sigs.k8s.io/cli-utils/pkg/apply/info"
18-
"sigs.k8s.io/cli-utils/pkg/apply/poller"
1918
"sigs.k8s.io/cli-utils/pkg/apply/prune"
2019
"sigs.k8s.io/cli-utils/pkg/inventory"
21-
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
22-
"sigs.k8s.io/controller-runtime/pkg/client"
20+
"sigs.k8s.io/cli-utils/pkg/kstatus"
2321
)
2422

2523
type ApplierBuilder struct {
@@ -31,7 +29,7 @@ type ApplierBuilder struct {
3129
mapper meta.RESTMapper
3230
restConfig *rest.Config
3331
unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error)
34-
statusPoller poller.Poller
32+
statusWatcher kstatus.Watcher
3533
}
3634

3735
// NewApplierBuilder returns a new ApplierBuilder.
@@ -52,7 +50,7 @@ func (b *ApplierBuilder) Build() (*Applier, error) {
5250
Client: bx.client,
5351
Mapper: bx.mapper,
5452
},
55-
statusPoller: bx.statusPoller,
53+
statusWatcher: bx.statusWatcher,
5654
invClient: bx.invClient,
5755
client: bx.client,
5856
openAPIGetter: bx.discoClient,
@@ -109,12 +107,12 @@ func (b *ApplierBuilder) finalize() (*ApplierBuilder, error) {
109107
}
110108
bx.unstructuredClientForMapping = bx.factory.UnstructuredClientForMapping
111109
}
112-
if bx.statusPoller == nil {
113-
c, err := client.New(bx.restConfig, client.Options{Scheme: scheme.Scheme, Mapper: bx.mapper})
114-
if err != nil {
115-
return nil, fmt.Errorf("error creating client: %v", err)
110+
if bx.statusWatcher == nil {
111+
bx.statusWatcher = &kstatus.ObjectSetWatcher{
112+
DynamicClient: bx.client,
113+
Mapper: bx.mapper,
114+
ResyncPeriod: 1 * time.Hour,
116115
}
117-
bx.statusPoller = polling.NewStatusPoller(c, bx.mapper, polling.Options{})
118116
}
119117
return &bx, nil
120118
}
@@ -154,7 +152,7 @@ func (b *ApplierBuilder) WithUnstructuredClientForMapping(unstructuredClientForM
154152
return b
155153
}
156154

157-
func (b *ApplierBuilder) WithStatusPoller(statusPoller poller.Poller) *ApplierBuilder {
158-
b.statusPoller = statusPoller
155+
func (b *ApplierBuilder) WithStatusWatcher(statusWatcher kstatus.Watcher) *ApplierBuilder {
156+
b.statusWatcher = statusWatcher
159157
return b
160158
}

pkg/apply/applier_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package apply
55

66
import (
77
"context"
8-
"sort"
98
"sync"
109
"testing"
1110
"time"
@@ -17,6 +16,7 @@ import (
1716
"sigs.k8s.io/cli-utils/pkg/apis/actuation"
1817
"sigs.k8s.io/cli-utils/pkg/apply/event"
1918
"sigs.k8s.io/cli-utils/pkg/inventory"
19+
"sigs.k8s.io/cli-utils/pkg/kstatus"
2020
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
2121
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
2222
"sigs.k8s.io/cli-utils/pkg/multierror"
@@ -98,7 +98,7 @@ func TestApplier(t *testing.T) {
9898
clusterObjs object.UnstructuredSet
9999
// options input to applier.Run
100100
options ApplierOptions
101-
// fake input events from the status poller
101+
// fake input events from the statusWatcher
102102
statusEvents []pollevent.Event
103103
// expected output status events (async)
104104
expectedStatusEvents []testutil.ExpEvent
@@ -1428,7 +1428,7 @@ func TestApplier(t *testing.T) {
14281428

14291429
for tn, tc := range testCases {
14301430
t.Run(tn, func(t *testing.T) {
1431-
poller := newFakePoller(tc.statusEvents)
1431+
statusWatcher := newFakeWatcher(tc.statusEvents)
14321432

14331433
// Only feed valid objects into the TestApplier.
14341434
// Invalid objects should not generate API requests.
@@ -1445,7 +1445,7 @@ func TestApplier(t *testing.T) {
14451445
tc.invInfo,
14461446
validObjs,
14471447
tc.clusterObjs,
1448-
poller,
1448+
statusWatcher,
14491449
)
14501450

14511451
// Context for Applier.Run
@@ -1490,7 +1490,7 @@ func TestApplier(t *testing.T) {
14901490
e.ActionGroupEvent.Action == event.PruneAction {
14911491
once.Do(func() {
14921492
// start events
1493-
poller.Start()
1493+
statusWatcher.Start()
14941494
})
14951495
}
14961496
}
@@ -1511,7 +1511,7 @@ func TestApplier(t *testing.T) {
15111511
}
15121512

15131513
// sort to allow comparison of multiple apply/prune tasks in the same task group
1514-
sort.Sort(testutil.GroupedEventsByID(receivedEvents))
1514+
testutil.SortEvents(receivedEvents)
15151515

15161516
// Validate the rest of the events
15171517
testutil.AssertEqual(t, tc.expectedEvents, receivedEvents,
@@ -1546,7 +1546,7 @@ func TestApplierCancel(t *testing.T) {
15461546
runTimeout time.Duration
15471547
// timeout for the test
15481548
testTimeout time.Duration
1549-
// fake input events from the status poller
1549+
// fake input events from the statusWatcher
15501550
statusEvents []pollevent.Event
15511551
// expected output status events (async)
15521552
expectedStatusEvents []testutil.ExpEvent
@@ -1880,13 +1880,13 @@ func TestApplierCancel(t *testing.T) {
18801880

18811881
for tn, tc := range testCases {
18821882
t.Run(tn, func(t *testing.T) {
1883-
poller := newFakePoller(tc.statusEvents)
1883+
statusWatcher := newFakeWatcher(tc.statusEvents)
18841884

18851885
applier := newTestApplier(t,
18861886
tc.invInfo,
18871887
tc.resources,
18881888
tc.clusterObjs,
1889-
poller,
1889+
statusWatcher,
18901890
)
18911891

18921892
// Context for Applier.Run
@@ -1928,7 +1928,7 @@ func TestApplierCancel(t *testing.T) {
19281928
e.ActionGroupEvent.Action == event.PruneAction {
19291929
once.Do(func() {
19301930
// start events
1931-
poller.Start()
1931+
statusWatcher.Start()
19321932
})
19331933
}
19341934
}
@@ -2069,7 +2069,7 @@ func TestReadAndPrepareObjects(t *testing.T) {
20692069
tc.resources,
20702070
tc.clusterObjs,
20712071
// no events needed for prepareObjects
2072-
newFakePoller([]pollevent.Event{}),
2072+
kstatus.BlindWatcher{},
20732073
)
20742074

20752075
applyObjs, pruneObjs, err := applier.prepareObjects(tc.invInfo.toWrapped(), tc.resources, ApplierOptions{})

pkg/apply/common_test.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,10 @@ import (
2727
"k8s.io/klog/v2"
2828
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
2929
"k8s.io/kubectl/pkg/scheme"
30-
"sigs.k8s.io/cli-utils/pkg/apply/poller"
3130
"sigs.k8s.io/cli-utils/pkg/common"
3231
"sigs.k8s.io/cli-utils/pkg/inventory"
3332
"sigs.k8s.io/cli-utils/pkg/jsonpath"
34-
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
33+
"sigs.k8s.io/cli-utils/pkg/kstatus"
3534
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
3635
"sigs.k8s.io/cli-utils/pkg/object"
3736
)
@@ -74,7 +73,7 @@ func newTestApplier(
7473
invInfo inventoryInfo,
7574
resources object.UnstructuredSet,
7675
clusterObjs object.UnstructuredSet,
77-
statusPoller poller.Poller,
76+
statusWatcher kstatus.Watcher,
7877
) *Applier {
7978
tf := newTestFactory(t, invInfo, resources, clusterObjs)
8079
defer tf.Cleanup()
@@ -88,7 +87,7 @@ func newTestApplier(
8887
applier, err := NewApplierBuilder().
8988
WithFactory(tf).
9089
WithInventoryClient(invClient).
91-
WithStatusPoller(statusPoller).
90+
WithStatusWatcher(statusWatcher).
9291
Build()
9392
require.NoError(t, err)
9493

@@ -103,7 +102,7 @@ func newTestDestroyer(
103102
t *testing.T,
104103
invInfo inventoryInfo,
105104
clusterObjs object.UnstructuredSet,
106-
statusPoller poller.Poller,
105+
statusWatcher kstatus.Watcher,
107106
) *Destroyer {
108107
tf := newTestFactory(t, invInfo, object.UnstructuredSet{}, clusterObjs)
109108
defer tf.Cleanup()
@@ -112,7 +111,7 @@ func newTestDestroyer(
112111

113112
destroyer, err := NewDestroyer(tf, invClient)
114113
require.NoError(t, err)
115-
destroyer.StatusPoller = statusPoller
114+
destroyer.statusWatcher = statusWatcher
116115

117116
return destroyer
118117
}
@@ -345,24 +344,24 @@ func (n *nsHandler) handle(t *testing.T, req *http.Request) (*http.Response, boo
345344
return nil, false, nil
346345
}
347346

348-
type fakePoller struct {
347+
type fakeWatcher struct {
349348
start chan struct{}
350349
events []pollevent.Event
351350
}
352351

353-
func newFakePoller(statusEvents []pollevent.Event) *fakePoller {
354-
return &fakePoller{
352+
func newFakeWatcher(statusEvents []pollevent.Event) *fakeWatcher {
353+
return &fakeWatcher{
355354
events: statusEvents,
356355
start: make(chan struct{}),
357356
}
358357
}
359358

360359
// Start events being sent on the status channel
361-
func (f *fakePoller) Start() {
360+
func (f *fakeWatcher) Start() {
362361
close(f.start)
363362
}
364363

365-
func (f *fakePoller) Poll(ctx context.Context, _ object.ObjMetadataSet, _ polling.PollOptions) <-chan pollevent.Event {
364+
func (f *fakeWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet) <-chan pollevent.Event {
366365
eventChannel := make(chan pollevent.Event)
367366
go func() {
368367
defer close(eventChannel)

0 commit comments

Comments
 (0)