Skip to content

Commit d7070f8

Browse files
[Issue-571] Add Python datastream API support (#576)
Signed-off-by: thekingofcity <[email protected]>
1 parent dd7a3f0 commit d7070f8

File tree

7 files changed

+566
-5
lines changed

7 files changed

+566
-5
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@ log/
1616
.metadata/
1717
.recommenders/
1818
*.log
19+
*.pyc
20+
__pycache__/

documentation/src/docs/index.md

+7-5
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,24 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
See the License for the specific language governing permissions and
1414
limitations under the License.
1515
-->
16-
## Apache Flink Connectors for Pravega
16+
17+
# Apache Flink Connectors for Pravega
1718

1819
This documentation describes the connectors API and it's usage to read and write [Pravega](http://pravega.io/) streams with [Apache Flink](http://flink.apache.org/) stream processing framework.
1920

20-
Build end-to-end stream processing pipelines that use Pravega as the stream storage and message bus, and Apache Flink for computation over the streams. See the [Pravega Concepts](http://pravega.io/docs/pravega-concepts/) page for more information.
21+
Build end-to-end stream processing pipelines that use Pravega as the stream storage and message bus, and Apache Flink for computation over the streams. See the [Pravega Concepts](http://pravega.io/docs/pravega-concepts/) page for more information.
2122

2223
## Table of Contents
2324

2425
- [Getting Started](getting-started.md)
2526
- [Quick Start](quickstart.md)
2627
- [Dev Guide](dev-guide.md)
2728
- Features
28-
- [Streaming](streaming.md)
29-
- [Batch](batch.md)
30-
- [Table API/SQL](table-api.md)
29+
- [Streaming](streaming.md)
30+
- [Batch](batch.md)
31+
- [Table API/SQL](table-api.md)
3132
- [Catalog](catalog.md)
33+
- [Python Datastream API](python.md)
3234
- [Metrics](metrics.md)
3335
- [Configurations](configurations.md)
3436
- [Serialization](serialization.md)

documentation/src/docs/python.md

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Pravega Python DataStream connector
2+
3+
This Pravega Python DataStream connector provides a data source and data sink for Flink streaming jobs.
4+
5+
Your Flink streaming jobs could use Pravega as their storage with these [Python API Wrappers](https://github.com/pravega/flink-connectors/tree/master/src/main/python). This page only describes the API usage and for parameter concepts please refer to [Configurations](configurations.md) and [Streaming](streaming.md)
6+
7+
**DISCLAIMER: This python wrapper is an IMPLEMENTATION REFERENCE and is not officially published.**
8+
9+
* [How to use](#How-to-use)
10+
* [PravegaConfig](#PravegaConfig)
11+
* [StreamCut](#StreamCut)
12+
* [FlinkPravegaReader](#FlinkPravegaReader)
13+
* [FlinkPravegaWriter](#FlinkPravegaWriter)
14+
* [Metrics](#Metrics)
15+
* [Serialization](#Serialization)
16+
17+
## How to use
18+
19+
Together with the connector jar and python wrapper files, you could submit your job with main compute code like this:
20+
21+
```bash
22+
flink run --python ./application.py --pyFiles <connector-repo>/src/main/python/ --jarfile /path/to/pravega-connectors-flink.jar
23+
```
24+
25+
## PravegaConfig
26+
27+
A top-level config object, `PravegaConfig`, is provided to establish a Pravega context for the Flink connector.
28+
29+
```python
30+
from pravega_config import PravegaConfig
31+
32+
pravega_config = PravegaConfig(uri=uri, scope=scope)
33+
```
34+
35+
|parameter|type|required|default value|description|
36+
|-|-|-|-|-|
37+
|uri|str|Yes|N/A|The Pravega controller RPC URI.|
38+
|scope|str|Yes|N/A|The self-defined Pravega scope.|
39+
|trust_store|str|No|None|The truststore value.|
40+
|default_scope|str|No|None|The default Pravega scope, to resolve unqualified stream names and to support reader groups.|
41+
|credentials|DefaultCredentials|No|None|The Pravega credentials to use.|
42+
|validate_hostname|bool|No|True|TLS hostname validation.|
43+
44+
## StreamCut
45+
46+
A `StreamCut` object could be constructed from the `from_base64` class method where a base64 str is passed as the only parameter.
47+
48+
By default, the `FlinkPravegaReader` will pass the `UNBOUNDED` `StreamCut` which let the reader read from the HEAD to the TAIL.
49+
50+
## FlinkPravegaReader
51+
52+
Use `FlinkPravegaReader` as a datastream source. Could be added by `env.add_source`.
53+
54+
```python
55+
from pyflink.common.serialization import SimpleStringSchema
56+
from pyflink.datastream import StreamExecutionEnvironment
57+
58+
from pravega_config import PravegaConfig
59+
from pravega_reader import FlinkPravegaReader
60+
61+
env = StreamExecutionEnvironment.get_execution_environment()
62+
63+
pravega_config = PravegaConfig(uri=uri, scope=scope)
64+
pravega_reader = FlinkPravegaReader(
65+
stream=stream,
66+
pravega_config=pravega_config,
67+
deserialization_schema=SimpleStringSchema())
68+
69+
ds = env.add_source(pravega_reader)
70+
```
71+
72+
|parameter|type|required|default value|description|
73+
|-|-|-|-|-|
74+
|stream|Union[str, Stream]|Yes|N/A|The stream to be read from.|
75+
|pravega_config|PravegaConfig|Yes|N/A|Set the Pravega client configuration, which includes connection info, security info, and a default scope.|
76+
|deserialization_schema|DeserializationSchema|Yes|N/A|The deserialization schema which describes how to turn byte messages into events.|
77+
|start_stream_cut|StreamCut|No|StreamCut.UNBOUNDED|Read from the given start position in the stream.|
78+
|end_stream_cut|StreamCut|No|StreamCut.UNBOUNDED|Read to the given end position in the stream.|
79+
|enable_metrics|bool|No|True|Pravega reader metrics.|
80+
|uid|str|No|None(random generated uid on java side)|The uid to identify the checkpoint state of this source.|
81+
|reader_group_scope|str|No|pravega_config.default_scope|The scope to store the Reader Group synchronization stream into.|
82+
|reader_group_name|str|No|None(auto-generated name on java side)|The Reader Group name for display purposes.|
83+
|reader_group_refresh_time|timedelta|No|None(3 seconds on java side)|The interval for synchronizing the Reader Group state across parallel source instances.|
84+
|checkpoint_initiate_timeout|timedelta|No|None(5 seconds on java side)|The timeout for executing a checkpoint of the Reader Group state.|
85+
|event_read_timeout|timedelta|No|None(1 second on java side)|Sets the timeout for the call to read events from Pravega. After the timeout expires (without an event being returned), another call will be made.|
86+
|max_outstanding_checkpoint_request|int|No|None(3 on java side)|Configures the maximum outstanding checkpoint requests to Pravega.|
87+
88+
## FlinkPravegaWriter
89+
90+
Use `FlinkPravegaWriter` as a datastream sink. Could be added by `env.add_sink`.
91+
92+
```python
93+
from pyflink.common.serialization import SimpleStringSchema
94+
from pyflink.datastream import StreamExecutionEnvironment
95+
96+
from pravega_config import PravegaConfig
97+
from pravega_writer import FlinkPravegaWriter
98+
99+
env = StreamExecutionEnvironment.get_execution_environment()
100+
101+
pravega_config = PravegaConfig(uri=uri, scope=scope)
102+
pravega_writer = FlinkPravegaWriter(stream=stream,
103+
pravega_config=pravega_config,
104+
serialization_schema=SimpleStringSchema())
105+
106+
ds = env.add_sink(pravega_reader)
107+
```
108+
109+
|parameter|type|required|default value|description|
110+
|-|-|-|-|-|
111+
|stream|Union[str, Stream]|Yes|N/A|Add a stream to be read by the source, from the earliest available position in the stream.|
112+
|pravega_config|PravegaConfig|Yes|N/A|Set the Pravega client configuration, which includes connection info, security info, and a default scope.|
113+
|serialization_schema|SerializationSchema|Yes|N/A|The serialization schema which describes how to turn events into byte messages.|
114+
|enable_metrics|bool|No|True|Pravega writer metrics.|
115+
|writer_mode|PravegaWriterMode|No|PravegaWriterMode.ATLEAST_ONCE|The writer mode to provide *Best-effort*, *At-least-once*, or *Exactly-once* guarantees.|
116+
|enable_watermark|bool|No|False|Emit Flink watermark in event-time semantics to Pravega streams.|
117+
|txn_lease_renewal_period|timedelta|No|None(30 seconds on java side)|Report Pravega metrics.|
118+
119+
## Metrics
120+
121+
Metrics are reported by default unless it is explicitly disabled using enable_metrics(False) option. See [Metrics](metrics.md) page for more details on type of metrics that are reported.
122+
123+
## Serialization
124+
125+
See the [Data Types](https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/python/datastream/data_types/) page of PyFlink for more information.

documentation/src/mkdocs.yml

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ nav:
4242
- 'Batch': 'batch.md'
4343
- 'Table API': 'table-api.md'
4444
- 'Catalog': 'catalog.md'
45+
- 'Python Datastream API': 'python.md'
4546
- Metrics: 'metrics.md'
4647
- Configurations: 'configurations.md'
4748
- Serialization: 'serialization.md'

src/main/python/pravega_config.py

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""
2+
Copyright Pravega Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
"""
16+
17+
from typing import Optional
18+
19+
from py4j.java_gateway import JavaObject
20+
from pyflink.java_gateway import get_gateway
21+
22+
23+
class Stream():
24+
"""A stream can be thought of as an unbounded sequence of events."""
25+
26+
_j_stream: JavaObject
27+
28+
def __init__(self, scope: str, stream: str) -> None:
29+
"""Helper utility to create a Stream object.
30+
31+
Args:
32+
scope (str): Scope of the stream.
33+
stream (str): Name of the stream.
34+
"""
35+
self._j_stream = get_gateway().jvm.io.pravega.client.stream.Stream.of(
36+
scope, stream)
37+
38+
39+
class DefaultCredentials():
40+
"""Pravega authentication credential"""
41+
42+
_j_default_credentials: JavaObject
43+
44+
def __init__(self, username: str, password: str) -> None:
45+
"""Creates a new object containing a token that is valid for
46+
Basic authentication scheme. The object encapsulates a token
47+
value comprising of a Base64 encoded value of "<username>:<password>".
48+
49+
Args:
50+
username (str): The username.
51+
password (str): The password.
52+
"""
53+
self._j_default_credentials = get_gateway(
54+
).jvm.io.pravega.shared.security.auth.DefaultCredentials(
55+
username, password)
56+
57+
58+
class PravegaConfig():
59+
"""The Pravega client configuration."""
60+
61+
_j_pravega_config: JavaObject
62+
63+
def __init__(self,
64+
uri: str,
65+
scope: str,
66+
schema_registry_uri: Optional[str] = None,
67+
trust_store: Optional[str] = None,
68+
default_scope: Optional[str] = None,
69+
credentials: Optional[DefaultCredentials] = None,
70+
validate_hostname: bool = True) -> None:
71+
"""Create a pravega client configuration.
72+
73+
Args:
74+
uri (str):
75+
The Pravega controller RPC URI.
76+
scope (str):
77+
The self-defined Pravega scope.
78+
schema_registry_uri (Optional[str], optional):
79+
The Pravega schema registry URI. Defaults to None.
80+
trust_store (Optional[str], optional):
81+
The truststore value. Defaults to None.
82+
default_scope (Optional[str], optional):
83+
The default Pravega scope, to resolve unqualified stream names
84+
and to support reader groups. Defaults to None.
85+
credentials (Optional[DefaultCredentials], optional):
86+
The Pravega credentials to use. Defaults to None.
87+
validate_hostname (bool, optional):
88+
TLS hostname validation. Defaults to True.
89+
"""
90+
j_uri = get_gateway().jvm.java.net.URI.create(uri)
91+
self._j_pravega_config = get_gateway().jvm \
92+
.io.pravega.connectors.flink \
93+
.PravegaConfig.fromDefaults() \
94+
.withControllerURI(j_uri) \
95+
.withDefaultScope(scope) \
96+
.withHostnameValidation(validate_hostname)
97+
98+
if schema_registry_uri:
99+
j_schema_registry_uri = get_gateway().jvm.java.net.URI.create(
100+
schema_registry_uri)
101+
self._j_pravega_config.withSchemaRegistryURI(j_schema_registry_uri)
102+
if trust_store: self._j_pravega_config.withTrustStore(trust_store)
103+
if default_scope:
104+
self._j_pravega_config.withDefaultScope(default_scope)
105+
if credentials:
106+
self._j_pravega_config.withCredentials(
107+
credentials._j_default_credentials)

0 commit comments

Comments
 (0)