Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions hudi-kafka-connect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Once downloaded and built, run the Zookeeper server and Kafka server using the c

```bash
export KAFKA_HOME=/path/to/kafka_install_dir
cd $KAFKA_KAFKA_HOME
cd $KAFKA_HOME
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
```
Expand All @@ -71,8 +71,9 @@ Wait until the kafka cluster is up and running.
### 2 - Set up the schema registry

Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema
registries, we use Confluent schema registry. Download the latest confluent platform and run the schema registry
service.
registries, we use Confluent schema registry. Download the
latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema
registry service.

```bash
cd $CONFLUENT_DIR
Expand All @@ -98,6 +99,13 @@ cd $HUDI_DIR/hudi-kafka-connect/demo/
bash setupKafka.sh -n <total_kafka_messages>
```

To generate data for long-running tests, you can add `-b` option to specify the number of batches of data
to generate, with each batch containing a number of messages and idle time between batches, as follows:

```bash
bash setupKafka.sh -n <num_kafka_messages_per_batch> -b <num_batches>
```

### 4 - Run the Sink connector worker (multiple workers can be run)

The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks)
Expand Down
94 changes: 61 additions & 33 deletions hudi-kafka-connect/demo/setupKafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
#########################
usage() {
echo "Usage: $0"
echo " -n |--num-kafka-records, (required) number of kafka records to generate"
echo " -n |--num-kafka-records, (required) number of kafka records to generate in a batch"
echo " -b |--num-batch, (optional) number of batches of records to generate (default is 1)"
echo " -t |--reuse-topic, (optional) reuses the Kafka topic (default deletes and recreate the topic)"
echo " -f |--raw-file, (optional) raw file for the kafka records"
echo " -k |--kafka-topic, (optional) Topic name for Kafka"
echo " -m |--num-kafka-partitions, (optional) number of kafka partitions"
echo " -r |--record-key, (optional) field to use as record key"
echo " -o |--record-key-offset, (optional) record key offset to start with (default is 0)"
echo " -l |--num-hudi-partitions, (optional) number of hudi partitions"
echo " -p |--partition-key, (optional) field to use as partition"
echo " -s |--schema-file, (optional) path of the file containing the schema of the records"
Expand Down Expand Up @@ -53,12 +56,23 @@ recordKey=volume
numHudiPartitions=5
partitionField=date
schemaFile=${HUDI_DIR}/docker/demo/config/schema.avsc
numBatch=1
recordValue=0
recreateTopic="Y"

while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
while getopts ":n:b:tf:k:m:r:o:l:p:s:-:" opt; do
case $opt in
n)
num_records="$OPTARG"
printf "Argument num-kafka-records is %s\n" "$num_records"
numRecords="$OPTARG"
printf "Argument num-kafka-records is %s\n" "$numRecords"
;;
b)
numBatch="$OPTARG"
printf "Argument num-batch is %s\n" "$numBatch"
;;
t)
recreateTopic="N"
printf "Argument recreate-topic is N (reuse Kafka topic) \n"
;;
k)
rawDataFile="$OPTARG"
Expand All @@ -76,6 +90,10 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
recordKey="$OPTARG"
printf "Argument record-key is %s\n" "$recordKey"
;;
o)
recordValue="$OPTARG"
printf "Argument record-key-offset is %s\n" "$recordValue"
;;
l)
numHudiPartitions="$OPTARG"
printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions"
Expand All @@ -84,7 +102,7 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
partitionField="$OPTARG"
printf "Argument partition-key is %s\n" "$partitionField"
;;
p)
s)
schemaFile="$OPTARG"
printf "Argument schema-file is %s\n" "$schemaFile"
;;
Expand All @@ -94,11 +112,15 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
esac
done

# First delete the existing topic
${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
if [ $recreateTopic = "Y" ]; then
# First delete the existing topic
echo "Delete Kafka topic $kafkaTopicName ..."
${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092

# Create the topic with 4 partitions
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
# Create the topic with 4 partitions
echo "Create Kafka topic $kafkaTopicName ..."
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
fi

# Setup the schema registry
export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring)
Expand All @@ -115,32 +137,38 @@ done
events_file=/tmp/kcat-input.events
rm -f ${events_file}

recordValue=0
num_records=$((num_records + 0))

for (( ; ; )); do
while IFS= read line; do
for partitionValue in "${partitions[@]}"; do
echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' >>${events_file}
((recordValue = recordValue + 1))

if [ $recordValue -gt $num_records ]; then
totalNumRecords=$((numRecords + recordValue))

for ((i = 1;i<=numBatch;i++)); do
rm -f ${events_file}
date
echo "Start batch $i ..."
batchRecordSeq=0
for (( ; ; )); do
while IFS= read line; do
for partitionValue in "${partitions[@]}"; do
echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' >>${events_file}
((recordValue = recordValue + 1))
((batchRecordSeq = batchRecordSeq + 1))

if [ $batchRecordSeq -eq $numRecords ]; then
break
fi
done

if [ $batchRecordSeq -eq $numRecords ]; then
break
fi
done

if [ $recordValue -gt $num_records ]; then
break
fi
done <"$rawDataFile"

if [ $(($recordValue % 1000)) -eq 0 ]; then
sleep 1
fi
done <"$rawDataFile"
if [ $batchRecordSeq -eq $numRecords ]; then
date
echo " Record key until $recordValue"
sleep 20
break
fi
done

if [ $recordValue -gt $num_records ]; then
break
fi
echo "publish to Kafka ..."
grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic
done

grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic