Skip to content

Commit

Permalink
fix!: change rate to resource.Quantity
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 27, 2021
1 parent 859abb7 commit 98eadec
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 215 deletions.
386 changes: 205 additions & 181 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions api/v1alpha1/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package v1alpha1

import (
"k8s.io/apimachinery/pkg/api/resource"
)

type Metrics struct {
Total uint64 `json:"total,omitempty" protobuf:"varint,1,opt,name=total"`
Errors uint64 `json:"errors,omitempty" protobuf:"varint,5,opt,name=errors"`
Rate uint64 `json:"rate,omitempty" protobuf:"varint,6,opt,name=rate"` // current rate of messages per second
Total uint64 `json:"total,omitempty" protobuf:"varint,1,opt,name=total"`
Errors uint64 `json:"errors,omitempty" protobuf:"varint,5,opt,name=errors"`
Rate resource.Quantity `json:"rate,omitempty" protobuf:"bytes,6,opt,name=rate"` // current rate of messages per second
}
4 changes: 3 additions & 1 deletion api/v1alpha1/source_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package v1alpha1
import (
"strconv"

"k8s.io/apimachinery/pkg/api/resource"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type SourceStatuses map[string]SourceStatus // key is replica

func (in SourceStatuses) Set(name string, replica int, msg string, rate uint64) {
func (in SourceStatuses) Set(name string, replica int, msg string, rate resource.Quantity) {
x := in[name]
x.LastMessage = &Message{Data: trunc(msg), Time: metav1.Now()}
if x.Metrics == nil {
Expand Down
14 changes: 8 additions & 6 deletions api/v1alpha1/source_statuses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"strings"
"testing"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/stretchr/testify/assert"
)

func TestSourceStatuses_Set(t *testing.T) {
ss := SourceStatuses{}

ss.Set("bar", 1, strings.Repeat("x", 33), 1)
ss.Set("bar", 1, strings.Repeat("x", 33), resource.MustParse("1"))

if assert.Len(t, ss, 1) {
s := ss["bar"]
Expand All @@ -20,11 +22,11 @@ func TestSourceStatuses_Set(t *testing.T) {
}
if assert.Len(t, s.Metrics, 1) {
assert.Equal(t, uint64(1), s.Metrics["1"].Total)
assert.Equal(t, uint64(1), s.Metrics["1"].Rate)
assert.Equal(t, resource.MustParse("1"), s.Metrics["1"].Rate)
}
}

ss.Set("bar", 1, "bar", 1)
ss.Set("bar", 1, "bar", resource.MustParse("1"))

if assert.Len(t, ss, 1) {
s := ss["bar"]
Expand All @@ -33,11 +35,11 @@ func TestSourceStatuses_Set(t *testing.T) {
}
if assert.Len(t, s.Metrics, 1) {
assert.Equal(t, uint64(2), s.Metrics["1"].Total)
assert.Equal(t, uint64(1), s.Metrics["1"].Rate)
assert.Equal(t, resource.MustParse("1"), s.Metrics["1"].Rate)
}
}

ss.Set("bar", 0, "foo", 0)
ss.Set("bar", 0, "foo", resource.MustParse("1"))

if assert.Len(t, ss, 1) {
s := ss["bar"]
Expand All @@ -50,7 +52,7 @@ func TestSourceStatuses_Set(t *testing.T) {
}
}

ss.Set("baz", 0, "foo", 0)
ss.Set("baz", 0, "foo", resource.MustParse("1"))

if assert.Len(t, ss, 2) {
s := ss["baz"]
Expand Down
5 changes: 3 additions & 2 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3683,8 +3683,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down Expand Up @@ -3726,8 +3729,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down
14 changes: 10 additions & 4 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2710,8 +2710,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down Expand Up @@ -2753,8 +2756,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down
14 changes: 10 additions & 4 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3683,8 +3683,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down Expand Up @@ -3726,8 +3729,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down
14 changes: 10 additions & 4 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3683,8 +3683,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down Expand Up @@ -3726,8 +3729,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down
14 changes: 10 additions & 4 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3683,8 +3683,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down Expand Up @@ -3726,8 +3729,11 @@ spec:
format: int64
type: integer
rate:
format: int64
type: integer
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
total:
format: int64
type: integer
Expand Down
5 changes: 4 additions & 1 deletion runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/Shopify/sarama"
"github.com/nats-io/stan.go"
"github.com/paulbellamy/ratecounter"
Expand Down Expand Up @@ -280,7 +282,8 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
f := func(msg []byte) error {
rateCounter.Incr(1)
withLock(func() {
status.SourceStatuses.Set(sourceName, replica, printable(msg), uint64(rateCounter.Rate()/int64(updateInterval/time.Second)))
rate := float64(rateCounter.Rate()) / updateInterval.Seconds()
status.SourceStatuses.Set(sourceName, replica, printable(msg), resource.MustParse(fmt.Sprintf("%.3f", rate)))
})
if err := toMain(msg); err != nil {
logger.Error(err, "⚠ →", "source", sourceName)
Expand Down

0 comments on commit 98eadec

Please sign in to comment.