Skip to content

Commit

Permalink
feat: Enable the Message retry metrics (#75)
Browse files Browse the repository at this point in the history
Signed-off-by: Saravanan Balasubramanian <[email protected]>
  • Loading branch information
sarabala1979 authored Jun 30, 2021
1 parent 0299cd8 commit 9db646a
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 6 deletions.
7 changes: 4 additions & 3 deletions api/v1alpha1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
)

type Metrics struct {
Total uint64 `json:"total,omitempty" protobuf:"varint,1,opt,name=total"`
Errors uint64 `json:"errors,omitempty" protobuf:"varint,5,opt,name=errors"`
Rate resource.Quantity `json:"rate,omitempty" protobuf:"bytes,6,opt,name=rate"` // current rate of messages per second
Total uint64 `json:"total,omitempty" protobuf:"varint,1,opt,name=total"`
Errors uint64 `json:"errors,omitempty" protobuf:"varint,2,opt,name=errors"`
Rate resource.Quantity `json:"rate,omitempty" protobuf:"bytes,3,opt,name=rate"` // current rate of messages per second
Retries uint64 `json:"retries,omitempty" protobuf:"bytes,4,opt,name=retries"`
}
16 changes: 13 additions & 3 deletions api/v1alpha1/source_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ type SourceStatus struct {
Metrics map[string]Metrics `json:"metrics,omitempty" protobuf:"bytes,4,rep,name=metrics"`
}

func (m SourceStatus) GetPending() uint64 {
if m.Pending != nil {
return *m.Pending
// GetPending returns pending counts
func (in SourceStatus) GetPending() uint64 {
if in.Pending != nil {
return *in.Pending
}
return 0
}
Expand Down Expand Up @@ -39,3 +40,12 @@ func (in SourceStatus) AnySunk() bool {
func (in SourceStatus) RecentErrors() bool {
return in.LastError != nil && time.Since(in.LastError.Time.Time) < 15*time.Minute
}

// GetRetryCount returns total Retries metrics
func (in SourceStatus) GetRetryCount() uint64 {
var x uint64
for _, m := range in.Metrics {
x += m.Retries
}
return x
}
19 changes: 19 additions & 0 deletions api/v1alpha1/source_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,22 @@ func TestSourceStatus_GetErrors(t *testing.T) {
assert.True(t, x.RecentErrors())
})
}

func TestSourceStatus_GetRetryCount(t *testing.T){
t.Run("None", func(t *testing.T) {
x := SourceStatus{}
assert.Equal(t, uint64(0), x.GetRetryCount())
})
t.Run("One", func(t *testing.T) {
x := SourceStatus{
Metrics: map[string]Metrics{"one": {Retries: 1}},
}
assert.Equal(t, uint64(1), x.GetRetryCount())
})
t.Run("Two", func(t *testing.T) {
x := SourceStatus{
Metrics: map[string]Metrics{"one": {Retries: 1}, "two": {Retries: 1}},
}
assert.Equal(t, uint64(2), x.GetRetryCount())
})
}
14 changes: 14 additions & 0 deletions api/v1alpha1/source_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,17 @@ func (in SourceStatuses) AnySunk() bool {
}
return false
}

// IncrRetryCount increase the retry_count metrics by 1
func (in SourceStatuses) IncrRetryCount(name string, replica int) {
x := in[name]
if x.Metrics == nil {
x.Metrics = map[string]Metrics{}
}
m := x.Metrics[strconv.Itoa(replica)]
m.Retries++
x.Metrics[strconv.Itoa(replica)] = m
in[name] = x
}


13 changes: 13 additions & 0 deletions api/v1alpha1/source_statuses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,16 @@ func TestSourceStatus_RecentErrors(t *testing.T) {
assert.False(t, SourceStatuses{}.RecentErrors())
assert.True(t, SourceStatuses{"0": {LastError: &Error{Time: metav1.Now()}}}.RecentErrors())
}

func TestSourceStatuses_IncrRetryCount(t *testing.T) {
sources := SourceStatuses{}
sources.IncrRetryCount("one", 1)
assert.Equal(t, uint64(1), sources.Get("one").GetRetryCount())
sources.IncrRetryCount("one", 2)
assert.Equal(t, uint64(2), sources.Get("one").GetRetryCount())
sources.IncrRetryCount("one", 1)
assert.Equal(t, uint64(3), sources.Get("one").GetRetryCount())
sources.IncrRetryCount("two", 1)
assert.Equal(t, uint64(3), sources.Get("one").GetRetryCount())
assert.Equal(t, uint64(1), sources.Get("two").GetRetryCount())
}
6 changes: 6 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2770,6 +2770,9 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
retries:
format: int64
type: integer
total:
format: int64
type: integer
Expand Down Expand Up @@ -2819,6 +2822,9 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
retries:
format: int64
type: integer
total:
format: int64
type: integer
Expand Down
6 changes: 6 additions & 0 deletions docs/METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ Use this metric to determine how long messages are taking to be processed.

Golden metric type: latency.

### message_retry_count

Use this metric to determine how many retries performed for message processing.

Golden metric type: error.

### replicas

Use this to track scaling events.
Expand Down
10 changes: 10 additions & 0 deletions runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func connectSources(ctx context.Context, toMain func(context.Context, []byte) er
case dfv1.RetryNever:
return true, err
default:
withLock(func() {
step.Status.SinkStatues.IncrRetryCount(sourceName, replica)
})
return false, nil
}
} else {
Expand Down Expand Up @@ -351,4 +354,11 @@ func newSourceMetrics(source dfv1.Source, sourceName string) {
Help: "Total number of errors, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_errors",
ConstLabels: map[string]string{"sourceName": source.Name},
}, func() float64 { return float64(step.Status.SourceStatuses.Get(sourceName).GetErrors()) })

promauto.NewCounterFunc(prometheus.CounterOpts{
Subsystem: "message",
Name: "retry_count",
Help: "Number of retry, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#message_retry_count",
}, func() float64 { return float64(step.Status.SourceStatuses.Get(sourceName).GetRetryCount()) })

}

0 comments on commit 9db646a

Please sign in to comment.