From fdf34c79686927476c4224133e2f031ebdff29ae Mon Sep 17 00:00:00 2001 From: Steve Hipwell Date: Thu, 22 Aug 2024 17:20:48 +0100 Subject: [PATCH] feat: Changed to use watch for wait rollout Signed-off-by: Steve Hipwell --- docs/resources/kubectl_manifest.md | 4 +- kubernetes/resource_kubectl_manifest.go | 338 ++++++++++++++----- kubernetes/resource_kubectl_manifest_test.go | 159 ++++++++- 3 files changed, 409 insertions(+), 92 deletions(-) diff --git a/docs/resources/kubectl_manifest.md b/docs/resources/kubectl_manifest.md index 35c55c34..b406c41f 100644 --- a/docs/resources/kubectl_manifest.md +++ b/docs/resources/kubectl_manifest.md @@ -88,7 +88,7 @@ YAML * `override_namespace` - Optional. Override the namespace to apply the kubernetes resource to, ignoring any declared namespace in the `yaml_body`. * `validate_schema` - Optional. Setting to `false` will mimic `kubectl apply --validate=false` mode. Default `true`. * `wait` - Optional. Set this flag to wait or not for finalized to complete for deleted objects. Default `false`. -* `wait_for_rollout` - Optional. Set this flag to wait or not for Deployments and APIService to complete rollout. Default `true`. +* `wait_for_rollout` - Optional. Set this flag to wait or not for `Deployment`, `DaemonSet`, `StatefulSet` & `APIService` resources to complete rollout. Default `true`. * `wait_for` - Optional. If set, will wait until either all conditions are satisfied, or until timeout is reached (see [below for nested schema](#wait_for)). Under the hood [gojsonq](https://github.com/thedevsaddam/gojsonq) is used for querying, see the related syntax and examples. * `delete_cascade` - Optional; `Background` or `Foreground` are valid options. If set this overrides the default provider behaviour which is to use `Background` unless `wait` is `true` when `Foreground` will be used. To duplicate the default behaviour of `kubectl` this should be explicitly set to `Background`. @@ -210,7 +210,7 @@ More examples can be found in the provider tests. ## Waiting for Rollout -By default, this resource will wait for Deployments and APIServices to complete their rollout before proceeding. +By default, this resource will wait for `Deployment`, `DaemonSet`, `StatefulSet` & `APIService` to complete their rollout before proceeding. You can disable this behavior by setting the `wait_for_rollout` field to `false`. ## Import diff --git a/kubernetes/resource_kubectl_manifest.go b/kubernetes/resource_kubectl_manifest.go index e000a855..d15e3344 100644 --- a/kubernetes/resource_kubectl_manifest.go +++ b/kubernetes/resource_kubectl_manifest.go @@ -43,6 +43,7 @@ import ( apps_v1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" meta_v1_unstruct "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" k8sschema "k8s.io/apimachinery/pkg/runtime/schema" yamlWriter "sigs.k8s.io/yaml" @@ -631,16 +632,26 @@ func resourceKubectlManifestApply(ctx context.Context, d *schema.ResourceData, m switch { case manifest.GetKind() == "Deployment": - log.Printf("[INFO] %v waiting for deployment rollout for %vmin", manifest, timeout.Minutes()) - err = resource.RetryContext(ctx, timeout, - waitForDeploymentReplicasFunc(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName())) + log.Printf("[INFO] %v waiting for Deployment rollout for %vmin", manifest, timeout.Minutes()) + err = waitForDeploymentRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout) + if err != nil { + return err + } + case manifest.GetKind() == "DaemonSet": + log.Printf("[INFO] %v waiting for DaemonSet rollout for %vmin", manifest, timeout.Minutes()) + err = waitForDaemonSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout) + if err != nil { + return err + } + case manifest.GetKind() == "StatefulSet": + log.Printf("[INFO] %v waiting for v rollout for %vmin", manifest, timeout.Minutes()) + err = waitForStatefulSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout) if err != nil { return err } case manifest.GetKind() == "APIService" && manifest.GetAPIVersion() == "apiregistration.k8s.io/v1": - log.Printf("[INFO] %v waiting for APIService rollout for %vmin", manifest, timeout.Minutes()) - err = resource.RetryContext(ctx, timeout, - waitForAPIServiceAvailableFunc(ctx, meta.(*KubeProvider), manifest.GetName())) + log.Printf("[INFO] %v waiting for APIService for %vmin", manifest, timeout.Minutes()) + err = waitForApiService(ctx, meta.(*KubeProvider), manifest.GetName(), timeout) if err != nil { return err } @@ -751,27 +762,29 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData, log.Printf("[INFO] %s perform delete of manifest", manifest) - waitForDelete := d.Get("wait").(bool) + wait := d.Get("wait").(bool) var propagationPolicy meta_v1.DeletionPropagation cascadeInput := d.Get("delete_cascade").(string) if len(cascadeInput) > 0 { propagationPolicy = meta_v1.DeletionPropagation(cascadeInput) - } else if waitForDelete { + } else if wait { propagationPolicy = meta_v1.DeletePropagationForeground } else { propagationPolicy = meta_v1.DeletePropagationBackground } var resourceVersion string - if waitForDelete { - rawResponse, err := restClient.ResourceInterface.Get(ctx, manifest.GetName(), meta_v1.GetOptions{}) + if wait { + rawResponse, err := restClient.ResourceInterface.List(ctx, meta_v1.ListOptions{FieldSelector: fields.OneTermEqualSelector("metadata.name", manifest.GetName()).String()}) if err != nil { return err } - response := yaml.NewFromUnstructured(rawResponse) - resourceVersion = response.GetResourceVersion() + resourceVersion, _, err = unstructured.NestedString(rawResponse.Object, "metadata", "resourceVersion") + if err != nil { + return err + } } err = restClient.ResourceInterface.Delete(ctx, manifest.GetName(), meta_v1.DeleteOptions{PropagationPolicy: &propagationPolicy}) @@ -781,28 +794,15 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData, } // The rest client doesn't wait for the delete so we need custom logic - if waitForDelete { + if wait { log.Printf("[INFO] %s waiting for delete of manifest to complete", manifest) - watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, FieldSelector: fields.OneTermEqualSelector("metadata.name", manifest.GetName()).String(), ResourceVersion: resourceVersion}) + timeout := d.Timeout(schema.TimeoutDelete) + + err = waitForDelete(ctx, restClient, manifest.GetName(), resourceVersion, timeout) if err != nil { return err } - - defer watcher.Stop() - - deleted := false - for !deleted { - select { - case event := <-watcher.ResultChan(): - if event.Type == watch.Deleted { - deleted = true - } - - case <-ctx.Done(): - return fmt.Errorf("%s failed to delete kubernetes resource", manifest) - } - } } // Success remove it from state @@ -951,9 +951,86 @@ func checkAPIResourceIsPresent(available []*meta_v1.APIResourceList, resource me return nil, false } -// GetDeploymentConditionInternal returns the condition with the provided type. -// Borrowed from: https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/deployment/util/deployment_util.go#L135 -func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition { +func waitForDelete(ctx context.Context, restClient *RestClientResult, name string, resourceVersion string, timeout time.Duration) error { + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), ResourceVersion: resourceVersion}) + if err != nil { + return err + } + + defer watcher.Stop() + + deleted := false + for !deleted { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Deleted { + deleted = true + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to delete resource", name) + } + } + + return nil +} + +func waitForDeploymentRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error { + // Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/polymorphichelpers/rollout_status.go#L59 + + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := provider.MainClientset.AppsV1().Deployments(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()}) + if err != nil { + return err + } + + defer watcher.Stop() + + done := false + for !done { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Modified { + deployment, ok := event.Object.(*apps_v1.Deployment) + if !ok { + return fmt.Errorf("%s could not cast to Deployment", name) + } + + if deployment.Generation <= deployment.Status.ObservedGeneration { + condition := getDeploymentCondition(deployment.Status, apps_v1.DeploymentProgressing) + if condition != nil && condition.Reason == TimedOutReason { + continue + } + + if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas { + continue + } + + if deployment.Status.Replicas > deployment.Status.UpdatedReplicas { + continue + } + + if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas { + continue + } + + done = true + } + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to rollout Deployment", name) + } + } + + return nil +} + +func getDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition { + // Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/util/deployment/deployment.go#L60 for i := range status.Conditions { c := status.Conditions[i] if c.Type == condType { @@ -963,6 +1040,151 @@ func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.De return nil } +func waitForDaemonSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error { + // Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/polymorphichelpers/rollout_status.go#L95 + + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := provider.MainClientset.AppsV1().DaemonSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()}) + if err != nil { + return err + } + + defer watcher.Stop() + + done := false + for !done { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Modified { + daemon, ok := event.Object.(*apps_v1.DaemonSet) + if !ok { + return fmt.Errorf("%s could not cast to DaemonSet", name) + } + + if daemon.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateDaemonSetStrategyType { + done = true + continue + } + + if daemon.Generation <= daemon.Status.ObservedGeneration { + if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled { + continue + } + + if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled { + continue + } + + done = true + } + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to rollout DaemonSet", name) + } + } + + return nil +} + +func waitForStatefulSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error { + // Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/polymorphichelpers/rollout_status.go#L120 + + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := provider.MainClientset.AppsV1().StatefulSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()}) + if err != nil { + return err + } + + defer watcher.Stop() + + done := false + for !done { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Modified { + sts, ok := event.Object.(*apps_v1.StatefulSet) + if !ok { + return fmt.Errorf("%s could not cast to StatefulSet", name) + } + + if sts.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateStatefulSetStrategyType { + done = true + continue + } + + if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration { + continue + } + + if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas { + continue + } + + if sts.Spec.UpdateStrategy.Type == apps_v1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil { + if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) { + continue + } + } + + done = true + continue + } + + if sts.Status.UpdateRevision != sts.Status.CurrentRevision { + continue + } + + done = true + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to rollout StatefulSet", name) + } + } + + return nil +} + +func waitForApiService(ctx context.Context, provider *KubeProvider, name string, timeout time.Duration) error { + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()}) + if err != nil { + return err + } + + defer watcher.Stop() + + done := false + for !done { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Modified { + apiService, ok := event.Object.(*apiregistration.APIService) + if !ok { + return fmt.Errorf("%s could not cast to APIService", name) + } + + for i := range apiService.Status.Conditions { + if apiService.Status.Conditions[i].Type == apiregistration.Available { + done = true + continue + } + } + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to wait for APIService", name) + } + } + + return nil +} + func waitForConditions(ctx context.Context, provider *RestClientResult, fields []types.WaitForField, conditions []types.WaitForStatusCondition, ns, name string) resource.RetryFunc { return func() *resource.RetryError { rawResponse, err := provider.ResourceInterface.Get(ctx, name, meta_v1.GetOptions{}) @@ -1013,58 +1235,6 @@ func waitForConditions(ctx context.Context, provider *RestClientResult, fields [ } } -func waitForDeploymentReplicasFunc(ctx context.Context, provider *KubeProvider, ns, name string) resource.RetryFunc { - return func() *resource.RetryError { - - // Query the deployment to get a status update. - dply, err := provider.MainClientset.AppsV1().Deployments(ns).Get(ctx, name, meta_v1.GetOptions{}) - if err != nil { - return resource.NonRetryableError(err) - } - - if dply.Generation <= dply.Status.ObservedGeneration { - cond := GetDeploymentCondition(dply.Status, apps_v1.DeploymentProgressing) - if cond != nil && cond.Reason == TimedOutReason { - err := fmt.Errorf("Deployment exceeded its progress deadline: %v", cond.String()) - return resource.NonRetryableError(err) - } - - if dply.Status.UpdatedReplicas < *dply.Spec.Replicas { - return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d out of %d new replicas have been updated...", dply.Status.UpdatedReplicas, dply.Spec.Replicas)) - } - - if dply.Status.Replicas > dply.Status.UpdatedReplicas { - return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d old replicas are pending termination...", dply.Status.Replicas-dply.Status.UpdatedReplicas)) - } - - if dply.Status.AvailableReplicas < dply.Status.UpdatedReplicas { - return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d of %d updated replicas are available...", dply.Status.AvailableReplicas, dply.Status.UpdatedReplicas)) - } - } else if dply.Status.ObservedGeneration == 0 { - return resource.RetryableError(fmt.Errorf("Waiting for rollout to start")) - } - return nil - } -} - -func waitForAPIServiceAvailableFunc(ctx context.Context, provider *KubeProvider, name string) resource.RetryFunc { - return func() *resource.RetryError { - - apiService, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Get(ctx, name, meta_v1.GetOptions{}) - if err != nil { - return resource.NonRetryableError(err) - } - - for i := range apiService.Status.Conditions { - if apiService.Status.Conditions[i].Type == apiregistration.Available { - return nil - } - } - - return resource.RetryableError(fmt.Errorf("Waiting for APIService %v to be Available", name)) - } -} - // Takes the result of flatmap.Expand for an array of strings // and returns a []*string func expandStringList(configured []interface{}) []string { diff --git a/kubernetes/resource_kubectl_manifest_test.go b/kubernetes/resource_kubectl_manifest_test.go index f13c6f20..1d930f39 100644 --- a/kubernetes/resource_kubectl_manifest_test.go +++ b/kubernetes/resource_kubectl_manifest_test.go @@ -224,6 +224,156 @@ YAML }) } +func TestAccKubectl_WaitForRolloutDeployment(t *testing.T) { + //language=hcl + config := ` +resource "kubectl_manifest" "test" { + wait_for_rollout = true + yaml_body = <