Skip to content

Commit

Permalink
WIP: wait with resourceVersion
Browse files Browse the repository at this point in the history
  • Loading branch information
dsimansk committed Mar 18, 2021
1 parent 431637a commit 646fe33
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
21 changes: 18 additions & 3 deletions pkg/serving/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ func (cl *knServingClient) WatchService(name string, timeout time.Duration) (wat
cl.client.RESTClient(), cl.namespace, "services", name, timeout)
}

func (cl *knServingClient) WatchServiceWithVersion(name, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcherWithVersion(cl.client.Services(cl.namespace).Watch,
cl.client.RESTClient(), initialVersion, cl.namespace, "services", name, timeout)
}

func (cl *knServingClient) WatchRevision(name string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(cl.client.Revisions(cl.namespace).Watch,
cl.client.RESTClient(), cl.namespace, "revision", name, timeout)
Expand Down Expand Up @@ -320,20 +325,30 @@ func (cl *knServingClient) DeleteService(serviceName string, timeout time.Durati
return cl.deleteService(serviceName, v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := cl.WatchService(serviceName, timeout)
svc, err := cl.GetService(serviceName)
if err != nil {
return nil
}
defer watcher.Stop()
fmt.Println("Main: Before magic...")
go func() {
fmt.Println("Goroutine: Sleeping...")
time.Sleep(time.Second * 20)
fmt.Println("Goroutine: Waky waky starting to watch...")
watcher, err := cl.WatchServiceWithVersion(serviceName, svc.ResourceVersion, timeout)
if err != nil {
waitC <- err
}
defer watcher.Stop()
waitForEvent := wait.NewWaitForEvent("service", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
err, _ = waitForEvent.Wait(watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
fmt.Println("Main: Calling deleteService...")
err = cl.deleteService(serviceName, v1.DeletePropagationForeground)
if err != nil {
return err
}
fmt.Println("Main: Waiting for result channel...")
return <-waitC
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/wait/poll_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ func NewWatcher(watchFunc watchF, c rest.Interface, ns string, resource string,
return polling, nil
}

func NewWatcherWithVersion(watchFunc watchF, c rest.Interface, initialResourceVersion string, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) {
native, err := nativeWatchWithVersion(watchFunc, name, initialResourceVersion, timeout)
if err == nil {
return native, nil
}
polling := &pollingWatcher{
c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{},
newTickerPollInterval(time.Second), nativePoll(c, ns, resource, name)}
polling.start()
return polling, nil
}

func (w *pollingWatcher) start() {
w.wg.Add(1)

Expand Down Expand Up @@ -159,6 +171,16 @@ func (w *pollingWatcher) Stop() {
close(w.done)
}

func nativeWatchWithVersion(watchFunc watchF, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
opts := v1.ListOptions{
ResourceVersion: initialVersion,
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
}
opts.Watch = true
addWatchTimeout(&opts, timeout)
return watchFunc(context.TODO(), opts)
}

func nativeWatch(watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) {
opts := v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
Expand Down
6 changes: 5 additions & 1 deletion pkg/wait/wait_for_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,11 @@ func (w *waitForEvent) Wait(watcher watch.Interface, name string, options Option
return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, int(timeout/time.Second)), time.Since(start)
case err := <-errChan:
return err, time.Since(start)
case event := <-watcher.ResultChan():
case event, ok := <-watcher.ResultChan():
if !ok {
fmt.Println("ResultChan closed")
}
fmt.Println("Event: " + event.Type)
if w.eventDone(&event) {
return nil, time.Since(start)
}
Expand Down

0 comments on commit 646fe33

Please sign in to comment.