Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 63 additions & 24 deletions hudi-kafka-connect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The first thing you need to do to start using this connector is building it. In
- [Java 1.8+](https://openjdk.java.net/)
- [Apache Maven](https://maven.apache.org/)
- Install [kcat](https://github.com/edenhill/kcat)
= Install jq. `brew install jq`
- Install jq. `brew install jq`


## Trying the connector
Expand All @@ -42,23 +42,28 @@ Either use confluent-hub to install the connector or download it from [here](htt
Copy the entire folder to the classpath that will be used by the Hudi Kafka Connector.

```bash
confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0
cp confluentinc-kafka-connect-hdfs-10.1.0/* /usr/local/share/kafka/plugins/
export CONFLUENT_DIR=/path/to/confluent_install_dir
mkdir -p /usr/local/share/kafka/plugins
$CONFLUENT_DIR/bin/confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0
cp -r $CONFLUENT_DIR/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/* /usr/local/share/kafka/plugins/
```

Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it
to the plugin path that contains all the other jars (`/usr/local/share/kafka/plugins/lib`)

```bash
cd ${HUDI_DIR}
cd $HUDI_DIR
mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib
mkdir -p /usr/local/share/kafka/plugins/lib
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib
```

Set up a Kafka broker locally. Download the latest apache kafka from [here](https://kafka.apache.org/downloads).
Once downloaded and built, run the Zookeeper server and Kafka server using the command line tools.
```bash
export KAFKA_HOME=/path/to/kafka_install_dir
cd $KAFKA_HOME
# Run the following commands in separate terminals to keep them running
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
```
Expand All @@ -71,11 +76,13 @@ registries, we use Confluent schema registry. Download the
latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema
registry service.

NOTE: You must change the port from `8081` (default) to `8082` to avoid conflict.
NOTE: You must change the port from `8081` (default) to `8082` to avoid conflict, i.e.,
using `listeners=http://0.0.0.0:8082` in the properties file `etc/schema-registry/schema-registry.properties`.

```bash
cd $CONFLUENT_DIR
/bin/kafka-configs --zookeeper localhost --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact
./bin/kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact
# Make sure you have changed the listener port as above
./bin/schema-registry-start etc/schema-registry/schema-registry.properties
```

Expand All @@ -85,6 +92,8 @@ The control topic should only have `1` partition, since its used to coordinate t

```bash
cd $KAFKA_HOME
# The following command is expected to throw an error if the control topic does not exist.
# "Error while executing topic command : Topic 'hudi-control-topic' does not exist as expected"
./bin/kafka-topics.sh --delete --topic hudi-control-topic --bootstrap-server localhost:9092
./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
```
Expand Down Expand Up @@ -115,8 +124,8 @@ Note that if multiple workers need to be run, the webserver needs to be reconfig
successful running of the workers.

```bash
cd ${KAFKA_HOME}
./bin/connect-distributed.sh ${HUDI_DIR}/hudi-kafka-connect/demo/connect-distributed.properties
cd $KAFKA_HOME
./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
```

### 6 - To add the Hudi Sink to the Connector (delete it if you want to re-configure)
Expand All @@ -133,21 +142,54 @@ curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect
Now, you should see that the connector is created and tasks are running.

```bash
mkdir /tmp/hoodie/
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors
> curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors
["hudi-sink"]
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq

> curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 329 100 329 0 0 21096 0 --:--:-- --:--:-- --:--:-- 36555
{
"name": "hudi-sink",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
{
"id": 3,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
}
],
"type": "sink"
}
```

And, you should see your Hudi table created, which you can query using Spark/Flink.
Note: HUDI-2325 tracks Hive sync, which will unlock pretty much every other query engine.
And, you should see your Hudi table created, which you can query using Spark/Flink. Note: HUDI-2325 tracks Hive sync,
which will unlock pretty much every other query engine.

```bash
ls -a /tmp/hoodie/hudi-test-topic
> ls -a /tmp/hoodie/hudi-test-topic
. .hoodie partition_1 partition_3
.. partition_0 partition_2 partition_4

ls -lt /tmp/hoodie/hudi-test-topic/.hoodie
> ls -lt /tmp/hoodie/hudi-test-topic/.hoodie
total 72
-rw-r--r-- 1 user wheel 346 Sep 14 10:32 hoodie.properties
-rw-r--r-- 1 user wheel 0 Sep 13 23:18 20210913231805.inflight
Expand All @@ -160,7 +202,7 @@ total 72
-rw-r--r-- 1 user wheel 0 Sep 13 21:41 20210913214114.commit.requested
drwxr-xr-x 2 user wheel 64 Sep 13 21:41 archived

ls -l /tmp/hoodie/hudi-test-topic/partition_0
> ls -l /tmp/hoodie/hudi-test-topic/partition_0
total 5168
-rw-r--r-- 1 user wheel 439332 Sep 13 21:43 2E0E6DB44ACC8479059574A2C71C7A7E-0_0-0-0_20210913214114.parquet
-rw-r--r-- 1 user wheel 440179 Sep 13 21:42 3B56FAAAE2BDD04E480C1CBACD463D3E-0_0-0-0_20210913214114.parquet
Expand All @@ -185,7 +227,7 @@ After the compaction is scheduled, you can see the requested compaction instant
below:

```
ls -l /tmp/hoodie/hudi-test-topic/.hoodie
> ls -l /tmp/hoodie/hudi-test-topic/.hoodie
total 280
-rw-r--r-- 1 user wheel 21172 Nov 11 11:09 20211111110807.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:08 20211111110807.deltacommit.inflight
Expand Down Expand Up @@ -245,7 +287,7 @@ Similarly, you can see the requested clustering instant (`20211111111813.replace
by the Sink:

```
ls -l /tmp/hoodie/hudi-test-topic/.hoodie
> ls -l /tmp/hoodie/hudi-test-topic/.hoodie
total 736
-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.inflight
Expand Down Expand Up @@ -300,7 +342,7 @@ hoodie.write.concurrency.mode=single_writer
Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled
clustering is going to be executed.

### 8- Querying via Hive
### 8 - Querying via Hive

In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore,
that enable querying via Hive, Presto etc.
Expand All @@ -326,7 +368,7 @@ Essentially, follow the steps listed here:

Bring up the docker containers
```bash
cd ${HUDI_DIR}/docker
cd $HUDI_DIR/docker
./setup_demo.sh
```

Expand All @@ -345,7 +387,6 @@ beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false


# List Tables
0: jdbc:hive2://hiveserver:10000> show tables;
+---------------------+--+
Expand All @@ -357,7 +398,6 @@ beeline -u jdbc:hive2://hiveserver:10000 \
3 rows selected (1.199 seconds)
0: jdbc:hive2://hiveserver:10000>


# Look at partitions that were added
0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_ro;
+-------------------+--+
Expand All @@ -371,7 +411,6 @@ beeline -u jdbc:hive2://hiveserver:10000 \
+-------------------+--+
1 row selected (0.24 seconds)


0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_ro;
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
Expand Down