diff --git a/operator/api/pulumi/shared/stack_types.go b/operator/api/pulumi/shared/stack_types.go index ade27468..9333df53 100644 --- a/operator/api/pulumi/shared/stack_types.go +++ b/operator/api/pulumi/shared/stack_types.go @@ -388,6 +388,10 @@ type StackUpdateState struct { Permalink Permalink `json:"permalink,omitempty"` // LastResyncTime contains a timestamp for the last time a resync of the stack took place. LastResyncTime metav1.Time `json:"lastResyncTime,omitempty"` + // Failures records how many times the update has been attempted and + // failed. Failed updates are periodically retried with exponential backoff + // in case the failure was due to transient conditions. + Failures int64 `json:"failures"` } // StackUpdateStatus is the status code for the result of a Stack Update run. diff --git a/operator/config/crd/bases/pulumi.com_stacks.yaml b/operator/config/crd/bases/pulumi.com_stacks.yaml index 841d8cfd..e04d54f5 100644 --- a/operator/config/crd/bases/pulumi.com_stacks.yaml +++ b/operator/config/crd/bases/pulumi.com_stacks.yaml @@ -9506,6 +9506,13 @@ spec: description: LastUpdate contains details of the status of the last update. properties: + failures: + description: |- + Failures records how many times the update has been attempted and + failed. Failed updates are periodically retried with exponential backoff + in case the failure was due to transient conditions. + format: int64 + type: integer generation: description: Generation is the stack generation associated with the update. @@ -9536,6 +9543,8 @@ spec: type: description: Type is the type of update. type: string + required: + - failures type: object observedGeneration: description: ObservedGeneration records the value of .meta.generation @@ -18962,6 +18971,13 @@ spec: description: LastUpdate contains details of the status of the last update. properties: + failures: + description: |- + Failures records how many times the update has been attempted and + failed. Failed updates are periodically retried with exponential backoff + in case the failure was due to transient conditions. + format: int64 + type: integer generation: description: Generation is the stack generation associated with the update. @@ -18992,6 +19008,8 @@ spec: type: description: Type is the type of update. type: string + required: + - failures type: object outputs: additionalProperties: diff --git a/operator/internal/controller/auto/update_controller.go b/operator/internal/controller/auto/update_controller.go index e09e9261..798191ba 100644 --- a/operator/internal/controller/auto/update_controller.go +++ b/operator/internal/controller/auto/update_controller.go @@ -19,14 +19,17 @@ package controller import ( "context" "encoding/json" + "errors" "fmt" "io" "time" + "github.com/go-logr/logr" agentpb "github.com/pulumi/pulumi-kubernetes-operator/v2/agent/pkg/proto" autov1alpha1 "github.com/pulumi/pulumi-kubernetes-operator/v2/operator/api/auto/v1alpha1" "github.com/pulumi/pulumi/sdk/v3/go/common/apitype" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" @@ -81,38 +84,7 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, client.IgnoreNotFound(err) } - rs := &reconcileSession{} - rs.progressing = meta.FindStatusCondition(obj.Status.Conditions, UpdateConditionTypeProgressing) - if rs.progressing == nil { - rs.progressing = &metav1.Condition{ - Type: UpdateConditionTypeProgressing, - Status: metav1.ConditionUnknown, - } - } - rs.failed = meta.FindStatusCondition(obj.Status.Conditions, UpdateConditionTypeFailed) - if rs.failed == nil { - rs.failed = &metav1.Condition{ - Type: UpdateConditionTypeFailed, - Status: metav1.ConditionUnknown, - } - } - rs.complete = meta.FindStatusCondition(obj.Status.Conditions, UpdateConditionTypeComplete) - if rs.complete == nil { - rs.complete = &metav1.Condition{ - Type: UpdateConditionTypeComplete, - Status: metav1.ConditionUnknown, - } - } - rs.updateStatus = func() error { - obj.Status.ObservedGeneration = obj.Generation - rs.progressing.ObservedGeneration = obj.Generation - meta.SetStatusCondition(&obj.Status.Conditions, *rs.progressing) - rs.failed.ObservedGeneration = obj.Generation - meta.SetStatusCondition(&obj.Status.Conditions, *rs.failed) - rs.complete.ObservedGeneration = obj.Generation - meta.SetStatusCondition(&obj.Status.Conditions, *rs.complete) - return r.Status().Update(ctx, obj) - } + rs := newReconcileSession(r.Client, obj) if rs.complete.Status == metav1.ConditionTrue { l.V(1).Info("Ignoring completed update") @@ -128,7 +100,7 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr rs.failed.Reason = "unknown" rs.complete.Status = metav1.ConditionTrue rs.complete.Reason = "Aborted" - return ctrl.Result{}, rs.updateStatus() + return ctrl.Result{}, rs.updateStatus(ctx, obj) } l.Info("Updating the status") @@ -138,7 +110,7 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr rs.failed.Reason = UpdateConditionReasonProgressing rs.complete.Status = metav1.ConditionFalse rs.complete.Reason = UpdateConditionReasonProgressing - err = rs.updateStatus() + err = rs.updateStatus(ctx, obj) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to update the status: %w", err) } @@ -164,7 +136,7 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr rs.failed.Reason = UpdateConditionReasonProgressing rs.complete.Status = metav1.ConditionFalse rs.complete.Reason = UpdateConditionReasonProgressing - return ctrl.Result{RequeueAfter: 5 * time.Second}, rs.updateStatus() + return ctrl.Result{RequeueAfter: 5 * time.Second}, rs.updateStatus(ctx, obj) } defer func() { _ = conn.Close() @@ -196,10 +168,52 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } type reconcileSession struct { - progressing *metav1.Condition - complete *metav1.Condition - failed *metav1.Condition - updateStatus func() error + progressing *metav1.Condition + complete *metav1.Condition + failed *metav1.Condition + client client.Client +} + +// newReconcileSession creates a new reconcileSession. +func newReconcileSession(client client.Client, obj *autov1alpha1.Update) *reconcileSession { + rs := &reconcileSession{client: client} + rs.progressing = meta.FindStatusCondition(obj.Status.Conditions, UpdateConditionTypeProgressing) + if rs.progressing == nil { + rs.progressing = &metav1.Condition{ + Type: UpdateConditionTypeProgressing, + Status: metav1.ConditionUnknown, + } + } + rs.failed = meta.FindStatusCondition(obj.Status.Conditions, UpdateConditionTypeFailed) + if rs.failed == nil { + rs.failed = &metav1.Condition{ + Type: UpdateConditionTypeFailed, + Status: metav1.ConditionUnknown, + } + } + rs.complete = meta.FindStatusCondition(obj.Status.Conditions, UpdateConditionTypeComplete) + if rs.complete == nil { + rs.complete = &metav1.Condition{ + Type: UpdateConditionTypeComplete, + Status: metav1.ConditionUnknown, + } + } + return rs +} + +func (rs *reconcileSession) updateStatus(ctx context.Context, obj *autov1alpha1.Update) error { + obj.Status.ObservedGeneration = obj.Generation + rs.progressing.ObservedGeneration = obj.Generation + meta.SetStatusCondition(&obj.Status.Conditions, *rs.progressing) + rs.failed.ObservedGeneration = obj.Generation + meta.SetStatusCondition(&obj.Status.Conditions, *rs.failed) + rs.complete.ObservedGeneration = obj.Generation + meta.SetStatusCondition(&obj.Status.Conditions, *rs.complete) + err := rs.client.Status().Update(ctx, obj) + if err == nil { + return nil + } + return fmt.Errorf("updating status: %w", err) } func (u *reconcileSession) Preview(ctx context.Context, obj *autov1alpha1.Update, client agentpb.AutomationServiceClient) (ctrl.Result, error) { @@ -221,53 +235,15 @@ func (u *reconcileSession) Preview(ctx context.Context, obj *autov1alpha1.Update if err != nil { return ctrl.Result{}, fmt.Errorf("failed request to workspace: %w", err) } - done := make(chan error) - go func() { - for { - stream, err := res.Recv() - if err == io.EOF { - close(done) - return - } - - switch r := stream.Response.(type) { - case *agentpb.PreviewStream_Event: - continue - case *agentpb.PreviewStream_Result: - l.Info("Result received", "result", r.Result) - - obj.Status.StartTime = metav1.NewTime(r.Result.Summary.StartTime.AsTime()) - obj.Status.EndTime = metav1.NewTime(r.Result.Summary.EndTime.AsTime()) - if r.Result.Permalink != nil { - obj.Status.Permalink = *r.Result.Permalink - } - obj.Status.Message = r.Result.Summary.Message - u.progressing.Status = metav1.ConditionFalse - u.progressing.Reason = UpdateConditionReasonComplete - u.complete.Status = metav1.ConditionTrue - u.complete.Reason = UpdateConditionReasonUpdated - switch r.Result.Summary.Result { - case string(apitype.StatusSucceeded): - u.failed.Status = metav1.ConditionFalse - u.failed.Reason = r.Result.Summary.Result - default: - u.failed.Status = metav1.ConditionTrue - u.failed.Reason = r.Result.Summary.Result - } - err = u.updateStatus() - if err != nil { - done <- fmt.Errorf("failed to update the status: %w", err) - return - } - } - } - }() - err = <-done + defer func() { _ = res.CloseSend() }() + + reader := streamReader[agentpb.PreviewStream]{receiver: res, l: l, u: u, obj: obj} + _, err = reader.Result() if err != nil { - return ctrl.Result{}, fmt.Errorf("response error: %w", err) + return ctrl.Result{}, err } - return ctrl.Result{}, nil + return ctrl.Result{}, u.updateStatus(ctx, obj) } type upper interface { @@ -302,77 +278,31 @@ func (u *reconcileSession) Update(ctx context.Context, obj *autov1alpha1.Update, l.Info("Executing update operation", "request", autoReq) res, err := client.Up(ctx, autoReq, grpc.WaitForReady(true)) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed request to workspace: %w", err) + } defer func() { _ = res.CloseSend() }() + reader := streamReader[agentpb.UpStream]{receiver: res, l: l, u: u, obj: obj} + result, err := reader.Result() if err != nil { - return ctrl.Result{}, fmt.Errorf("failed request to workspace: %w", err) + return ctrl.Result{}, err } - for { - stream, err := res.Recv() - if err == io.EOF { - break - } + // Create a secret with result.Outputs + if r, ok := result.(*agentpb.UpResult); ok && r.Outputs != nil { + secret, err := outputsToSecret(obj, r.Outputs) if err != nil { - // Update failed - obj.Status.Message = status.Convert(err).Message() - u.progressing.Status = metav1.ConditionFalse - u.progressing.Reason = UpdateConditionReasonComplete - u.complete.Status = metav1.ConditionTrue - u.complete.Reason = UpdateConditionReasonComplete - u.failed.Status = metav1.ConditionTrue - u.failed.Reason = status.Code(err).String() - u.failed.Message = obj.Status.Message - return ctrl.Result{}, u.updateStatus() - } - - result := stream.GetResult() - if result == nil { - continue - } - - l.Info("Result received", "result", result) - - // Create a secret with result.Outputs - if result.Outputs != nil { - secret, err := outputsToSecret(obj, result.Outputs) - if err != nil { - return ctrl.Result{}, fmt.Errorf("marshaling outputs: %w", err) - } - err = kclient.Create(ctx, secret) - if err != nil { - return ctrl.Result{}, fmt.Errorf("creating output secret: %w", err) - } - obj.Status.Outputs = secret.Name - } - - obj.Status.StartTime = metav1.NewTime(result.Summary.StartTime.AsTime()) - obj.Status.EndTime = metav1.NewTime(result.Summary.EndTime.AsTime()) - if result.Permalink != nil { - obj.Status.Permalink = *result.Permalink - } - obj.Status.Message = result.Summary.Message - u.progressing.Status = metav1.ConditionFalse - u.progressing.Reason = UpdateConditionReasonComplete - u.complete.Status = metav1.ConditionTrue - u.complete.Reason = UpdateConditionReasonUpdated - switch result.Summary.Result { - case string(apitype.StatusSucceeded): - u.failed.Status = metav1.ConditionFalse - u.failed.Reason = result.Summary.Result - default: - u.failed.Status = metav1.ConditionTrue - u.failed.Reason = result.Summary.Result - u.failed.Message = result.Summary.Message + return ctrl.Result{}, fmt.Errorf("marshaling outputs: %w", err) } - err = u.updateStatus() + err = kclient.Create(ctx, secret) if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to update the status: %w", err) + return ctrl.Result{}, fmt.Errorf("creating output secret: %w", err) } - return ctrl.Result{}, nil + obj.Status.Outputs = secret.Name } - return ctrl.Result{}, fmt.Errorf("didn't receive a result") + return ctrl.Result{}, u.updateStatus(ctx, obj) } func (u *reconcileSession) Refresh(ctx context.Context, obj *autov1alpha1.Update, client agentpb.AutomationServiceClient) (ctrl.Result, error) { @@ -391,53 +321,15 @@ func (u *reconcileSession) Refresh(ctx context.Context, obj *autov1alpha1.Update if err != nil { return ctrl.Result{}, fmt.Errorf("failed request to workspace: %w", err) } - done := make(chan error) - go func() { - for { - stream, err := res.Recv() - if err == io.EOF { - close(done) - return - } - - switch r := stream.Response.(type) { - case *agentpb.RefreshStream_Event: - continue - case *agentpb.RefreshStream_Result: - l.Info("Result received", "result", r.Result) - - obj.Status.StartTime = metav1.NewTime(r.Result.Summary.StartTime.AsTime()) - obj.Status.EndTime = metav1.NewTime(r.Result.Summary.EndTime.AsTime()) - if r.Result.Permalink != nil { - obj.Status.Permalink = *r.Result.Permalink - } - obj.Status.Message = r.Result.Summary.Message - u.progressing.Status = metav1.ConditionFalse - u.progressing.Reason = UpdateConditionReasonComplete - u.complete.Status = metav1.ConditionTrue - u.complete.Reason = UpdateConditionReasonUpdated - switch r.Result.Summary.Result { - case string(apitype.StatusSucceeded): - u.failed.Status = metav1.ConditionFalse - u.failed.Reason = r.Result.Summary.Result - default: - u.failed.Status = metav1.ConditionTrue - u.failed.Reason = r.Result.Summary.Result - } - err = u.updateStatus() - if err != nil { - done <- fmt.Errorf("failed to update the status: %w", err) - return - } - } - } - }() - err = <-done + defer func() { _ = res.CloseSend() }() + + reader := streamReader[agentpb.RefreshStream]{receiver: res, l: l, u: u, obj: obj} + _, err = reader.Result() if err != nil { - return ctrl.Result{}, fmt.Errorf("response error: %w", err) + return ctrl.Result{}, err } - return ctrl.Result{}, nil + return ctrl.Result{}, u.updateStatus(ctx, obj) } func (u *reconcileSession) Destroy(ctx context.Context, obj *autov1alpha1.Update, client agentpb.AutomationServiceClient) (ctrl.Result, error) { @@ -454,58 +346,20 @@ func (u *reconcileSession) Destroy(ctx context.Context, obj *autov1alpha1.Update Remove: obj.Spec.Remove, } - l.Info("Executing refresh operation", "request", autoReq) + l.Info("Executing destroy operation", "request", autoReq) res, err := client.Destroy(ctx, autoReq, grpc.WaitForReady(true)) if err != nil { return ctrl.Result{}, fmt.Errorf("failed request to workspace: %w", err) } - done := make(chan error) - go func() { - for { - stream, err := res.Recv() - if err == io.EOF { - close(done) - return - } - - switch r := stream.Response.(type) { - case *agentpb.DestroyStream_Event: - continue - case *agentpb.DestroyStream_Result: - l.Info("Result received", "result", r.Result) - - obj.Status.StartTime = metav1.NewTime(r.Result.Summary.StartTime.AsTime()) - obj.Status.EndTime = metav1.NewTime(r.Result.Summary.EndTime.AsTime()) - if r.Result.Permalink != nil { - obj.Status.Permalink = *r.Result.Permalink - } - obj.Status.Message = r.Result.Summary.Message - u.progressing.Status = metav1.ConditionFalse - u.progressing.Reason = UpdateConditionReasonComplete - u.complete.Status = metav1.ConditionTrue - u.complete.Reason = UpdateConditionReasonUpdated - switch r.Result.Summary.Result { - case string(apitype.StatusSucceeded): - u.failed.Status = metav1.ConditionFalse - u.failed.Reason = r.Result.Summary.Result - default: - u.failed.Status = metav1.ConditionTrue - u.failed.Reason = r.Result.Summary.Result - } - err = u.updateStatus() - if err != nil { - done <- fmt.Errorf("failed to update the status: %w", err) - return - } - } - } - }() - err = <-done + defer func() { _ = res.CloseSend() }() + + reader := streamReader[agentpb.DestroyStream]{receiver: res, l: l, u: u, obj: obj} + _, err = reader.Result() if err != nil { - return ctrl.Result{}, fmt.Errorf("response error: %w", err) + return ctrl.Result{}, err } - return ctrl.Result{}, nil + return ctrl.Result{}, u.updateStatus(ctx, obj) } // SetupWithManager sets up the controller with the Manager. @@ -579,3 +433,147 @@ func outputsToSecret(owner *autov1alpha1.Update, outputs map[string]*agentpb.Out return s, nil } + +// stream is an interface constraint for the response streams consumable by a +// streamReader. +type stream interface { + agentpb.UpStream | agentpb.DestroyStream | agentpb.PreviewStream | agentpb.RefreshStream +} + +// streamReader reads an update stream until a result is received. The +// reconcile session and underlying Update object are updated to reflect the +// result, but no changes are written back to the API server. +type streamReader[T stream] struct { + receiver grpc.ServerStreamingClient[T] + obj *autov1alpha1.Update + u *reconcileSession + l logr.Logger +} + +// Recv reads one message from the stream which may or may not contain a +// result. +func (s streamReader[T]) Recv() (getResulter[T], error) { + stream, err := s.receiver.Recv() + return getResulter[T]{stream}, err +} + +// Result reads from the underlying stream until a Result is received or an +// error is encountered. A non-nil error is returned if the stream is closed +// prematurely or if a gRPC error is encountered. Importantly, if the +// Automation API returns an Unknown error it is assumed that the operation +// failed; in this case a failed Result is returned with nil error. +func (s streamReader[T]) Result() (result, error) { + var res result + + for { + stream, err := s.Recv() + if err == io.EOF { + break + } + if transient(err) { + // Surface transient errors to trigger another reconcile. + return nil, err + } + if err != nil { + // For all other errors treat the operation as failed. + s.l.Error(err, "Update failed") + s.obj.Status.Message = status.Convert(err).Message() + s.u.progressing.Status = metav1.ConditionFalse + s.u.progressing.Reason = UpdateConditionReasonComplete + s.u.complete.Status = metav1.ConditionTrue + s.u.complete.Reason = UpdateConditionReasonComplete + s.u.failed.Status = metav1.ConditionTrue + s.u.failed.Reason = status.Code(err).String() + s.u.failed.Message = s.obj.Status.Message + return res, nil + } + + res = stream.GetResult() + if res == nil { + continue // No result yet. + } + + s.l.Info("Result received", "result", res) + + s.obj.Status.StartTime = metav1.NewTime(res.GetSummary().StartTime.AsTime()) + s.obj.Status.EndTime = metav1.NewTime(res.GetSummary().EndTime.AsTime()) + if link := res.GetPermalink(); link != "" { + s.obj.Status.Permalink = link + } + s.obj.Status.Message = res.GetSummary().Message + s.u.progressing.Status = metav1.ConditionFalse + s.u.progressing.Reason = UpdateConditionReasonComplete + s.u.complete.Status = metav1.ConditionTrue + s.u.complete.Reason = UpdateConditionReasonUpdated + switch res.GetSummary().Result { + case string(apitype.StatusSucceeded): + s.u.failed.Status = metav1.ConditionFalse + s.u.failed.Reason = res.GetSummary().Result + s.u.failed.Message = res.GetSummary().Message + default: + s.u.failed.Status = metav1.ConditionTrue + s.u.failed.Reason = res.GetSummary().Result + s.u.failed.Message = res.GetSummary().Message + } + return res, nil + } + + return res, fmt.Errorf("didn't receive a result") +} + +// transient returns false when the given error is nil, or when the error +// represents a condition that is not likely to resolve quickly. This is used +// to determine whether to retry immediately or much more slowly. +func transient(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return true + } + + code := status.Code(err) + switch code { + case codes.Unknown, codes.Unauthenticated, codes.PermissionDenied, codes.InvalidArgument: + return false + default: + return true + } +} + +// getResulter glues our various result types to a common interface. +type getResulter[T stream] struct { + stream *T +} + +// result captures behavior for all of our stream results. +type result interface { + GetSummary() *agentpb.UpdateSummary + GetPermalink() string +} + +// getResult returns nil if the underlying stream doesn't yet have a result; +// otherwise it returns a result interface wrapping the underlying type. See +// Update for an example of how to customize result handling. +func (gr getResulter[T]) GetResult() result { + var res result + switch s := any(gr.stream).(type) { + case *agentpb.UpStream: + if r := s.GetResult(); r != nil { + res = r + } + case *agentpb.DestroyStream: + if r := s.GetResult(); r != nil { + res = r + } + case *agentpb.PreviewStream: + if r := s.GetResult(); r != nil { + res = r + } + case *agentpb.RefreshStream: + if r := s.GetResult(); r != nil { + res = r + } + } + return res +} diff --git a/operator/internal/controller/auto/update_controller_test.go b/operator/internal/controller/auto/update_controller_test.go index 49bf1ec0..ad5f0f2e 100644 --- a/operator/internal/controller/auto/update_controller_test.go +++ b/operator/internal/controller/auto/update_controller_test.go @@ -25,13 +25,19 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" grpc "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -100,21 +106,18 @@ func TestUpdate(t *testing.T) { client func(*gomock.Controller) upper kclient func(*gomock.Controller) creater - want autov1alpha1.UpdateStatus + want autov1alpha1.UpdateStatus + wantErr string }{ { - name: "with outputs", - obj: autov1alpha1.Update{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "uid"}, - Spec: autov1alpha1.UpdateSpec{}, - Status: autov1alpha1.UpdateStatus{}, - }, + name: "update success result with outputs", + obj: autov1alpha1.Update{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "uid"}}, client: func(ctrl *gomock.Controller) upper { upper := NewMockupper(ctrl) recver := NewMockrecver[agentpb.UpStream](ctrl) result := &agentpb.UpStream_Result{Result: &agentpb.UpResult{ - Summary: &agentpb.UpdateSummary{}, + Summary: &agentpb.UpdateSummary{Result: "succeeded"}, Outputs: map[string]*agentpb.OutputValue{ "username": {Value: []byte("username")}, "password": {Value: []byte("hunter2"), Secret: true}, @@ -156,15 +159,63 @@ func TestUpdate(t *testing.T) { Outputs: "foo-stack-outputs", StartTime: metav1.NewTime(time.Unix(0, 0).UTC()), EndTime: metav1.NewTime(time.Unix(0, 0).UTC()), + Conditions: []metav1.Condition{ + {Type: "Progressing", Status: "False", Reason: "Complete"}, + {Type: "Failed", Status: "False", Reason: "succeeded"}, + {Type: "Complete", Status: "True", Reason: "Updated"}, + }, }, }, { - name: "update failure", - obj: autov1alpha1.Update{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "uid"}, - Spec: autov1alpha1.UpdateSpec{}, - Status: autov1alpha1.UpdateStatus{}, + // Auto API failures are currently returned as non-nil errors, but + // we should also be able to handle explicit "failed" results. + name: "update failed result", + obj: autov1alpha1.Update{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "uid"}}, + client: func(ctrl *gomock.Controller) upper { + upper := NewMockupper(ctrl) + recver := NewMockrecver[agentpb.UpStream](ctrl) + + result := &agentpb.UpStream_Result{Result: &agentpb.UpResult{ + Summary: &agentpb.UpdateSummary{ + Result: "failed", + Message: "something went wrong", + }, + }} + + gomock.InOrder( + upper.EXPECT(). + Up(gomock.Any(), protoMatcher{&agentpb.UpRequest{}}, grpc.WaitForReady(true)). + Return(recver, nil), + recver.EXPECT(). + Recv(). + Return(&agentpb.UpStream{Response: &agentpb.UpStream_Event{}}, nil), + recver.EXPECT().Recv().Return(&agentpb.UpStream{Response: result}, nil), + recver.EXPECT().CloseSend().Return(nil), + ) + return upper + }, + kclient: func(_ *gomock.Controller) creater { return nil }, + want: autov1alpha1.UpdateStatus{ + Message: "something went wrong", + StartTime: metav1.NewTime(time.Unix(0, 0).UTC()), + EndTime: metav1.NewTime(time.Unix(0, 0).UTC()), + Conditions: []metav1.Condition{ + {Type: "Progressing", Status: "False", Reason: "Complete"}, + { + Type: "Failed", + Status: "True", + Reason: "failed", + Message: "something went wrong", + }, + {Type: "Complete", Status: "True", Reason: "Updated"}, + }, }, + }, + { + // Auto API failures are currently returned as non-nil errors, + // which we translate into "failed" Results. + name: "update failed error", + obj: autov1alpha1.Update{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "uid"}}, client: func(ctrl *gomock.Controller) upper { upper := NewMockupper(ctrl) recver := NewMockrecver[agentpb.UpStream](ctrl) @@ -181,27 +232,94 @@ func TestUpdate(t *testing.T) { return upper }, kclient: func(*gomock.Controller) creater { return nil }, - want: autov1alpha1.UpdateStatus{Message: "failed to run update: exit status 255"}, + want: autov1alpha1.UpdateStatus{ + Message: "failed to run update: exit status 255", + Conditions: []metav1.Condition{ + {Type: "Progressing", Status: "False", Reason: "Complete"}, + { + Type: "Failed", + Status: "True", + Reason: "Unknown", + Message: "failed to run update: exit status 255", + }, + {Type: "Complete", Status: "True", Reason: "Complete"}, + }, + }, + }, + { + name: "workspace grpc failure", + obj: autov1alpha1.Update{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "uid"}}, + client: func(ctrl *gomock.Controller) upper { + upper := NewMockupper(ctrl) + + gomock.InOrder( + upper.EXPECT(). + Up(gomock.Any(), protoMatcher{&agentpb.UpRequest{}}, grpc.WaitForReady(true)). + Return(nil, status.Error(codes.Unavailable, "transient workspace error")), + ) + return upper + }, + kclient: func(*gomock.Controller) creater { return nil }, + wantErr: "transient workspace error", + }, + { + name: "response stream grpc failure", + obj: autov1alpha1.Update{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "uid"}}, + client: func(ctrl *gomock.Controller) upper { + upper := NewMockupper(ctrl) + recver := NewMockrecver[agentpb.UpStream](ctrl) + + gomock.InOrder( + upper.EXPECT(). + Up(gomock.Any(), protoMatcher{&agentpb.UpRequest{}}, grpc.WaitForReady(true)). + Return(recver, nil), + recver.EXPECT(). + Recv(). + Return(nil, status.Error(codes.Unavailable, "transient stream error")), + recver.EXPECT().CloseSend().Return(nil), + ) + return upper + }, + kclient: func(*gomock.Controller) creater { return nil }, + wantErr: "transient stream error", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := &reconcileSession{ - progressing: &metav1.Condition{}, - complete: &metav1.Condition{}, - failed: &metav1.Condition{}, - updateStatus: func() error { return nil }, - } + ctx := context.Background() + scheme.Scheme.AddKnownTypes( + schema.GroupVersion{Group: "auto.pulumi.com", Version: "v1alpha1"}, + &autov1alpha1.Update{}, + ) + builder := fake.NewClientBuilder(). + WithObjects(&tt.obj). + WithStatusSubresource(&tt.obj) + rs := newReconcileSession(builder.Build(), &tt.obj) + ctrl := gomock.NewController(t) - _, err := u.Update( - context.Background(), + _, err := rs.Update( + ctx, &tt.obj, tt.client(ctrl), tt.kclient(ctrl), ) + if tt.wantErr != "" { + assert.ErrorContains(t, err, tt.wantErr) + return + } assert.NoError(t, err) - assert.Equal(t, tt.want, tt.obj.Status) + + var res autov1alpha1.Update + require.NoError( + t, + rs.client.Get( + ctx, + types.NamespacedName{Namespace: tt.obj.Namespace, Name: tt.obj.Name}, + &res, + ), + ) + assert.EqualExportedValues(t, tt.want, res.Status) }) } } diff --git a/operator/internal/controller/pulumi/flux.go b/operator/internal/controller/pulumi/flux.go index fcb34418..6ccfc5c2 100644 --- a/operator/internal/controller/pulumi/flux.go +++ b/operator/internal/controller/pulumi/flux.go @@ -13,8 +13,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ) -func (sess *StackReconcilerSession) SetupWorkspaceFromFluxSource(ctx context.Context, source unstructured.Unstructured, fluxSource *shared.FluxSource) (string, error) { - +func (sess *stackReconcilerSession) SetupWorkspaceFromFluxSource(ctx context.Context, source unstructured.Unstructured, fluxSource *shared.FluxSource) (string, error) { // this source artifact fetching code is based closely on // https://github.com/fluxcd/kustomize-controller/blob/db3c321163522259595894ca6c19ed44a876976d/controllers/kustomization_controller.go#L529 sess.logger.V(1).Info("Setting up pulumi workspace for stack", "stack", sess.stack, "workspace", sess.ws.Name) diff --git a/operator/internal/controller/pulumi/git.go b/operator/internal/controller/pulumi/git.go index 0a1a23b9..98538e41 100644 --- a/operator/internal/controller/pulumi/git.go +++ b/operator/internal/controller/pulumi/git.go @@ -147,7 +147,7 @@ func (gs gitSource) authMethod() (transport.AuthMethod, error) { // // The workspace pod is also mutated to mount these references at some // well-known paths. -func (sess *StackReconcilerSession) resolveGitAuth(ctx context.Context) (*auto.GitAuth, error) { +func (sess *stackReconcilerSession) resolveGitAuth(ctx context.Context) (*auto.GitAuth, error) { auth := &auto.GitAuth{} if sess.stack.GitSource == nil { @@ -240,7 +240,7 @@ func (sess *StackReconcilerSession) resolveGitAuth(ctx context.Context) (*auto.G return auth, nil } -func (sess *StackReconcilerSession) setupWorkspaceFromGitSource(ctx context.Context, commit string) error { +func (sess *stackReconcilerSession) setupWorkspaceFromGitSource(ctx context.Context, commit string) error { gs := sess.stack.GitSource if gs == nil { return fmt.Errorf("missing gitSource") diff --git a/operator/internal/controller/pulumi/stack_controller.go b/operator/internal/controller/pulumi/stack_controller.go index ce12c993..5fdf922c 100644 --- a/operator/internal/controller/pulumi/stack_controller.go +++ b/operator/internal/controller/pulumi/stack_controller.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "maps" + "math" "os" "path" "slices" @@ -671,25 +672,14 @@ func (r *StackReconciler) Reconcile(ctx context.Context, request ctrl.Request) ( sess.SetSecretEnvs(ctx, stack.SecretEnvs, request.Namespace) // Step 3: Evaluate whether an update is needed. If not, we transition to Ready. - - // determine whether the stack is in sync with respect to the specification. - // i.e. the current spec generation has been applied, the update was successful, - // the latest commit has been applied, and (if resync is enabled) has been resynced recently. - resyncFreq := time.Duration(sess.stack.ResyncFrequencySeconds) * time.Second - if resyncFreq.Seconds() < 60 { - resyncFreq = time.Duration(60) * time.Second - } - synced := instance.Status.LastUpdate != nil && - instance.Status.LastUpdate.Generation == instance.Generation && - instance.Status.LastUpdate.State == shared.SucceededStackStateMessage && - (isStackMarkedToBeDeleted || - (instance.Status.LastUpdate.LastSuccessfulCommit == currentCommit && - (!sess.stack.ContinueResyncOnCommitMatch || time.Since(instance.Status.LastUpdate.LastResyncTime.Time) < resyncFreq))) - - if synced { - // transition to ready, and requeue reconciliation as necessary to detect - // branch updates and resyncs. - instance.Status.MarkReadyCondition() + if isSynced(instance, currentCommit) { + // We don't mark the stack as read if its update failed so downstream + // Stack dependencies aren't triggered. + if instance.Status.LastUpdate.State == shared.SucceededStackStateMessage { + instance.Status.MarkReadyCondition() + } else { + instance.Status.MarkReconcilingCondition(pulumiv1.ReconcilingRetryReason, fmt.Sprintf("%d update failure(s)", instance.Status.LastUpdate.Failures)) + } if isStackMarkedToBeDeleted { log.Info("Stack was destroyed; finalizing now.") @@ -700,15 +690,22 @@ func (r *StackReconciler) Reconcile(ctx context.Context, request ctrl.Request) ( return reconcile.Result{}, nil } + // Requeue reconciliation as necessary to detect branch updates and + // resyncs. + requeueAfter := time.Duration(0) + + if instance.Status.LastUpdate.State == shared.FailedStackStateMessage { + requeueAfter = max(1*time.Second, time.Until(instance.Status.LastUpdate.LastResyncTime.Add(cooldown(instance)))) + } if sess.stack.ContinueResyncOnCommitMatch { - requeueAfter = max(1*time.Second, time.Until(instance.Status.LastUpdate.LastResyncTime.Add(resyncFreq))) + requeueAfter = max(1*time.Second, time.Until(instance.Status.LastUpdate.LastResyncTime.Add(resyncFreq(instance)))) } if stack.GitSource != nil { trackBranch := len(stack.GitSource.Branch) > 0 if trackBranch { // Reconcile every resyncFreq to check for new commits to the branch. - pollFreq := resyncFreq + pollFreq := resyncFreq(instance) log.Info("Commit hash unchanged. Will poll for new commits.", "pollFrequency", pollFreq) requeueAfter = min(requeueAfter, pollFreq) } else { @@ -719,6 +716,7 @@ func (r *StackReconciler) Reconcile(ctx context.Context, request ctrl.Request) ( } else if stack.ProgramRef != nil { log.Info("Commit hash unchanged. Will wait for Program update or resync.") } + return reconcile.Result{RequeueAfter: requeueAfter}, saveStatus() } @@ -785,7 +783,7 @@ func (r *StackReconciler) emitEvent(instance *pulumiv1.Stack, event pulumiv1.Sta } // markStackFailed updates the status of the Stack object `instance` locally, to reflect a failure to process the stack. -func (r *StackReconciler) markStackFailed(sess *StackReconcilerSession, instance *pulumiv1.Stack, current *shared.CurrentStackUpdate, update *autov1alpha1.Update) { +func (r *StackReconciler) markStackFailed(sess *stackReconcilerSession, instance *pulumiv1.Stack, current *shared.CurrentStackUpdate, update *autov1alpha1.Update) { sess.logger.Info("Failed to update Stack", "Stack.Name", sess.stack.Stack, "Message", update.Status.Message) // Update Stack status with failed state @@ -802,6 +800,9 @@ func (r *StackReconciler) markStackFailed(sess *StackReconcilerSession, instance if last != nil { instance.Status.LastUpdate.LastSuccessfulCommit = last.LastSuccessfulCommit } + if last != nil && last.Generation == current.Generation { + instance.Status.LastUpdate.Failures = last.Failures + 1 + } r.emitEvent(instance, pulumiv1.StackUpdateFailureEvent(), "Failed to update Stack: %s", update.Status.Message) } @@ -841,13 +842,14 @@ func (r *StackReconciler) markStackSucceeded(ctx context.Context, instance *pulu LastSuccessfulCommit: current.Commit, Permalink: shared.Permalink(update.Status.Permalink), LastResyncTime: metav1.Now(), + Failures: 0, } r.emitEvent(instance, pulumiv1.StackUpdateSuccessfulEvent(), "Successfully updated stack.") return nil } -type StackReconcilerSession struct { +type stackReconcilerSession struct { logger logr.Logger kubeClient client.Client scheme *runtime.Scheme @@ -865,8 +867,8 @@ func newStackReconcilerSession( kubeClient client.Client, scheme *runtime.Scheme, namespace string, -) *StackReconcilerSession { - return &StackReconcilerSession{ +) *stackReconcilerSession { + return &stackReconcilerSession{ logger: logger, kubeClient: kubeClient, scheme: scheme, @@ -875,9 +877,72 @@ func newStackReconcilerSession( } } +// isSynced determines whether the stack is in sync with respect to the +// specification. i.e. the current spec generation has been applied, the update +// was successful, the latest commit has been applied, and (if resync is +// enabled) has been resynced recently. +func isSynced(stack *pulumiv1.Stack, currentCommit string) bool { + if stack.Status.LastUpdate == nil { + return false + } + + if stack.Status.LastUpdate.Generation != stack.Generation { + return false + } + + if stack.Status.LastUpdate.State == shared.SucceededStackStateMessage { + if stack.DeletionTimestamp != nil { // Marked for deletion. + return true + } + if stack.Status.LastUpdate.LastSuccessfulCommit != currentCommit { + return false + } + if !stack.Spec.ContinueResyncOnCommitMatch { + return true + } + return time.Since(stack.Status.LastUpdate.LastResyncTime.Time) < resyncFreq(stack) + } + + if stack.Status.LastUpdate.State == shared.FailedStackStateMessage { + return time.Since(stack.Status.LastUpdate.LastResyncTime.Time) < cooldown(stack) + } + + // We should never get here; if we do the Update is in an unknown state so + // don't trigger work for it. + return true +} + +// cooldown returns the amount of time to wait before a failed Update should be +// retried. We start with a 1-minute cooldown and double that for each failed +// attempt, up to a max of 24 hours. Failed Updates are considered synced while +// inside this cooldown period. A zero-value duration is returned if the update +// succeeded. +func cooldown(stack *pulumiv1.Stack) time.Duration { + cooldown := time.Duration(0) + if stack.Status.LastUpdate == nil { + return cooldown + } + if stack.Status.LastUpdate.State == shared.FailedStackStateMessage { + cooldown = 1 * time.Minute + cooldown *= time.Duration(math.Exp2(float64(stack.Status.LastUpdate.Failures))) + cooldown = min(24*time.Hour, cooldown) + } + return cooldown +} + +// resyncFreq determines how often a stack should be re-synced, for example to +// poll for new commits. +func resyncFreq(stack *pulumiv1.Stack) time.Duration { + resyncFreq := time.Duration(stack.Spec.ResyncFrequencySeconds) * time.Second + if resyncFreq.Seconds() < 60 { + resyncFreq = time.Duration(60) * time.Second + } + return resyncFreq +} + // SetEnvs populates the environment the stack run with values // from an array of Kubernetes ConfigMaps in a Namespace. -func (sess *StackReconcilerSession) SetEnvs(ctx context.Context, configMapNames []string, _ string) { +func (sess *stackReconcilerSession) SetEnvs(ctx context.Context, configMapNames []string, _ string) { for _, name := range configMapNames { sess.ws.Spec.EnvFrom = append(sess.ws.Spec.EnvFrom, corev1.EnvFromSource{ ConfigMapRef: &corev1.ConfigMapEnvSource{ @@ -889,7 +954,7 @@ func (sess *StackReconcilerSession) SetEnvs(ctx context.Context, configMapNames // SetSecretEnvs populates the environment of the stack run with values // from an array of Kubernetes Secrets in a Namespace. -func (sess *StackReconcilerSession) SetSecretEnvs(ctx context.Context, secretNames []string, _ string) { +func (sess *stackReconcilerSession) SetSecretEnvs(ctx context.Context, secretNames []string, _ string) { for _, name := range secretNames { sess.ws.Spec.EnvFrom = append(sess.ws.Spec.EnvFrom, corev1.EnvFromSource{ SecretRef: &corev1.SecretEnvSource{ @@ -901,7 +966,7 @@ func (sess *StackReconcilerSession) SetSecretEnvs(ctx context.Context, secretNam // SetEnvRefsForWorkspace populates environment variables for workspace using items in // the EnvRefs field in the stack specification. -func (sess *StackReconcilerSession) SetEnvRefsForWorkspace(ctx context.Context) error { +func (sess *stackReconcilerSession) SetEnvRefsForWorkspace(ctx context.Context) error { envRefs := sess.stack.EnvRefs // envRefs is an unordered map, but we need to constrct env vars @@ -924,7 +989,7 @@ func (sess *StackReconcilerSession) SetEnvRefsForWorkspace(ctx context.Context) return nil } -func (sess *StackReconcilerSession) resolveResourceRefAsEnvVar(_ context.Context, ref *shared.ResourceRef) (string, *corev1.EnvVarSource, error) { +func (sess *stackReconcilerSession) resolveResourceRefAsEnvVar(_ context.Context, ref *shared.ResourceRef) (string, *corev1.EnvVarSource, error) { switch ref.SelectorType { case shared.ResourceSelectorEnv: // DEPRECATED: this reads from the operator's own environment @@ -974,7 +1039,7 @@ func makeSecretRefMountPath(secretRef *shared.SecretSelector) string { return "/var/run/secrets/stacks.pulumi.com/secrets/" + secretRef.Name } -func (sess *StackReconcilerSession) resolveResourceRefAsConfigItem(ctx context.Context, ref *shared.ResourceRef) (*string, *autov1alpha1.ConfigValueFrom, error) { +func (sess *stackReconcilerSession) resolveResourceRefAsConfigItem(ctx context.Context, ref *shared.ResourceRef) (*string, *autov1alpha1.ConfigValueFrom, error) { switch ref.SelectorType { case shared.ResourceSelectorEnv: // DEPRECATED: this reads from the operator's own environment @@ -1041,7 +1106,7 @@ func (sess *StackReconcilerSession) resolveResourceRefAsConfigItem(ctx context.C // a string. The v1 controller allowed env and filesystem references which no // longer make sense in the v2 agent/manager model, so only secret refs are // currently supported. -func (sess *StackReconcilerSession) resolveSecretResourceRef(ctx context.Context, ref *shared.ResourceRef) (string, error) { +func (sess *stackReconcilerSession) resolveSecretResourceRef(ctx context.Context, ref *shared.ResourceRef) (string, error) { switch ref.SelectorType { case shared.ResourceSelectorSecret: if ref.SecretRef == nil { @@ -1083,7 +1148,7 @@ func labelsForWorkspace(stack *metav1.ObjectMeta) map[string]string { } // NewWorkspace makes a new workspace for the given stack. -func (sess *StackReconcilerSession) NewWorkspace(stack *pulumiv1.Stack) error { +func (sess *stackReconcilerSession) NewWorkspace(stack *pulumiv1.Stack) error { labels := labelsForWorkspace(&stack.ObjectMeta) sess.ws = &autov1alpha1.Workspace{ TypeMeta: metav1.TypeMeta{ @@ -1115,7 +1180,7 @@ func (sess *StackReconcilerSession) NewWorkspace(stack *pulumiv1.Stack) error { return nil } -func (sess *StackReconcilerSession) CreateWorkspace(ctx context.Context) error { +func (sess *stackReconcilerSession) CreateWorkspace(ctx context.Context) error { sess.ws.Spec.Stacks = append(sess.ws.Spec.Stacks, *sess.wss) if err := sess.kubeClient.Patch(ctx, sess.ws, client.Apply, client.FieldOwner(FieldManager)); err != nil { @@ -1125,7 +1190,7 @@ func (sess *StackReconcilerSession) CreateWorkspace(ctx context.Context) error { return nil } -func (sess *StackReconcilerSession) isWorkspaceReady() bool { +func (sess *stackReconcilerSession) isWorkspaceReady() bool { if sess.ws == nil { return false } @@ -1137,7 +1202,7 @@ func (sess *StackReconcilerSession) isWorkspaceReady() bool { // setupWorkspace sets all the extra configuration specified by the Stack object, after you have // constructed a workspace from a source. -func (sess *StackReconcilerSession) setupWorkspace(ctx context.Context) error { +func (sess *stackReconcilerSession) setupWorkspace(ctx context.Context) error { w := sess.ws if sess.stack.Backend != "" { w.Spec.Env = append(w.Spec.Env, corev1.EnvVar{ @@ -1204,7 +1269,7 @@ func (sess *StackReconcilerSession) setupWorkspace(ctx context.Context) error { return nil } -func (sess *StackReconcilerSession) UpdateConfig(ctx context.Context) error { +func (sess *stackReconcilerSession) UpdateConfig(ctx context.Context) error { ws := sess.wss // m := make(auto.ConfigMap) @@ -1241,7 +1306,7 @@ func (sess *StackReconcilerSession) UpdateConfig(ctx context.Context) error { } // newUp runs `pulumi up` on the stack. -func (sess *StackReconcilerSession) newUp(ctx context.Context, o *pulumiv1.Stack, message string) (*autov1alpha1.Update, error) { +func (sess *stackReconcilerSession) newUp(ctx context.Context, o *pulumiv1.Stack, message string) (*autov1alpha1.Update, error) { update := &autov1alpha1.Update{ TypeMeta: metav1.TypeMeta{ APIVersion: autov1alpha1.GroupVersion.String(), @@ -1271,7 +1336,7 @@ func (sess *StackReconcilerSession) newUp(ctx context.Context, o *pulumiv1.Stack } // newUp runs `pulumi destroy` on the stack. -func (sess *StackReconcilerSession) newDestroy(ctx context.Context, o *pulumiv1.Stack, message string) (*autov1alpha1.Update, error) { +func (sess *stackReconcilerSession) newDestroy(ctx context.Context, o *pulumiv1.Stack, message string) (*autov1alpha1.Update, error) { update := &autov1alpha1.Update{ TypeMeta: metav1.TypeMeta{ APIVersion: autov1alpha1.GroupVersion.String(), @@ -1300,7 +1365,7 @@ func makeUpdateName(o *pulumiv1.Stack) string { return fmt.Sprintf("%s-%s", o.Name, utilrand.String(8)) } -func (sess *StackReconcilerSession) readCurrentUpdate(ctx context.Context, name types.NamespacedName) error { +func (sess *stackReconcilerSession) readCurrentUpdate(ctx context.Context, name types.NamespacedName) error { u := &autov1alpha1.Update{} if err := sess.kubeClient.Get(ctx, name, u); err != nil { return err diff --git a/operator/internal/controller/pulumi/stack_controller_test.go b/operator/internal/controller/pulumi/stack_controller_test.go index e4dd0ad0..11f341e5 100644 --- a/operator/internal/controller/pulumi/stack_controller_test.go +++ b/operator/internal/controller/pulumi/stack_controller_test.go @@ -19,6 +19,7 @@ package pulumi import ( "context" "fmt" + "testing" "time" fluxsourcev1 "github.com/fluxcd/source-controller/api/v1" @@ -29,6 +30,7 @@ import ( autov1alpha1 "github.com/pulumi/pulumi-kubernetes-operator/v2/operator/api/auto/v1alpha1" "github.com/pulumi/pulumi-kubernetes-operator/v2/operator/api/pulumi/shared" pulumiv1 "github.com/pulumi/pulumi-kubernetes-operator/v2/operator/api/pulumi/v1" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -400,7 +402,7 @@ var _ = Describe("Stack Controller", func() { State: shared.FailedStackStateMessage, Name: "update-abcdef", Type: autov1alpha1.DestroyType, - LastResyncTime: metav1.Now(), + LastResyncTime: metav1.NewTime(time.Now().Add(-1 * time.Hour)), LastAttemptedCommit: "abcdef", LastSuccessfulCommit: "abcdef", }), @@ -526,6 +528,50 @@ var _ = Describe("Stack Controller", func() { By("emitting an event") Expect(r.Recorder.(*record.FakeRecorder).Events).To(Receive(matchEvent(pulumiv1.StackUpdateFailure))) }) + + When("retrying", func() { + JustBeforeEach(func(ctx context.Context) { + obj.Status.LastUpdate = &shared.StackUpdateState{ + Generation: 1, + State: shared.FailedStackStateMessage, + Name: "update-retried", + Type: autov1alpha1.UpType, + LastAttemptedCommit: obj.Status.CurrentUpdate.Commit, + LastSuccessfulCommit: "", + Failures: 2, + } + Expect(k8sClient.Status().Update(ctx, obj)).To(Succeed()) + }) + It("increments failures", func(ctx context.Context) { + _, err := reconcileF(ctx) + Expect(err).NotTo(HaveOccurred()) + + Expect(obj.Status.LastUpdate).To(Not(BeNil())) + Expect(obj.Status.LastUpdate.Failures).To(Equal(int64(3))) + }) + + When("with a new generation", func() { + JustBeforeEach(func(ctx context.Context) { + obj.Status.LastUpdate = &shared.StackUpdateState{ + Generation: 2, + State: shared.FailedStackStateMessage, + Name: "update-retried-with-new-sha", + Type: autov1alpha1.UpType, + LastAttemptedCommit: obj.Status.CurrentUpdate.Commit, + LastSuccessfulCommit: "", + Failures: 2, + } + Expect(k8sClient.Status().Update(ctx, obj)).To(Succeed()) + }) + It("resets failures", func(ctx context.Context) { + _, err := reconcileF(ctx) + Expect(err).NotTo(HaveOccurred()) + + Expect(obj.Status.LastUpdate).To(Not(BeNil())) + Expect(obj.Status.LastUpdate.Failures).To(Equal(int64(0))) + }) + }) + }) }) When("the update succeeded", func() { @@ -567,6 +613,48 @@ var _ = Describe("Stack Controller", func() { Expect(r.Recorder.(*record.FakeRecorder).Events).To(Receive(matchEvent(pulumiv1.StackUpdateSuccessful))) }) + When("after retrying", func() { + JustBeforeEach(func(ctx context.Context) { + obj.Status.LastUpdate = &shared.StackUpdateState{ + Generation: 1, + State: shared.FailedStackStateMessage, + Name: "update-retried", + Type: autov1alpha1.UpType, + LastAttemptedCommit: obj.Status.CurrentUpdate.Commit, + Failures: 2, + } + Expect(k8sClient.Status().Update(ctx, obj)).To(Succeed()) + }) + It("resets failures", func(ctx context.Context) { + _, err := reconcileF(ctx) + Expect(err).NotTo(HaveOccurred()) + + Expect(obj.Status.LastUpdate).To(Not(BeNil())) + Expect(obj.Status.LastUpdate.Failures).To(Equal(int64(0))) + }) + + When("with a new generation", func() { + JustBeforeEach(func(ctx context.Context) { + obj.Status.LastUpdate = &shared.StackUpdateState{ + Generation: 2, + State: shared.FailedStackStateMessage, + Name: "update-retried-with-new-sha", + Type: autov1alpha1.UpType, + LastAttemptedCommit: obj.Status.CurrentUpdate.Commit, + Failures: 2, + } + Expect(k8sClient.Status().Update(ctx, obj)).To(Succeed()) + }) + It("resets failures", func(ctx context.Context) { + _, err := reconcileF(ctx) + Expect(err).NotTo(HaveOccurred()) + + Expect(obj.Status.LastUpdate).To(Not(BeNil())) + Expect(obj.Status.LastUpdate.Failures).To(Equal(int64(0))) + }) + }) + }) + When("the update produced outputs", func() { BeforeEach(func(ctx context.Context) { secret := &corev1.Secret{ @@ -623,12 +711,27 @@ var _ = Describe("Stack Controller", func() { LastResyncTime: metav1.Now(), LastAttemptedCommit: fluxRepo.Status.Artifact.Digest, LastSuccessfulCommit: "", + Failures: 3, } }) - It("reconciles", func(ctx context.Context) { - _, err := reconcileF(ctx) - Expect(err).NotTo(HaveOccurred()) - ByResyncing() + When("within cooldown period", func() { + It("backs off exponentially", func(ctx context.Context) { + res, err := reconcileF(ctx) + Expect(err).NotTo(HaveOccurred()) + // 1 minute * 2^3 + Expect(res.RequeueAfter).To(BeNumerically("~", time.Duration(8*time.Minute), time.Minute)) + ByMarkingAsReconciling(pulumiv1.ReconcilingRetryReason, Equal("3 update failure(s)")) + }) + }) + When("done cooling down", func() { + BeforeEach(func() { + obj.Status.LastUpdate.LastResyncTime = metav1.NewTime(time.Now().Add(-1 * time.Hour)) + }) + It("reconciles", func(ctx context.Context) { + _, err := reconcileF(ctx) + Expect(err).NotTo(HaveOccurred()) + ByResyncing() + }) }) }) @@ -1155,3 +1258,197 @@ var _ = Describe("Stack Controller", func() { func matchEvent(reason pulumiv1.StackEventReason) gtypes.GomegaMatcher { return ContainSubstring(string(reason)) } + +func TestExactlyOneOf(t *testing.T) { + tests := []struct { + name string + input []bool + expected bool + }{ + { + name: "No true values", + input: []bool{false, false, false}, + expected: false, + }, + { + name: "One true value", + input: []bool{false, true, false}, + expected: true, + }, + { + name: "Multiple true values", + input: []bool{true, true, false}, + expected: false, + }, + { + name: "All true values", + input: []bool{true, true, true}, + expected: false, + }, + { + name: "Empty input", + input: []bool{}, + expected: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := exactlyOneOf(tc.input...) + if result != tc.expected { + t.Errorf("exactlyOneOf(%v) = %v; want %v", tc.input, result, tc.expected) + } + }) + } +} + +func TestIsSynced(t *testing.T) { + tests := []struct { + name string + stack pulumiv1.Stack + currentCommit string + + want bool + }{ + { + name: "no update yet", + stack: pulumiv1.Stack{}, + want: false, + }, + { + name: "generation mismatch", + stack: pulumiv1.Stack{ + ObjectMeta: metav1.ObjectMeta{ + Generation: int64(2), + }, + Status: pulumiv1.StackStatus{ + LastUpdate: &shared.StackUpdateState{ + Generation: int64(1), + }, + }, + }, + want: false, + }, + { + name: "marked for deletion", + stack: pulumiv1.Stack{ + ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: ptr.To(metav1.Now())}, + Status: pulumiv1.StackStatus{ + LastUpdate: &shared.StackUpdateState{ + State: shared.SucceededStackStateMessage, + LastSuccessfulCommit: "something-else", + }, + }, + }, + want: true, + }, + { + name: "last update succeeeded but a new commit is available", + stack: pulumiv1.Stack{ + Status: pulumiv1.StackStatus{ + LastUpdate: &shared.StackUpdateState{ + State: shared.SucceededStackStateMessage, + LastSuccessfulCommit: "old-sha", + }, + }, + }, + currentCommit: "new-sha", + want: false, + }, + { + name: "last update succeeeded and we don't continue on commit match", + stack: pulumiv1.Stack{ + Status: pulumiv1.StackStatus{ + LastUpdate: &shared.StackUpdateState{ + State: shared.SucceededStackStateMessage, + LastSuccessfulCommit: "sha", + }, + }, + }, + currentCommit: "sha", + want: true, + }, + { + name: "last update succeeeded and we continue on commit match but we're inside the resync interval", + stack: pulumiv1.Stack{ + Spec: shared.StackSpec{ + ContinueResyncOnCommitMatch: true, + }, + Status: pulumiv1.StackStatus{ + LastUpdate: &shared.StackUpdateState{ + State: shared.SucceededStackStateMessage, + LastSuccessfulCommit: "sha", + LastResyncTime: metav1.Now(), + }, + }, + }, + currentCommit: "sha", + want: true, + }, + { + name: "last update succeeeded and we continue on commit match and we're outside the resync interval", + stack: pulumiv1.Stack{ + Spec: shared.StackSpec{ + ContinueResyncOnCommitMatch: true, + }, + Status: pulumiv1.StackStatus{ + LastUpdate: &shared.StackUpdateState{ + State: shared.SucceededStackStateMessage, + LastSuccessfulCommit: "sha", + LastResyncTime: metav1.NewTime(time.Now().Add(-1 * time.Hour)), + }, + }, + }, + currentCommit: "sha", + want: false, + }, + { + name: "last update failed but we're inside the cooldown interval", + stack: pulumiv1.Stack{ + Spec: shared.StackSpec{ + ContinueResyncOnCommitMatch: true, + }, + Status: pulumiv1.StackStatus{ + LastUpdate: &shared.StackUpdateState{ + State: shared.FailedStackStateMessage, + LastResyncTime: metav1.Now(), + }, + }, + }, + want: true, + }, + { + name: "last update failed and we're outside the cooldown interval", + stack: pulumiv1.Stack{ + Spec: shared.StackSpec{ + ContinueResyncOnCommitMatch: true, + }, + Status: pulumiv1.StackStatus{ + LastUpdate: &shared.StackUpdateState{ + State: shared.FailedStackStateMessage, + LastResyncTime: metav1.NewTime(time.Now().Add(-1 * time.Hour)), + }, + }, + }, + want: false, + }, + { + name: "unrecognized state", + stack: pulumiv1.Stack{ + Spec: shared.StackSpec{}, + Status: pulumiv1.StackStatus{ + LastUpdate: &shared.StackUpdateState{ + State: "unknown", + }, + }, + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, isSynced(&tt.stack, tt.currentCommit)) + }) + } +} diff --git a/operator/internal/controller/pulumi/utils_test.go b/operator/internal/controller/pulumi/utils_test.go deleted file mode 100644 index 20241261..00000000 --- a/operator/internal/controller/pulumi/utils_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2016-2024, Pulumi Corporation. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulumi - -import "testing" - -func TestExactlyOneOf(t *testing.T) { - tests := []struct { - name string - input []bool - expected bool - }{ - { - name: "No true values", - input: []bool{false, false, false}, - expected: false, - }, - { - name: "One true value", - input: []bool{false, true, false}, - expected: true, - }, - { - name: "Multiple true values", - input: []bool{true, true, false}, - expected: false, - }, - { - name: "All true values", - input: []bool{true, true, true}, - expected: false, - }, - { - name: "Empty input", - input: []bool{}, - expected: false, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - result := exactlyOneOf(tc.input...) - if result != tc.expected { - t.Errorf("exactlyOneOf(%v) = %v; want %v", tc.input, result, tc.expected) - } - }) - } -}