Skip to content

Commit

Permalink
feat: Add dedupe to Python DSL (#341)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Sep 17, 2021
1 parent afd4583 commit 489ded6
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ kubebuilder:
mv kubebuilder_$(version)_$(name)_$(arch) kubebuilder && sudo mv kubebuilder /usr/local/

.PHONY: examples
examples: $(shell find examples -name '*-pipeline.yaml' | sort) docs/EXAMPLES.md
examples: $(shell find examples -name '*-pipeline.yaml' | sort) docs/EXAMPLES.md test/examples/examples_test.go

.PHONY: tests
tests: test/examples/examples_test.go
Expand Down
8 changes: 8 additions & 0 deletions docs/EXAMPLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ While they read from Kafka, they are connected by a NATS Streaming subject.
kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/101-two-node-pipeline.yaml
```

### [102-dedupe](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-dedupe-pipeline.yaml)

This is an example of built-in de-duplication step.

```
kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-dedupe-pipeline.yaml
```

### [102-filter](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-filter-pipeline.yaml)

This is an example of built-in filtering.
Expand Down
17 changes: 17 additions & 0 deletions dsls/python/argo_dataflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ def dump(self):
return x


class DedupeStep(Step):
def __init__(self, name, sources=[]):
super().__init__(name, sources=sources)

def dump(self):
x = super().dump()
x['dedupe'] = {}
return x


class ExpandStep(Step):
def __init__(self, name, sources=[]):
super().__init__(name, sources=sources)
Expand Down Expand Up @@ -438,6 +448,9 @@ def container(self, name, image, args=[], fifo=False, volumes=[], volumeMounts=[
return ContainerStep(name, sources=[self], image=image, args=args, fifo=fifo, volumes=volumes,
volumeMounts=volumeMounts, env=env, resources=resources, terminator=terminator)

def dedupe(self, name):
return DedupeStep(name, sources=[self])

def expand(self, name):
return ExpandStep(name, sources=[self])

Expand Down Expand Up @@ -469,6 +482,10 @@ def container(name, image, args, fifo=False, volumes=[], volumeMounts=[], env={}
volumeMounts=volumeMounts, env=env, resources=resources)


def dedupe(name):
return DedupeStep(name)


def expand(name):
return ExpandStep(name)

Expand Down
12 changes: 12 additions & 0 deletions examples/102-dedupe-pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from argo_dataflow import pipeline, kafka

if __name__ == '__main__':
(pipeline("102-dedupe")
.owner('argoproj-labs')
.describe("""This is an example of built-in de-duplication step.""")
.step(
(kafka('input-topic')
.dedupe('main')
.kafka('output-topic'))
)
.save())
18 changes: 18 additions & 0 deletions examples/102-dedupe-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: dataflow.argoproj.io/v1alpha1
kind: Pipeline
metadata:
annotations:
dataflow.argoproj.io/description: This is an example of built-in de-duplication
step.
dataflow.argoproj.io/owner: argoproj-labs
name: 102-dedupe
spec:
steps:
- dedupe: {}
name: main
sinks:
- kafka:
topic: output-topic
sources:
- kafka:
topic: input-topic
12 changes: 12 additions & 0 deletions test/examples/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ func Test_101_two_node_pipeline(t *testing.T) {
WaitForPodsToBeDeleted()
}

func Test_102_dedupe_pipeline(t *testing.T) {
defer Setup(t)()

CreatePipelineFromFile("../../examples/102-dedupe-pipeline.yaml")

WaitForPipeline()
WaitForPipeline(UntilRunning, 90*time.Second)

DeletePipelines()
WaitForPodsToBeDeleted()
}

func Test_102_filter_pipeline(t *testing.T) {
defer Setup(t)()

Expand Down

0 comments on commit 489ded6

Please sign in to comment.