From 2117915a16d90c430f7be5b6a87512800267c71a Mon Sep 17 00:00:00 2001 From: Rajesh Mahindra Date: Thu, 25 Nov 2021 22:33:37 -0800 Subject: [PATCH 1/4] Fix README with current limitations of hive sync --- hudi-kafka-connect/README.md | 151 ++++++++++-------- hudi-kafka-connect/demo/config-sink-hive.json | 31 ++++ hudi-kafka-connect/demo/config-sink.json | 14 +- .../demo/connect-distributed.properties | 2 +- 4 files changed, 117 insertions(+), 81 deletions(-) create mode 100644 hudi-kafka-connect/demo/config-sink-hive.json diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md index 33adb5b1d2270..5b7b9c579879b 100644 --- a/hudi-kafka-connect/README.md +++ b/hudi-kafka-connect/README.md @@ -29,34 +29,29 @@ The first thing you need to do to start using this connector is building it. In - Install [kcat](https://github.com/edenhill/kcat) = Install jq. `brew install jq` -After installing these dependencies, execute the following commands. This will install all the Hudi dependency jars, -including the fat packaged jar that contains all the dependencies required for a functional Hudi Kafka Connect Sink. - - -```bash -mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am -``` - -Next, we need to make sure that the hudi sink connector bundle jar is in Kafka Connect classpath. Note that the connect -classpath should be same as the one configured in the connector configuration file. - -```bash -cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/java/hudi-kafka-connect/ -``` - + ## Trying the connector -After building the package, we need to install the Apache Kafka +After installing these dependencies, follow steps based on your requirement. ### 1 - Starting the environment For runtime dependencies, we encourage using the confluent HDFS connector jars. We have tested our setup with version `10.1.0`. -After downloading the connector, copy the jars from the lib folder to the Kafka Connect classpath. +Either use confluent-hub to install the connector or download it from [here](https://tinyurl.com/yb472f79). + +Copy the entire folder to the classpath that will be used by the Hudi Kafka Connector. ```bash confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0 +cp confluentinc-kafka-connect-hdfs-10.1.0/* /usr/local/share/kafka/plugins/ +``` + +Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. +```bash +cd ${HUDI_DIR} +mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am +cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib/ ``` -Add `confluentinc-kafka-connect-hdfs-10.1.0/lib` to the plugin.path (comma separated) in $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties ### 2 - Set up the docker containers @@ -80,7 +75,7 @@ Essentially, follow the steps listed here: Bring up the docker containers ```bash -cd $HUDI_DIR/docker +cd ${HUDI_DIR}/docker ./setup_demo.sh ``` @@ -93,7 +88,7 @@ registries, we use Confluent schema registry. Download the latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema registry service. -NOTE: You might need to change the port from `8081` to `8082`. +NOTE: You must change the port from `8081` (default) to `8082` to avoid conflict. ```bash cd $CONFLUENT_DIR @@ -137,8 +132,8 @@ Note that if multiple workers need to be run, the webserver needs to be reconfig successful running of the workers. ```bash -cd $KAFKA_HOME -./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties +cd ${KAFKA_HOME} +./bin/connect-distributed.sh ${HUDI_DIR}/hudi-kafka-connect/demo/connect-distributed.properties ``` ### 7 - To add the Hudi Sink to the Connector (delete it if you want to re-configure) @@ -191,52 +186,7 @@ total 5168 -rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet ``` -### 8- Querying via Hive - -```bash -docker exec -it adhoc-2 /bin/bash -beeline -u jdbc:hive2://hiveserver:10000 \ - --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \ - --hiveconf hive.stats.autogather=false - - -# List Tables -0: jdbc:hive2://hiveserver:10000> show tables; -+---------------------+--+ -| tab_name | -+---------------------+--+ -| huditesttopic_ro | -| huditesttopic_rt | -+---------------------+--+ -3 rows selected (1.199 seconds) -0: jdbc:hive2://hiveserver:10000> - - -# Look at partitions that were added -0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_rt; -+-------------------+--+ -| partition | -+-------------------+--+ -| date=partition_0 | -| date=partition_1 | -| date=partition_2 | -| date=partition_3 | -| date=partition_4 | -+-------------------+--+ -1 row selected (0.24 seconds) - - -0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_rt; -+----------------------+---------+----------------------+---------+------------+-----------+--+ -| _hoodie_commit_time | symbol | ts | volume | open | close | -+----------------------+---------+----------------------+---------+------------+-----------+--+ -| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | -| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 | -+----------------------+---------+----------------------+---------+------------+-----------+--+ -``` - - -### 9 - Run async compaction and clustering if scheduled +### 8 - Run async compaction and clustering if scheduled When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is running. Inline compaction and clustering are disabled by default due to performance reason. By default, async @@ -365,3 +315,68 @@ hoodie.write.concurrency.mode=single_writer Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled clustering is going to be executed. + +### 9- Querying via Hive + +In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore, +that enable querying via Hive, Presto etc. + +Firstly, (re)-install a different connector that is configured to write the Hudi table to Hdfs instead of local filesystem. + +```bash +curl -X DELETE http://localhost:8083/connectors/hudi-sink +curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect/demo/config-sink-hive.json http://localhost:8083/connectors +``` + +After running the connector, you can query the hive server using the following steps: + +```bash +docker exec -it adhoc-2 /bin/bash +beeline -u jdbc:hive2://hiveserver:10000 \ + --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \ + --hiveconf hive.stats.autogather=false + + +# List Tables +0: jdbc:hive2://hiveserver:10000> show tables; ++---------------------+--+ +| tab_name | ++---------------------+--+ +| huditesttopic_ro | +| huditesttopic_rt | ++---------------------+--+ +3 rows selected (1.199 seconds) +0: jdbc:hive2://hiveserver:10000> + + +# Look at partitions that were added +0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_ro; ++-------------------+--+ +| partition | ++-------------------+--+ +| date=partition_0 | +| date=partition_1 | +| date=partition_2 | +| date=partition_3 | +| date=partition_4 | ++-------------------+--+ +1 row selected (0.24 seconds) + + +0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_ro; ++----------------------+---------+----------------------+---------+------------+-----------+--+ +| _hoodie_commit_time | symbol | ts | volume | open | close | ++----------------------+---------+----------------------+---------+------------+-----------+--+ +| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | +| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 | ++----------------------+---------+----------------------+---------+------------+-----------+--+ +``` + +`Current Limitation:` The Hudi Kafka Connect sink uses `Merge-On-Read` by default, and inserts/ appends the kafka records +directly to the log file(s). Asynchronously, compaction service can be executed to merge the log files into base file (Parquet format). +Generally, we support both Read-Optimized that reads only parquet base files and Snapshot queries that read and merge +records across base and log files. However, currently there is a limitation where we are not able to read records from +only log files. Hence, the queries for Hudi Kafka Connect will only work after compaction merges the records into base files. Alternatively, +users have the option to reconfigure the table type to `COPY_ON_WRITE` in config-sink.json. + + diff --git a/hudi-kafka-connect/demo/config-sink-hive.json b/hudi-kafka-connect/demo/config-sink-hive.json new file mode 100644 index 0000000000000..bf7e998331ca7 --- /dev/null +++ b/hudi-kafka-connect/demo/config-sink-hive.json @@ -0,0 +1,31 @@ +{ + "name": "hudi-sink", + "config": { + "bootstrap.servers": "kafkabroker:9092", + "connector.class": "org.apache.hudi.connect.HoodieSinkConnector", + "tasks.max": "4", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter.schemas.enable": "false", + "topics": "hudi-test-topic", + "hoodie.table.name": "hudi-test-topic", + "hoodie.table.type": "MERGE_ON_READ", + "hoodie.metadata.enable": "false", + "hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic", + "hoodie.datasource.write.recordkey.field": "volume", + "hoodie.datasource.write.partitionpath.field": "date", + "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", + "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/hudi-test-topic/versions/latest", + "hoodie.kafka.commit.interval.secs": 60, + "hoodie.meta.sync.enable": "true", + "hoodie.meta.sync.classes": "org.apache.hudi.hive.HiveSyncTool", + "hoodie.datasource.hive_sync.table": "huditesttopic", + "hoodie.datasource.hive_sync.partition_fields": "date", + "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", + "hoodie.datasource.hive_sync.use_jdbc": "false", + "hoodie.datasource.hive_sync.mode": "hms", + "dfs.client.use.datanode.hostname": "true", + "hive.metastore.uris": "thrift://hivemetastore:9083", + "hive.metastore.client.socket.timeout": "1500s" + } +} diff --git a/hudi-kafka-connect/demo/config-sink.json b/hudi-kafka-connect/demo/config-sink.json index bf7e998331ca7..9d1aedff647f2 100644 --- a/hudi-kafka-connect/demo/config-sink.json +++ b/hudi-kafka-connect/demo/config-sink.json @@ -11,21 +11,11 @@ "hoodie.table.name": "hudi-test-topic", "hoodie.table.type": "MERGE_ON_READ", "hoodie.metadata.enable": "false", - "hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic", + "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic", "hoodie.datasource.write.recordkey.field": "volume", "hoodie.datasource.write.partitionpath.field": "date", "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/hudi-test-topic/versions/latest", - "hoodie.kafka.commit.interval.secs": 60, - "hoodie.meta.sync.enable": "true", - "hoodie.meta.sync.classes": "org.apache.hudi.hive.HiveSyncTool", - "hoodie.datasource.hive_sync.table": "huditesttopic", - "hoodie.datasource.hive_sync.partition_fields": "date", - "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", - "hoodie.datasource.hive_sync.use_jdbc": "false", - "hoodie.datasource.hive_sync.mode": "hms", - "dfs.client.use.datanode.hostname": "true", - "hive.metastore.uris": "thrift://hivemetastore:9083", - "hive.metastore.client.socket.timeout": "1500s" + "hoodie.kafka.commit.interval.secs": 60 } } diff --git a/hudi-kafka-connect/demo/connect-distributed.properties b/hudi-kafka-connect/demo/connect-distributed.properties index 172e847897cc6..1c28bc60d52b3 100644 --- a/hudi-kafka-connect/demo/connect-distributed.properties +++ b/hudi-kafka-connect/demo/connect-distributed.properties @@ -30,4 +30,4 @@ status.storage.replication.factor=1 offset.flush.interval.ms=60000 listeners=HTTP://:8083 -plugin.path=/usr/local/share/java +plugin.path=/usr/local/share/kafka/plugins From cb6932255d5e9439f521362c343d0eee863c837d Mon Sep 17 00:00:00 2001 From: Rajesh Mahindra Date: Thu, 25 Nov 2021 22:42:39 -0800 Subject: [PATCH 2/4] Fix README with current limitations of hive sync --- hudi-kafka-connect/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md index 5b7b9c579879b..9708313a206c7 100644 --- a/hudi-kafka-connect/README.md +++ b/hudi-kafka-connect/README.md @@ -46,11 +46,12 @@ confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0 cp confluentinc-kafka-connect-hdfs-10.1.0/* /usr/local/share/kafka/plugins/ ``` -Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. +Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it +to the plugin path that contains all the other jars (`/usr/local/share/kafka/plugins/lib`) ```bash cd ${HUDI_DIR} mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am -cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib/ +cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib ``` ### 2 - Set up the docker containers From 987e69fa4cc0b82bab89a8b9d47c0a988bc75a41 Mon Sep 17 00:00:00 2001 From: Rajesh Mahindra Date: Fri, 26 Nov 2021 00:04:09 -0800 Subject: [PATCH 3/4] Fix dep issue --- packaging/hudi-kafka-connect-bundle/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index a828132d8b4c9..cd7b151ced553 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -80,6 +80,7 @@ org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr org.apache.hudi:hudi-timeline-service + org.apache.hudi:hudi-aws org.apache.hudi:hudi-flink_${scala.binary.version} From 2835de9db21c29b7b75c003e3d6acede6c7354c5 Mon Sep 17 00:00:00 2001 From: Rajesh Mahindra Date: Fri, 26 Nov 2021 13:41:05 -0800 Subject: [PATCH 4/4] Fix Copy on Write flow --- .../java/org/apache/hudi/table/FileIdPrefixProvider.java | 8 ++++---- .../org/apache/hudi/table/RandomFileIdPrefixProvider.java | 5 ++--- hudi-kafka-connect/README.md | 1 + .../hudi/connect/KafkaConnectFileIdPrefixProvider.java | 5 ++--- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java index d06da9b0d9b92..0fc0823184ea0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java @@ -18,17 +18,17 @@ package org.apache.hudi.table; -import java.util.Properties; +import org.apache.hudi.common.config.TypedProperties; public abstract class FileIdPrefixProvider { - private final Properties props; + private final TypedProperties props; - public FileIdPrefixProvider(Properties props) { + public FileIdPrefixProvider(TypedProperties props) { this.props = props; } - public Properties getProps() { + public TypedProperties getProps() { return props; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java index 89d993460e2ba..5ad3eedf437c2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java @@ -18,13 +18,12 @@ package org.apache.hudi.table; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; -import java.util.Properties; - public class RandomFileIdPrefixProvider extends FileIdPrefixProvider { - public RandomFileIdPrefixProvider(Properties props) { + public RandomFileIdPrefixProvider(TypedProperties props) { super(props); } diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md index 9708313a206c7..0bf53d6c41ca4 100644 --- a/hudi-kafka-connect/README.md +++ b/hudi-kafka-connect/README.md @@ -151,6 +151,7 @@ curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect Now, you should see that the connector is created and tasks are running. ```bash +mkdir /tmp/hoodie/ curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors ["hudi-sink"] curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java index 9c4674706a6a1..436366709d5be 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java @@ -18,6 +18,7 @@ package org.apache.hudi.connect; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.connect.utils.KafkaConnectUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.FileIdPrefixProvider; @@ -25,8 +26,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.util.Properties; - public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider { public static final String KAFKA_CONNECT_PARTITION_ID = "hudi.kafka.connect.partition"; @@ -34,7 +33,7 @@ public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider { private final String kafkaPartition; - public KafkaConnectFileIdPrefixProvider(Properties props) { + public KafkaConnectFileIdPrefixProvider(TypedProperties props) { super(props); if (!props.containsKey(KAFKA_CONNECT_PARTITION_ID)) { LOG.error("Fatal error due to Kafka Connect Partition Id is not set");