From 455e6911deb1b475b3d69a4fd0cc8b9e4aa34042 Mon Sep 17 00:00:00 2001 From: Rajesh Mahindra Date: Fri, 3 Dec 2021 12:36:19 -0800 Subject: [PATCH 1/4] Fix kafka connect readme --- hudi-kafka-connect/README.md | 71 ++++++++++--------- hudi-kafka-connect/demo/setupKafka.sh | 2 +- .../KafkaConnectTransactionServices.java | 9 +++ .../writers/KafkaConnectWriterProvider.java | 2 +- 4 files changed, 50 insertions(+), 34 deletions(-) diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md index 0bf53d6c41ca..9d63f5a546e5 100644 --- a/hudi-kafka-connect/README.md +++ b/hudi-kafka-connect/README.md @@ -54,35 +54,17 @@ mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib ``` -### 2 - Set up the docker containers - -To run the connect locally, we need kafka, zookeeper, hdfs, hive etc. To make the setup easier, we use the docker -containers from the hudi docker demo. Refer to [this link for the setup](https://hudi.apache.org/docs/docker_demo) - -Essentially, follow the steps listed here: - -/etc/hosts : The demo references many services running in container by the hostname. Add the following settings to /etc/hosts -```bash -127.0.0.1 adhoc-1 -127.0.0.1 adhoc-2 -127.0.0.1 namenode -127.0.0.1 datanode1 -127.0.0.1 hiveserver -127.0.0.1 hivemetastore -127.0.0.1 kafkabroker -127.0.0.1 sparkmaster -127.0.0.1 zookeeper -``` - -Bring up the docker containers +Set up a Kafka broker locally. Download the latest apache kafka from [here](https://kafka.apache.org/downloads). +Once downloaded and built, run the Zookeeper server and Kafka server using the command line tools. ```bash -cd ${HUDI_DIR}/docker -./setup_demo.sh +export KAFKA_HOME=/path/to/kafka_install_dir +cd $KAFKA_HOME +./bin/zookeeper-server-start.sh ./config/zookeeper.properties +./bin/kafka-server-start.sh ./config/server.properties ``` +Wait until the kafka cluster is up and running. -The schema registry and kafka connector can be run from host system directly (mac/ linux). - -### 3 - Set up the schema registry +### 2 - Set up the schema registry Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema registries, we use Confluent schema registry. Download the @@ -97,7 +79,7 @@ cd $CONFLUENT_DIR ./bin/schema-registry-start etc/schema-registry/schema-registry.properties ``` -### 4 - Create the Hudi Control Topic for Coordination of the transactions +### 3 - Create the Hudi Control Topic for Coordination of the transactions The control topic should only have `1` partition, since its used to coordinate the Hudi write transactions across the multiple Connect tasks. @@ -107,7 +89,7 @@ cd $KAFKA_HOME ./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 ``` -### 5 - Create the Hudi Topic for the Sink and insert data into the topic +### 4 - Create the Hudi Topic for the Sink and insert data into the topic Open a terminal to execute the following command: @@ -123,7 +105,7 @@ to generate, with each batch containing a number of messages and idle time betwe bash setupKafka.sh -n -b ``` -### 6 - Run the Sink connector worker (multiple workers can be run) +### 5 - Run the Sink connector worker (multiple workers can be run) The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks) that parallely process the records from the Kafka partitions for the same topic. We provide a properties file with @@ -137,7 +119,7 @@ cd ${KAFKA_HOME} ./bin/connect-distributed.sh ${HUDI_DIR}/hudi-kafka-connect/demo/connect-distributed.properties ``` -### 7 - To add the Hudi Sink to the Connector (delete it if you want to re-configure) +### 6 - To add the Hudi Sink to the Connector (delete it if you want to re-configure) Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink, @@ -188,7 +170,7 @@ total 5168 -rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet ``` -### 8 - Run async compaction and clustering if scheduled +### 7 - Run async compaction and clustering if scheduled When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is running. Inline compaction and clustering are disabled by default due to performance reason. By default, async @@ -318,11 +300,36 @@ hoodie.write.concurrency.mode=single_writer Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled clustering is going to be executed. -### 9- Querying via Hive +### 8- Querying via Hive In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore, that enable querying via Hive, Presto etc. +To ease the deployment of HDFS, Hive Server, Hive Metastore etc. for testing hive sync, we use the docker +containers from the hudi docker demo. Refer to [this link for the setup](https://hudi.apache.org/docs/docker_demo). +Additionally, the docker deploys kafka and zookeeper too, so you do not need to run them explicitly in this setup. + +Essentially, follow the steps listed here: + +/etc/hosts : The demo references many services running in container by the hostname. Add the following settings to /etc/hosts +```bash +127.0.0.1 adhoc-1 +127.0.0.1 adhoc-2 +127.0.0.1 namenode +127.0.0.1 datanode1 +127.0.0.1 hiveserver +127.0.0.1 hivemetastore +127.0.0.1 kafkabroker +127.0.0.1 sparkmaster +127.0.0.1 zookeeper +``` + +Bring up the docker containers +```bash +cd ${HUDI_DIR}/docker +./setup_demo.sh +``` + Firstly, (re)-install a different connector that is configured to write the Hudi table to Hdfs instead of local filesystem. ```bash diff --git a/hudi-kafka-connect/demo/setupKafka.sh b/hudi-kafka-connect/demo/setupKafka.sh index bc615a567151..49155649cea3 100755 --- a/hudi-kafka-connect/demo/setupKafka.sh +++ b/hudi-kafka-connect/demo/setupKafka.sh @@ -50,7 +50,7 @@ fi ## defaults rawDataFile=${HUDI_DIR}/docker/demo/data/batch_1.json -kafkaBrokerHostname=kafkabroker +kafkaBrokerHostname=localhost kafkaTopicName=hudi-test-topic numKafkaPartitions=4 recordKey=volume diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java index b18e7c695f9c..957bd4659500 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java @@ -27,11 +27,14 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.connect.transaction.TransactionCoordinator; import org.apache.hudi.connect.utils.KafkaConnectUtils; @@ -77,9 +80,15 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throws HoodieException { this.connectConfigs = connectConfigs; + // This is the writeConfig for the Transaction Coordinator this.writeConfig = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.JAVA) .withProperties(connectConfigs.getProps()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + // we will trigger cleaning manually, to control the instant times + .withAutoClean(false) + // we will trigger compaction manually, to control the instant times + .withInlineCompaction(false).build()) .build(); tableBasePath = writeConfig.getBasePath(); diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java index 5bac1f0772bc..ec1e4a6f55b0 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java @@ -73,7 +73,7 @@ public KafkaConnectWriterProvider( this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator( new TypedProperties(connectConfigs.getProps())); - // Create the write client to write some records in + // This is the writeConfig for the writers for the individual Transaction Coordinators writeConfig = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.JAVA) .withProperties(connectConfigs.getProps()) From 41126e2834bfc7721b05032da711f4e626c2f2f8 Mon Sep 17 00:00:00 2001 From: Rajesh Mahindra Date: Fri, 3 Dec 2021 15:03:04 -0800 Subject: [PATCH 2/4] Fix handling of errors in write records for kafka connect --- .../ConnectTransactionCoordinator.java | 55 ++++++++--- .../writers/ConnectTransactionServices.java | 2 +- .../KafkaConnectTransactionServices.java | 37 ++++--- .../writers/KafkaConnectWriterProvider.java | 10 ++ .../TestConnectTransactionCoordinator.java | 96 +++++++++++++------ .../MockConnectTransactionServices.java | 3 +- .../resources/log4j-surefire-quiet.properties | 30 ++++++ .../test/resources/log4j-surefire.properties | 31 ++++++ 8 files changed, 198 insertions(+), 66 deletions(-) create mode 100644 hudi-kafka-connect/src/test/resources/log4j-surefire-quiet.properties create mode 100644 hudi-kafka-connect/src/test/resources/log4j-surefire.properties diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java index 7acd875b6bee..f4068355e6c7 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java @@ -280,26 +280,56 @@ private void endExistingCommit() { private void onReceiveWriteStatus(ControlMessage message) { ControlMessage.ParticipantInfo participantInfo = message.getParticipantInfo(); - int partition = message.getSenderPartition(); - partitionsWriteStatusReceived.put(partition, KafkaConnectUtils.getWriteStatuses(participantInfo)); - currentConsumedKafkaOffsets.put(partition, participantInfo.getKafkaOffset()); + int partitionId = message.getSenderPartition(); + partitionsWriteStatusReceived.put(partitionId, KafkaConnectUtils.getWriteStatuses(participantInfo)); + currentConsumedKafkaOffsets.put(partitionId, participantInfo.getKafkaOffset()); if (partitionsWriteStatusReceived.size() >= numPartitions && currentState.equals(State.ENDED_COMMIT)) { // Commit the kafka offsets to the commit file try { List allWriteStatuses = new ArrayList<>(); partitionsWriteStatusReceived.forEach((key, value) -> allWriteStatuses.addAll(value)); - // Commit the last write in Hudi, along with the latest kafka offset - if (!allWriteStatuses.isEmpty()) { - transactionServices.endCommit(currentCommitTime, + + long totalErrorRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalErrorRecords).sum(); + long totalRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum(); + boolean hasErrors = totalErrorRecords > 0; + + if (!hasErrors && !allWriteStatuses.isEmpty()) { + boolean success = transactionServices.endCommit(currentCommitTime, allWriteStatuses, transformKafkaOffsets(currentConsumedKafkaOffsets)); + + if (success) { + LOG.info("Commit " + currentCommitTime + " successful!"); + currentState = State.WRITE_STATUS_RCVD; + globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets); + submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT, + message.getTopicName(), + currentCommitTime)); + return; + } else { + LOG.error("Commit " + currentCommitTime + " failed!"); + } + } else if (hasErrors) { + LOG.error("Coordinator found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); + LOG.error("Printing out the top 100 errors"); + allWriteStatuses.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> { + LOG.error("Global error :", ws.getGlobalError()); + if (ws.getErrors().size() > 0) { + ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value)); + } + }); + } else { + LOG.warn("Empty write statuses were received from all Participants"); } - currentState = State.WRITE_STATUS_RCVD; - globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets); - submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT, - message.getTopicName(), - currentCommitTime)); + + // Submit the next start commit, that will rollback the current commit. + currentState = State.FAILED_COMMIT; + LOG.warn("Current commit " + currentCommitTime + " failed, so starting a new commit after recovery delay"); + submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, + partition.topic(), + StringUtils.EMPTY_STRING), + RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS); } catch (Exception exception) { LOG.error("Fatal error while committing file", exception); } @@ -310,7 +340,7 @@ private void handleWriteStatusTimeout() { // If we are still stuck in ENDED_STATE if (currentState.equals(State.ENDED_COMMIT)) { currentState = State.WRITE_STATUS_TIMEDOUT; - LOG.warn("Did not receive the Write Status from all partitions"); + LOG.warn("Current commit " + currentCommitTime + " failed after a write status timeout, so starting a new commit after recovery delay"); // Submit the next start commit submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, partition.topic(), @@ -365,6 +395,7 @@ private enum State { INIT, STARTED_COMMIT, ENDED_COMMIT, + FAILED_COMMIT, WRITE_STATUS_RCVD, WRITE_STATUS_TIMEDOUT, ACKED_COMMIT, diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java index b36e1f1c7ab5..2ce44ff802bc 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java @@ -32,7 +32,7 @@ public interface ConnectTransactionServices { String startCommit(); - void endCommit(String commitTime, List writeStatuses, Map extraMetadata); + boolean endCommit(String commitTime, List writeStatuses, Map extraMetadata); Map fetchLatestExtraCommitMetadata(); } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java index 957bd4659500..cca738a70eba 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java @@ -27,14 +27,11 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; -import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.connect.transaction.TransactionCoordinator; import org.apache.hudi.connect.utils.KafkaConnectUtils; @@ -84,11 +81,6 @@ public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throw this.writeConfig = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.JAVA) .withProperties(connectConfigs.getProps()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - // we will trigger cleaning manually, to control the instant times - .withAutoClean(false) - // we will trigger compaction manually, to control the instant times - .withInlineCompaction(false).build()) .build(); tableBasePath = writeConfig.getBasePath(); @@ -131,20 +123,23 @@ public String startCommit() { } @Override - public void endCommit(String commitTime, List writeStatuses, Map extraMetadata) { - javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata)); - LOG.info("Ending Hudi commit " + commitTime); - - // Schedule clustering and compaction as needed. - if (writeConfig.isAsyncClusteringEnabled()) { - javaClient.scheduleClustering(Option.empty()).ifPresent( - instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs)); - } - if (isAsyncCompactionEnabled()) { - javaClient.scheduleCompaction(Option.empty()).ifPresent( - instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs)); + public boolean endCommit(String commitTime, List writeStatuses, Map extraMetadata) { + boolean success = javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata)); + if (success) { + LOG.info("Ending Hudi commit " + commitTime); + + // Schedule clustering and compaction as needed. + if (writeConfig.isAsyncClusteringEnabled()) { + javaClient.scheduleClustering(Option.empty()).ifPresent( + instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs)); + } + if (isAsyncCompactionEnabled()) { + javaClient.scheduleCompaction(Option.empty()).ifPresent( + instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs)); + } + syncMeta(); } - syncMeta(); + return success; } @Override diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java index ec1e4a6f55b0..6ab0469738df 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java @@ -27,6 +27,8 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.connect.KafkaConnectFileIdPrefixProvider; @@ -84,6 +86,14 @@ public KafkaConnectWriterProvider( .withSchema(schemaProvider.getSourceSchema().toString()) .withAutoCommit(false) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + // participants should not trigger table services, and leave it to the coordinator + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withAutoClean(false) + .withAutoArchive(false) + .withInlineCompaction(false).build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withInlineClustering(false) + .build()) .build(); context = new HoodieJavaEngineContext(hadoopConf); diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java index 6e049c6118b0..48a9386cb267 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java @@ -50,7 +50,7 @@ public class TestConnectTransactionCoordinator { private static final String TOPIC_NAME = "kafka-connect-test-topic"; - private static final int NUM_PARTITIONS = 4; + private static final int TOTAL_KAFKA_PARTITIONS = 4; private static final int MAX_COMMIT_ROUNDS = 5; private static final int TEST_TIMEOUT_SECS = 60; @@ -83,7 +83,7 @@ public void testSingleCommitScenario(MockParticipant.TestScenarios scenario) thr new TopicPartition(TOPIC_NAME, 0), kafkaControlAgent, transactionServices, - (bootstrapServers, topicName) -> NUM_PARTITIONS); + (bootstrapServers, topicName) -> TOTAL_KAFKA_PARTITIONS); coordinator.start(); latch.await(TEST_TIMEOUT_SECS, TimeUnit.SECONDS); @@ -119,7 +119,7 @@ public MockParticipant(MockKafkaControlAgent kafkaControlAgent, this.latch = latch; this.testScenario = testScenario; this.maxNumberCommitRounds = maxNumberCommitRounds; - this.partition = new TopicPartition(TOPIC_NAME, (NUM_PARTITIONS - 1)); + this.partition = new TopicPartition(TOPIC_NAME, (TOTAL_KAFKA_PARTITIONS - 1)); this.kafkaOffsetsCommitted = new HashMap<>(); expectedMsgType = ControlMessage.EventType.START_COMMIT; numberCommitRounds = 0; @@ -162,39 +162,33 @@ public long getLastKafkaCommittedOffset() { private void testScenarios(ControlMessage message) { assertEquals(expectedMsgType, message.getType()); - switch (message.getType()) { case START_COMMIT: expectedMsgType = ControlMessage.EventType.END_COMMIT; break; case END_COMMIT: assertEquals(kafkaOffsetsCommitted, message.getCoordinatorInfo().getGlobalKafkaCommitOffsets()); - int numSuccessPartitions; + int numPartitionsThatReportWriteStatus; Map kafkaOffsets = new HashMap<>(); List controlEvents = new ArrayList<>(); - // Prepare the WriteStatuses for all partitions - for (int i = 1; i <= NUM_PARTITIONS; i++) { - try { - long kafkaOffset = (long) (Math.random() * 10000); - kafkaOffsets.put(i, kafkaOffset); - ControlMessage event = successWriteStatus( - message.getCommitTime(), - new TopicPartition(TOPIC_NAME, i), - kafkaOffset); - controlEvents.add(event); - } catch (Exception exception) { - throw new HoodieException("Fatal error sending control event to Coordinator"); - } - } - switch (testScenario) { case ALL_CONNECT_TASKS_SUCCESS: - numSuccessPartitions = NUM_PARTITIONS; + composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents); + numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; + // This commit round should succeed, and the kafka offsets getting committed kafkaOffsetsCommitted.putAll(kafkaOffsets); expectedMsgType = ControlMessage.EventType.ACK_COMMIT; break; + case SUBSET_WRITE_STATUS_FAILED: + composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents); + numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; + // This commit round should fail, and a new commit round should start without kafka offsets getting committed + expectedMsgType = ControlMessage.EventType.START_COMMIT; + break; case SUBSET_CONNECT_TASKS_FAILED: - numSuccessPartitions = NUM_PARTITIONS / 2; + composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents); + numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS / 2; + // This commit round should fail, and a new commit round should start without kafka offsets getting committed expectedMsgType = ControlMessage.EventType.START_COMMIT; break; default: @@ -202,7 +196,7 @@ private void testScenarios(ControlMessage message) { } // Send events based on test scenario - for (int i = 0; i < numSuccessPartitions; i++) { + for (int i = 0; i < numPartitionsThatReportWriteStatus; i++) { kafkaControlAgent.publishMessage(controlEvents.get(i)); } break; @@ -227,18 +221,35 @@ private void testScenarios(ControlMessage message) { public enum TestScenarios { SUBSET_CONNECT_TASKS_FAILED, + SUBSET_WRITE_STATUS_FAILED, ALL_CONNECT_TASKS_SUCCESS } - private static ControlMessage successWriteStatus(String commitTime, - TopicPartition partition, - long kafkaOffset) throws Exception { - // send WS - WriteStatus writeStatus = new WriteStatus(); - WriteStatus status = new WriteStatus(false, 1.0); - for (int i = 0; i < 1000; i++) { - status.markSuccess(mock(HoodieRecord.class), Option.empty()); + private static void composeControlEvent(String commitTime, boolean shouldIncludeFailedRecords, Map kafkaOffsets, List controlEvents) { + // Prepare the WriteStatuses for all partitions + for (int i = 1; i <= TOTAL_KAFKA_PARTITIONS; i++) { + try { + long kafkaOffset = (long) (Math.random() * 10000); + kafkaOffsets.put(i, kafkaOffset); + ControlMessage event = composeWriteStatusResponse( + commitTime, + new TopicPartition(TOPIC_NAME, i), + kafkaOffset, + shouldIncludeFailedRecords); + controlEvents.add(event); + } catch (Exception exception) { + throw new HoodieException("Fatal error sending control event to Coordinator"); + } } + } + + private static ControlMessage composeWriteStatusResponse(String commitTime, + TopicPartition partition, + long kafkaOffset, + boolean includeFailedRecords) throws Exception { + // send WS + WriteStatus writeStatus = includeFailedRecords ? getSubsetFailedRecordsWriteStatus() : getAllSuccessfulRecordsWriteStatus(); + return ControlMessage.newBuilder() .setType(ControlMessage.EventType.WRITE_STATUS) .setTopicName(partition.topic()) @@ -255,4 +266,27 @@ private static ControlMessage successWriteStatus(String commitTime, ).build(); } } + + private static WriteStatus getAllSuccessfulRecordsWriteStatus() { + // send WS + WriteStatus status = new WriteStatus(false, 0.0); + for (int i = 0; i < 1000; i++) { + status.markSuccess(mock(HoodieRecord.class), Option.empty()); + } + return status; + } + + private static WriteStatus getSubsetFailedRecordsWriteStatus() { + // send WS + WriteStatus status = new WriteStatus(false, 0.0); + for (int i = 0; i < 1000; i++) { + if (i % 10 == 0) { + status.markFailure(mock(HoodieRecord.class), new Throwable("Error writing record on disk"), Option.empty()); + } else { + status.markSuccess(mock(HoodieRecord.class), Option.empty()); + } + } + status.setGlobalError(new Throwable("More than one records failed to be written to storage")); + return status; + } } diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java index 6994c6554d9a..b3314ade3d5b 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java @@ -46,8 +46,9 @@ public String startCommit() { } @Override - public void endCommit(String commitTime, List writeStatuses, Map extraMetadata) { + public boolean endCommit(String commitTime, List writeStatuses, Map extraMetadata) { assertEquals(String.valueOf(this.commitTime), commitTime); + return true; } @Override diff --git a/hudi-kafka-connect/src/test/resources/log4j-surefire-quiet.properties b/hudi-kafka-connect/src/test/resources/log4j-surefire-quiet.properties new file mode 100644 index 000000000000..ca0a50c84270 --- /dev/null +++ b/hudi-kafka-connect/src/test/resources/log4j-surefire-quiet.properties @@ -0,0 +1,30 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache.hudi=DEBUG +log4j.logger.org.apache.hadoop.hbase=ERROR + +# CONSOLE is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# CONSOLE uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-kafka-connect/src/test/resources/log4j-surefire.properties b/hudi-kafka-connect/src/test/resources/log4j-surefire.properties new file mode 100644 index 000000000000..c5bdf75ae2ae --- /dev/null +++ b/hudi-kafka-connect/src/test/resources/log4j-surefire.properties @@ -0,0 +1,31 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache=INFO +log4j.logger.org.apache.hudi=DEBUG +log4j.logger.org.apache.hadoop.hbase=ERROR + +# A1 is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL \ No newline at end of file From f2394d571aa179f3532ab8fc772235131ed4cb0c Mon Sep 17 00:00:00 2001 From: Rajesh Mahindra Date: Fri, 3 Dec 2021 16:15:43 -0800 Subject: [PATCH 3/4] By default, ensure we skip error records and keep the pipeline alive --- .../ConnectTransactionCoordinator.java | 2 +- .../connect/writers/KafkaConnectConfigs.java | 14 +++++++++++++ .../TestConnectTransactionCoordinator.java | 21 +++++++++++++++---- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java index f4068355e6c7..14fd880b1c05 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java @@ -294,7 +294,7 @@ private void onReceiveWriteStatus(ControlMessage message) { long totalRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum(); boolean hasErrors = totalErrorRecords > 0; - if (!hasErrors && !allWriteStatuses.isEmpty()) { + if ((!hasErrors || configs.allowCommitOnErrors()) && !allWriteStatuses.isEmpty()) { boolean success = transactionServices.endCommit(currentCommitTime, allWriteStatuses, transformKafkaOffsets(currentConsumedKafkaOffsets)); diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java index 692edf75a2a9..ec0345104680 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java @@ -88,6 +88,11 @@ public class KafkaConnectConfigs extends HoodieConfig { .defaultValue(HiveSyncTool.class.getName()) .withDocumentation("Meta sync client tool, using comma to separate multi tools"); + public static final ConfigProperty ALLOW_COMMIT_ON_ERRORS = ConfigProperty + .key("hoodie.kafka.allow.commit.on.errors") + .defaultValue(true) + .withDocumentation("Commit even when some records failed to be written"); + protected KafkaConnectConfigs() { super(); } @@ -136,6 +141,10 @@ public String getMetaSyncClasses() { return getString(META_SYNC_CLASSES); } + public Boolean allowCommitOnErrors() { + return getBoolean(ALLOW_COMMIT_ON_ERRORS); + } + public static class Builder { protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs(); @@ -160,6 +169,11 @@ public Builder withCoordinatorWriteTimeoutSecs(Long coordinatorWriteTimeoutSecs) return this; } + public Builder withAllowCommitOnErrors(Boolean allowCommitOnErrors) { + connectConfigs.setValue(ALLOW_COMMIT_ON_ERRORS, String.valueOf(allowCommitOnErrors)); + return this; + } + // Kafka connect task are passed with props with type Map<> public Builder withProperties(Map properties) { connectConfigs.getProps().putAll(properties); diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java index 48a9386cb267..f003fe9fa645 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java @@ -63,10 +63,6 @@ public class TestConnectTransactionCoordinator { @BeforeEach public void setUp() throws Exception { transactionServices = new MockConnectTransactionServices(); - configs = KafkaConnectConfigs.newBuilder() - .withCommitIntervalSecs(1L) - .withCoordinatorWriteTimeoutSecs(1L) - .build(); latch = new CountDownLatch(1); } @@ -77,6 +73,15 @@ public void testSingleCommitScenario(MockParticipant.TestScenarios scenario) thr participant = new MockParticipant(kafkaControlAgent, latch, scenario, MAX_COMMIT_ROUNDS); participant.start(); + KafkaConnectConfigs.Builder configBuilder = KafkaConnectConfigs.newBuilder() + .withCommitIntervalSecs(1L) + .withCoordinatorWriteTimeoutSecs(1L); + + if (scenario.equals(MockParticipant.TestScenarios.SUBSET_WRITE_STATUS_FAILED)) { + configBuilder.withAllowCommitOnErrors(false); + } + configs = configBuilder.build(); + // Test the coordinator using the mock participant TransactionCoordinator coordinator = new ConnectTransactionCoordinator( configs, @@ -179,6 +184,13 @@ private void testScenarios(ControlMessage message) { kafkaOffsetsCommitted.putAll(kafkaOffsets); expectedMsgType = ControlMessage.EventType.ACK_COMMIT; break; + case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED: + composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents); + numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; + // Despite error records, this commit round should succeed, and the kafka offsets getting committed + kafkaOffsetsCommitted.putAll(kafkaOffsets); + expectedMsgType = ControlMessage.EventType.ACK_COMMIT; + break; case SUBSET_WRITE_STATUS_FAILED: composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents); numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; @@ -222,6 +234,7 @@ private void testScenarios(ControlMessage message) { public enum TestScenarios { SUBSET_CONNECT_TASKS_FAILED, SUBSET_WRITE_STATUS_FAILED, + SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED, ALL_CONNECT_TASKS_SUCCESS } From 3927bde1edd015f0df9ad44202d8c3ddfc2ad0c0 Mon Sep 17 00:00:00 2001 From: rmahindra123 <76502047+rmahindra123@users.noreply.github.com> Date: Fri, 3 Dec 2021 16:23:13 -0800 Subject: [PATCH 4/4] Fix indentation --- hudi-kafka-connect/src/test/resources/log4j-surefire.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-kafka-connect/src/test/resources/log4j-surefire.properties b/hudi-kafka-connect/src/test/resources/log4j-surefire.properties index c5bdf75ae2ae..32af462093ae 100644 --- a/hudi-kafka-connect/src/test/resources/log4j-surefire.properties +++ b/hudi-kafka-connect/src/test/resources/log4j-surefire.properties @@ -28,4 +28,4 @@ log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true log4j.appender.CONSOLE.filter.a.LevelMin=WARN -log4j.appender.CONSOLE.filter.a.LevelMax=FATAL \ No newline at end of file +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL