Skip to content

Commit

Permalink
Merge pull request #5121 from medyagh/refactor_util_kube
Browse files Browse the repository at this point in the history
Add wait-timeout flag to start command and refactor util/kubernetes
  • Loading branch information
medyagh authored Aug 20, 2019
2 parents c5a06fd + dd94e15 commit c3cfedf
Show file tree
Hide file tree
Showing 22 changed files with 189 additions and 279 deletions.
11 changes: 7 additions & 4 deletions cmd/minikube/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/minikube/pkg/minikube/bootstrapper/kubeadm"
"k8s.io/minikube/pkg/minikube/cluster"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/config"
cfg "k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/cruntime"
Expand Down Expand Up @@ -102,6 +103,7 @@ const (
dnsProxy = "dns-proxy"
hostDNSResolver = "host-dns-resolver"
waitUntilHealthy = "wait"
waitTimeout = "wait-timeout"
)

var (
Expand All @@ -111,7 +113,7 @@ var (
insecureRegistry []string
apiServerNames []string
apiServerIPs []net.IP
extraOptions pkgutil.ExtraOptionSlice
extraOptions config.ExtraOptionSlice
)

func init() {
Expand Down Expand Up @@ -148,6 +150,7 @@ func initMinikubeFlags() {
startCmd.Flags().String(networkPlugin, "", "The name of the network plugin.")
startCmd.Flags().Bool(enableDefaultCNI, false, "Enable the default CNI plugin (/etc/cni/net.d/k8s.conf). Used in conjunction with \"--network-plugin=cni\".")
startCmd.Flags().Bool(waitUntilHealthy, true, "Wait until Kubernetes core services are healthy before exiting.")
startCmd.Flags().Duration(waitTimeout, 3*time.Minute, "max time to wait per Kubernetes core services to be healthy.")
}

// initKubernetesFlags inits the commandline flags for kubernetes related options
Expand Down Expand Up @@ -321,7 +324,7 @@ func runStart(cmd *cobra.Command, args []string) {
// special ops for none driver, like change minikube directory.
prepareNone(viper.GetString(vmDriver))
if viper.GetBool(waitUntilHealthy) {
if err := bs.WaitCluster(config.KubernetesConfig); err != nil {
if err := bs.WaitCluster(config.KubernetesConfig, viper.GetDuration(waitTimeout)); err != nil {
exit.WithError("Wait failed", err)
}
}
Expand Down Expand Up @@ -538,8 +541,8 @@ func validateConfig() {

// check that kubeadm extra args contain only whitelisted parameters
for param := range extraOptions.AsMap().Get(kubeadm.Kubeadm) {
if !pkgutil.ContainsString(kubeadm.KubeadmExtraArgsWhitelist[kubeadm.KubeadmCmdParam], param) &&
!pkgutil.ContainsString(kubeadm.KubeadmExtraArgsWhitelist[kubeadm.KubeadmConfigParam], param) {
if !cfg.ContainsParam(kubeadm.KubeadmExtraArgsWhitelist[kubeadm.KubeadmCmdParam], param) &&
!cfg.ContainsParam(kubeadm.KubeadmExtraArgsWhitelist[kubeadm.KubeadmConfigParam], param) {
exit.UsageT("Sorry, the kubeadm.{{.parameter_name}} parameter is currently not supported by --extra-config", out.V{"parameter_name": param})
}
}
Expand Down
8 changes: 5 additions & 3 deletions hack/jenkins/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ export MINIKUBE_HOME="${TEST_HOME}/.minikube"
export MINIKUBE_WANTREPORTERRORPROMPT=False
export KUBECONFIG="${TEST_HOME}/kubeconfig"

# Build the gvisor image. This will be copied into minikube and loaded by ctr.
# Used by TestContainerd for Gvisor Test.
docker build -t gcr.io/k8s-minikube/gvisor-addon:latest -f testdata/gvisor-addon-Dockerfile out


# Display the default image URL
echo ""
echo ">> ISO URL"
Expand Down Expand Up @@ -295,9 +300,6 @@ ${SUDO_PREFIX} rm -f "${KUBECONFIG}" || true
rmdir "${TEST_HOME}"
echo ">> ${TEST_HOME} completed at $(date)"

# Build the gvisor image. This will be copied into minikube and loaded by ctr.
docker build -t gcr.io/k8s-minikube/gvisor-addon:latest -f testdata/gvisor-addon-Dockerfile out

if [[ "${MINIKUBE_LOCATION}" != "master" ]]; then
readonly target_url="https://storage.googleapis.com/minikube-builds/logs/${MINIKUBE_LOCATION}/${JOB_NAME}.txt"
curl -s "https://api.github.com/repos/kubernetes/minikube/statuses/${COMMIT}?access_token=$access_token" \
Expand Down
144 changes: 19 additions & 125 deletions pkg/util/kubernetes.go → pkg/kapi/kapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package util
package kapi

import (
"context"
Expand All @@ -29,15 +29,13 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
kconst "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/minikube/pkg/minikube/proxy"
)

Expand All @@ -48,30 +46,8 @@ var (
ReasonableStartTime = time.Minute * 5
)

// PodStore stores pods
type PodStore struct {
cache.Store
stopCh chan struct{}
Reflector *cache.Reflector
}

// List lists the pods
func (s *PodStore) List() []*core.Pod {
objects := s.Store.List()
pods := make([]*core.Pod, 0)
for _, o := range objects {
pods = append(pods, o.(*core.Pod))
}
return pods
}

// Stop stops the pods
func (s *PodStore) Stop() {
close(s.stopCh)
}

// GetClient gets the client from config
func GetClient(kubectlContext ...string) (kubernetes.Interface, error) {
// Client gets the kuberentes client from default kubeconfig
func Client(kubectlContext ...string) (kubernetes.Interface, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{}
if kubectlContext != nil {
Expand All @@ -92,50 +68,16 @@ func GetClient(kubectlContext ...string) (kubernetes.Interface, error) {
return client, nil
}

// NewPodStore creates a new PodStore
func NewPodStore(c kubernetes.Interface, namespace string, label fmt.Stringer, field fmt.Stringer) *PodStore {
lw := &cache.ListWatch{
ListFunc: func(options meta.ListOptions) (runtime.Object, error) {
options.LabelSelector = label.String()
options.FieldSelector = field.String()
obj, err := c.CoreV1().Pods(namespace).List(options)
return runtime.Object(obj), err
},
WatchFunc: func(options meta.ListOptions) (watch.Interface, error) {
options.LabelSelector = label.String()
options.FieldSelector = field.String()
return c.CoreV1().Pods(namespace).Watch(options)
},
}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
stopCh := make(chan struct{})
reflector := cache.NewReflector(lw, &core.Pod{}, store, 0)
go reflector.Run(stopCh)
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}
}

// StartPods starts all pods
func StartPods(c kubernetes.Interface, namespace string, pod core.Pod, waitForRunning bool) error {
pod.ObjectMeta.Labels["name"] = pod.Name
if waitForRunning {
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": pod.Name}))
err := WaitForPodsWithLabelRunning(c, namespace, label)
if err != nil {
return fmt.Errorf("error waiting for pod %s to be running: %v", pod.Name, err)
}
}
return nil
}

// WaitForPodsWithLabelRunning waits for all matching pods to become Running and at least one matching pod exists.
func WaitForPodsWithLabelRunning(c kubernetes.Interface, ns string, label labels.Selector) error {
func WaitForPodsWithLabelRunning(c kubernetes.Interface, ns string, label labels.Selector, timeOut ...time.Duration) error {
start := time.Now()
glog.Infof("Waiting for pod with label %q in ns %q ...", ns, label)
lastKnownPodNumber := -1
return wait.PollImmediate(constants.APICallRetryInterval, ReasonableStartTime, func() (bool, error) {
f := func() (bool, error) {
listOpts := meta.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(ns).List(listOpts)
if err != nil {
glog.Infof("error getting Pods with label selector %q [%v]\n", label.String(), err)
glog.Infof("temporary error: getting Pods with label selector %q : [%v]\n", label.String(), err)
return false, nil
}

Expand All @@ -150,45 +92,23 @@ func WaitForPodsWithLabelRunning(c kubernetes.Interface, ns string, label labels

for _, pod := range pods.Items {
if pod.Status.Phase != core.PodRunning {
glog.Infof("waiting for pod %q, current state: %s: [%v]\n", label.String(), pod.Status.Phase, err)
return false, nil
}
}

return true, nil
})
}

// WaitForPodDelete waits for a pod to be deleted
func WaitForPodDelete(c kubernetes.Interface, ns string, label fmt.Stringer) error {
return wait.PollImmediate(constants.APICallRetryInterval, ReasonableMutateTime, func() (bool, error) {
listOpts := meta.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(ns).List(listOpts)
if err != nil {
glog.Infof("error getting Pods with label selector %q [%v]\n", label.String(), err)
return false, nil
}
return len(pods.Items) == 0, nil
})
}

// WaitForEvent waits for the given event to appear
func WaitForEvent(c kubernetes.Interface, ns string, reason string) error {
return wait.PollImmediate(constants.APICallRetryInterval, ReasonableMutateTime, func() (bool, error) {
events, err := c.EventsV1beta1().Events("default").List(meta.ListOptions{})
if err != nil {
glog.Infof("error getting events: %v", err)
return false, nil
}
for _, e := range events.Items {
if e.Reason == reason {
return true, nil
}
}
return false, nil
})
}
t := ReasonableStartTime
if timeOut != nil {
t = timeOut[0]
}
err := wait.PollImmediate(kconst.APICallRetryInterval, t, f)
glog.Infof("duration metric: took %s to wait for %s ...", time.Since(start), label)
return err
}

// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status.
// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status. used by integration tests
func WaitForRCToStabilize(c kubernetes.Interface, ns, name string, timeout time.Duration) error {
options := meta.ListOptions{FieldSelector: fields.Set{
"metadata.name": name,
Expand Down Expand Up @@ -222,7 +142,7 @@ func WaitForRCToStabilize(c kubernetes.Interface, ns, name string, timeout time.
return err
}

// WaitForDeploymentToStabilize waits till the Deployment has a matching generation/replica count between spec and status.
// WaitForDeploymentToStabilize waits till the Deployment has a matching generation/replica count between spec and status. used by integrationt tests
func WaitForDeploymentToStabilize(c kubernetes.Interface, ns, name string, timeout time.Duration) error {
options := meta.ListOptions{FieldSelector: fields.Set{
"metadata.name": name,
Expand Down Expand Up @@ -281,32 +201,6 @@ func WaitForService(c kubernetes.Interface, namespace, name string, exist bool,
return nil
}

// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
func WaitForServiceEndpointsNum(c kubernetes.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
return wait.Poll(interval, timeout, func() (bool, error) {
glog.Infof("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
list, err := c.CoreV1().Endpoints(namespace).List(meta.ListOptions{})
if err != nil {
return false, err
}

for _, e := range list.Items {
if e.Name == serviceName && countEndpointsNum(&e) == expectNum {
return true, nil
}
}
return false, nil
})
}

func countEndpointsNum(e *core.Endpoints) int {
num := 0
for _, sub := range e.Subsets {
num += len(sub.Addresses)
}
return num
}

// IsRetryableAPIError returns if this error is retryable or not
func IsRetryableAPIError(err error) bool {
return apierr.IsTimeout(err) || apierr.IsServerTimeout(err) || apierr.IsTooManyRequests(err) || apierr.IsInternalError(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/minikube/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bootstrapper

import (
"net"
"time"

"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
Expand All @@ -39,7 +40,7 @@ type Bootstrapper interface {
UpdateCluster(config.KubernetesConfig) error
RestartCluster(config.KubernetesConfig) error
DeleteCluster(config.KubernetesConfig) error
WaitCluster(config.KubernetesConfig) error
WaitCluster(config.KubernetesConfig, time.Duration) error
// LogCommands returns a map of log type to a command which will display that log.
LogCommands(LogOptions) map[string]string
SetupCerts(cfg config.KubernetesConfig) error
Expand Down
Loading

0 comments on commit c3cfedf

Please sign in to comment.