Skip to content

Commit

Permalink
feat: add retryPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 8, 2021
1 parent 8170a88 commit c6a9e6e
Show file tree
Hide file tree
Showing 60 changed files with 471 additions and 244 deletions.
389 changes: 215 additions & 174 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions api/v1alpha1/retry_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package v1alpha1

// +kubebuilder:validation:Enum=Always;Never
type RetryPolicy string

const (
RetryNever RetryPolicy = "Never"
RetryAlways RetryPolicy = "Always"
)
2 changes: 2 additions & 0 deletions api/v1alpha1/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ type Source struct {
STAN *STAN `json:"stan,omitempty" protobuf:"bytes,3,opt,name=stan"`
Kafka *Kafka `json:"kafka,omitempty" protobuf:"bytes,4,opt,name=kafka"`
HTTP *HTTPSource `json:"http,omitempty" protobuf:"bytes,5,opt,name=http"`
// +kubebuilder:default=Never
RetryPolicy RetryPolicy `json:"retryPolicy,omitempty" protobuf:"bytes,6,opt,name=retryPolicy,casttype=RetryPolicy"`
}
12 changes: 12 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down Expand Up @@ -2670,6 +2676,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down
12 changes: 12 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down Expand Up @@ -2670,6 +2676,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down
12 changes: 12 additions & 0 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down Expand Up @@ -2670,6 +2676,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down
12 changes: 12 additions & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down Expand Up @@ -2670,6 +2676,12 @@ spec:
name:
default: default
type: string
retryPolicy:
default: Never
enum:
- Always
- Never
type: string
stan:
properties:
clusterId:
Expand Down
7 changes: 3 additions & 4 deletions docs/EXAMPLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/m

### [103-scaling](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/103-scaling-pipeline.yaml)

This is an example of having multiple replicas for a single step.
This is an example of having multiple replicas for a single step.

Steps can be manually scaled using `kubectl`:

Expand Down Expand Up @@ -265,10 +265,9 @@ This logs the message.
kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-cron-log-pipeline.yaml
```

### [erroring](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-erroring-pipeline.yaml)

This example creates errors randomly
### [301-erroring](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-erroring-pipeline.yaml)

This example showcases retry policies.

```
kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-erroring-pipeline.yaml
Expand Down
57 changes: 35 additions & 22 deletions dsls/python/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import getpass
import inspect
import yaml

DEFAULT_RUNTIME = 'python3-9'

GROUPS_VOLUME_NAME = 'groups'
USER = getpass.getuser()


def str_presenter(dumper, data):
Expand All @@ -20,11 +21,15 @@ def __init__(self, name):
self._name = name
self._annotations = {}
self._steps = []
self.owner(USER)

def annotate(self, name, value):
self._annotations[name] = value
return self

def owner(self, value):
return self.annotate('dataflow.argoproj.io/owner', value)

def describe(self, value):
return self.annotate('dataflow.argoproj.io/description', value)

Expand Down Expand Up @@ -324,13 +329,16 @@ def dump(self):


class Source:
def __init__(self, name=None):
def __init__(self, name=None, retryPolicy=None):
self._name = name
self._retryPolicy = retryPolicy

def dump(self):
x = {}
if self._name:
x['name'] = self._name
if self._retryPolicy:
x['retryPolicy'] = self._retryPolicy
return x

def cat(self, name):
Expand Down Expand Up @@ -400,22 +408,23 @@ def map(name, map):


class CronSource(Source):
def __init__(self, schedule, layout, name=None):
super().__init__(name)
def __init__(self, schedule, layout, name=None, retryPolicy=None):
super().__init__(name=name, retryPolicy=retryPolicy)
self._schedule = schedule
self._layout = layout

def dump(self):
x = super().dump()
x['schedule'] = self._schedule
y = {'schedule': self._schedule}
if self._layout:
x['layout'] = self._layout
return {'cron': x}
y['layout'] = self._layout
x['cron'] = y
return x


class HTTPSource(Source):
def __init__(self, name=None):
super().__init__(name)
def __init__(self, name=None, retryPolicy=None):
super().__init__(name=name, retryPolicy=retryPolicy)

def dump(self):
x = super().dump()
Expand All @@ -424,8 +433,8 @@ def dump(self):


class KafkaSource(Source):
def __init__(self, topic, parallel=None, name=None):
super().__init__(name)
def __init__(self, topic, parallel=None, name=None, retryPolicy=None):
super().__init__(name=name, retryPolicy=retryPolicy)
self._topic = topic
self._parallel = parallel

