Skip to content

Commit

Permalink
feat: support HPA
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 17, 2021
1 parent bb32570 commit 290cde4
Show file tree
Hide file tree
Showing 67 changed files with 2,606 additions and 850 deletions.
4 changes: 3 additions & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
ignore:
- .github
- shared/util/containerkiller/container_killer.go
- api/v1alpha1/generated.pb.go
- api/v1alpha1/zz_generated.deepcopy.go
- bin
- config
- docs
- examples
- hack
- kill
- manager
- runner
- runtimes
- shared/containerkiller
coverage:
status:
# we've found this not to be useful
Expand Down
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
*.iml
.github
.idea
bin
docs
examples
18 changes: 11 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ build: generate manifests
test:
go test -v ./... -coverprofile cover.out

pre-commit: codegen test install lint
pre-commit: codegen test install lint start

codegen: generate manifests proto config/ci.yaml config/default.yaml config/dev.yaml config/kafka-default.yaml config/quick-start.yaml config/stan-default.yaml docs/EXAMPLES.md
codegen: generate manifests proto config/ci.yaml config/default.yaml config/dev.yaml config/kafka-default.yaml config/quick-start.yaml config/stan-default.yaml docs/EXAMPLES.md CHANGELOG.md
go generate ./...
./hack/changelog.sh > CHANGELOG.md


$(GOBIN)/goreman:
go install github.com/mattn/[email protected]
Expand Down Expand Up @@ -78,8 +78,12 @@ generate: $(GOBIN)/controller-gen
$(GOBIN)/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."

.PHONY: docs/EXAMPLES.md
docs/EXAMPLES.md:
go run ./docs/examples > docs/EXAMPLES.md
docs/EXAMPLES.md: /dev/null
go run ./examples > docs/EXAMPLES.md

.PHONY: CHANGELOG.md
CHANGELOG.md: /dev/null
./hack/changelog.sh > CHANGELOG.md

