diff --git a/Makefile b/Makefile index 80cf0d9c..26af6f62 100644 --- a/Makefile +++ b/Makefile @@ -154,9 +154,12 @@ config/nats/single-server-nats.yml: config/stan/single-server-stan.yml: curl -o config/stan/single-server-stan.yml https://raw.githubusercontent.com/nats-io/k8s/v0.7.4/nats-streaming-server/single-server-stan.yml -examples: $(shell find examples -name '*-pipeline.yaml') +examples: $(shell find examples -name '*-pipeline.yaml' | sort) examples/101-hello-pipeline.yaml: +examples/101-two-node-pipeline.yaml: +examples/102-filter-pipeline.yaml: +examples/102-flatten-expand-pipeline.yaml: examples/%-pipeline.yaml: examples/%-pipeline.py dsls/python/__init__.py PYTHONPATH=. python3 examples/$*-pipeline.py > $@ diff --git a/docs/EXAMPLES.md b/docs/EXAMPLES.md index e3dc00ed..2bdee537 100644 --- a/docs/EXAMPLES.md +++ b/docs/EXAMPLES.md @@ -1,17 +1,16 @@ ### Examples -### [Hello](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/101-hello-pipeline.yaml) +### [hello](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/101-hello-pipeline.yaml) This is the hello world of pipelines. It uses a cron schedule as a source and then just cat the message to a log - ``` kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/101-hello-pipeline.yaml ``` -### [Two nodes pipeline](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/101-two-node-pipeline.yaml) +### [two-node](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/101-two-node-pipeline.yaml) This example shows a example of having two nodes in a pipeline. @@ -22,7 +21,7 @@ While they read from Kafka, they are connected by a NATS Streaming subject. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/101-two-node-pipeline.yaml ``` -### [Filter messages](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-filter-pipeline.yaml) +### [filter](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-filter-pipeline.yaml) This is an example of built-in filtering. @@ -37,7 +36,7 @@ They have a single variable, `msg`, which is a byte array. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-filter-pipeline.yaml ``` -### [Flatten and expand](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-flatten-expand-pipeline.yaml) +### [flatten-expand](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-flatten-expand-pipeline.yaml) This is an example of built-in flattening and expanding. @@ -46,7 +45,7 @@ This is an example of built-in flattening and expanding. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-flatten-expand-pipeline.yaml ``` -### [Map messages](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-map-pipeline.yaml) +### [map](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-map-pipeline.yaml) This is an example of built-in mapping. @@ -61,7 +60,7 @@ They have a single variable, `msg`, which is a byte array. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-map-pipeline.yaml ``` -### [Using replicas and auto-scaling](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/103-autoscaling-pipeline.yaml) +### [autoscaling](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/103-autoscaling-pipeline.yaml) This is an example of having multiple replicas for a single step. @@ -98,7 +97,22 @@ of replicas re-calculated. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/103-autoscaling-pipeline.yaml ``` -### [Go 1.16 handler](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/104-go1-16-pipeline.yaml) +### [scaling](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/103-scaling-pipeline.yaml) + +This is an example of having multiple replicas for a single step. + +Steps can be manually scaled using `kubectl`: + +``` +kubectl scale step/scaling-main --replicas 3 +``` + + +``` +kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/103-scaling-pipeline.yaml +``` + +### [go1-16](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/104-go1-16-pipeline.yaml) This example of Go 1.16 handler. @@ -109,7 +123,7 @@ This example of Go 1.16 handler. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/104-go1-16-pipeline.yaml ``` -### [Java 16 handler](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/104-java16-pipeline.yaml) +### [java16](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/104-java16-pipeline.yaml) This example is of the Java 16 handler. @@ -120,7 +134,7 @@ This example is of the Java 16 handler. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/104-java16-pipeline.yaml ``` -### [Python 3.9 handler](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/104-python-pipeline.yaml) +### [python3-9](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/104-python-pipeline.yaml) This example is of the Python 3.9 handler. @@ -131,7 +145,7 @@ This example is of the Python 3.9 handler. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/104-python-pipeline.yaml ``` -### [Git handler](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/106-git-pipeline.yaml) +### [git](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/106-git-pipeline.yaml) This example of a pipeline using Git. @@ -145,7 +159,7 @@ your code when the step starts. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/106-git-pipeline.yaml ``` -### [Runs to completion](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/107-completion-pipeline.yaml) +### [completion](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/107-completion-pipeline.yaml) This example shows a pipelne running to completion. @@ -161,7 +175,7 @@ For a pipeline to terminate one of two things must happen: kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/107-completion-pipeline.yaml ``` -### [Terminator](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/107-terminator-pipeline.yaml) +### [terminator](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/107-terminator-pipeline.yaml) This example demostrates having a terminator step, and then terminating other steps using different terminations strategies. @@ -171,7 +185,7 @@ using different terminations strategies. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/107-terminator-pipeline.yaml ``` -### [Using FIFOs for input and outputs](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/108-fifos-pipeline.yaml) +### [fifos](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/108-fifos-pipeline.yaml) This example use named pipe to send and receive messages. @@ -187,7 +201,7 @@ You MUST escape new lines. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/108-fifos-pipeline.yaml ``` -### [Group messages](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/109-group-pipeline.yaml) +### [group](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/109-group-pipeline.yaml) This is an example of built-in grouping. @@ -212,7 +226,7 @@ Storage can either be: kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/109-group-pipeline.yaml ``` -### [Vetinary](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/201-vetinary-pipeline.yaml) +### [vet](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/201-vetinary-pipeline.yaml) This pipeline processes pets (cats and dogs). @@ -221,7 +235,7 @@ This pipeline processes pets (cats and dogs). kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/201-vetinary-pipeline.yaml ``` -### [Word count](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/201-word-count-pipeline.yaml) +### [word-count](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/201-word-count-pipeline.yaml) This pipeline count the number of words in a document, not the number of count of each word as you might expect. @@ -232,7 +246,7 @@ It also shows an example of a pipelines terminates based on a single step's stat kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/201-word-count-pipeline.yaml ``` -### [Cron](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-cron-pipeline.yaml) +### [cron](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-cron-pipeline.yaml) This example uses a cron source. @@ -247,7 +261,7 @@ By deafult, the layout is RFC3339. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-cron-pipeline.yaml ``` -### [Erroring](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-erroring-pipeline.yaml) +### [erroring](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-erroring-pipeline.yaml) This example creates errors randomly @@ -256,7 +270,7 @@ This example creates errors randomly kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-erroring-pipeline.yaml ``` -### [HTTP](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-http-pipeline.yaml) +### [http](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-http-pipeline.yaml) This example uses a HTTP sources and sinks. @@ -269,7 +283,7 @@ Also, this is sync, not async, so it can be slow due to the time taken to delive kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-http-pipeline.yaml ``` -### [Kafka](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-kafka-pipeline.yaml) +### [kafka-1](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-kafka-pipeline.yaml) This example shows reading and writing to cafe. @@ -278,7 +292,7 @@ This example shows reading and writing to cafe. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-kafka-pipeline.yaml ``` -### [Parallel](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-parallel-pipeline.yaml) +### [parallel](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-parallel-pipeline.yaml) This example uses parallel to 2x the amount of data it processes. @@ -287,7 +301,7 @@ This example uses parallel to 2x the amount of data it processes. kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-parallel-pipeline.yaml ``` -### [Two sinks](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-two-sinks-pipeline.yaml) +### [two-sinks](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-two-sinks-pipeline.yaml) This example has two sinks @@ -296,7 +310,7 @@ This example has two sinks kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-two-sinks-pipeline.yaml ``` -### [Two sources](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-two-sources-pipeline.yaml) +### [two-sources](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-two-sources-pipeline.yaml) This example has two sources @@ -305,7 +319,7 @@ This example has two sources kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-two-sources-pipeline.yaml ``` -### [Default Kafka config](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/dataflow-kafka-default-secret.yaml) +### [dataflow-kafka-default](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/dataflow-kafka-default-secret.yaml) This is an example of providing a namespace named Kafka configuration. @@ -325,7 +339,7 @@ version: "2.0.0" kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/dataflow-kafka-default-secret.yaml ``` -### [Default NATS Streaming (STAN) configuration](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/dataflow-stan-default-secret.yaml) +### [dataflow-stan-default](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/dataflow-stan-default-secret.yaml) This is an example of providing a namespace named NATS Streaming configuration. diff --git a/dsls/python/__init__.py b/dsls/python/__init__.py index c9e8f6b8..109daa3c 100644 --- a/dsls/python/__init__.py +++ b/dsls/python/__init__.py @@ -1,6 +1,17 @@ +import inspect +import sys import yaml +def str_presenter(dumper, data): + if len(data.splitlines()) > 1 or '"' in data or "'" in data: + return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') + return dumper.represent_scalar('tag:yaml.org,2002:str', data) + + +yaml.add_representer(str, str_presenter) + + class PipelineBuilder(): def __init__(self, name): self.name = name @@ -11,6 +22,9 @@ def annotate(self, name, value): self.annotations[name] = value return self + def describe(self, value): + return self.annotate('dataflow.argoproj.io/description', value) + def step(self, step): self.steps.append(step) return self @@ -24,12 +38,12 @@ def build(self): 'annotations': self.annotations }, 'spec': { - 'steps': [x.build(i) for i, x in enumerate(self.steps)] + 'steps': [x.build() for x in self.steps] } } def dump(self): - print(yaml.dump(self.build())) + sys.stdout.write((yaml.dump(self.build()))) def pipeline(name): @@ -41,8 +55,25 @@ def build(self): return {'log': {}} -class CatStep(): - def __init__(self, sources): +class KafkaSink: + def __init__(self, subject): + self.subject = subject + + def build(self): + return {'kafka': {'topic': self.subject}} + + +class STANSink: + def __init__(self, topic): + self.topic = topic + + def build(self): + return {'stan': {'subject': self.topic}} + + +class Step: + def __init__(self, name, sources): + self.name = name self.sources = sources self.sinks = [] @@ -50,29 +81,143 @@ def log(self): self.sinks.append(LogSink()) return self - def build(self, i): + def kafka(self, subject): + self.sinks.append(KafkaSink(subject)) + return self + + def stan(self, topic): + self.sinks.append(STANSink(topic)) + return self + + def build(self): return { - 'name': 's{i}'.format(i=i), + 'name': self.name, 'sources': [x.build() for x in self.sources], 'sinks': [x.build() for x in self.sinks], - 'cat': {} } -class CronSource: - def __init__(self, schedule): - self.schedule = schedule +class CatStep(Step): + def __init__(self, name, sources): + super().__init__(name, sources) + + def build(self): + x = super().build() + x['cat'] = {} + return x + - def cat(self): - return CatStep([self]) +class ExpandStep(Step): + def __init__(self, name, sources): + super().__init__(name, sources) def build(self): - return { - 'cron': { - 'schedule': self.schedule - } + x = super().build() + x['expand'] = {} + return x + + +class FilterStep(Step): + def __init__(self, name, sources, filter): + super().__init__(name, sources) + self.filter = filter + + def build(self): + x = super().build() + x['filter'] = self.filter + return x + + +class FlattenStep(Step): + def __init__(self, name, sources): + super().__init__(name, sources) + + def build(self): + x = super().build() + x['flatten'] = {} + return x + + +class HandlerStep(Step): + def __init__(self, name, sources, handler): + super().__init__(name, sources) + self.handler = handler + + def build(self): + x = super().build() + x['handler'] = { + 'runtime': 'python3-9', + 'code': inspect.getsource(self.handler) } + return x + + +class MapStep(Step): + def __init__(self, name, sources, map): + super().__init__(name, sources) + self.map = map + + def build(self): + x = super().build() + x['map'] = self.map + return x + + +class Source: + def cat(self, name): + return CatStep(name, [self]) + + def expand(self, name): + return ExpandStep(name, [self]) + + def filter(self, name, filter): + return FilterStep(name, [self], filter) + + def flatten(self, name): + return FlattenStep(name, [self]) + + def handler(self, name, handler): + return HandlerStep(name, [self], handler) + + def map(self, name, map): + return MapStep(name, [self], map) + + +class CronSource(Source): + def __init__(self, schedule, layout): + self.schedule = schedule + self.layout = layout + + def build(self): + x = {'schedule': self.schedule} + if self.layout: + x['layout'] = self.layout + return {'cron': x} + + +class KafkaSource(Source): + def __init__(self, topic): + self.topic = topic + + def build(self): + return {'kafka': {'topic': self.topic}} + + +class STANSource(Source): + def __init__(self, subject): + self.subject = subject + + def build(self): + return {'stan': {'subject': self.subject}} + + +def cron(schedule, layout=''): + return CronSource(schedule, layout) + + +def kafka(topic): + return KafkaSource(topic) -def cron(schedule): - return CronSource(schedule) +def stan(subject): + return STANSource(subject) diff --git a/examples/101-hello-pipeline.py b/examples/101-hello-pipeline.py index e820dcb9..b83d3f16 100644 --- a/examples/101-hello-pipeline.py +++ b/examples/101-hello-pipeline.py @@ -1,13 +1,14 @@ from dsls.python import cron, pipeline if __name__ == "__main__": - pipeline("hello") \ - .annotate('dataflow.argoproj.io/description', """This is the hello world of pipelines. + (pipeline("hello") + .describe("""This is the hello world of pipelines. -It uses a cron schedule as a source and then just cat the message to a log""") \ - .annotate('dataflow.argoproj.io/test', "true") \ - .step( - cron('*/3 * * * * *') - .cat() - .log()) \ - .dump() +It uses a cron schedule as a source and then just cat the message to a log""") + .annotate('dataflow.argoproj.io/test', "true") + .step( + (cron('*/3 * * * * *') + .cat('main') + .log()) + ) + .dump()) diff --git a/examples/101-hello-pipeline.yaml b/examples/101-hello-pipeline.yaml index f533f0a0..779d3dac 100644 --- a/examples/101-hello-pipeline.yaml +++ b/examples/101-hello-pipeline.yaml @@ -2,19 +2,18 @@ apiVersion: dataflow.argoproj.io/v1alpha1 kind: Pipeline metadata: annotations: - dataflow.argoproj.io/description: 'This is the hello world of pipelines. + dataflow.argoproj.io/description: |- + This is the hello world of pipelines. - - It uses a cron schedule as a source and then just cat the message to a log' + It uses a cron schedule as a source and then just cat the message to a log dataflow.argoproj.io/test: 'true' name: hello spec: steps: - cat: {} - name: s0 + name: main sinks: - log: {} sources: - cron: schedule: '*/3 * * * * *' - diff --git a/examples/101-two-node-pipeline.py b/examples/101-two-node-pipeline.py new file mode 100644 index 00000000..d7124a6a --- /dev/null +++ b/examples/101-two-node-pipeline.py @@ -0,0 +1,18 @@ +from dsls.python import kafka, pipeline, stan + +if __name__ == "__main__": + (pipeline("two-node") + .describe("""This example shows a example of having two nodes in a pipeline. + +While they read from Kafka, they are connected by a NATS Streaming subject.""") + .step( + (kafka('input-topic') + .cat('main') + .stan('a-b')) + ) + .step( + (stan('a-b') + .cat('main') + .kafka('output-topic')) + ) + .dump()) diff --git a/examples/101-two-node-pipeline.yaml b/examples/101-two-node-pipeline.yaml index 118934dc..31705e29 100644 --- a/examples/101-two-node-pipeline.yaml +++ b/examples/101-two-node-pipeline.yaml @@ -2,16 +2,15 @@ apiVersion: dataflow.argoproj.io/v1alpha1 kind: Pipeline metadata: annotations: - dataflow.argoproj.io/description: | + dataflow.argoproj.io/description: |- This example shows a example of having two nodes in a pipeline. While they read from Kafka, they are connected by a NATS Streaming subject. - creationTimestamp: null name: two-node spec: steps: - cat: {} - name: a + name: main sinks: - stan: subject: a-b @@ -19,7 +18,7 @@ spec: - kafka: topic: input-topic - cat: {} - name: b + name: main sinks: - kafka: topic: output-topic diff --git a/examples/102-filter-pipeline.py b/examples/102-filter-pipeline.py new file mode 100644 index 00000000..ed70cf21 --- /dev/null +++ b/examples/102-filter-pipeline.py @@ -0,0 +1,17 @@ +from dsls.python import kafka, pipeline + +if __name__ == "__main__": + (pipeline("two-node") + .describe("""This is an example of built-in filtering. + +Filters are written using expression syntax and must return a boolean. + +They have a single variable, `msg`, which is a byte array. + +[Learn about expressions](../docs/EXPRESSIONS.md)""") + .step( + kafka('input-topic') + .filter('main', 'string(msg) contains "capybara"') + .kafka('output-topic') + ) + .dump()) diff --git a/examples/102-filter-pipeline.yaml b/examples/102-filter-pipeline.yaml index b2e4bb94..9db301e6 100644 --- a/examples/102-filter-pipeline.yaml +++ b/examples/102-filter-pipeline.yaml @@ -2,7 +2,7 @@ apiVersion: dataflow.argoproj.io/v1alpha1 kind: Pipeline metadata: annotations: - dataflow.argoproj.io/description: | + dataflow.argoproj.io/description: |- This is an example of built-in filtering. Filters are written using expression syntax and must return a boolean. @@ -10,11 +10,11 @@ metadata: They have a single variable, `msg`, which is a byte array. [Learn about expressions](../docs/EXPRESSIONS.md) - creationTimestamp: null - name: filter + name: two-node spec: steps: - - filter: string(msg) contains "capybara" + - filter: |- + string(msg) contains "capybara" name: main sinks: - kafka: diff --git a/examples/102-flatten-expand-pipeline.py b/examples/102-flatten-expand-pipeline.py new file mode 100644 index 00000000..b6a39762 --- /dev/null +++ b/examples/102-flatten-expand-pipeline.py @@ -0,0 +1,20 @@ +from dsls.python import cron, pipeline, stan + +if __name__ == "__main__": + (pipeline("flatten-expand") + .describe("""This is an example of built-in flattening and expanding.""") + .step( + cron('*/3 * * * * *') + .map('generate', """bytes('{"foo": {"bar": "' + string(msg) + '"}}')""") + .stan('data')) + .step( + stan('data') + .flatten('flatten') + .stan('flattened') + ) + .step( + stan('flattened') + .expand('expand') + .log() + ) + .dump()) diff --git a/examples/102-flatten-expand-pipeline.yaml b/examples/102-flatten-expand-pipeline.yaml index 1da6f6eb..1f80d853 100644 --- a/examples/102-flatten-expand-pipeline.yaml +++ b/examples/102-flatten-expand-pipeline.yaml @@ -2,13 +2,12 @@ apiVersion: dataflow.argoproj.io/v1alpha1 kind: Pipeline metadata: annotations: - dataflow.argoproj.io/description: | - This is an example of built-in flattening and expanding. - creationTimestamp: null + dataflow.argoproj.io/description: This is an example of built-in flattening and + expanding. name: flatten-expand spec: steps: - - map: | + - map: |- bytes('{"foo": {"bar": "' + string(msg) + '"}}') name: generate sinks: @@ -16,7 +15,6 @@ spec: subject: data sources: - cron: - layout: "15:04:05" schedule: '*/3 * * * * *' - flatten: {} name: flatten diff --git a/examples/102-map-pipeline.py b/examples/102-map-pipeline.py new file mode 100644 index 00000000..ecb1b351 --- /dev/null +++ b/examples/102-map-pipeline.py @@ -0,0 +1,17 @@ +from dsls.python import pipeline, kafka + +if __name__ == "__main__": + (pipeline("map") + .describe("""This is an example of built-in mapping. + +Maps are written using expression syntax and must return a byte array. + +They have a single variable, `msg`, which is a byte array. + +[Learn about expressions](../docs/EXPRESSIONS.md)""") + .step( + kafka('input-topic') + .map('main', "bytes('hi ' + string(msg))") + .kafka('output-topic') + ) + .dump()) diff --git a/examples/102-map-pipeline.yaml b/examples/102-map-pipeline.yaml index d264a124..ee9b82d2 100644 --- a/examples/102-map-pipeline.yaml +++ b/examples/102-map-pipeline.yaml @@ -2,7 +2,7 @@ apiVersion: dataflow.argoproj.io/v1alpha1 kind: Pipeline metadata: annotations: - dataflow.argoproj.io/description: | + dataflow.argoproj.io/description: |- This is an example of built-in mapping. Maps are written using expression syntax and must return a byte array. @@ -10,11 +10,11 @@ metadata: They have a single variable, `msg`, which is a byte array. [Learn about expressions](../docs/EXPRESSIONS.md) - creationTimestamp: null name: map spec: steps: - - map: bytes('hi ' + string(msg)) + - map: |- + bytes('hi ' + string(msg)) name: main sinks: - kafka: diff --git a/examples/104-python-pipeline.py b/examples/104-python-pipeline.py new file mode 100644 index 00000000..f1463368 --- /dev/null +++ b/examples/104-python-pipeline.py @@ -0,0 +1,19 @@ +from dsls.python import pipeline, kafka + + +def handler(msg): + return msg + + +if __name__ == "__main__": + (pipeline("python") + .describe("""This example is of the Python 3.9 handler. + +[Learn about handlers](../docs/HANDLERS.md)""") + .annotate('dataflow.argoproj.io/timeout', '2m') + .step( + (kafka('input-topic') + .handler('main', handler) + .kafka('output-topic') + )) + .dump()) diff --git a/examples/104-python-pipeline.yaml b/examples/104-python-pipeline.yaml index 1a486056..18c3501f 100644 --- a/examples/104-python-pipeline.yaml +++ b/examples/104-python-pipeline.yaml @@ -2,13 +2,12 @@ apiVersion: dataflow.argoproj.io/v1alpha1 kind: Pipeline metadata: annotations: - dataflow.argoproj.io/description: | + dataflow.argoproj.io/description: |- This example is of the Python 3.9 handler. [Learn about handlers](../docs/HANDLERS.md) dataflow.argoproj.io/timeout: 2m - creationTimestamp: null - name: python3-9 + name: python spec: steps: - handler: