Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(wait): Only use MODIFIED events when waiting for Read == True #701

Merged
merged 1 commit into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/serving/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (cl *knServingClient) DeleteService(serviceName string, timeout time.Durati
waitC := make(chan error)
go func() {
waitForEvent := wait.NewWaitForEvent("service", cl.WatchService, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(serviceName, timeout, wait.NoopMessageCallback())
err, _ := waitForEvent.Wait(serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err := cl.deleteService(serviceName, v1.DeletePropagationForeground)
Expand All @@ -292,7 +292,7 @@ func (cl *knServingClient) deleteService(serviceName string, propagationPolicy v
// Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor)
return waitForReady.Wait(name, timeout, msgCallback)
return waitForReady.Wait(name, wait.Options{Timeout: &timeout}, msgCallback)
}

// Get the configuration for a service
Expand Down Expand Up @@ -382,7 +382,7 @@ func (cl *knServingClient) DeleteRevision(name string, timeout time.Duration) er
waitC := make(chan error)
go func() {
waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevision, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(name, timeout, wait.NoopMessageCallback())
err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err := cl.deleteRevision(name)
Expand Down
65 changes: 51 additions & 14 deletions pkg/wait/wait_for_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ import (
"knative.dev/pkg/apis"
)

// Window for how long a ReadyCondition == false has to stay
// for being considered as an error
var ErrorWindow = 2 * time.Second

// Callbacks and configuration used while waiting
type waitForReadyConfig struct {
watchMaker WatchMaker
Expand All @@ -50,10 +46,19 @@ type EventDone func(ev *watch.Event) bool
// state in its "Ready" condition.
type Wait interface {

// Wait on resource the resource with this name until a given timeout
// Wait on resource the resource with this name
// and write event messages for unknown event to the status writer.
// Returns an error (if any) and the overall time it took to wait
Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration)
Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration)
}

type Options struct {
// Window for how long a ReadyCondition == false has to stay
// for being considered as an error (useful for flaky reconciliation
ErrorWindow *time.Duration

// Timeout for how long to wait at maximum
Timeout *time.Duration
}

// Create watch which is used when waiting for Ready condition
Expand All @@ -65,7 +70,7 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
// Callback for event messages
type MessageCallback func(durationSinceState time.Duration, message string)

// Constructor with resource type specific configuration
// NewWaitForReady waits until the condition is set to Ready == True
func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait {
return &waitForReadyConfig{
kind: kind,
Expand All @@ -74,6 +79,8 @@ func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExt
}
}

// NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when
// the EventDone function returns true)
func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait {
return &waitForEvent{
kind: kind,
Expand All @@ -82,7 +89,7 @@ func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wa
}
}

// A simple message callback which prints out messages line by line
// SimpleMessageCallback returns a callback which prints out a simple event message to a given writer
func SimpleMessageCallback(out io.Writer) MessageCallback {
oldMessage := ""
return func(duration time.Duration, message string) {
Expand All @@ -95,7 +102,7 @@ func SimpleMessageCallback(out io.Writer) MessageCallback {
}
}

// Noop-callback
// NoopMessageCallback is callback which does nothing
func NoopMessageCallback() MessageCallback {
return func(durationSinceState time.Duration, message string) {}
}
Expand All @@ -105,12 +112,13 @@ func NoopMessageCallback() MessageCallback {
// (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no
// target state has been entered yet and `out` is used for printing out status messages
// msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message.
func (w *waitForReadyConfig) Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {

timeout := options.timeoutWithDefault()
floatingTimeout := timeout
for {
start := time.Now()
retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, ErrorWindow, msgCallback)
retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback)
if err != nil {
return err, time.Since(start)
}
Expand Down Expand Up @@ -158,8 +166,22 @@ func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string,
return true, false, nil
}

// Skip event if its not a MODIFIED event, as only MODIFIED events update the condition
// we are looking for.
// This will filter out all synthetic ADDED events that created bt the API server for
// the initial state. See https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter
// for details:
// "Get State and Start at Most Recent: Start a watch at the most recent resource version,
// which must be consistent (i.e. served from etcd via a quorum read). To establish initial state,
// the watch begins with synthetic “Added” events of all resources instances that exist at the starting
// resource version. All following watch events are for all changes that occurred after the resource
// version the watch started at."
if event.Type != watch.Modified {
continue
}

// Skip event if generations has not yet been consolidated
inSync, err := isGivenEqualsObservedGeneration(event.Object)
inSync, err := generationCheck(event.Object)
if err != nil {
return false, false, err
}
Expand Down Expand Up @@ -198,7 +220,8 @@ func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string,
}

// Wait until the expected EventDone is satisfied
func (w *waitForEvent) Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
timeout := options.timeoutWithDefault()
watcher, err := w.watchMaker(name, timeout)
if err != nil {
return err, 0
Expand All @@ -223,7 +246,7 @@ func (w *waitForEvent) Wait(name string, timeout time.Duration, msgCallback Mess
}
}

func isGivenEqualsObservedGeneration(object runtime.Object) (bool, error) {
func generationCheck(object runtime.Object) (bool, error) {
unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object)
if err != nil {
return false, err
Expand All @@ -247,3 +270,17 @@ func isGivenEqualsObservedGeneration(object runtime.Object) (bool, error) {
}
return givenGeneration == observedGeneration, nil
}

func (o Options) timeoutWithDefault() time.Duration {
if o.Timeout != nil {
return *o.Timeout
}
return 60 * time.Second
}

func (o Options) errorWindowWithDefault() time.Duration {
if o.ErrorWindow != nil {
return *o.ErrorWindow
}
return 2 * time.Second
}
16 changes: 8 additions & 8 deletions pkg/wait/wait_for_ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestAddWaitForReady(t *testing.T) {
})
fakeWatchApi.Start()
var msgs []string
err, _ := waitForReady.Wait("foobar", tc.timeout, func(_ time.Duration, msg string) {
err, _ := waitForReady.Wait("foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
msgs = append(msgs, msg)
})
close(fakeWatchApi.eventChan)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestAddWaitForDelete(t *testing.T) {
func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
fakeWatchAPI.Start()

err, _ := waitForEvent.Wait("foobar", tc.timeout, NoopMessageCallback())
err, _ := waitForEvent.Wait("foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback())
close(fakeWatchAPI.eventChan)

if tc.errorText == "" && err != nil {
Expand All @@ -106,7 +106,6 @@ func TestAddWaitForDelete(t *testing.T) {
if fakeWatchAPI.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
}

}
}

Expand All @@ -130,7 +129,7 @@ func prepareDeleteTestCases(name string) []waitForReadyTestCase {

func errorTest(name string) waitForReadyTestCase {
events := []watch.Event{
{watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", "msg1")},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", "msg1")},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionFalse, corev1.ConditionTrue, "FakeError", "Test Error")},
}

Expand Down Expand Up @@ -164,6 +163,7 @@ func peNormal(name string) ([]watch.Event, int) {
messages := pMessages(2)
return []watch.Event{
{watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionTrue, "", messages[1])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionTrue, corev1.ConditionTrue, "", "")},
}, len(messages)
Expand All @@ -172,30 +172,30 @@ func peNormal(name string) ([]watch.Event, int) {
func peTimeout(name string) ([]watch.Event, int) {
messages := pMessages(1)
return []watch.Event{
{watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])},
}, len(messages)
}

func peWrongGeneration(name string) ([]watch.Event, int) {
messages := pMessages(1)
return []watch.Event{
{watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionTrue, corev1.ConditionTrue, "", "", 1, 2)},
}, len(messages)
}

func peReadyFalseWithinErrorWindow(name string) ([]watch.Event, int) {
messages := pMessages(1)
return []watch.Event{
{watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionFalse, corev1.ConditionFalse, "Route not ready", messages[0])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionFalse, corev1.ConditionFalse, "Route not ready", messages[0])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionTrue, corev1.ConditionTrue, "Route ready", "")},
}, len(messages)
}

func deNormal(name string) ([]watch.Event, int) {
messages := pMessages(2)
return []watch.Event{
{watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])},
{watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionTrue, "", messages[1])},
{watch.Deleted, CreateTestServiceWithConditions(name, corev1.ConditionTrue, corev1.ConditionTrue, "", "")},
}, len(messages)
Expand Down