Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@

package org.apache.hudi.table;

import java.util.Properties;
import org.apache.hudi.common.config.TypedProperties;

public abstract class FileIdPrefixProvider {

private final Properties props;
private final TypedProperties props;

public FileIdPrefixProvider(Properties props) {
public FileIdPrefixProvider(TypedProperties props) {
this.props = props;
}

public Properties getProps() {
public TypedProperties getProps() {
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

package org.apache.hudi.table;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;

import java.util.Properties;

public class RandomFileIdPrefixProvider extends FileIdPrefixProvider {

public RandomFileIdPrefixProvider(Properties props) {
public RandomFileIdPrefixProvider(TypedProperties props) {
super(props);
}

Expand Down
153 changes: 85 additions & 68 deletions hudi-kafka-connect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,30 @@ The first thing you need to do to start using this connector is building it. In
- Install [kcat](https://github.com/edenhill/kcat)
= Install jq. `brew install jq`

After installing these dependencies, execute the following commands. This will install all the Hudi dependency jars,
including the fat packaged jar that contains all the dependencies required for a functional Hudi Kafka Connect Sink.


```bash
mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am
```

Next, we need to make sure that the hudi sink connector bundle jar is in Kafka Connect classpath. Note that the connect
classpath should be same as the one configured in the connector configuration file.

```bash
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/java/hudi-kafka-connect/
```


## Trying the connector

After building the package, we need to install the Apache Kafka
After installing these dependencies, follow steps based on your requirement.

### 1 - Starting the environment

For runtime dependencies, we encourage using the confluent HDFS connector jars. We have tested our setup with version `10.1.0`.
After downloading the connector, copy the jars from the lib folder to the Kafka Connect classpath.
Either use confluent-hub to install the connector or download it from [here](https://tinyurl.com/yb472f79).

Copy the entire folder to the classpath that will be used by the Hudi Kafka Connector.

```bash
confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0
cp confluentinc-kafka-connect-hdfs-10.1.0/* /usr/local/share/kafka/plugins/
```

Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it
to the plugin path that contains all the other jars (`/usr/local/share/kafka/plugins/lib`)
```bash
cd ${HUDI_DIR}
mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib
```
Add `confluentinc-kafka-connect-hdfs-10.1.0/lib` to the plugin.path (comma separated) in $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties

### 2 - Set up the docker containers

Expand All @@ -80,7 +76,7 @@ Essentially, follow the steps listed here:

Bring up the docker containers
```bash
cd $HUDI_DIR/docker
cd ${HUDI_DIR}/docker
./setup_demo.sh
```

Expand All @@ -93,7 +89,7 @@ registries, we use Confluent schema registry. Download the
latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema
registry service.

NOTE: You might need to change the port from `8081` to `8082`.
NOTE: You must change the port from `8081` (default) to `8082` to avoid conflict.

```bash
cd $CONFLUENT_DIR
Expand Down Expand Up @@ -137,8 +133,8 @@ Note that if multiple workers need to be run, the webserver needs to be reconfig
successful running of the workers.

```bash
cd $KAFKA_HOME
./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
cd ${KAFKA_HOME}
./bin/connect-distributed.sh ${HUDI_DIR}/hudi-kafka-connect/demo/connect-distributed.properties
```

### 7 - To add the Hudi Sink to the Connector (delete it if you want to re-configure)
Expand All @@ -155,6 +151,7 @@ curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect
Now, you should see that the connector is created and tasks are running.

```bash
mkdir /tmp/hoodie/
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors
["hudi-sink"]
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq
Expand Down Expand Up @@ -191,52 +188,7 @@ total 5168
-rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
```

### 8- Querying via Hive

```bash
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false


# List Tables
0: jdbc:hive2://hiveserver:10000> show tables;
+---------------------+--+
| tab_name |
+---------------------+--+
| huditesttopic_ro |
| huditesttopic_rt |
+---------------------+--+
3 rows selected (1.199 seconds)
0: jdbc:hive2://hiveserver:10000>


# Look at partitions that were added
0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_rt;
+-------------------+--+
| partition |
+-------------------+--+
| date=partition_0 |
| date=partition_1 |
| date=partition_2 |
| date=partition_3 |
| date=partition_4 |
+-------------------+--+
1 row selected (0.24 seconds)


0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_rt;
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
```


### 9 - Run async compaction and clustering if scheduled
### 8 - Run async compaction and clustering if scheduled

When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is
running. Inline compaction and clustering are disabled by default due to performance reason. By default, async
Expand Down Expand Up @@ -365,3 +317,68 @@ hoodie.write.concurrency.mode=single_writer

Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled
clustering is going to be executed.

### 9- Querying via Hive

In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore,
that enable querying via Hive, Presto etc.

Firstly, (re)-install a different connector that is configured to write the Hudi table to Hdfs instead of local filesystem.

```bash
curl -X DELETE http://localhost:8083/connectors/hudi-sink
curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect/demo/config-sink-hive.json http://localhost:8083/connectors
```

After running the connector, you can query the hive server using the following steps:

```bash
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false


# List Tables
0: jdbc:hive2://hiveserver:10000> show tables;
+---------------------+--+
| tab_name |
+---------------------+--+
| huditesttopic_ro |
| huditesttopic_rt |
+---------------------+--+
3 rows selected (1.199 seconds)
0: jdbc:hive2://hiveserver:10000>


# Look at partitions that were added
0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_ro;
+-------------------+--+
| partition |
+-------------------+--+
| date=partition_0 |
| date=partition_1 |
| date=partition_2 |
| date=partition_3 |
| date=partition_4 |
+-------------------+--+
1 row selected (0.24 seconds)


0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_ro;
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
```

`Current Limitation:` The Hudi Kafka Connect sink uses `Merge-On-Read` by default, and inserts/ appends the kafka records
directly to the log file(s). Asynchronously, compaction service can be executed to merge the log files into base file (Parquet format).
Generally, we support both Read-Optimized that reads only parquet base files and Snapshot queries that read and merge
records across base and log files. However, currently there is a limitation where we are not able to read records from
only log files. Hence, the queries for Hudi Kafka Connect will only work after compaction merges the records into base files. Alternatively,
users have the option to reconfigure the table type to `COPY_ON_WRITE` in config-sink.json.


31 changes: 31 additions & 0 deletions hudi-kafka-connect/demo/config-sink-hive.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"name": "hudi-sink",
"config": {
"bootstrap.servers": "kafkabroker:9092",
"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
"tasks.max": "4",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"topics": "hudi-test-topic",
"hoodie.table.name": "hudi-test-topic",
"hoodie.table.type": "MERGE_ON_READ",
"hoodie.metadata.enable": "false",
"hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic",
"hoodie.datasource.write.recordkey.field": "volume",
"hoodie.datasource.write.partitionpath.field": "date",
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/hudi-test-topic/versions/latest",
"hoodie.kafka.commit.interval.secs": 60,
"hoodie.meta.sync.enable": "true",
"hoodie.meta.sync.classes": "org.apache.hudi.hive.HiveSyncTool",
"hoodie.datasource.hive_sync.table": "huditesttopic",
"hoodie.datasource.hive_sync.partition_fields": "date",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"dfs.client.use.datanode.hostname": "true",
"hive.metastore.uris": "thrift://hivemetastore:9083",
"hive.metastore.client.socket.timeout": "1500s"
}
}
14 changes: 2 additions & 12 deletions hudi-kafka-connect/demo/config-sink.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,11 @@
"hoodie.table.name": "hudi-test-topic",
"hoodie.table.type": "MERGE_ON_READ",
"hoodie.metadata.enable": "false",
"hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic",
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
"hoodie.datasource.write.recordkey.field": "volume",
"hoodie.datasource.write.partitionpath.field": "date",
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/hudi-test-topic/versions/latest",
"hoodie.kafka.commit.interval.secs": 60,
"hoodie.meta.sync.enable": "true",
"hoodie.meta.sync.classes": "org.apache.hudi.hive.HiveSyncTool",
"hoodie.datasource.hive_sync.table": "huditesttopic",
"hoodie.datasource.hive_sync.partition_fields": "date",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"dfs.client.use.datanode.hostname": "true",
"hive.metastore.uris": "thrift://hivemetastore:9083",
"hive.metastore.client.socket.timeout": "1500s"
"hoodie.kafka.commit.interval.secs": 60
}
}
2 changes: 1 addition & 1 deletion hudi-kafka-connect/demo/connect-distributed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ status.storage.replication.factor=1

offset.flush.interval.ms=60000
listeners=HTTP://:8083
plugin.path=/usr/local/share/java
plugin.path=/usr/local/share/kafka/plugins
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@

package org.apache.hudi.connect;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.FileIdPrefixProvider;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.Properties;

public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {

public static final String KAFKA_CONNECT_PARTITION_ID = "hudi.kafka.connect.partition";
private static final Logger LOG = LogManager.getLogger(KafkaConnectFileIdPrefixProvider.class);

private final String kafkaPartition;

public KafkaConnectFileIdPrefixProvider(Properties props) {
public KafkaConnectFileIdPrefixProvider(TypedProperties props) {
super(props);
if (!props.containsKey(KAFKA_CONNECT_PARTITION_ID)) {
LOG.error("Fatal error due to Kafka Connect Partition Id is not set");
Expand Down
1 change: 1 addition & 0 deletions packaging/hudi-kafka-connect-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
<include>org.apache.hudi:hudi-sync-common</include>
<include>org.apache.hudi:hudi-hadoop-mr</include>
<include>org.apache.hudi:hudi-timeline-service</include>
<include>org.apache.hudi:hudi-aws</include>

<!-- NOTE: This is temp (SchemaProvide dep) until PR3162 lands -->
<include>org.apache.hudi:hudi-flink_${scala.binary.version}</include>
Expand Down