Skip to content

Commit 5e25656

Browse files
committed
feat: Changed to use watch for wait rollout
Signed-off-by: Steve Hipwell <[email protected]>
1 parent abed1de commit 5e25656

File tree

3 files changed

+393
-82
lines changed

3 files changed

+393
-82
lines changed

docs/resources/kubectl_manifest.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ YAML
8888
* `override_namespace` - Optional. Override the namespace to apply the kubernetes resource to, ignoring any declared namespace in the `yaml_body`.
8989
* `validate_schema` - Optional. Setting to `false` will mimic `kubectl apply --validate=false` mode. Default `true`.
9090
* `wait` - Optional. Set this flag to wait or not for finalized to complete for deleted objects. Default `false`.
91-
* `wait_for_rollout` - Optional. Set this flag to wait or not for Deployments and APIService to complete rollout. Default `true`.
91+
* `wait_for_rollout` - Optional. Set this flag to wait or not for `Deployment`, `DaemonSet`, `StatefulSet` & `APIService` resources to complete rollout. Default `true`.
9292
* `wait_for` - Optional. If set, will wait until either all conditions are satisfied, or until timeout is reached (see [below for nested schema](#wait_for)). Under the hood [gojsonq](https://github.com/thedevsaddam/gojsonq) is used for querying, see the related syntax and examples.
9393
* `delete_cascade` - Optional; `Background` or `Foreground` are valid options. If set this overrides the default provider behaviour which is to use `Background` unless `wait` is `true` when `Foreground` will be used. To duplicate the default behaviour of `kubectl` this should be explicitly set to `Background`.
9494

kubernetes/resource_kubectl_manifest.go

+242-81
Original file line numberDiff line numberDiff line change
@@ -611,16 +611,26 @@ func resourceKubectlManifestApply(ctx context.Context, d *schema.ResourceData, m
611611

612612
switch {
613613
case manifest.GetKind() == "Deployment":
614-
log.Printf("[INFO] %v waiting for deployment rollout for %vmin", manifest, timeout.Minutes())
615-
err = resource.RetryContext(ctx, timeout,
616-
waitForDeploymentReplicasFunc(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName()))
614+
log.Printf("[INFO] %v waiting for Deployment rollout for %vmin", manifest, timeout.Minutes())
615+
err = waitForDeploymentRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout)
616+
if err != nil {
617+
return err
618+
}
619+
case manifest.GetKind() == "DaemonSet":
620+
log.Printf("[INFO] %v waiting for DaemonSet rollout for %vmin", manifest, timeout.Minutes())
621+
err = waitForDaemonSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout)
622+
if err != nil {
623+
return err
624+
}
625+
case manifest.GetKind() == "StatefulSet":
626+
log.Printf("[INFO] %v waiting for v rollout for %vmin", manifest, timeout.Minutes())
627+
err = waitForStatefulSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout)
617628
if err != nil {
618629
return err
619630
}
620631
case manifest.GetKind() == "APIService" && manifest.GetAPIVersion() == "apiregistration.k8s.io/v1":
621-
log.Printf("[INFO] %v waiting for APIService rollout for %vmin", manifest, timeout.Minutes())
622-
err = resource.RetryContext(ctx, timeout,
623-
waitForAPIServiceAvailableFunc(ctx, meta.(*KubeProvider), manifest.GetName()))
632+
log.Printf("[INFO] %v waiting for APIService for %vmin", manifest, timeout.Minutes())
633+
err = waitForApiService(ctx, meta.(*KubeProvider), manifest.GetName(), timeout)
624634
if err != nil {
625635
return err
626636
}
@@ -728,20 +738,20 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData,
728738

729739
log.Printf("[INFO] %s perform delete of manifest", manifest)
730740

731-
waitForDelete := d.Get("wait").(bool)
741+
wait := d.Get("wait").(bool)
732742

733743
var propagationPolicy meta_v1.DeletionPropagation
734744
cascadeInput := d.Get("delete_cascade").(string)
735745
if len(cascadeInput) > 0 {
736746
propagationPolicy = meta_v1.DeletionPropagation(cascadeInput)
737-
} else if waitForDelete {
747+
} else if wait {
738748
propagationPolicy = meta_v1.DeletePropagationForeground
739749
} else {
740750
propagationPolicy = meta_v1.DeletePropagationBackground
741751
}
742752

743753
var resourceVersion string
744-
if waitForDelete {
754+
if wait {
745755
rawResponse, err := restClient.ResourceInterface.Get(ctx, manifest.GetName(), meta_v1.GetOptions{})
746756
if err != nil {
747757
return err
@@ -758,28 +768,15 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData,
758768
}
759769

760770
// The rest client doesn't wait for the delete so we need custom logic
761-
if waitForDelete {
771+
if wait {
762772
log.Printf("[INFO] %s waiting for delete of manifest to complete", manifest)
763773

764-
watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, FieldSelector: fields.OneTermEqualSelector("metadata.name", manifest.GetName()).String(), ResourceVersion: resourceVersion})
774+
timeout := d.Timeout(schema.TimeoutDelete)
775+
776+
err = waitForDelete(ctx, restClient, manifest.GetName(), resourceVersion, timeout)
765777
if err != nil {
766778
return err
767779
}
768-
769-
defer watcher.Stop()
770-
771-
deleted := false
772-
for !deleted {
773-
select {
774-
case event := <-watcher.ResultChan():
775-
if event.Type == watch.Deleted {
776-
deleted = true
777-
}
778-
779-
case <-ctx.Done():
780-
return fmt.Errorf("%s failed to delete kubernetes resource", manifest)
781-
}
782-
}
783780
}
784781

785782
// Success remove it from state
@@ -928,9 +925,84 @@ func checkAPIResourceIsPresent(available []*meta_v1.APIResourceList, resource me
928925
return nil, false
929926
}
930927

931-
// GetDeploymentConditionInternal returns the condition with the provided type.
932-
// Borrowed from: https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/deployment/util/deployment_util.go#L135
933-
func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition {
928+
func waitForDelete(ctx context.Context, restClient *RestClientResult, name string, resourceVersion string, timeout time.Duration) error {
929+
timeoutSeconds := int64(timeout.Seconds())
930+
931+
watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), ResourceVersion: resourceVersion})
932+
if err != nil {
933+
return err
934+
}
935+
936+
defer watcher.Stop()
937+
938+
deleted := false
939+
for !deleted {
940+
select {
941+
case event := <-watcher.ResultChan():
942+
if event.Type == watch.Deleted {
943+
deleted = true
944+
}
945+
946+
case <-ctx.Done():
947+
return fmt.Errorf("%s failed to delete resource", name)
948+
}
949+
}
950+
951+
return nil
952+
}
953+
954+
func waitForDeploymentRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error {
955+
timeoutSeconds := int64(timeout.Seconds())
956+
957+
watcher, err := provider.MainClientset.AppsV1().Deployments(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
958+
if err != nil {
959+
return err
960+
}
961+
962+
defer watcher.Stop()
963+
964+
done := false
965+
for !done {
966+
select {
967+
case event := <-watcher.ResultChan():
968+
if event.Type == watch.Modified {
969+
deployment, ok := event.Object.(*apps_v1.Deployment)
970+
if !ok {
971+
return fmt.Errorf("%s could not cast to Deployment", name)
972+
}
973+
974+
if deployment.Generation <= deployment.Status.ObservedGeneration {
975+
condition := getDeploymentCondition(deployment.Status, apps_v1.DeploymentProgressing)
976+
if condition != nil && condition.Reason == TimedOutReason {
977+
continue
978+
}
979+
980+
if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
981+
continue
982+
}
983+
984+
if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
985+
continue
986+
}
987+
988+
if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
989+
continue
990+
}
991+
992+
done = true
993+
}
994+
}
995+
996+
case <-ctx.Done():
997+
return fmt.Errorf("%s failed to rollout Deployment", name)
998+
}
999+
}
1000+
1001+
return nil
1002+
}
1003+
1004+
func getDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition {
1005+
// Borrowed from: https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/deployment/util/deployment_util.go#L135
9341006
for i := range status.Conditions {
9351007
c := status.Conditions[i]
9361008
if c.Type == condType {
@@ -940,6 +1012,147 @@ func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.De
9401012
return nil
9411013
}
9421014

1015+
func waitForDaemonSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error {
1016+
timeoutSeconds := int64(timeout.Seconds())
1017+
1018+
watcher, err := provider.MainClientset.AppsV1().DaemonSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
1019+
if err != nil {
1020+
return err
1021+
}
1022+
1023+
defer watcher.Stop()
1024+
1025+
done := false
1026+
for !done {
1027+
select {
1028+
case event := <-watcher.ResultChan():
1029+
if event.Type == watch.Modified {
1030+
daemon, ok := event.Object.(*apps_v1.DaemonSet)
1031+
if !ok {
1032+
return fmt.Errorf("%s could not cast to DaemonSet", name)
1033+
}
1034+
1035+
if daemon.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateDaemonSetStrategyType {
1036+
done = true
1037+
continue
1038+
}
1039+
1040+
if daemon.Generation <= daemon.Status.ObservedGeneration {
1041+
if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled {
1042+
continue
1043+
}
1044+
1045+
if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled {
1046+
continue
1047+
}
1048+
1049+
done = true
1050+
}
1051+
}
1052+
1053+
case <-ctx.Done():
1054+
return fmt.Errorf("%s failed to rollout DaemonSet", name)
1055+
}
1056+
}
1057+
1058+
return nil
1059+
}
1060+
1061+
func waitForStatefulSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error {
1062+
timeoutSeconds := int64(timeout.Seconds())
1063+
1064+
watcher, err := provider.MainClientset.AppsV1().StatefulSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
1065+
if err != nil {
1066+
return err
1067+
}
1068+
1069+
defer watcher.Stop()
1070+
1071+
done := false
1072+
for !done {
1073+
select {
1074+
case event := <-watcher.ResultChan():
1075+
if event.Type == watch.Modified {
1076+
sts, ok := event.Object.(*apps_v1.StatefulSet)
1077+
if !ok {
1078+
return fmt.Errorf("%s could not cast to StatefulSet", name)
1079+
}
1080+
1081+
if sts.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateStatefulSetStrategyType {
1082+
done = true
1083+
continue
1084+
}
1085+
1086+
if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration {
1087+
continue
1088+
}
1089+
1090+
if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas {
1091+
continue
1092+
}
1093+
1094+
if sts.Spec.UpdateStrategy.Type == apps_v1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil {
1095+
if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
1096+
if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
1097+
continue
1098+
}
1099+
}
1100+
1101+
done = true
1102+
continue
1103+
}
1104+
1105+
if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
1106+
continue
1107+
}
1108+
1109+
done = true
1110+
}
1111+
1112+
case <-ctx.Done():
1113+
return fmt.Errorf("%s failed to rollout StatefulSet", name)
1114+
}
1115+
}
1116+
1117+
return nil
1118+
}
1119+
1120+
func waitForApiService(ctx context.Context, provider *KubeProvider, name string, timeout time.Duration) error {
1121+
timeoutSeconds := int64(timeout.Seconds())
1122+
1123+
watcher, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
1124+
if err != nil {
1125+
return err
1126+
}
1127+
1128+
defer watcher.Stop()
1129+
1130+
done := false
1131+
for !done {
1132+
select {
1133+
case event := <-watcher.ResultChan():
1134+
if event.Type == watch.Modified {
1135+
apiService, ok := event.Object.(*apiregistration.APIService)
1136+
if !ok {
1137+
return fmt.Errorf("%s could not cast to APIService", name)
1138+
}
1139+
1140+
for i := range apiService.Status.Conditions {
1141+
if apiService.Status.Conditions[i].Type == apiregistration.Available {
1142+
done = true
1143+
continue
1144+
}
1145+
}
1146+
}
1147+
1148+
case <-ctx.Done():
1149+
return fmt.Errorf("%s failed to wait for APIService", name)
1150+
}
1151+
}
1152+
1153+
return nil
1154+
}
1155+
9431156
func waitForFields(ctx context.Context, provider *RestClientResult, conditions []types.WaitForField, ns, name string) resource.RetryFunc {
9441157
return func() *resource.RetryError {
9451158
rawResponse, err := provider.ResourceInterface.Get(ctx, name, meta_v1.GetOptions{})
@@ -982,58 +1195,6 @@ func waitForFields(ctx context.Context, provider *RestClientResult, conditions [
9821195
}
9831196
}
9841197

985-
func waitForDeploymentReplicasFunc(ctx context.Context, provider *KubeProvider, ns, name string) resource.RetryFunc {
986-
return func() *resource.RetryError {
987-
988-
// Query the deployment to get a status update.
989-
dply, err := provider.MainClientset.AppsV1().Deployments(ns).Get(ctx, name, meta_v1.GetOptions{})
990-
if err != nil {
991-
return resource.NonRetryableError(err)
992-
}
993-
994-
if dply.Generation <= dply.Status.ObservedGeneration {
995-
cond := GetDeploymentCondition(dply.Status, apps_v1.DeploymentProgressing)
996-
if cond != nil && cond.Reason == TimedOutReason {
997-
err := fmt.Errorf("Deployment exceeded its progress deadline: %v", cond.String())
998-
return resource.NonRetryableError(err)
999-
}
1000-
1001-
if dply.Status.UpdatedReplicas < *dply.Spec.Replicas {
1002-
return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d out of %d new replicas have been updated...", dply.Status.UpdatedReplicas, dply.Spec.Replicas))
1003-
}
1004-
1005-
if dply.Status.Replicas > dply.Status.UpdatedReplicas {
1006-
return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d old replicas are pending termination...", dply.Status.Replicas-dply.Status.UpdatedReplicas))
1007-
}
1008-
1009-
if dply.Status.AvailableReplicas < dply.Status.UpdatedReplicas {
1010-
return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d of %d updated replicas are available...", dply.Status.AvailableReplicas, dply.Status.UpdatedReplicas))
1011-
}
1012-
} else if dply.Status.ObservedGeneration == 0 {
1013-
return resource.RetryableError(fmt.Errorf("Waiting for rollout to start"))
1014-
}
1015-
return nil
1016-
}
1017-
}
1018-
1019-
func waitForAPIServiceAvailableFunc(ctx context.Context, provider *KubeProvider, name string) resource.RetryFunc {
1020-
return func() *resource.RetryError {
1021-
1022-
apiService, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Get(ctx, name, meta_v1.GetOptions{})
1023-
if err != nil {
1024-
return resource.NonRetryableError(err)
1025-
}
1026-
1027-
for i := range apiService.Status.Conditions {
1028-
if apiService.Status.Conditions[i].Type == apiregistration.Available {
1029-
return nil
1030-
}
1031-
}
1032-
1033-
return resource.RetryableError(fmt.Errorf("Waiting for APIService %v to be Available", name))
1034-
}
1035-
}
1036-
10371198
// Takes the result of flatmap.Expand for an array of strings
10381199
// and returns a []*string
10391200
func expandStringList(configured []interface{}) []string {

0 commit comments

Comments
 (0)