diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java index 19556dca45ead..1d32d03c371bd 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java @@ -149,7 +149,7 @@ private void handleStartCommit(ControlMessage message) { LOG.info("Started a new transaction after receiving START_COMMIT for commit " + currentCommitTime); try { ongoingTransactionInfo = new TransactionInfo<>(currentCommitTime, writerProvider.getWriter(currentCommitTime)); - ongoingTransactionInfo.setLastWrittenKafkaOffset(committedKafkaOffset); + ongoingTransactionInfo.setExpectedKafkaOffset(committedKafkaOffset); } catch (Exception exception) { LOG.warn("Error received while starting a new transaction", exception); } @@ -188,7 +188,7 @@ private void handleEndCommit(ControlMessage message) { .setParticipantInfo( ControlMessage.ParticipantInfo.newBuilder() .setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatuses)) - .setKafkaOffset(ongoingTransactionInfo.getLastWrittenKafkaOffset()) + .setKafkaOffset(ongoingTransactionInfo.getExpectedKafkaOffset()) .build() ).build(); @@ -201,9 +201,9 @@ private void handleEndCommit(ControlMessage message) { } private void handleAckCommit(ControlMessage message) { - // Update lastKafkCommitedOffset locally. - if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getLastWrittenKafkaOffset()) { - committedKafkaOffset = ongoingTransactionInfo.getLastWrittenKafkaOffset(); + // Update committedKafkaOffset that tracks the last committed kafka offset locally. + if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getExpectedKafkaOffset()) { + committedKafkaOffset = ongoingTransactionInfo.getExpectedKafkaOffset(); } syncKafkaOffsetWithLeader(message); cleanupOngoingTransaction(); @@ -215,12 +215,22 @@ private void writeRecords() { try { SinkRecord record = buffer.peek(); if (record != null - && record.kafkaOffset() >= ongoingTransactionInfo.getLastWrittenKafkaOffset()) { + && record.kafkaOffset() == ongoingTransactionInfo.getExpectedKafkaOffset()) { ongoingTransactionInfo.getWriter().writeRecord(record); - ongoingTransactionInfo.setLastWrittenKafkaOffset(record.kafkaOffset() + 1); - } else if (record != null && record.kafkaOffset() < committedKafkaOffset) { - LOG.warn(String.format("Received a kafka record with offset %s prior to last committed offset %s for partition %s", - record.kafkaOffset(), ongoingTransactionInfo.getLastWrittenKafkaOffset(), + ongoingTransactionInfo.setExpectedKafkaOffset(record.kafkaOffset() + 1); + } else if (record != null && record.kafkaOffset() > ongoingTransactionInfo.getExpectedKafkaOffset()) { + LOG.warn(String.format("Received a kafka record with offset %s above the next expected kafka offset %s for partition %s, " + + "hence resetting the kafka offset to %s", + record.kafkaOffset(), + ongoingTransactionInfo.getExpectedKafkaOffset(), + partition, + ongoingTransactionInfo.getExpectedKafkaOffset())); + context.offset(partition, ongoingTransactionInfo.getExpectedKafkaOffset()); + } else if (record != null && record.kafkaOffset() < ongoingTransactionInfo.getExpectedKafkaOffset()) { + LOG.warn(String.format("Received a kafka record with offset %s below the next expected kafka offset %s for partition %s, " + + "no action will be taken but this record will be ignored since its already written", + record.kafkaOffset(), + ongoingTransactionInfo.getExpectedKafkaOffset(), partition)); } buffer.poll(); @@ -250,13 +260,24 @@ private void syncKafkaOffsetWithLeader(ControlMessage message) { // Recover kafka committed offsets, treating the commit offset from the coordinator // as the source of truth if (coordinatorCommittedKafkaOffset != null && coordinatorCommittedKafkaOffset >= 0) { + // Debug only messages if (coordinatorCommittedKafkaOffset != committedKafkaOffset) { - LOG.warn(String.format("Recovering the kafka offset for partition %s to offset %s instead of local offset %s", - partition.partition(), coordinatorCommittedKafkaOffset, committedKafkaOffset)); - context.offset(partition, coordinatorCommittedKafkaOffset); + LOG.warn(String.format("The coordinator offset for kafka partition %s is %d while the locally committed offset is %d, " + + "hence resetting the local committed offset to the coordinator provided one to ensure consistency", + partition, + coordinatorCommittedKafkaOffset, + committedKafkaOffset)); } committedKafkaOffset = coordinatorCommittedKafkaOffset; + return; } + } else { + LOG.warn(String.format("The coordinator offset for kafka partition %s is not present while the locally committed offset is %d, " + + "hence resetting the local committed offset to 0 to avoid data loss", + partition, + committedKafkaOffset)); } + // If the coordinator does not have a committed offset for this partition, reset to zero offset. + committedKafkaOffset = 0; } } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java index 9c7bbf1e83657..7c1852e5fa5c6 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java @@ -29,13 +29,13 @@ public class TransactionInfo { private final String commitTime; private final ConnectWriter writer; - private long lastWrittenKafkaOffset; + private long expectedKafkaOffset; private boolean commitInitiated; public TransactionInfo(String commitTime, ConnectWriter writer) { this.commitTime = commitTime; this.writer = writer; - this.lastWrittenKafkaOffset = 0; + this.expectedKafkaOffset = 0; this.commitInitiated = false; } @@ -47,16 +47,16 @@ public ConnectWriter getWriter() { return writer; } - public long getLastWrittenKafkaOffset() { - return lastWrittenKafkaOffset; + public long getExpectedKafkaOffset() { + return expectedKafkaOffset; } public boolean isCommitInitiated() { return commitInitiated; } - public void setLastWrittenKafkaOffset(long lastWrittenKafkaOffset) { - this.lastWrittenKafkaOffset = lastWrittenKafkaOffset; + public void setExpectedKafkaOffset(long expectedKafkaOffset) { + this.expectedKafkaOffset = expectedKafkaOffset; } public void commitInitiated() { diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java index 5d551a79fa03d..36da6ac32e01e 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java @@ -26,7 +26,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.helper.MockKafkaControlAgent; import org.apache.hudi.helper.TestHudiWriterProvider; -import org.apache.hudi.helper.TestKafkaConnect; +import org.apache.hudi.helper.MockKafkaConnect; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.BeforeEach; @@ -41,23 +41,24 @@ public class TestConnectTransactionParticipant { private static final String TOPIC_NAME = "kafka-connect-test-topic"; + private static final int NUM_RECORDS_BATCH = 5; private static final int PARTITION_NUMBER = 4; private ConnectTransactionParticipant participant; - private MockCoordinator coordinator; + private MockCoordinator mockCoordinator; private TopicPartition partition; private KafkaConnectConfigs configs; private KafkaControlAgent kafkaControlAgent; private TestHudiWriterProvider testHudiWriterProvider; - private TestKafkaConnect testKafkaConnect; + private MockKafkaConnect mockKafkaConnect; @BeforeEach public void setUp() throws Exception { partition = new TopicPartition(TOPIC_NAME, PARTITION_NUMBER); kafkaControlAgent = new MockKafkaControlAgent(); - testKafkaConnect = new TestKafkaConnect(partition); - coordinator = new MockCoordinator(kafkaControlAgent); - coordinator.start(); + mockKafkaConnect = new MockKafkaConnect(partition); + mockCoordinator = new MockCoordinator(kafkaControlAgent); + mockCoordinator.start(); configs = KafkaConnectConfigs.newBuilder() .build(); initializeParticipant(); @@ -66,26 +67,19 @@ public void setUp() throws Exception { @ParameterizedTest @EnumSource(value = CoordinatorFailureTestScenarios.class) public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios testScenario) { - int expectedRecordsWritten = 0; try { + assertTrue(mockKafkaConnect.isPaused()); switch (testScenario) { case REGULAR_SCENARIO: - expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); break; case COORDINATOR_FAILED_AFTER_START_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); - testKafkaConnect.putRecordsToParticipant(); + triggerAndProcessStartCommit(); // Coordinator Failed initializeCoordinator(); break; case COORDINATOR_FAILED_AFTER_END_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT); - expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); + triggerAndProcessStartCommit(); + triggerAndProcessEndCommit(); // Coordinator Failed initializeCoordinator(); break; @@ -93,18 +87,8 @@ public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios t throw new HoodieException("Unknown test scenario " + testScenario); } - // Regular Case or Coordinator Recovery Case - coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); - expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isResumed()); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); - // Ensure Coordinator and participant are in sync in the kafka offsets - assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); + // Despite failures in the previous commit, a fresh 2-phase commit should PASS. + testTwoPhaseCommit(0); } catch (Exception exception) { throw new HoodieException("Unexpected test failure ", exception); } @@ -114,62 +98,38 @@ public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios t @ParameterizedTest @EnumSource(value = ParticipantFailureTestScenarios.class) public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios testScenario) { - int expectedRecordsWritten = 0; try { + int currentKafkaOffset = 0; switch (testScenario) { case FAILURE_BEFORE_START_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - // Participant fails + // Participant failing after START_COMMIT will not write any data in this commit cycle. initializeParticipant(); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); - expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isResumed()); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); - // Ensure Coordinator and participant are in sync in the kafka offsets - assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); break; case FAILURE_AFTER_START_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - // Participant fails + triggerAndProcessStartCommit(); + // Participant failing after START_COMMIT will not write any data in this commit cycle. initializeParticipant(); - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); - // Ensure Coordinator and participant are in sync in the kafka offsets - assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); + triggerAndProcessEndCommit(); + triggerAndProcessAckCommit(); break; case FAILURE_AFTER_END_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - // Participant fails + // Regular Case or Coordinator Recovery Case + triggerAndProcessStartCommit(); + triggerAndProcessEndCommit(); initializeParticipant(); - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); - coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); - // Ensure Coordinator and participant are in sync in the kafka offsets - assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); + triggerAndProcessAckCommit(); + + // Participant failing after and END_COMMIT should not cause issues with the present commit, + // since the data would have been written by previous participant before failing + // and hence moved the kafka offset. + currentKafkaOffset = NUM_RECORDS_BATCH; break; default: throw new HoodieException("Unknown test scenario " + testScenario); } + + // Despite failures in the previous commit, a fresh 2-phase commit should PASS. + testTwoPhaseCommit(currentKafkaOffset); } catch (Exception exception) { throw new HoodieException("Unexpected test failure ", exception); } @@ -180,15 +140,49 @@ private void initializeParticipant() { participant = new ConnectTransactionParticipant( partition, kafkaControlAgent, - testKafkaConnect, + mockKafkaConnect, testHudiWriterProvider); - testKafkaConnect.setParticipant(participant); + mockKafkaConnect.setParticipant(participant); participant.start(); } private void initializeCoordinator() { - coordinator = new MockCoordinator(kafkaControlAgent); - coordinator.start(); + mockCoordinator = new MockCoordinator(kafkaControlAgent); + mockCoordinator.start(); + } + + // Test and validate result of a single 2 Phase commit from START_COMMIT to ACK_COMMIT. + // Validates that NUM_RECORDS_BATCH number of kafka records are written, + // and the kafka offset only increments by NUM_RECORDS_BATCH. + private void testTwoPhaseCommit(long currentKafkaOffset) { + triggerAndProcessStartCommit(); + triggerAndProcessEndCommit(); + triggerAndProcessAckCommit(); + + // Validate records written, current kafka offset and kafka offsets committed across + // coordinator and participant are in sync despite failure scenarios. + assertEquals(NUM_RECORDS_BATCH, testHudiWriterProvider.getLatestNumberWrites()); + assertEquals((currentKafkaOffset + NUM_RECORDS_BATCH), mockKafkaConnect.getCurrentKafkaOffset()); + // Ensure Coordinator and participant are in sync in the kafka offsets + assertEquals(participant.getLastKafkaCommittedOffset(), mockCoordinator.getCommittedKafkaOffset()); + } + + private void triggerAndProcessStartCommit() { + mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); + mockKafkaConnect.publishBatchRecordsToParticipant(NUM_RECORDS_BATCH); + assertTrue(mockKafkaConnect.isResumed()); + } + + private void triggerAndProcessEndCommit() { + mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT); + mockKafkaConnect.publishBatchRecordsToParticipant(0); + assertTrue(mockKafkaConnect.isPaused()); + } + + private void triggerAndProcessAckCommit() { + mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT); + mockKafkaConnect.publishBatchRecordsToParticipant(0); + assertTrue(mockKafkaConnect.isPaused()); } private static class MockCoordinator implements TransactionCoordinator { @@ -279,5 +273,4 @@ private enum ParticipantFailureTestScenarios { FAILURE_AFTER_START_COMMIT, FAILURE_AFTER_END_COMMIT, } - } diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java similarity index 73% rename from hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java rename to hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java index 6e947de072211..113b93ef87123 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java @@ -25,7 +25,6 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; -import java.io.IOException; import java.util.Arrays; import java.util.Map; import java.util.Set; @@ -33,20 +32,26 @@ /** * Helper class that emulates the Kafka Connect f/w and additionally * implements {@link SinkTaskContext} for testing purposes. + * + * Everytime the consumer (Participant) calls resume, a fixed + * batch of kafka records from the current offset are pushed. If + * the consumer resets the offsets, then a fresh batch of records + * are sent from the new offset. */ -public class TestKafkaConnect implements SinkTaskContext { +public class MockKafkaConnect implements SinkTaskContext { - private static final int NUM_RECORDS_BATCH = 5; private final TopicPartition testPartition; private TransactionParticipant participant; private long currentKafkaOffset; private boolean isPaused; + private boolean isResetOffset; - public TestKafkaConnect(TopicPartition testPartition) { + public MockKafkaConnect(TopicPartition testPartition) { this.testPartition = testPartition; isPaused = false; currentKafkaOffset = 0L; + isResetOffset = false; } public void setParticipant(TransactionParticipant participant) { @@ -61,23 +66,6 @@ public boolean isResumed() { return !isPaused; } - public int putRecordsToParticipant() throws IOException { - for (int i = 1; i <= NUM_RECORDS_BATCH; i++) { - participant.buffer(getNextKafkaRecord()); - } - participant.processRecords(); - return NUM_RECORDS_BATCH; - } - - public SinkRecord getNextKafkaRecord() { - return new SinkRecord(testPartition.topic(), - testPartition.partition(), - Schema.OPTIONAL_BYTES_SCHEMA, - ("key-" + currentKafkaOffset).getBytes(), - Schema.OPTIONAL_BYTES_SCHEMA, - "value".getBytes(), currentKafkaOffset++); - } - public long getCurrentKafkaOffset() { return currentKafkaOffset; } @@ -100,7 +88,7 @@ public void resume(TopicPartition... partitions) { public void offset(Map offsets) { for (TopicPartition tp : offsets.keySet()) { if (tp.equals(testPartition)) { - currentKafkaOffset = offsets.get(tp); + resetOffset(offsets.get(tp)); } } } @@ -108,7 +96,7 @@ public void offset(Map offsets) { @Override public void offset(TopicPartition tp, long offset) { if (tp.equals(testPartition)) { - currentKafkaOffset = offset; + resetOffset(offset); } } @@ -129,6 +117,33 @@ public Set assignment() { @Override public void requestCommit() { + } + + public int publishBatchRecordsToParticipant(int numRecords) { + // Send NUM_RECORDS_BATCH to participant + // If client resets offset, send another batch starting + // from the new reset offset value + do { + isResetOffset = false; + for (int i = 1; i <= numRecords; i++) { + participant.buffer(getNextKafkaRecord()); + } + participant.processRecords(); + } while (isResetOffset); + return numRecords; + } + + private SinkRecord getNextKafkaRecord() { + return new SinkRecord(testPartition.topic(), + testPartition.partition(), + Schema.OPTIONAL_BYTES_SCHEMA, + ("key-" + currentKafkaOffset).getBytes(), + Schema.OPTIONAL_BYTES_SCHEMA, + "value".getBytes(), currentKafkaOffset++); + } + private void resetOffset(long newOffset) { + currentKafkaOffset = newOffset; + isResetOffset = true; } }