Expand All @@ -438,27 +447,31 @@ def dump(self):


class STANSource(Source):
def __init__(self, subject, name=None):
super().__init__(name)
def __init__(self, subject, name=None, parallel=None, retryPolicy=None):
super().__init__(name=name, retryPolicy=retryPolicy)
self._subject = subject
self._parallel = parallel

def dump(self):
x = super().dump()
x['stan'] = {'subject': self._subject}
y = {'subject': self._subject}
if self._parallel:
y['parallel'] = self._parallel
x['stan'] = y
return x


def cron(schedule, layout=None, name=None):
return CronSource(schedule, layout=layout, name=name)
def cron(schedule, layout=None, name=None, retryPolicy=None):
return CronSource(schedule, layout=layout, name=name, retryPolicy=retryPolicy)


def http(name=None):
return HTTPSource(name=name)
def http(name=None, retryPolicy=None):
return HTTPSource(name=name, retryPolicy=retryPolicy)


def kafka(topic, parallel=None, name=None):
return KafkaSource(topic, parallel, name=name)
def kafka(topic, parallel=None, name=None, retryPolicy=None):
return KafkaSource(topic, parallel, name=name, retryPolicy=retryPolicy)


def stan(subject, name=None):
return STANSource(subject, name=name)
def stan(subject, name=None, parallel=None, retryPolicy=None):
return STANSource(subject, name=name, parallel=parallel, retryPolicy=retryPolicy)
1 change: 1 addition & 0 deletions examples/101-hello-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

if __name__ == '__main__':
(pipeline("101-hello")
.owner('argoproj-labs')
.describe("""This is the hello world of pipelines.
It uses a cron schedule as a source and then just cat the message to a log""")
Expand Down
1 change: 1 addition & 0 deletions examples/101-hello-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ metadata:
This is the hello world of pipelines.
It uses a cron schedule as a source and then just cat the message to a log
dataflow.argoproj.io/owner: argoproj-labs
dataflow.argoproj.io/test: 'true'
name: 101-hello
spec:
Expand Down
1 change: 1 addition & 0 deletions examples/101-two-node-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

if __name__ == '__main__':
(pipeline("101-two-node")
.owner('argoproj-labs')
.describe("""This example shows a example of having two nodes in a pipeline.
While they read from Kafka, they are connected by a NATS Streaming subject.""")
Expand Down
4 changes: 2 additions & 2 deletions examples/101-two-node-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ metadata:
This example shows a example of having two nodes in a pipeline.
While they read from Kafka, they are connected by a NATS Streaming subject.
dataflow.argoproj.io/owner: argoproj-labs
name: 101-two-node
spec:
steps:
Expand All @@ -23,5 +24,4 @@ spec:
- kafka:
topic: output-topic
sources:
- stan:
subject: a-b
- {}
1 change: 1 addition & 0 deletions examples/102-filter-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

if __name__ == '__main__':
(pipeline("102-filter")
.owner('argoproj-labs')
.describe("""This is an example of built-in filtering.
Filters are written using expression syntax and must return a boolean.
Expand Down
1 change: 1 addition & 0 deletions examples/102-filter-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ metadata:
They have a single variable, `msg`, which is a byte array.
[Learn about expressions](../docs/EXPRESSIONS.md)
dataflow.argoproj.io/owner: argoproj-labs
name: 102-filter
spec:
steps:
Expand Down
1 change: 1 addition & 0 deletions examples/102-flatten-expand-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

if __name__ == '__main__':
(pipeline("102-flatten-expand")
.owner('argoproj-labs')
.describe("""This is an example of built-in flattening and expanding.""")
.step(
cron('*/3 * * * * *')
Expand Down
7 changes: 3 additions & 4 deletions examples/102-flatten-expand-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
annotations:
dataflow.argoproj.io/description: This is an example of built-in flattening and
expanding.
dataflow.argoproj.io/owner: argoproj-labs
name: 102-flatten-expand
spec:
steps:
Expand All @@ -22,12 +23,10 @@ spec:
- stan:
subject: flattened
sources:
- stan:
subject: data
- {}
- expand: {}
name: expand
sinks:
- log: {}
sources:
- stan:
subject: flattened
- {}
Loading

0 comments on commit c6a9e6e

Please sign in to comment.