Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 39 additions & 32 deletions hudi-kafka-connect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,35 +54,17 @@ mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib
```

### 2 - Set up the docker containers

To run the connect locally, we need kafka, zookeeper, hdfs, hive etc. To make the setup easier, we use the docker
containers from the hudi docker demo. Refer to [this link for the setup](https://hudi.apache.org/docs/docker_demo)

Essentially, follow the steps listed here:

/etc/hosts : The demo references many services running in container by the hostname. Add the following settings to /etc/hosts
```bash
127.0.0.1 adhoc-1
127.0.0.1 adhoc-2
127.0.0.1 namenode
127.0.0.1 datanode1
127.0.0.1 hiveserver
127.0.0.1 hivemetastore
127.0.0.1 kafkabroker
127.0.0.1 sparkmaster
127.0.0.1 zookeeper
```

Bring up the docker containers
Set up a Kafka broker locally. Download the latest apache kafka from [here](https://kafka.apache.org/downloads).
Once downloaded and built, run the Zookeeper server and Kafka server using the command line tools.
```bash
cd ${HUDI_DIR}/docker
./setup_demo.sh
export KAFKA_HOME=/path/to/kafka_install_dir
cd $KAFKA_HOME
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
```
Wait until the kafka cluster is up and running.

The schema registry and kafka connector can be run from host system directly (mac/ linux).

### 3 - Set up the schema registry
### 2 - Set up the schema registry

Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema
registries, we use Confluent schema registry. Download the
Expand All @@ -97,7 +79,7 @@ cd $CONFLUENT_DIR
./bin/schema-registry-start etc/schema-registry/schema-registry.properties
```

### 4 - Create the Hudi Control Topic for Coordination of the transactions
### 3 - Create the Hudi Control Topic for Coordination of the transactions

The control topic should only have `1` partition, since its used to coordinate the Hudi write transactions across the multiple Connect tasks.

Expand All @@ -107,7 +89,7 @@ cd $KAFKA_HOME
./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
```

### 5 - Create the Hudi Topic for the Sink and insert data into the topic
### 4 - Create the Hudi Topic for the Sink and insert data into the topic

Open a terminal to execute the following command:

Expand All @@ -123,7 +105,7 @@ to generate, with each batch containing a number of messages and idle time betwe
bash setupKafka.sh -n <num_kafka_messages_per_batch> -b <num_batches>
```

### 6 - Run the Sink connector worker (multiple workers can be run)
### 5 - Run the Sink connector worker (multiple workers can be run)

The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks)
that parallely process the records from the Kafka partitions for the same topic. We provide a properties file with
Expand All @@ -137,7 +119,7 @@ cd ${KAFKA_HOME}
./bin/connect-distributed.sh ${HUDI_DIR}/hudi-kafka-connect/demo/connect-distributed.properties
```

### 7 - To add the Hudi Sink to the Connector (delete it if you want to re-configure)
### 6 - To add the Hudi Sink to the Connector (delete it if you want to re-configure)

Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following
curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink,
Expand Down Expand Up @@ -188,7 +170,7 @@ total 5168
-rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
```

### 8 - Run async compaction and clustering if scheduled
### 7 - Run async compaction and clustering if scheduled

When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is
running. Inline compaction and clustering are disabled by default due to performance reason. By default, async
Expand Down Expand Up @@ -318,11 +300,36 @@ hoodie.write.concurrency.mode=single_writer
Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled
clustering is going to be executed.

### 9- Querying via Hive
### 8- Querying via Hive

In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore,
that enable querying via Hive, Presto etc.

