Skip to content

Commit

Permalink
feat: automatic GC of pipelines 30m after completion
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jul 2, 2021
1 parent be5e5ab commit 810aef7
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 209 deletions.
3 changes: 2 additions & 1 deletion api/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ const (
CtrMain = "main"
CtrSidecar = "sidecar"
// env vars
EnvImageFormat = "ARGO_DATAFLOW_IMAGE_FORMAT" // default "quay.io/argoproj/%s:latest"
EnvImageFormat = "ARGO_DATAFLOW_IMAGE_FORMAT" // default "quay.io/argoproj/%s:latest"
EnvDeletionDelay = "ARGO_DATAFLOW_DELETION_DELAY" // default "30m"
EnvNamespace = "ARGO_DATAFLOW_NAMESPACE"
EnvPipelineName = "ARGO_DATAFLOW_PIPELINE_NAME"
EnvReplica = "ARGO_DATAFLOW_REPLICA"
Expand Down
430 changes: 239 additions & 191 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/v1alpha1/pipeline_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ type PipelineStatus struct {
Phase PipelinePhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=PipelinePhase"`
Message string `json:"message,omitempty" protobuf:"bytes,2,opt,name=message"`
Conditions []metav1.Condition `json:"conditions,omitempty" protobuf:"bytes,3,rep,name=conditions"`
LastUpdated metav1.Time `json:"lastUpdated"`
LastUpdated metav1.Time `json:"lastUpdated" protobuf:"bytes,4,opt,name=lastUpdated"`
}
5 changes: 5 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,9 @@ spec:
- type
type: object
type: array
lastUpdated:
format: date-time
type: string
message:
type: string
phase:
Expand All @@ -1891,6 +1894,8 @@ spec:
- Succeeded
- Failed
type: string
required:
- lastUpdated
type: object
required:
- spec
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2929,6 +2929,9 @@ spec:
- type
type: object
type: array
lastUpdated:
format: date-time
type: string
message:
type: string
phase:
Expand All @@ -2939,6 +2942,8 @@ spec:
- Succeeded
- Failed
type: string
required:
- lastUpdated
type: object
required:
- spec
Expand Down
5 changes: 5 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,9 @@ spec:
- type
type: object
type: array
lastUpdated:
format: date-time
type: string
message:
type: string
phase:
Expand All @@ -1891,6 +1894,8 @@ spec:
- Succeeded
- Failed
type: string
required:
- lastUpdated
type: object
required:
- spec
Expand Down
14 changes: 8 additions & 6 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,9 @@ spec:
- type
type: object
type: array
lastUpdated:
format: date-time
type: string
message:
type: string
phase:
Expand All @@ -1891,6 +1894,8 @@ spec:
- Succeeded
- Failed
type: string
required:
- lastUpdated
type: object
required:
- spec
Expand Down Expand Up @@ -4489,7 +4494,6 @@ metadata:
name: kafka-broker
namespace: argo-dataflow-system
spec:
replicas: 2
selector:
matchLabels:
app: kafka-broker
Expand All @@ -4504,13 +4508,11 @@ spec:
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
value: kafka-broker
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181
- name: KAFKA_CREATE_TOPICS
value: __consumer_offsets:1:1
- name: KAFKA_BROKER_ID
value: "0"
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: main
Expand Down
9 changes: 3 additions & 6 deletions config/kafka-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ metadata:
name: kafka-broker
namespace: argo-dataflow-system
spec:
replicas: 2
selector:
matchLabels:
app: kafka-broker
Expand All @@ -46,13 +45,11 @@ spec:
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
value: kafka-broker
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181
- name: KAFKA_CREATE_TOPICS
value: __consumer_offsets:1:1
- name: KAFKA_BROKER_ID
value: "0"
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: main
Expand Down
5 changes: 5 additions & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,9 @@ spec:
- type
type: object
type: array
lastUpdated:
format: date-time
type: string
message:
type: string
phase:
Expand All @@ -1891,6 +1894,8 @@ spec:
- Succeeded
- Failed
type: string
required:
- lastUpdated
type: object
required:
- spec
Expand Down
8 changes: 5 additions & 3 deletions manager/controllers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ var (
"memory": resource.MustParse("64Mi"),
},
}
logger = util.NewLogger()
deletionDelay = util.GetEnvDuration(dfv1.EnvDeletionDelay, 30*time.Minute)
logger = util.NewLogger()
)

func init() {
Expand All @@ -52,8 +53,9 @@ func init() {
"runnerImage", runnerImage,
"pullPolicy", pullPolicy,
"updateInterval", updateInterval.String(),
"scalingDelay", scalingDelay,
"peekDelay", peekDelay,
"scalingDelay", scalingDelay.String(),
"peekDelay", peekDelay.String(),
"defaultResourceRequirements", defaultResourceRequirements,
"deletionDelay", deletionDelay.String(),
)
}
10 changes: 9 additions & 1 deletion runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"github.com/argoproj-labs/argo-dataflow/runner/dedupe"
"io/ioutil"
Expand Down Expand Up @@ -60,7 +61,14 @@ func main() {
return fmt.Errorf("unknown comand")
}
}()
if err != nil && err != context.Canceled {
is := errors.Is(err, context.Canceled)
logger.Info("is", "is", is)



if is {
println(fmt.Errorf("ignoring context cancelled error, expected"))
} else if err != nil {
if err := ioutil.WriteFile("/dev/termination-log", []byte(err.Error()), 0o600); err != nil {
println(fmt.Sprintf("failed to write termination-log: %v", err))
}
Expand Down
35 changes: 35 additions & 0 deletions test/e2e/completion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// +build test

package e2e

import (
"testing"

. "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
. "github.com/argoproj-labs/argo-dataflow/test"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestCompletion(t *testing.T) {

Setup(t)
defer Teardown(t)

CreatePipeline(Pipeline{
ObjectMeta: metav1.ObjectMeta{Name: "completion"},
Spec: PipelineSpec{
Steps: []StepSpec{{
Name: "main",
Container: &Container{
Image: "alpine",
Command: []string{"sh"},
Args: []string{"-c", "exit 0"},
},
}},
},
})

WaitForPipeline(UntilSucceeded)
DeletePipelines()
WaitForPodsToBeDeleted()
}
4 changes: 4 additions & 0 deletions test/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func UntilMessagesSunk(pl Pipeline) bool {
return meta.FindStatusCondition(pl.Status.Conditions, ConditionSunkMessages) != nil
}

func UntilSucceeded(pl Pipeline) bool {
return pl.Status.Phase == PipelineSucceeded
}

func DeletePipelines() {
log.Printf("deleting pipelines\n")
ctx := context.Background()
Expand Down

0 comments on commit 810aef7

Please sign in to comment.