Skip to content

Commit ea4b9ac

Browse files
committed
feat: Changed to use watch for wait rollout
Signed-off-by: Steve Hipwell <[email protected]>
1 parent abed1de commit ea4b9ac

File tree

3 files changed

+399
-82
lines changed

3 files changed

+399
-82
lines changed

docs/resources/kubectl_manifest.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ YAML
8888
* `override_namespace` - Optional. Override the namespace to apply the kubernetes resource to, ignoring any declared namespace in the `yaml_body`.
8989
* `validate_schema` - Optional. Setting to `false` will mimic `kubectl apply --validate=false` mode. Default `true`.
9090
* `wait` - Optional. Set this flag to wait or not for finalized to complete for deleted objects. Default `false`.
91-
* `wait_for_rollout` - Optional. Set this flag to wait or not for Deployments and APIService to complete rollout. Default `true`.
91+
* `wait_for_rollout` - Optional. Set this flag to wait or not for `Deployment`, `DaemonSet`, `StatefulSet` & `APIService` resources to complete rollout. Default `true`.
9292
* `wait_for` - Optional. If set, will wait until either all conditions are satisfied, or until timeout is reached (see [below for nested schema](#wait_for)). Under the hood [gojsonq](https://github.com/thedevsaddam/gojsonq) is used for querying, see the related syntax and examples.
9393
* `delete_cascade` - Optional; `Background` or `Foreground` are valid options. If set this overrides the default provider behaviour which is to use `Background` unless `wait` is `true` when `Foreground` will be used. To duplicate the default behaviour of `kubectl` this should be explicitly set to `Background`.
9494

kubernetes/resource_kubectl_manifest.go

+248-81
Original file line numberDiff line numberDiff line change
@@ -611,16 +611,26 @@ func resourceKubectlManifestApply(ctx context.Context, d *schema.ResourceData, m
611611

612612
switch {
613613
case manifest.GetKind() == "Deployment":
614-
log.Printf("[INFO] %v waiting for deployment rollout for %vmin", manifest, timeout.Minutes())
615-
err = resource.RetryContext(ctx, timeout,
616-
waitForDeploymentReplicasFunc(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName()))
614+
log.Printf("[INFO] %v waiting for Deployment rollout for %vmin", manifest, timeout.Minutes())
615+
err = waitForDeploymentRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout)
616+
if err != nil {
617+
return err
618+
}
619+
case manifest.GetKind() == "DaemonSet":
620+
log.Printf("[INFO] %v waiting for DaemonSet rollout for %vmin", manifest, timeout.Minutes())
621+
err = waitForDaemonSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout)
622+
if err != nil {
623+
return err
624+
}
625+
case manifest.GetKind() == "StatefulSet":
626+
log.Printf("[INFO] %v waiting for v rollout for %vmin", manifest, timeout.Minutes())
627+
err = waitForStatefulSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout)
617628
if err != nil {
618629
return err
619630
}
620631
case manifest.GetKind() == "APIService" && manifest.GetAPIVersion() == "apiregistration.k8s.io/v1":
621-
log.Printf("[INFO] %v waiting for APIService rollout for %vmin", manifest, timeout.Minutes())
622-
err = resource.RetryContext(ctx, timeout,
623-
waitForAPIServiceAvailableFunc(ctx, meta.(*KubeProvider), manifest.GetName()))
632+
log.Printf("[INFO] %v waiting for APIService for %vmin", manifest, timeout.Minutes())
633+
err = waitForApiService(ctx, meta.(*KubeProvider), manifest.GetName(), timeout)
624634
if err != nil {
625635
return err
626636
}
@@ -728,20 +738,20 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData,
728738

729739
log.Printf("[INFO] %s perform delete of manifest", manifest)
730740

731-
waitForDelete := d.Get("wait").(bool)
741+
wait := d.Get("wait").(bool)
732742

733743
var propagationPolicy meta_v1.DeletionPropagation
734744
cascadeInput := d.Get("delete_cascade").(string)
735745
if len(cascadeInput) > 0 {
736746
propagationPolicy = meta_v1.DeletionPropagation(cascadeInput)
737-
} else if waitForDelete {
747+
} else if wait {
738748
propagationPolicy = meta_v1.DeletePropagationForeground
739749
} else {
740750
propagationPolicy = meta_v1.DeletePropagationBackground
741751
}
742752

743753
var resourceVersion string
744-
if waitForDelete {
754+
if wait {
745755
rawResponse, err := restClient.ResourceInterface.Get(ctx, manifest.GetName(), meta_v1.GetOptions{})
746756
if err != nil {
747757
return err
@@ -758,28 +768,15 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData,
758768
}
759769

760770
// The rest client doesn't wait for the delete so we need custom logic
761-
if waitForDelete {
771+
if wait {
762772
log.Printf("[INFO] %s waiting for delete of manifest to complete", manifest)
763773

764-
watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, FieldSelector: fields.OneTermEqualSelector("metadata.name", manifest.GetName()).String(), ResourceVersion: resourceVersion})
774+
timeout := d.Timeout(schema.TimeoutDelete)
775+
776+
err = waitForDelete(ctx, restClient, manifest.GetName(), resourceVersion, timeout)
765777
if err != nil {
766778
return err
767779
}
768-
769-
defer watcher.Stop()
770-
771-
deleted := false
772-
for !deleted {
773-
select {
774-
case event := <-watcher.ResultChan():
775-
if event.Type == watch.Deleted {
776-
deleted = true
777-
}
778-
779-
case <-ctx.Done():
780-
return fmt.Errorf("%s failed to delete kubernetes resource", manifest)
781-
}
782-
}
783780
}
784781

785782
// Success remove it from state
@@ -928,9 +925,86 @@ func checkAPIResourceIsPresent(available []*meta_v1.APIResourceList, resource me
928925
return nil, false
929926
}
930927

931-
// GetDeploymentConditionInternal returns the condition with the provided type.
932-
// Borrowed from: https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/deployment/util/deployment_util.go#L135
933-
func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition {
928+
func waitForDelete(ctx context.Context, restClient *RestClientResult, name string, resourceVersion string, timeout time.Duration) error {
929+
timeoutSeconds := int64(timeout.Seconds())
930+
931+
watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), ResourceVersion: resourceVersion})
932+
if err != nil {
933+
return err
934+
}
935+
936+
defer watcher.Stop()
937+
938+
deleted := false
939+
for !deleted {
940+
select {
941+
case event := <-watcher.ResultChan():
942+
if event.Type == watch.Deleted {
943+
deleted = true
944+
}
945+
946+
case <-ctx.Done():
947+
return fmt.Errorf("%s failed to delete resource", name)
948+
}
949+
}
950+
951+
return nil
952+
}
953+
954+
func waitForDeploymentRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error {
955+
// Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/polymorphichelpers/rollout_status.go#L59
956+
957+
timeoutSeconds := int64(timeout.Seconds())
958+
959+
watcher, err := provider.MainClientset.AppsV1().Deployments(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
960+
if err != nil {
961+
return err
962+
}
963+
964+
defer watcher.Stop()
965+
966+
done := false
967+
for !done {
968+
select {
969+
case event := <-watcher.ResultChan():
970+
if event.Type == watch.Modified {
971+
deployment, ok := event.Object.(*apps_v1.Deployment)
972+
if !ok {
973+
return fmt.Errorf("%s could not cast to Deployment", name)
974+
}
975+
976+
if deployment.Generation <= deployment.Status.ObservedGeneration {
977+
condition := getDeploymentCondition(deployment.Status, apps_v1.DeploymentProgressing)
978+
if condition != nil && condition.Reason == TimedOutReason {
979+
continue
980+
}
981+
982+
if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
983+
continue
984+
}
985+
986+
if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
987+
continue
988+
}
989+
990+
if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
991+
continue
992+
}
993+
994+
done = true
995+
}
996+
}
997+
998+
case <-ctx.Done():
999+
return fmt.Errorf("%s failed to rollout Deployment", name)
1000+
}
1001+
}
1002+
1003+
return nil
1004+
}
1005+
1006+
func getDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition {
1007+
// Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/util/deployment/deployment.go#L60
9341008
for i := range status.Conditions {
9351009
c := status.Conditions[i]
9361010
if c.Type == condType {
@@ -940,6 +1014,151 @@ func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.De
9401014
return nil
9411015
}
9421016

1017+
func waitForDaemonSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error {
1018+
// Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/polymorphichelpers/rollout_status.go#L95
1019+
1020+
timeoutSeconds := int64(timeout.Seconds())
1021+
1022+
watcher, err := provider.MainClientset.AppsV1().DaemonSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
1023+
if err != nil {
1024+
return err
1025+
}
1026+
1027+
defer watcher.Stop()
1028+
1029+
done := false
1030+
for !done {
1031+
select {
1032+
case event := <-watcher.ResultChan():
1033+
if event.Type == watch.Modified {
1034+
daemon, ok := event.Object.(*apps_v1.DaemonSet)
1035+
if !ok {
1036+
return fmt.Errorf("%s could not cast to DaemonSet", name)
1037+
}
1038+
1039+
if daemon.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateDaemonSetStrategyType {
1040+
done = true
1041+
continue
1042+
}
1043+
1044+
if daemon.Generation <= daemon.Status.ObservedGeneration {
1045+
if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled {
1046+
continue
1047+
}
1048+
1049+
if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled {
1050+
continue
1051+
}
1052+
1053+
done = true
1054+
}
1055+
}
1056+
1057+
case <-ctx.Done():
1058+
return fmt.Errorf("%s failed to rollout DaemonSet", name)
1059+
}
1060+
}
1061+
1062+
return nil
1063+
}
1064+
1065+
func waitForStatefulSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error {
1066+
// Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/polymorphichelpers/rollout_status.go#L120
1067+
1068+
timeoutSeconds := int64(timeout.Seconds())
1069+
1070+
watcher, err := provider.MainClientset.AppsV1().StatefulSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
1071+
if err != nil {
1072+
return err
1073+
}
1074+
1075+
defer watcher.Stop()
1076+
1077+
done := false
1078+
for !done {
1079+
select {
1080+
case event := <-watcher.ResultChan():
1081+
if event.Type == watch.Modified {
1082+
sts, ok := event.Object.(*apps_v1.StatefulSet)
1083+
if !ok {
1084+
return fmt.Errorf("%s could not cast to StatefulSet", name)
1085+
}
1086+
1087+
if sts.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateStatefulSetStrategyType {
1088+
done = true
1089+
continue
1090+
}
1091+
1092+
if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration {
1093+
continue
1094+
}
1095+
1096+
if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas {
1097+
continue
1098+
}
1099+
1100+
if sts.Spec.UpdateStrategy.Type == apps_v1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil {
1101+
if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
1102+
if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
1103+
continue
1104+
}
1105+
}
1106+
1107+
done = true
1108+
continue
1109+
}
1110+
1111+
if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
1112+
continue
1113+
}
1114+
1115+
done = true
1116+
}
1117+
1118+
case <-ctx.Done():
1119+
return fmt.Errorf("%s failed to rollout StatefulSet", name)
1120+
}
1121+
}
1122+
1123+
return nil
1124+
}
1125+
1126+
func waitForApiService(ctx context.Context, provider *KubeProvider, name string, timeout time.Duration) error {
1127+
timeoutSeconds := int64(timeout.Seconds())
1128+
1129+
watcher, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
1130+
if err != nil {
1131+
return err
1132+
}
1133+
1134+
defer watcher.Stop()
1135+
1136+
done := false
1137+
for !done {
1138+
select {
1139+
case event := <-watcher.ResultChan():
1140+
if event.Type == watch.Modified {
1141+
apiService, ok := event.Object.(*apiregistration.APIService)
1142+
if !ok {
1143+
return fmt.Errorf("%s could not cast to APIService", name)
1144+
}
1145+
1146+
for i := range apiService.Status.Conditions {
1147+
if apiService.Status.Conditions[i].Type == apiregistration.Available {
1148+
done = true
1149+
continue
1150+
}
1151+
}
1152+
}
1153+
1154+
case <-ctx.Done():
1155+
return fmt.Errorf("%s failed to wait for APIService", name)
1156+
}
1157+
}
1158+
1159+
return nil
1160+
}
1161+
9431162
func waitForFields(ctx context.Context, provider *RestClientResult, conditions []types.WaitForField, ns, name string) resource.RetryFunc {
9441163
return func() *resource.RetryError {
9451164
rawResponse, err := provider.ResourceInterface.Get(ctx, name, meta_v1.GetOptions{})
@@ -982,58 +1201,6 @@ func waitForFields(ctx context.Context, provider *RestClientResult, conditions [
9821201
}
9831202
}
9841203

985-
func waitForDeploymentReplicasFunc(ctx context.Context, provider *KubeProvider, ns, name string) resource.RetryFunc {
986-
return func() *resource.RetryError {
987-
988-
// Query the deployment to get a status update.
989-
dply, err := provider.MainClientset.AppsV1().Deployments(ns).Get(ctx, name, meta_v1.GetOptions{})
990-
if err != nil {
991-
return resource.NonRetryableError(err)
992-
}
993-
994-
if dply.Generation <= dply.Status.ObservedGeneration {
995-
cond := GetDeploymentCondition(dply.Status, apps_v1.DeploymentProgressing)
996-
if cond != nil && cond.Reason == TimedOutReason {
997-
err := fmt.Errorf("Deployment exceeded its progress deadline: %v", cond.String())
998-
return resource.NonRetryableError(err)
999-
}
1000-
1001-
if dply.Status.UpdatedReplicas < *dply.Spec.Replicas {
1002-
return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d out of %d new replicas have been updated...", dply.Status.UpdatedReplicas, dply.Spec.Replicas))
1003-
}
1004-
1005-
if dply.Status.Replicas > dply.Status.UpdatedReplicas {
1006-
return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d old replicas are pending termination...", dply.Status.Replicas-dply.Status.UpdatedReplicas))
1007-
}
1008-
1009-
if dply.Status.AvailableReplicas < dply.Status.UpdatedReplicas {
1010-
return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d of %d updated replicas are available...", dply.Status.AvailableReplicas, dply.Status.UpdatedReplicas))
1011-
}
1012-
} else if dply.Status.ObservedGeneration == 0 {
1013-
return resource.RetryableError(fmt.Errorf("Waiting for rollout to start"))
1014-
}
1015-
return nil
1016-
}
1017-
}
1018-
1019-
func waitForAPIServiceAvailableFunc(ctx context.Context, provider *KubeProvider, name string) resource.RetryFunc {
1020-
return func() *resource.RetryError {
1021-
1022-
apiService, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Get(ctx, name, meta_v1.GetOptions{})
1023-
if err != nil {
1024-
return resource.NonRetryableError(err)
1025-
}
1026-
1027-
for i := range apiService.Status.Conditions {
1028-
if apiService.Status.Conditions[i].Type == apiregistration.Available {
1029-
return nil
1030-
}
1031-
}
1032-
1033-
return resource.RetryableError(fmt.Errorf("Waiting for APIService %v to be Available", name))
1034-
}
1035-
}
1036-
10371204
// Takes the result of flatmap.Expand for an array of strings
10381205
// and returns a []*string
10391206
func expandStringList(configured []interface{}) []string {

0 commit comments

Comments
 (0)