Skip to content

Commit

Permalink
feat: report rate
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 20, 2021
1 parent b28c3a3 commit 5c6832b
Show file tree
Hide file tree
Showing 30 changed files with 831 additions and 466 deletions.
1 change: 1 addition & 0 deletions api/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
EnvPipelineName = "ARGO_DATAFLOW_PIPELINE_NAME"
EnvReplica = "ARGO_DATAFLOW_REPLICA"
EnvStepSpec = "ARGO_DATAFLOW_STEP_SPEC"
EnvStepStatus = "ARGO_DATAFLOW_STEP_STATUS"
EnvPeekDelay = "ARGO_DATAFLOW_PEEK_DELAY" // how long between peeking (default 4m)
EnvPullPolicy = "ARGO_DATAFLOW_PULL_POLICY" // default ""
EnvScalingDelay = "ARGO_DATAFLOW_SCALING_DELAY" // // how long to wait between any scaling events (including peeking) default "4m"
Expand Down
600 changes: 350 additions & 250 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/v1alpha1/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ type Kafka struct {
Topic string `json:"topic" protobuf:"bytes,3,opt,name=topic"`
Version string `json:"version,omitempty" protobuf:"bytes,4,opt,name=version"`
NET *KafkaNET `json:"net,omitempty" protobuf:"bytes,5,opt,name=net"`
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default=1
Parallel uint32 `json:"parallel,omitempty" protobuf:"varint,6,opt,name=parallel"`
}
1 change: 1 addition & 0 deletions api/v1alpha1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package v1alpha1
type Metrics struct {
Total uint64 `json:"total,omitempty" protobuf:"varint,1,opt,name=total"`
Errors uint64 `json:"errors,omitempty" protobuf:"varint,5,opt,name=errors"`
Rate uint64 `json:"rate,omitempty" protobuf:"varint,6,opt,name=rate"` // current rate of messages per second
}
4 changes: 2 additions & 2 deletions api/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Pipeline struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

Spec PipelineSpec `json:"spec" protobuf:"bytes,2,opt,name=spec"`
Status *PipelineStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
Spec PipelineSpec `json:"spec" protobuf:"bytes,2,opt,name=spec"`
Status PipelineStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

// +kubebuilder:object:root=true
Expand Down
5 changes: 3 additions & 2 deletions api/v1alpha1/source_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type SourceStatuses map[string]SourceStatus
type SourceStatuses map[string]SourceStatus // key is replica

