diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md index e2abab1ccbabd..f5087ce0f9b6d 100644 --- a/hudi-kafka-connect/README.md +++ b/hudi-kafka-connect/README.md @@ -61,7 +61,7 @@ Once downloaded and built, run the Zookeeper server and Kafka server using the c ```bash export KAFKA_HOME=/path/to/kafka_install_dir -cd $KAFKA_KAFKA_HOME +cd $KAFKA_HOME ./bin/zookeeper-server-start.sh ./config/zookeeper.properties ./bin/kafka-server-start.sh ./config/server.properties ``` @@ -71,8 +71,9 @@ Wait until the kafka cluster is up and running. ### 2 - Set up the schema registry Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema -registries, we use Confluent schema registry. Download the latest confluent platform and run the schema registry -service. +registries, we use Confluent schema registry. Download the +latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema +registry service. ```bash cd $CONFLUENT_DIR @@ -98,6 +99,13 @@ cd $HUDI_DIR/hudi-kafka-connect/demo/ bash setupKafka.sh -n ``` +To generate data for long-running tests, you can add `-b` option to specify the number of batches of data +to generate, with each batch containing a number of messages and idle time between batches, as follows: + +```bash +bash setupKafka.sh -n -b +``` + ### 4 - Run the Sink connector worker (multiple workers can be run) The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks) diff --git a/hudi-kafka-connect/demo/setupKafka.sh b/hudi-kafka-connect/demo/setupKafka.sh index 20edb1ceb2eac..f4a88ee91e651 100755 --- a/hudi-kafka-connect/demo/setupKafka.sh +++ b/hudi-kafka-connect/demo/setupKafka.sh @@ -21,11 +21,14 @@ ######################### usage() { echo "Usage: $0" - echo " -n |--num-kafka-records, (required) number of kafka records to generate" + echo " -n |--num-kafka-records, (required) number of kafka records to generate in a batch" + echo " -b |--num-batch, (optional) number of batches of records to generate (default is 1)" + echo " -t |--reuse-topic, (optional) reuses the Kafka topic (default deletes and recreate the topic)" echo " -f |--raw-file, (optional) raw file for the kafka records" echo " -k |--kafka-topic, (optional) Topic name for Kafka" echo " -m |--num-kafka-partitions, (optional) number of kafka partitions" echo " -r |--record-key, (optional) field to use as record key" + echo " -o |--record-key-offset, (optional) record key offset to start with (default is 0)" echo " -l |--num-hudi-partitions, (optional) number of hudi partitions" echo " -p |--partition-key, (optional) field to use as partition" echo " -s |--schema-file, (optional) path of the file containing the schema of the records" @@ -53,12 +56,23 @@ recordKey=volume numHudiPartitions=5 partitionField=date schemaFile=${HUDI_DIR}/docker/demo/config/schema.avsc +numBatch=1 +recordValue=0 +recreateTopic="Y" -while getopts ":n:f:k:m:r:l:p:s:-:" opt; do +while getopts ":n:b:tf:k:m:r:o:l:p:s:-:" opt; do case $opt in n) - num_records="$OPTARG" - printf "Argument num-kafka-records is %s\n" "$num_records" + numRecords="$OPTARG" + printf "Argument num-kafka-records is %s\n" "$numRecords" + ;; + b) + numBatch="$OPTARG" + printf "Argument num-batch is %s\n" "$numBatch" + ;; + t) + recreateTopic="N" + printf "Argument recreate-topic is N (reuse Kafka topic) \n" ;; k) rawDataFile="$OPTARG" @@ -76,6 +90,10 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do recordKey="$OPTARG" printf "Argument record-key is %s\n" "$recordKey" ;; + o) + recordValue="$OPTARG" + printf "Argument record-key-offset is %s\n" "$recordValue" + ;; l) numHudiPartitions="$OPTARG" printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions" @@ -84,7 +102,7 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do partitionField="$OPTARG" printf "Argument partition-key is %s\n" "$partitionField" ;; - p) + s) schemaFile="$OPTARG" printf "Argument schema-file is %s\n" "$schemaFile" ;; @@ -94,11 +112,15 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do esac done -# First delete the existing topic -${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092 +if [ $recreateTopic = "Y" ]; then + # First delete the existing topic + echo "Delete Kafka topic $kafkaTopicName ..." + ${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092 -# Create the topic with 4 partitions -${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092 + # Create the topic with 4 partitions + echo "Create Kafka topic $kafkaTopicName ..." + ${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092 +fi # Setup the schema registry export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring) @@ -115,32 +137,38 @@ done events_file=/tmp/kcat-input.events rm -f ${events_file} -recordValue=0 -num_records=$((num_records + 0)) - -for (( ; ; )); do - while IFS= read line; do - for partitionValue in "${partitions[@]}"; do - echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' >>${events_file} - ((recordValue = recordValue + 1)) - - if [ $recordValue -gt $num_records ]; then +totalNumRecords=$((numRecords + recordValue)) + +for ((i = 1;i<=numBatch;i++)); do + rm -f ${events_file} + date + echo "Start batch $i ..." + batchRecordSeq=0 + for (( ; ; )); do + while IFS= read line; do + for partitionValue in "${partitions[@]}"; do + echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' >>${events_file} + ((recordValue = recordValue + 1)) + ((batchRecordSeq = batchRecordSeq + 1)) + + if [ $batchRecordSeq -eq $numRecords ]; then + break + fi + done + + if [ $batchRecordSeq -eq $numRecords ]; then break fi - done - - if [ $recordValue -gt $num_records ]; then - break - fi + done <"$rawDataFile" - if [ $(($recordValue % 1000)) -eq 0 ]; then - sleep 1 - fi - done <"$rawDataFile" + if [ $batchRecordSeq -eq $numRecords ]; then + date + echo " Record key until $recordValue" + sleep 20 + break + fi + done - if [ $recordValue -gt $num_records ]; then - break - fi + echo "publish to Kafka ..." + grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic done - -grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic