Skip to content

Commit 641dc54

Browse files
committed
added example with parquet
1 parent 2f87980 commit 641dc54

File tree

4 files changed

+108
-3
lines changed

4 files changed

+108
-3
lines changed

README.adoc

+63-3
Original file line numberDiff line numberDiff line change
@@ -2074,8 +2074,9 @@ A S3 sink connector will be created with this link:kafka-connect-sink-s3/config/
20742074
}
20752075
----
20762076

2077-
Sink will read messages from topic _gaming-player-activity_ and store in S3 bucket _gaming-player-activity-bucket_.
2078-
Sink will generate a new object storage entry every 100 messages (_flush_size_).
2077+
Sink connector will read messages from topic _gaming-player-activity_ and store in S3 bucket _gaming-player-activity-bucket_ using _io.confluent.connect.s3.format.avro.AvroFormat_ as format class.
2078+
2079+
Sink connector will generate a new object storage entry every 100 messages (_flush_size_).
20792080

20802081
To generate random records for topic _gaming-player-activity_ we will use link:https://github.com/ugol/jr[jr] tool.
20812082

@@ -2086,7 +2087,7 @@ Send 1000 messages to _gaming-player-activity_ topic using jr:
20862087
docker exec -it -w /home/jr/.jr jr jr template run gaming_player_activity -n 1000 -o kafka -t gaming-player-activity -s --serializer avro-generic
20872088
----
20882089

2089-
Verify that 10 entries are stored in Minio into _gaming-player-activity-bucket_ bucket, connecting to MiniIO web console, http://localhost:9000 (admin/minioadmin):
2090+
Verify that 10 entries are stored in MinIO into _gaming-player-activity-bucket_ bucket, connecting to MiniIO web console, http://localhost:9000 (admin/minioadmin):
20902091

20912092
image::images/minio.png[gaming-player-activity-bucket]
20922093

@@ -2097,6 +2098,65 @@ Teardown:
20972098
scripts/tear-down-connect-sink-s3.sh
20982099
----
20992100

2101+
==== Parquet format
2102+
2103+
Same example but Sink connector will read Avro messages from topic _gaming-player-activity_ and store them in S3 bucket _gaming-player-activity-bucket_ using _io.confluent.connect.s3.format.parquet.ParquetFormat_ as format class.
2104+
2105+
The format of data stored in MinIO will be Parquet.
2106+
2107+
Run the example:
2108+
2109+
[source,bash]
2110+
----
2111+
scripts/bootstrap-connect-sink-s3-parquet.sh
2112+
----
2113+
2114+
A S3 sink connector will be created with this link:kafka-connect-sink-s3/config/s3_parquet_sink.json[config]:
2115+
2116+
[source,bash]
2117+
----
2118+
{
2119+
"name": "sink-parquet-s3",
2120+
"config":
2121+
{
2122+
"topics": "gaming-player-activity",
2123+
"tasks.max": "1",
2124+
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
2125+
"store.url": "http://minio:9000",
2126+
"s3.region": "us-west-2",
2127+
"s3.bucket.name": "gaming-player-activity-bucket",
2128+
"s3.part.size": "5242880",
2129+
"flush.size": "100",
2130+
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
2131+
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
2132+
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
2133+
"parquet.codec": "snappy",
2134+
"schema.registry.url": "http://schema-registry:8081",
2135+
"value.converter": "io.confluent.connect.avro.AvroConverter",
2136+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
2137+
"value.converter.schema.registry.url": "http://schema-registry:8081"
2138+
}
2139+
}
2140+
----
2141+
2142+
Send 1000 messages to _gaming-player-activity_ topic using jr:
2143+
2144+
[source,bash]
2145+
----
2146+
docker exec -it -w /home/jr/.jr jr jr template run gaming_player_activity -n 1000 -o kafka -t gaming-player-activity -s --serializer avro-generic
2147+
----
2148+
2149+
Verify that 10 entries are stored in MinIO into _gaming-player-activity-bucket_ bucket, connecting to MiniIO web console, http://localhost:9000 (admin/minioadmin):
2150+
2151+
image::images/minio2.png[gaming-player-activity-bucket]
2152+
2153+
Teardown:
2154+
2155+
[source,bash]
2156+
----
2157+
scripts/tear-down-connect-sink-s3.sh
2158+
----
2159+
21002160
=== SAP HANA Source Connector example
21012161

21022162
Folder: link:kafka-connect-source-sap-hana/[kafka-connect-source-sap-hana]

images/minio2.png

118 KB
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"name": "sink-parquet-s3",
3+
"config":
4+
{
5+
"topics": "gaming-player-activity",
6+
"tasks.max": "1",
7+
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
8+
"store.url": "http://minio:9000",
9+
"s3.region": "us-west-2",
10+
"s3.bucket.name": "gaming-player-activity-bucket",
11+
"s3.part.size": "5242880",
12+
"flush.size": "100",
13+
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
14+
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
15+
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
16+
"parquet.codec": "snappy",
17+
"schema.registry.url": "http://schema-registry:8081",
18+
"value.converter": "io.confluent.connect.avro.AvroConverter",
19+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
20+
"value.converter.schema.registry.url": "http://schema-registry:8081"
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/bin/bash
2+
3+
sh scripts/tear-down-connect-sink-s3.sh
4+
5+
echo "Starting Kafka cluster..."
6+
docker-compose -f kafka-connect-sink-s3/docker-compose.yml --env-file .env up -d
7+
8+
echo "Wait 20 seconds..."
9+
10+
sleep 20
11+
12+
echo "Create gaming-player-activity..."
13+
docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic gaming-player-activity --replication-factor 1 --partitions 1
14+
15+
echo "Installing s3 parquet format sink..."
16+
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @kafka-connect-sink-s3/config/s3_parquet_sink.json
17+
18+
echo "Wait 3 seconds..."
19+
20+
sleep 3
21+
22+
echo "connectors status..."
23+
curl -v http://localhost:8083/connectors?expand=status

0 commit comments

Comments
 (0)