To ease the deployment of HDFS, Hive Server, Hive Metastore etc. for testing hive sync, we use the docker
containers from the hudi docker demo. Refer to [this link for the setup](https://hudi.apache.org/docs/docker_demo).
Additionally, the docker deploys kafka and zookeeper too, so you do not need to run them explicitly in this setup.

Essentially, follow the steps listed here:

/etc/hosts : The demo references many services running in container by the hostname. Add the following settings to /etc/hosts
```bash
127.0.0.1 adhoc-1
127.0.0.1 adhoc-2
127.0.0.1 namenode
127.0.0.1 datanode1
127.0.0.1 hiveserver
127.0.0.1 hivemetastore
127.0.0.1 kafkabroker
127.0.0.1 sparkmaster
127.0.0.1 zookeeper
```

Bring up the docker containers
```bash
cd ${HUDI_DIR}/docker
./setup_demo.sh
```

Firstly, (re)-install a different connector that is configured to write the Hudi table to Hdfs instead of local filesystem.

```bash
Expand Down
2 changes: 1 addition & 1 deletion hudi-kafka-connect/demo/setupKafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fi

## defaults
rawDataFile=${HUDI_DIR}/docker/demo/data/batch_1.json
kafkaBrokerHostname=kafkabroker
kafkaBrokerHostname=localhost
kafkaTopicName=hudi-test-topic
numKafkaPartitions=4
recordKey=volume
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,26 +280,56 @@ private void endExistingCommit() {

private void onReceiveWriteStatus(ControlMessage message) {
ControlMessage.ParticipantInfo participantInfo = message.getParticipantInfo();
int partition = message.getSenderPartition();
partitionsWriteStatusReceived.put(partition, KafkaConnectUtils.getWriteStatuses(participantInfo));
currentConsumedKafkaOffsets.put(partition, participantInfo.getKafkaOffset());
int partitionId = message.getSenderPartition();
partitionsWriteStatusReceived.put(partitionId, KafkaConnectUtils.getWriteStatuses(participantInfo));
currentConsumedKafkaOffsets.put(partitionId, participantInfo.getKafkaOffset());
if (partitionsWriteStatusReceived.size() >= numPartitions
&& currentState.equals(State.ENDED_COMMIT)) {
// Commit the kafka offsets to the commit file
try {
List<WriteStatus> allWriteStatuses = new ArrayList<>();
partitionsWriteStatusReceived.forEach((key, value) -> allWriteStatuses.addAll(value));
// Commit the last write in Hudi, along with the latest kafka offset
if (!allWriteStatuses.isEmpty()) {
transactionServices.endCommit(currentCommitTime,

long totalErrorRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalErrorRecords).sum();
long totalRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
boolean hasErrors = totalErrorRecords > 0;

if ((!hasErrors || configs.allowCommitOnErrors()) && !allWriteStatuses.isEmpty()) {
boolean success = transactionServices.endCommit(currentCommitTime,
allWriteStatuses,
transformKafkaOffsets(currentConsumedKafkaOffsets));

if (success) {
LOG.info("Commit " + currentCommitTime + " successful!");
currentState = State.WRITE_STATUS_RCVD;
globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
message.getTopicName(),
currentCommitTime));
return;
} else {
LOG.error("Commit " + currentCommitTime + " failed!");
}
} else if (hasErrors) {
LOG.error("Coordinator found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
LOG.error("Printing out the top 100 errors");
allWriteStatuses.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
LOG.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
}
});
} else {
LOG.warn("Empty write statuses were received from all Participants");
}
currentState = State.WRITE_STATUS_RCVD;
globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
message.getTopicName(),
currentCommitTime));

// Submit the next start commit, that will rollback the current commit.
currentState = State.FAILED_COMMIT;
LOG.warn("Current commit " + currentCommitTime + " failed, so starting a new commit after recovery delay");
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
StringUtils.EMPTY_STRING),
RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
} catch (Exception exception) {
LOG.error("Fatal error while committing file", exception);
}
Expand All @@ -310,7 +340,7 @@ private void handleWriteStatusTimeout() {
// If we are still stuck in ENDED_STATE
if (currentState.equals(State.ENDED_COMMIT)) {
currentState = State.WRITE_STATUS_TIMEDOUT;
LOG.warn("Did not receive the Write Status from all partitions");
LOG.warn("Current commit " + currentCommitTime + " failed after a write status timeout, so starting a new commit after recovery delay");
// Submit the next start commit
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
Expand Down Expand Up @@ -365,6 +395,7 @@ private enum State {
INIT,
STARTED_COMMIT,
ENDED_COMMIT,
FAILED_COMMIT,
WRITE_STATUS_RCVD,
WRITE_STATUS_TIMEDOUT,
ACKED_COMMIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface ConnectTransactionServices {

String startCommit();

void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata);
boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata);

Map<String, String> fetchLatestExtraCommitMetadata();
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public class KafkaConnectConfigs extends HoodieConfig {
.defaultValue(HiveSyncTool.class.getName())
.withDocumentation("Meta sync client tool, using comma to separate multi tools");

public static final ConfigProperty<Boolean> ALLOW_COMMIT_ON_ERRORS = ConfigProperty
.key("hoodie.kafka.allow.commit.on.errors")
.defaultValue(true)
.withDocumentation("Commit even when some records failed to be written");

protected KafkaConnectConfigs() {
super();
}
Expand Down Expand Up @@ -136,6 +141,10 @@ public String getMetaSyncClasses() {
return getString(META_SYNC_CLASSES);
}

public Boolean allowCommitOnErrors() {
return getBoolean(ALLOW_COMMIT_ON_ERRORS);
}

public static class Builder {

protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs();
Expand All @@ -160,6 +169,11 @@ public Builder withCoordinatorWriteTimeoutSecs(Long coordinatorWriteTimeoutSecs)
return this;
}

public Builder withAllowCommitOnErrors(Boolean allowCommitOnErrors) {
connectConfigs.setValue(ALLOW_COMMIT_ON_ERRORS, String.valueOf(allowCommitOnErrors));
return this;
}

// Kafka connect task are passed with props with type Map<>
public Builder withProperties(Map<?, ?> properties) {
connectConfigs.getProps().putAll(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic

public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throws HoodieException {
this.connectConfigs = connectConfigs;
// This is the writeConfig for the Transaction Coordinator
this.writeConfig = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA)
.withProperties(connectConfigs.getProps())
Expand Down Expand Up @@ -122,20 +123,23 @@ public String startCommit() {
}

@Override
public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
LOG.info("Ending Hudi commit " + commitTime);

// Schedule clustering and compaction as needed.
if (writeConfig.isAsyncClusteringEnabled()) {
javaClient.scheduleClustering(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs));
}
if (isAsyncCompactionEnabled()) {
javaClient.scheduleCompaction(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs));
public boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
boolean success = javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
if (success) {
LOG.info("Ending Hudi commit " + commitTime);

// Schedule clustering and compaction as needed.
if (writeConfig.isAsyncClusteringEnabled()) {
javaClient.scheduleClustering(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs));
}
if (isAsyncCompactionEnabled()) {
javaClient.scheduleCompaction(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs));
}
syncMeta();
}
syncMeta();
return success;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.KafkaConnectFileIdPrefixProvider;
Expand Down Expand Up @@ -73,7 +75,7 @@ public KafkaConnectWriterProvider(
this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(
new TypedProperties(connectConfigs.getProps()));

// Create the write client to write some records in
// This is the writeConfig for the writers for the individual Transaction Coordinators
writeConfig = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA)
.withProperties(connectConfigs.getProps())
Expand All @@ -84,6 +86,14 @@ public KafkaConnectWriterProvider(
.withSchema(schemaProvider.getSourceSchema().toString())
.withAutoCommit(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
// participants should not trigger table services, and leave it to the coordinator
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withAutoClean(false)
.withAutoArchive(false)
.withInlineCompaction(false).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClustering(false)
.build())
.build();

context = new HoodieJavaEngineContext(hadoopConf);
Expand Down
Loading