func (in SourceStatuses) Set(name string, replica int, msg string) {
func (in SourceStatuses) Set(name string, replica int, msg string, rate uint64) {
x := in[name]
x.LastMessage = &Message{Data: trunc(msg), Time: metav1.Now()}
if x.Metrics == nil {
x.Metrics = map[string]Metrics{}
}
m := x.Metrics[strconv.Itoa(replica)]
m.Total++
m.Rate = rate
x.Metrics[strconv.Itoa(replica)] = m
in[name] = x
}
Expand Down
10 changes: 6 additions & 4 deletions api/v1alpha1/source_statuses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestSourceStatuses_Set(t *testing.T) {
ss := SourceStatuses{}

ss.Set("bar", 1, strings.Repeat("x", 33))
ss.Set("bar", 1, strings.Repeat("x", 33), 1)

if assert.Len(t, ss, 1) {
s := ss["bar"]
Expand All @@ -20,10 +20,11 @@ func TestSourceStatuses_Set(t *testing.T) {
}
if assert.Len(t, s.Metrics, 1) {
assert.Equal(t, uint64(1), s.Metrics["1"].Total)
assert.Equal(t, uint64(1), s.Metrics["1"].Rate)
}
}

ss.Set("bar", 1, "bar")
ss.Set("bar", 1, "bar", 1)

if assert.Len(t, ss, 1) {
s := ss["bar"]
Expand All @@ -32,10 +33,11 @@ func TestSourceStatuses_Set(t *testing.T) {
}
if assert.Len(t, s.Metrics, 1) {
assert.Equal(t, uint64(2), s.Metrics["1"].Total)
assert.Equal(t, uint64(1), s.Metrics["1"].Rate)
}
}

ss.Set("bar", 0, "foo")
ss.Set("bar", 0, "foo", 0)

if assert.Len(t, ss, 1) {
s := ss["bar"]
Expand All @@ -48,7 +50,7 @@ func TestSourceStatuses_Set(t *testing.T) {
}
}

ss.Set("baz", 0, "foo")
ss.Set("baz", 0, "foo", 0)

if assert.Len(t, ss, 2) {
s := ss["baz"]
Expand Down
3 changes: 3 additions & 0 deletions api/v1alpha1/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ type STAN struct {
ClusterID string `json:"clusterId,omitempty" protobuf:"bytes,5,opt,name=clusterId"`
Subject string `json:"subject" protobuf:"bytes,3,opt,name=subject"`
SubjectPrefix SubjectPrefix `json:"subjectPrefix,omitempty" protobuf:"bytes,6,opt,name=subjectPrefix,casttype=SubjectPrefix"`
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default=1
Parallel uint32 `json:"parallel,omitempty" protobuf:"varint,7,opt,name=parallel"`
}
8 changes: 5 additions & 3 deletions api/v1alpha1/step_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type StepSpec struct {
Tolerations []corev1.Toleration `json:"tolerations,omitempty" protobuf:"bytes,19,rep,name=tolerations"`
}

// +kubebuilder:skipversion
type GetPodSpecReq struct {
PipelineName string `protobuf:"bytes,1,opt,name=pipelineName"`
Namespace string `protobuf:"bytes,2,opt,name=namespace"`
Expand All @@ -53,20 +52,23 @@ type GetPodSpecReq struct {
RunnerImage string `protobuf:"bytes,5,opt,name=runnerImage"`
PullPolicy corev1.PullPolicy `protobuf:"bytes,6,opt,name=pullPolicy,casttype=k8s.io/api/core/v1.PullPolicy"`
UpdateInterval time.Duration `protobuf:"varint,7,opt,name=updateInterval,casttype=time.Duration"`
StepStatus StepStatus `protobuf:"bytes,8,opt,name=stepStatus"`
}

func (in *StepSpec) GetPodSpec(req GetPodSpecReq) corev1.PodSpec {
volume := corev1.Volume{
Name: "var-run-argo-dataflow",
VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}},
}
data, _ := json.Marshal(in)
stepSpec, _ := json.Marshal(in)
stepStatus, _ := json.Marshal(req.StepStatus)
volumeMounts := []corev1.VolumeMount{{Name: volume.Name, MountPath: PathVarRun}}
envVars := []corev1.EnvVar{
{Name: EnvPipelineName, Value: req.PipelineName},
{Name: EnvNamespace, Value: req.Namespace},
{Name: EnvReplica, Value: strconv.Itoa(int(req.Replica))},
{Name: EnvStepSpec, Value: string(data)},
{Name: EnvStepSpec, Value: string(stepSpec)},
{Name: EnvStepStatus, Value: string(stepStatus)},
{Name: EnvUpdateInterval, Value: req.UpdateInterval.String()},
}
return corev1.PodSpec{
Expand Down
41 changes: 11 additions & 30 deletions api/v1alpha1/step_status.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,24 @@
package v1alpha1

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type StepStatus struct {
Phase StepPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=StepPhase"`
Reason string `json:"reason,omitempty" protobuf:"bytes,8,opt,name=reason"`
Message string `json:"message,omitempty" protobuf:"bytes,2,opt,name=message"`
Replicas uint32 `json:"replicas,omitempty" protobuf:"varint,5,opt,name=replicas"`
Selector string `json:"selector,omitempty" protobuf:"bytes,7,opt,name=selector"`
LastScaledAt *metav1.Time `json:"lastScaledAt,omitempty" protobuf:"bytes,6,opt,name=lastScaledAt"`
SourceStatues SourceStatuses `json:"sourceStatuses,omitempty" protobuf:"bytes,3,rep,name=sourceStatuses"`
SinkStatues SinkStatuses `json:"sinkStatuses,omitempty" protobuf:"bytes,4,rep,name=sinkStatuses"`
}

func (m *StepStatus) GetSourceStatues() SourceStatuses {
if m == nil {
return nil
}
return m.SourceStatues
Phase StepPhase `json:"phase" protobuf:"bytes,1,opt,name=phase,casttype=StepPhase"`
Reason string `json:"reason" protobuf:"bytes,8,opt,name=reason"`
Message string `json:"message" protobuf:"bytes,2,opt,name=message"`
Replicas uint32 `json:"replicas" protobuf:"varint,5,opt,name=replicas"`
Selector string `json:"selector" protobuf:"bytes,7,opt,name=selector"`
LastScaledAt metav1.Time `json:"lastScaledAt" protobuf:"bytes,6,opt,name=lastScaledAt"`
SourceStatuses SourceStatuses `json:"sourceStatuses" protobuf:"bytes,3,rep,name=sourceStatuses"`
SinkStatues SinkStatuses `json:"sinkStatuses" protobuf:"bytes,4,rep,name=sinkStatuses"`
}

func (m *StepStatus) GetReplicas() int {
if m == nil {
return -1
}
func (m StepStatus) GetReplicas() int {
return int(m.Replicas)
}

func (m *StepStatus) GetLastScaledAt() time.Time {
if m == nil || m.LastScaledAt == nil {
return time.Time{}
}
return m.LastScaledAt.Time
}

func (in *StepStatus) AnyErrors() bool {
return in.SinkStatues.AnyErrors() || in.SourceStatues.AnyErrors()
func (in StepStatus) AnyErrors() bool {
return in.SinkStatues.AnyErrors() || in.SourceStatuses.AnyErrors()
}
12 changes: 1 addition & 11 deletions api/v1alpha1/step_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,10 @@ package v1alpha1

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestStepStatus_AnyErrors(t *testing.T) {
assert.False(t, (&StepStatus{}).AnyErrors())
}

func TestStepStatus_GetLastScaledAt(t *testing.T) {
assert.Equal(t, time.Time{}, (&StepStatus{}).GetLastScaledAt())
}

func TestStepStatus_GetReplicas(t *testing.T) {
var x *StepStatus
assert.Equal(t, -1, x.GetReplicas())
assert.False(t, StepStatus{}.AnyErrors())
}
8 changes: 4 additions & 4 deletions api/v1alpha1/step_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ type Step struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

Spec StepSpec `json:"spec" protobuf:"bytes,2,opt,name=spec"`
Status *StepStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
Spec StepSpec `json:"spec" protobuf:"bytes,2,opt,name=spec"`
Status StepStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

func (in *Step) GetTargetReplicas(scalingDelay, peekDelay time.Duration) int {
lastScaledAt := in.Status.GetLastScaledAt()
lastScaledAt := in.Status.LastScaledAt.Time
currentReplicas := in.Status.GetReplicas() // can be -1

if time.Since(lastScaledAt) < scalingDelay {
return currentReplicas
}

pending := in.Status.GetSourceStatues().GetPending()
pending := in.Status.SourceStatuses.GetPending()
targetReplicas := in.Spec.CalculateReplicas(pending)

// do we need to peek? currentReplicas and targetReplicas must both be zero
Expand Down
30 changes: 15 additions & 15 deletions api/v1alpha1/step_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
)

func TestStep_GetTargetReplicas(t *testing.T) {
old := &metav1.Time{}
recent := &metav1.Time{Time: time.Now().Add(-2 * time.Minute)}
now := &metav1.Time{Time: time.Now()}
old := metav1.Time{}
recent := metav1.Time{Time: time.Now().Add(-2 * time.Minute)}
now := metav1.Time{Time: time.Now()}
scalingDelay := time.Minute
peekDelay := 4 * time.Minute
t.Run("Init", func(t *testing.T) {
Expand All @@ -26,57 +26,57 @@ func TestStep_GetTargetReplicas(t *testing.T) {
})
t.Run("ScalingUp", func(t *testing.T) {
t.Run("Min=2,Replicas=1,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: &StepStatus{Replicas: 1, LastScaledAt: old}}
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: StepStatus{Replicas: 1, LastScaledAt: old}}
assert.Equal(t, 2, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("Min=2,Replicas=1,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: &StepStatus{Replicas: 1, LastScaledAt: recent}}
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: StepStatus{Replicas: 1, LastScaledAt: recent}}
assert.Equal(t, 2, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("Min=2,Replicas=1,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: &StepStatus{Replicas: 1, LastScaledAt: now}}
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: StepStatus{Replicas: 1, LastScaledAt: now}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
})
t.Run("ScalingDown", func(t *testing.T) {
t.Run("Min=1,Replicas=2,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: &StepStatus{Replicas: 2, LastScaledAt: old}}
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: StepStatus{Replicas: 2, LastScaledAt: old}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("Min=1,Replicas=2,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: &StepStatus{Replicas: 2, LastScaledAt: recent}}
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: StepStatus{Replicas: 2, LastScaledAt: recent}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("Min=1,Replicas=2,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: &StepStatus{Replicas: 2, LastScaledAt: now}}
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: StepStatus{Replicas: 2, LastScaledAt: now}}
assert.Equal(t, 2, s.GetTargetReplicas(scalingDelay, peekDelay))
})
})
t.Run("ScaleToZero", func(t *testing.T) {
t.Run("Min=0,Replicas=1,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 1, LastScaledAt: old}}
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 1, LastScaledAt: old}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("Min=0,Replicas=1,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 1, LastScaledAt: recent}}
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 1, LastScaledAt: recent}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("Min=0,Replicas=1,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 1, LastScaledAt: now}}
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 1, LastScaledAt: now}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
})
t.Run("Peek", func(t *testing.T) {
t.Run("Min=0,Replicas=0,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 0, LastScaledAt: old}}
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 0, LastScaledAt: old}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("Min=0,Replicas=0,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 0, LastScaledAt: now}}
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 0, LastScaledAt: now}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("Min=0,Replicas=0,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 0, LastScaledAt: now}}
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 0, LastScaledAt: now}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
})
})
Expand Down
Loading

0 comments on commit 5c6832b

Please sign in to comment.