# not dependant on api/v1alpha1/generated.proto because it often does not change when this target runs, so results in remakes when they are not needed
proto: api/v1alpha1/generated.pb.go
Expand All @@ -100,7 +104,7 @@ api/v1alpha1/generated.%: $(shell find api/v1alpha1 -type f -name '*.go' -not -n
lint:
go mod tidy
golangci-lint run --fix
kubectl apply --dry-run=client -f docs/examples
kubectl apply --dry-run=client -f examples

.PHONY: controller
controller: controller-image
Expand Down Expand Up @@ -153,7 +157,7 @@ config/stan/single-server-stan.yml:

.PHONY: test-examples
test-examples:
go test -timeout 20m -v -tags examples -count 1 ./docs/examples
go test -timeout 20m -v -count 1 ./examples

argocli:
cd ../../argoproj/argo-workflows && git checkout dev-dataflow && make ./dist/argo DEV_BRANCH=true && ./dist/argo server --secure=false --namespaced --auth-mode=server --namespace=argo-dataflow-system
Expand Down
673 changes: 458 additions & 215 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

24 changes: 16 additions & 8 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions api/v1alpha1/scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package v1alpha1

type Scale struct {
MinReplicas int32 `json:"minReplicas,omitempty" protobuf:"varint,1,opt,name=minReplicas"`
MaxReplicas *uint32 `json:"maxReplicas,omitempty" protobuf:"varint,2,opt,name=maxReplicas"` // takes precedence over min
ReplicaRatio uint32 `json:"replicaRatio,omitempty" protobuf:"varint,3,opt,name=replicaRatio"`
}

// Used to calculate the number of replicas.
// min(r.max, max(r.min, pending/ratio))
// Example:
// min=1, max=4, ratio=100
// pending=0, replicas=1
// pending=100, replicas=1
// pending=200, replicas=2
// pending=300, replicas=3
// pending=400, replicas=4
// pending=500, replicas=4
func (in Scale) Calculate(pending int) int {
n := 0
if in.ReplicaRatio > 0 {
n = pending / int(in.ReplicaRatio)
}
if n < int(in.MinReplicas) {
n = int(in.MinReplicas)
}
if in.MaxReplicas != nil && n > int(*in.MaxReplicas) {
n = int(*in.MaxReplicas)
}
return n
}
17 changes: 17 additions & 0 deletions api/v1alpha1/scale_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestScale_Calculate(t *testing.T) {
assert.Equal(t, 0, Scale{MinReplicas: 0}.Calculate(0))
assert.Equal(t, 0, Scale{MinReplicas: 0}.Calculate(0))
max := uint32(0)
assert.Equal(t, 0, Scale{MinReplicas: 1, MaxReplicas: &max}.Calculate(0))
assert.Equal(t, 2, Scale{MinReplicas: 1, ReplicaRatio: 2}.Calculate(4))
max = uint32(1)
assert.Equal(t, 1, Scale{MinReplicas: 1, MaxReplicas: &max, ReplicaRatio: 2}.Calculate(4))
}
4 changes: 2 additions & 2 deletions api/v1alpha1/sink_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type SinkStatuses map[string]SinkStatus

func (in SinkStatuses) Set(name string, replica int, msg string) {
x := in[name]
x.LastMessage = &Message{Data: msg, Time: metav1.Now()}
x.LastMessage = &Message{Data: trunc(msg), Time: metav1.Now()}
if x.Metrics == nil {
x.Metrics = map[string]Metrics{}
}
Expand All @@ -22,7 +22,7 @@ func (in SinkStatuses) Set(name string, replica int, msg string) {

func (in SinkStatuses) IncErrors(name string, replica int, err error) {
x := in[name]
x.LastError = &Error{Message: err.Error(), Time: metav1.Now()}
x.LastError = &Error{Message: trunc(err.Error()), Time: metav1.Now()}
if x.Metrics == nil {
x.Metrics = map[string]Metrics{}
}
Expand Down
31 changes: 7 additions & 24 deletions api/v1alpha1/step_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ type StepSpec struct {
Map Map `json:"map,omitempty" protobuf:"bytes,9,opt,name=map,casttype=Map"`
Group *Group `json:"group,omitempty" protobuf:"bytes,11,opt,name=group"`

// +kubebuilder:default=1
MinReplicas int32 `json:"minReplicas" protobuf:"varint,20,opt,name=minReplicas"` // this is both the min, and the initial value
MaxReplicas *uint32 `json:"maxReplicas,omitempty" protobuf:"varint,21,opt,name=maxReplicas"` // takes precedence over min
ReplicaRatio uint32 `json:"replicaRatio,omitempty" protobuf:"varint,22,opt,name=replicaRatio"`

Replicas *uint32 `json:"replicas,omitempty" protobuf:"varint,23,opt,name=replicas"`
Scale *Scale `json:"scale,omitempty" protobuf:"bytes,24,opt,name=scale"`
// +patchStrategy=merge
// +patchMergeKey=name
Sources []Source `json:"sources,omitempty" protobuf:"bytes,3,rep,name=sources"`
Expand Down Expand Up @@ -148,26 +145,12 @@ func (in *StepSpec) getType() containerSupplier {
}
}

// Used to calculate the number of replicas.
// min(r.max, max(r.min, pending/ratio))
// Example:
// min=1, max=4, ratio=100
// pending=0, replicas=1
// pending=100, replicas=1
// pending=200, replicas=2
// pending=300, replicas=3
// pending=400, replicas=4
// pending=500, replicas=4
func (in *StepSpec) CalculateReplicas(pending int) int {
n := 0
if in.ReplicaRatio > 0 {
n = pending / int(in.ReplicaRatio)
}
if n < int(in.MinReplicas) {
n = int(in.MinReplicas)
if in.Replicas != nil {
return int(*in.Replicas)
}
if in.MaxReplicas != nil && n > int(*in.MaxReplicas) {
n = int(*in.MaxReplicas)
if in.Scale == nil {
return 1
}
return n
return in.Scale.Calculate(pending)
}
5 changes: 3 additions & 2 deletions api/v1alpha1/step_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

type StepStatus struct {
Phase StepPhase `json:"phase" protobuf:"bytes,1,opt,name=phase,casttype=StepPhase"`
Phase StepPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=StepPhase"`
Message string `json:"message,omitempty" protobuf:"bytes,2,opt,name=message"`
Replicas uint32 `json:"replicas" protobuf:"varint,5,opt,name=replicas"`
Replicas uint32 `json:"replicas,omitempty" protobuf:"varint,5,opt,name=replicas"`
Selector string `json:"selector,omitempty" protobuf:"bytes,7,opt,name=selector"`
LastScaledAt *metav1.Time `json:"lastScaledAt,omitempty" protobuf:"bytes,6,opt,name=lastScaledAt"`
SourceStatues SourceStatuses `json:"sourceStatuses,omitempty" protobuf:"bytes,3,rep,name=sourceStatuses"`
SinkStatues SinkStatuses `json:"sinkStatuses,omitempty" protobuf:"bytes,4,rep,name=sinkStatuses"`
Expand Down
4 changes: 3 additions & 1 deletion api/v1alpha1/step_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector
// +kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Message",type=string,JSONPath=`.status.message`
// +kubebuilder:printcolumn:name="Replicas",type=string,JSONPath=`.status.replicas`
// +kubebuilder:printcolumn:name="Desired",type=string,JSONPath=`.spec.replicas`
// +kubebuilder:printcolumn:name="Current",type=string,JSONPath=`.status.replicas`
type Step struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
Expand Down
56 changes: 28 additions & 28 deletions api/v1alpha1/step_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,68 +15,68 @@ func TestStep_GetTargetReplicas(t *testing.T) {
scalingDelay := time.Minute
peekDelay := 4 * time.Minute
t.Run("Init", func(t *testing.T) {
t.Run("MinReplicas=0", func(t *testing.T) {
s := &Step{Spec: StepSpec{MinReplicas: 0}}
t.Run("Min=0", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 0}}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("MinReplicas=1", func(t *testing.T) {
s := &Step{Spec: StepSpec{MinReplicas: 1}}
t.Run("Min=1", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
})
t.Run("ScalingUp", func(t *testing.T) {
t.Run("MinReplicas=2,Replicas=1,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{MinReplicas: 2}, Status: &StepStatus{Replicas: 1, LastScaledAt: old}}
t.Run("Min=2,Replicas=1,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: &StepStatus{Replicas: 1, LastScaledAt: old}}
assert.Equal(t, 2, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("MinReplicas=2,Replicas=1,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{MinReplicas: 2}, Status: &StepStatus{Replicas: 1, LastScaledAt: recent}}
t.Run("Min=2,Replicas=1,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: &StepStatus{Replicas: 1, LastScaledAt: recent}}
assert.Equal(t, 2, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("MinReplicas=2,Replicas=1,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{MinReplicas: 2}, Status: &StepStatus{Replicas: 1, LastScaledAt: now}}
t.Run("Min=2,Replicas=1,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: &StepStatus{Replicas: 1, LastScaledAt: now}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
})
t.Run("ScalingDown", func(t *testing.T) {
t.Run("MinReplicas=1,Replicas=2,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{MinReplicas: 1}, Status: &StepStatus{Replicas: 2, LastScaledAt: old}}
t.Run("Min=1,Replicas=2,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: &StepStatus{Replicas: 2, LastScaledAt: old}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("MinReplicas=1,Replicas=2,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{MinReplicas: 1}, Status: &StepStatus{Replicas: 2, LastScaledAt: recent}}
t.Run("Min=1,Replicas=2,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: &StepStatus{Replicas: 2, LastScaledAt: recent}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("MinReplicas=1,Replicas=2,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{MinReplicas: 1}, Status: &StepStatus{Replicas: 2, LastScaledAt: now}}
t.Run("Min=1,Replicas=2,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: &StepStatus{Replicas: 2, LastScaledAt: now}}
assert.Equal(t, 2, s.GetTargetReplicas(scalingDelay, peekDelay))
})
})
t.Run("ScaleToZero", func(t *testing.T) {
t.Run("MinReplicas=0,Replicas=1,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{}, Status: &StepStatus{Replicas: 1, LastScaledAt: old}}
t.Run("Min=0,Replicas=1,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 1, LastScaledAt: old}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("MinReplicas=0,Replicas=1,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{}, Status: &StepStatus{Replicas: 1, LastScaledAt: recent}}
t.Run("Min=0,Replicas=1,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 1, LastScaledAt: recent}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("MinReplicas=0,Replicas=1,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{}, Status: &StepStatus{Replicas: 1, LastScaledAt: now}}
t.Run("Min=0,Replicas=1,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 1, LastScaledAt: now}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
})
t.Run("Peek", func(t *testing.T) {
t.Run("MinReplicas=0,Replicas=0,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{}, Status: &StepStatus{Replicas: 0, LastScaledAt: old}}
t.Run("Min=0,Replicas=0,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 0, LastScaledAt: old}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("MinReplicas=0,Replicas=0,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{}, Status: &StepStatus{Replicas: 0, LastScaledAt: now}}
t.Run("Min=0,Replicas=0,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 0, LastScaledAt: now}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
})
t.Run("MinReplicas=0,Replicas=0,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{}, Status: &StepStatus{Replicas: 0, LastScaledAt: now}}
t.Run("Min=0,Replicas=0,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: &StepStatus{Replicas: 0, LastScaledAt: now}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
})
})
Expand Down
12 changes: 12 additions & 0 deletions api/v1alpha1/trunc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package v1alpha1

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_trunc(t *testing.T) {
assert.Len(t, trunc(strings.Repeat("x", 99)), 32)
}
Loading

0 comments on commit 290cde4

Please sign in to comment.