diff --git a/hudi-kafka-connect/demo/setupKafka.sh b/hudi-kafka-connect/demo/setupKafka.sh index 5c618b2a70266..e4e8d2e382ed1 100755 --- a/hudi-kafka-connect/demo/setupKafka.sh +++ b/hudi-kafka-connect/demo/setupKafka.sh @@ -1,3 +1,4 @@ +#!/bin/bash # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -14,8 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -#!/bin/bash - ######################### # The command line help # ######################### @@ -79,11 +78,11 @@ while getopts ":n:b:tf:k:m:r:o:l:p:s:-:" opt; do recreateTopic="N" printf "Argument recreate-topic is N (reuse Kafka topic) \n" ;; - k) + f) rawDataFile="$OPTARG" printf "Argument raw-file is %s\n" "$rawDataFile" ;; - f) + k) kafkaTopicName="$OPTARG" printf "Argument kafka-topic is %s\n" "$kafkaTopicName" ;; diff --git a/packaging/bundle-validation/Dockerfile-base b/packaging/bundle-validation/Dockerfile-base index 1e782e08d5292..81df6ce0c029d 100644 --- a/packaging/bundle-validation/Dockerfile-base +++ b/packaging/bundle-validation/Dockerfile-base @@ -16,7 +16,7 @@ # FROM adoptopenjdk/openjdk8:alpine -RUN apk add --no-cache --upgrade bash +RUN apk add --no-cache --upgrade bash curl jq RUN mkdir /opt/bundle-validation ENV WORKDIR=/opt/bundle-validation @@ -27,6 +27,8 @@ ARG HIVE_VERSION=3.1.3 ARG DERBY_VERSION=10.14.1.0 ARG SPARK_VERSION=3.1.3 ARG SPARK_HADOOP_VERSION=2.7 +ARG CONFLUENT_VERSION=5.5.12 +ARG KAFKA_CONNECT_HDFS_VERSION=10.1.13 RUN wget https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz -P "$WORKDIR" \ && tar -xf $WORKDIR/hadoop-$HADOOP_VERSION.tar.gz -C $WORKDIR/ \ @@ -47,3 +49,15 @@ RUN wget https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK && tar -xf $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz -C $WORKDIR/ \ && rm $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz ENV SPARK_HOME=$WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION + +RUN wget https://packages.confluent.io/archive/${CONFLUENT_VERSION%.*}/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -P "$WORKDIR" \ + && tar -xf $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -C $WORKDIR/ \ + && rm $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz +ENV CONFLUENT_HOME=$WORKDIR/confluent-$CONFLUENT_VERSION + +RUN wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-hdfs/versions/$KAFKA_CONNECT_HDFS_VERSION/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -P "$WORKDIR" \ + && mkdir $WORKDIR/kafka-connectors \ + && unzip $WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -d $WORKDIR/kafka-connectors/ \ + && rm $WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip \ + && printf "\nplugin.path=$WORKDIR/kafka-connectors\n" >> $CONFLUENT_HOME/etc/kafka/connect-distributed.properties +ENV KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH=$WORKDIR/kafka-connectors/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION/lib diff --git a/packaging/bundle-validation/ci_run.sh b/packaging/bundle-validation/ci_run.sh index 4cc795f405902..46ef80964bed2 100755 --- a/packaging/bundle-validation/ci_run.sh +++ b/packaging/bundle-validation/ci_run.sh @@ -36,6 +36,8 @@ if [[ ${SPARK_PROFILE} == 'spark2.4' ]]; then DERBY_VERSION=10.10.2.0 SPARK_VERSION=2.4.8 SPARK_HADOOP_VERSION=2.7 + CONFLUENT_VERSION=5.5.12 + KAFKA_CONNECT_HDFS_VERSION=10.1.13 IMAGE_TAG=spark248hive239 elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then HADOOP_VERSION=2.7.7 @@ -43,6 +45,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then DERBY_VERSION=10.14.1.0 SPARK_VERSION=3.1.3 SPARK_HADOOP_VERSION=2.7 + CONFLUENT_VERSION=5.5.12 + KAFKA_CONNECT_HDFS_VERSION=10.1.13 IMAGE_TAG=spark313hive313 elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then HADOOP_VERSION=2.7.7 @@ -50,6 +54,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then DERBY_VERSION=10.14.1.0 SPARK_VERSION=3.2.2 SPARK_HADOOP_VERSION=2.7 + CONFLUENT_VERSION=5.5.12 + KAFKA_CONNECT_HDFS_VERSION=10.1.13 IMAGE_TAG=spark322hive313 elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then HADOOP_VERSION=2.7.7 @@ -57,6 +63,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then DERBY_VERSION=10.14.1.0 SPARK_VERSION=3.3.0 SPARK_HADOOP_VERSION=2 + CONFLUENT_VERSION=5.5.12 + KAFKA_CONNECT_HDFS_VERSION=10.1.13 IMAGE_TAG=spark330hive313 fi @@ -67,6 +75,7 @@ cp ${GITHUB_WORKSPACE}/packaging/hudi-hadoop-mr-bundle/target/hudi-*-$HUDI_VERSI cp ${GITHUB_WORKSPACE}/packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-slim-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ +cp ${GITHUB_WORKSPACE}/packaging/hudi-kafka-connect-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ echo 'Validating jars below:' ls -l $TMP_JARS_DIR @@ -84,6 +93,8 @@ docker build \ --build-arg DERBY_VERSION=$DERBY_VERSION \ --build-arg SPARK_VERSION=$SPARK_VERSION \ --build-arg SPARK_HADOOP_VERSION=$SPARK_HADOOP_VERSION \ +--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \ +--build-arg KAFKA_CONNECT_HDFS_VERSION=$KAFKA_CONNECT_HDFS_VERSION \ --build-arg IMAGE_TAG=$IMAGE_TAG \ -t hudi-ci-bundle-validation:$IMAGE_TAG \ . diff --git a/packaging/bundle-validation/kafka/config-sink.json b/packaging/bundle-validation/kafka/config-sink.json new file mode 100644 index 0000000000000..0318ec961a598 --- /dev/null +++ b/packaging/bundle-validation/kafka/config-sink.json @@ -0,0 +1,20 @@ +{ + "name": "hudi-sink", + "config": { + "bootstrap.servers": "localhost:9092", + "connector.class": "org.apache.hudi.connect.HoodieSinkConnector", + "tasks.max": "2", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter.schemas.enable": "false", + "topics": "hudi-test-topic", + "hoodie.table.name": "hudi-test-topic", + "hoodie.table.type": "COPY_ON_WRITE", + "hoodie.base.path": "file:///tmp/hudi-kafka-test", + "hoodie.datasource.write.recordkey.field": "volume", + "hoodie.datasource.write.partitionpath.field": "date", + "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", + "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest", + "hoodie.kafka.commit.interval.secs": 10 + } +} diff --git a/packaging/bundle-validation/kafka/consume.sh b/packaging/bundle-validation/kafka/consume.sh new file mode 100755 index 0000000000000..5ae67c3b61379 --- /dev/null +++ b/packaging/bundle-validation/kafka/consume.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties & +sleep 30 +curl -X POST -H "Content-Type:application/json" -d @/opt/bundle-validation/kafka/config-sink.json http://localhost:8083/connectors & +sleep 30 +curl -X DELETE http://localhost:8083/connectors/hudi-sink & +sleep 10 + +# validate +numCommits=$(ls /tmp/hudi-kafka-test/.hoodie/*.commit | wc -l) +if [ $numCommits -gt 0 ]; then + exit 0 +else + exit 1 +fi diff --git a/packaging/bundle-validation/kafka/produce.sh b/packaging/bundle-validation/kafka/produce.sh new file mode 100755 index 0000000000000..7f828b5d3b6f3 --- /dev/null +++ b/packaging/bundle-validation/kafka/produce.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +kafkaTopicName=hudi-test-topic + +# Setup the schema registry +SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' /opt/bundle-validation/data/stocks/schema.avsc | sed '/\/\*/,/*\//d' | jq tostring) +curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8081/subjects/${kafkaTopicName}/versions + +# produce data +cat /opt/bundle-validation/data/stocks/data/batch_1.json /opt/bundle-validation/data/stocks/data/batch_2.json | $CONFLUENT_HOME/bin/kafka-console-producer \ +--bootstrap-server http://localhost:9092 \ +--topic ${kafkaTopicName} diff --git a/packaging/bundle-validation/validate.sh b/packaging/bundle-validation/validate.sh index 67abdd5d65658..7d40228651cb5 100755 --- a/packaging/bundle-validation/validate.sh +++ b/packaging/bundle-validation/validate.sh @@ -32,6 +32,7 @@ ln -sf $JARS_DIR/hudi-hadoop-mr*.jar $JARS_DIR/hadoop-mr.jar ln -sf $JARS_DIR/hudi-spark*.jar $JARS_DIR/spark.jar ln -sf $JARS_DIR/hudi-utilities-bundle*.jar $JARS_DIR/utilities.jar ln -sf $JARS_DIR/hudi-utilities-slim*.jar $JARS_DIR/utilities-slim.jar +ln -sf $JARS_DIR/hudi-kafka-connect-bundle*.jar $JARS_DIR/kafka-connect.jar ## @@ -131,26 +132,64 @@ test_utilities_bundle () { } +## +# Function to test the kafka-connect bundle. +# It runs zookeeper, kafka broker, schema registry, and connector worker. +# After producing and consuming data, it checks successful commit under `.hoodie/` +# +# 1st arg: path to the hudi-kafka-connect-bundle.jar (for writing data) +# +# env vars (defined in container): +# CONFLUENT_HOME: path to the confluent community directory +# KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH: path to install hudi-kafka-connect-bundle.jar +## +test_kafka_connect_bundle() { + KAFKA_CONNECT_JAR=$1 + cp $KAFKA_CONNECT_JAR $KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH + $CONFLUENT_HOME/bin/zookeeper-server-start $CONFLUENT_HOME/etc/kafka/zookeeper.properties & + $CONFLUENT_HOME/bin/kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties & + sleep 10 + $CONFLUENT_HOME/bin/schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties & + sleep 10 + $CONFLUENT_HOME/bin/kafka-topics --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 + $WORKDIR/kafka/produce.sh + $WORKDIR/kafka/consume.sh +} + + +############################ +# Execute tests +############################ + +echo "::warning::validate.sh validating spark & hadoop-mr bundle" test_spark_hadoop_mr_bundles if [ "$?" -ne 0 ]; then exit 1 fi +echo "::warning::validate.sh done validating spark & hadoop-mr bundle" if [[ $SPARK_HOME == *"spark-2.4"* ]] || [[ $SPARK_HOME == *"spark-3.1"* ]] then - echo "::warning::validate.sh testing utilities bundle" + echo "::warning::validate.sh validating utilities bundle" test_utilities_bundle $JARS_DIR/utilities.jar if [ "$?" -ne 0 ]; then exit 1 fi - echo "::warning::validate.sh done testing utilities bundle" + echo "::warning::validate.sh done validating utilities bundle" else - echo "::warning::validate.sh skip testing utilities bundle for non-spark2.4 & non-spark3.1 build" + echo "::warning::validate.sh skip validating utilities bundle for non-spark2.4 & non-spark3.1 build" fi -echo "::warning::validate.sh testing utilities slim bundle" +echo "::warning::validate.sh validating utilities slim bundle" test_utilities_bundle $JARS_DIR/utilities-slim.jar $JARS_DIR/spark.jar if [ "$?" -ne 0 ]; then exit 1 fi -echo "::warning::validate.sh done testing utilities slim bundle" +echo "::warning::validate.sh done validating utilities slim bundle" + +echo "::warning::validate.sh validating kafka connect bundle" +test_kafka_connect_bundle $JARS_DIR/kafka-connect.jar +if [ "$?" -ne 0 ]; then + exit 1 +fi +echo "::warning::validate.sh done validating kafka connect bundle"