Skip to content

Commit

Permalink
feat: change Errors to RecentErrors
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 15, 2021
1 parent fac8051 commit 88f77f7
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 21 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
const (
// conditions
ConditionCompleted = "Completed" // the pipeline completed
ConditionErrors = "Errors" // added if any step encounters an error
ConditionRecentErrors = "RecentErrors" // added if any step encountered an error recently
ConditionRunning = "Running" // added if any step is currently running
ConditionSunkMessages = "SunkMessages" // added if any messages have been written to a sink for any step
ConditionTerminating = "Terminating" // added if any terminator step terminated
Expand Down
6 changes: 4 additions & 2 deletions api/v1alpha1/source_status.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package v1alpha1

import "time"

type SourceStatus struct {
LastMessage *Message `json:"lastMessage,omitempty" protobuf:"bytes,2,opt,name=lastMessage"`
LastError *Error `json:"lastError,omitempty" protobuf:"bytes,5,opt,name=lastError"`
Expand Down Expand Up @@ -34,6 +36,6 @@ func (in SourceStatus) AnySunk() bool {
return in.GetTotal() > 0
}

func (in SourceStatus) AnyErrors() bool {
return in.GetErrors() > 0
func (in SourceStatus) RecentErrors() bool {
return in.LastError != nil && time.Since(in.LastError.Time.Time) < 15*time.Minute
}
26 changes: 22 additions & 4 deletions api/v1alpha1/source_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package v1alpha1
import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/stretchr/testify/assert"
)

Expand All @@ -14,8 +16,24 @@ func TestSourceStatus_GetTotal(t *testing.T) {
}

func TestSourceStatus_GetErrors(t *testing.T) {
assert.Equal(t, uint64(0), (SourceStatus{}).GetErrors())
assert.Equal(t, uint64(1), (SourceStatus{
Metrics: map[string]Metrics{"": {Errors: 1}},
}).GetErrors())
t.Run("None", func(t *testing.T) {
x := SourceStatus{}
assert.Equal(t, uint64(0), x.GetErrors())
})
t.Run("NotRecent", func(t *testing.T) {
x := SourceStatus{
LastError: &Error{Time: metav1.Time{}},
Metrics: map[string]Metrics{"": {Errors: 1}},
}
assert.Equal(t, uint64(1), x.GetErrors())
assert.False(t, x.RecentErrors())
})
t.Run("Recent", func(t *testing.T) {
x := SourceStatus{
LastError: &Error{Time: metav1.Now()},
Metrics: map[string]Metrics{"": {Errors: 1}},
}
assert.Equal(t, uint64(1), x.GetErrors())
assert.True(t, x.RecentErrors())
})
}
4 changes: 2 additions & 2 deletions api/v1alpha1/source_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func (in SourceStatuses) GetPending() uint64 {
return v
}

func (in SourceStatuses) AnyErrors() bool {
func (in SourceStatuses) RecentErrors() bool {
for _, s := range in {
if s.AnyErrors() {
if s.RecentErrors() {
return true
}
}
Expand Down
8 changes: 5 additions & 3 deletions api/v1alpha1/source_statuses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"strings"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -106,7 +108,7 @@ func TestSourceStatus_GetPending(t *testing.T) {
assert.Equal(t, uint64(1), SourceStatuses{"0": {Pending: &v}}.GetPending())
}

func TestSourceStatus_AnyErrors(t *testing.T) {
assert.False(t, SourceStatuses{}.AnyErrors())
assert.True(t, SourceStatuses{"0": {Metrics: map[string]Metrics{"0": {Errors: 1}}}}.AnyErrors())
func TestSourceStatus_RecentErrors(t *testing.T) {
assert.False(t, SourceStatuses{}.RecentErrors())
assert.True(t, SourceStatuses{"0": {LastError: &Error{Time: metav1.Now()}}}.RecentErrors())
}
4 changes: 2 additions & 2 deletions api/v1alpha1/step_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ func (m StepStatus) GetReplicas() int {
return int(m.Replicas)
}

func (in StepStatus) AnyErrors() bool {
return in.SinkStatues.AnyErrors() || in.SourceStatuses.AnyErrors()
func (in StepStatus) RecentErrors() bool {
return in.SinkStatues.RecentErrors() || in.SourceStatuses.RecentErrors()
}
4 changes: 2 additions & 2 deletions api/v1alpha1/step_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestStepStatus_AnyErrors(t *testing.T) {
assert.False(t, StepStatus{}.AnyErrors())
func TestStepStatus_RecentErrors(t *testing.T) {
assert.False(t, StepStatus{}.RecentErrors())
}
2 changes: 1 addition & 1 deletion examples/301-erroring-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def handler(msg):
(pipeline("301-erroring")
.owner('argoproj-labs')
.describe("""This example showcases retry policies.""")
.annotate('dataflow.argoproj.io/wait-for', 'Errors')
.annotate('dataflow.argoproj.io/wait-for', 'RecentErrors')
.step(
(cron('*/3 * * * * *', retryPolicy='Always')
.handler('always', handler=handler)
Expand Down
2 changes: 1 addition & 1 deletion examples/301-erroring-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
annotations:
dataflow.argoproj.io/description: This example showcases retry policies.
dataflow.argoproj.io/owner: argoproj-labs
dataflow.argoproj.io/wait-for: Errors
dataflow.argoproj.io/wait-for: RecentErrors
name: 301-erroring
spec:
steps:
Expand Down
6 changes: 3 additions & 3 deletions manager/controllers/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
pending, running, succeeded, failed := 0, 0, 0, 0
newStatus := *pipeline.Status.DeepCopy()
newStatus.Phase = dfv1.PipelineUnknown
terminate, sunkMessages, errors := false, false, false
terminate, sunkMessages, recentErrors := false, false, false
for _, step := range steps.Items {
stepName := step.Spec.Name
if !pipeline.Spec.HasStep(stepName) { // this happens when a pipeline changes and a step is removed
Expand Down Expand Up @@ -139,7 +139,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
terminate = terminate || step.Status.Phase.Completed() && step.Spec.Terminator
sunkMessages = sunkMessages || step.Status.SinkStatues.AnySunk()
errors = errors || step.Status.AnyErrors()
recentErrors = recentErrors || step.Status.RecentErrors()
}

if newStatus.Phase.Completed() {
Expand Down Expand Up @@ -167,7 +167,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
dfv1.ConditionRunning: newStatus.Phase == dfv1.PipelineRunning,
dfv1.ConditionCompleted: newStatus.Phase.Completed(),
dfv1.ConditionSunkMessages: sunkMessages,
dfv1.ConditionErrors: errors,
dfv1.ConditionRecentErrors: recentErrors,
dfv1.ConditionTerminating: terminate,
} {
if ok {
Expand Down

0 comments on commit 88f77f7

Please sign in to comment.