Skip to content

Commit

Permalink
feat: Expose async and sidecarResources in Python DSL
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Aug 9, 2021
1 parent b3bb0c5 commit 95f8cb2
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,8 @@ examples: $(shell find examples -name '*-pipeline.yaml' | sort) docs/EXAMPLES.md
install-dsls:
pip3 install dsls/python

.PHONY: example-yamls
example-yamls: install-dsls
cd examples && python3 *.py
examples/%-pipeline.yaml: examples/%-pipeline.py example-yamls
examples/%-pipeline.yaml: examples/%-pipeline.py dsls/python/*.py install-dsls
cd examples && python3 $*-pipeline.py

argocli:
cd ../../argoproj/argo-workflows && git checkout dev-dataflow && make ./dist/argo DEV_BRANCH=true && ./dist/argo server --secure=false --namespaced --auth-mode=server --namespace=argo-dataflow-system
Expand Down
26 changes: 20 additions & 6 deletions dsls/python/argo_dataflow/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import getpass
import inspect
import json

import kubernetes
import yaml

Expand Down Expand Up @@ -149,13 +150,16 @@ def dump(self):


class KafkaSink(Sink):
def __init__(self, subject, name=None):
def __init__(self, subject, name=None, a_sync=False):
super().__init__(name)
self._subject = subject
self._a_sync = a_sync

def dump(self):
x = super().dump()
x['kafka'] = {'topic': self._subject}
if self._a_sync:
x['kafka']['async'] = True
return x


Expand All @@ -171,14 +175,15 @@ def dump(self):


class Step:
def __init__(self, name, sources=[], volumes=[], terminator=False):
def __init__(self, name, sources=[], volumes=[], terminator=False, sidecarResource=None):
self._name = name
self._sources = sources
self._sinks = []
self._scale = {}
self._volumes = volumes
self._terminator = terminator
self._annotations = []
self._sidecarResources = sidecarResource

def log(self, name=None):
self._sinks.append(LogSink(name=name))
Expand All @@ -188,8 +193,8 @@ def http(self, url, name=None):
self._sinks.append(HTTPSink(url, name=name))
return self

def kafka(self, subject, name=None):
self._sinks.append(KafkaSink(subject, name=name))
def kafka(self, subject, name=None, a_sync=False):
self._sinks.append(KafkaSink(subject, name=name, a_sync=a_sync))
return self

def scale(self, minReplicas, maxReplicas, replicaRatio):
Expand All @@ -212,6 +217,10 @@ def annotations(self, annotations):
self._annotations = annotations
return self

def sidecarResources(self, sidecarResources):
self._sidecarResources = sidecarResources
return self

def dump(self):
y = {
'name': self._name,
Expand All @@ -231,6 +240,10 @@ def dump(self):
y['metadata'] = {
'annotations': self._annotations
}
if self._sidecarResources:
y['sidecar'] = {
'resources': self._sidecarResources
}
return y


Expand Down Expand Up @@ -408,9 +421,10 @@ def dump(self):
def cat(self, name):
return CatStep(name, sources=[self])

def container(self, name, image, args=[], fifo=False, volumes=[], volumeMounts=[], env={}, resources={},terminator=False):
def container(self, name, image, args=[], fifo=False, volumes=[], volumeMounts=[], env={}, resources={},
terminator=False):
return ContainerStep(name, sources=[self], image=image, args=args, fifo=fifo, volumes=volumes,
volumeMounts=volumeMounts, env=env, resources=resources,terminator=terminator)
volumeMounts=volumeMounts, env=env, resources=resources, terminator=terminator)

def expand(self, name):
return ExpandStep(name, sources=[self])
Expand Down
2 changes: 1 addition & 1 deletion examples/301-kafka-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
.step(
(kafka('input-topic')
.cat('main')
.kafka('output-topic')
.kafka('output-topic', a_sync=True)
))
.save())
1 change: 1 addition & 0 deletions examples/301-kafka-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ spec:
name: main
sinks:
- kafka:
async: true
topic: output-topic
sources:
- kafka:
Expand Down

0 comments on commit 95f8cb2

Please sign in to comment.