Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private void handleStartCommit(ControlMessage message) {
LOG.info("Started a new transaction after receiving START_COMMIT for commit " + currentCommitTime);
try {
ongoingTransactionInfo = new TransactionInfo<>(currentCommitTime, writerProvider.getWriter(currentCommitTime));
ongoingTransactionInfo.setLastWrittenKafkaOffset(committedKafkaOffset);
ongoingTransactionInfo.setExpectedKafkaOffset(committedKafkaOffset);
} catch (Exception exception) {
LOG.warn("Error received while starting a new transaction", exception);
}
Expand Down Expand Up @@ -188,7 +188,7 @@ private void handleEndCommit(ControlMessage message) {
.setParticipantInfo(
ControlMessage.ParticipantInfo.newBuilder()
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatuses))
.setKafkaOffset(ongoingTransactionInfo.getLastWrittenKafkaOffset())
.setKafkaOffset(ongoingTransactionInfo.getExpectedKafkaOffset())
.build()
).build();

Expand All @@ -201,9 +201,9 @@ private void handleEndCommit(ControlMessage message) {
}

private void handleAckCommit(ControlMessage message) {
// Update lastKafkCommitedOffset locally.
if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
committedKafkaOffset = ongoingTransactionInfo.getLastWrittenKafkaOffset();
// Update committedKafkaOffset that tracks the last committed kafka offset locally.
if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getExpectedKafkaOffset()) {
committedKafkaOffset = ongoingTransactionInfo.getExpectedKafkaOffset();
}
syncKafkaOffsetWithLeader(message);
cleanupOngoingTransaction();
Expand All @@ -215,12 +215,22 @@ private void writeRecords() {
try {
SinkRecord record = buffer.peek();
if (record != null
&& record.kafkaOffset() >= ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
&& record.kafkaOffset() == ongoingTransactionInfo.getExpectedKafkaOffset()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the records come in order in terms of the Kafka offset from the buffer. Is it possible that the records may come out of order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will always come in order. But its possible that kafka may send them out of order in case we do not commit an offset for sometime. For instance, it will send 12,13,14,15 ... but then realize that the consumer has not committed an offset (and the last committed offset was 5), it will start sending 5,6,7 .. but henceforth it will be in order. Thats why we have an excepted offset, and if we do not receive an expected offset, we force offset reset, and kafka will start sending messages from before. This ensures no data loss/duplication, but yeah there may be scope for some optimization, but trying to be conservative for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Basically, the offsets of the records are sth like 12,13,14,15,5,6,7,8,9,10,11,12,13,14,15 after the resetting so this logic skips the first group of records.

ongoingTransactionInfo.getWriter().writeRecord(record);
ongoingTransactionInfo.setLastWrittenKafkaOffset(record.kafkaOffset() + 1);
} else if (record != null && record.kafkaOffset() < committedKafkaOffset) {
LOG.warn(String.format("Received a kafka record with offset %s prior to last committed offset %s for partition %s",
record.kafkaOffset(), ongoingTransactionInfo.getLastWrittenKafkaOffset(),
ongoingTransactionInfo.setExpectedKafkaOffset(record.kafkaOffset() + 1);
} else if (record != null && record.kafkaOffset() > ongoingTransactionInfo.getExpectedKafkaOffset()) {
LOG.warn(String.format("Received a kafka record with offset %s above the next expected kafka offset %s for partition %s, "
+ "hence resetting the kafka offset to %s",
record.kafkaOffset(),
ongoingTransactionInfo.getExpectedKafkaOffset(),
partition,
ongoingTransactionInfo.getExpectedKafkaOffset()));
context.offset(partition, ongoingTransactionInfo.getExpectedKafkaOffset());
} else if (record != null && record.kafkaOffset() < ongoingTransactionInfo.getExpectedKafkaOffset()) {
LOG.warn(String.format("Received a kafka record with offset %s below the next expected kafka offset %s for partition %s, "
+ "no action will be taken but this record will be ignored since its already written",
record.kafkaOffset(),
ongoingTransactionInfo.getExpectedKafkaOffset(),
partition));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we still log a warning message if the Kafka record offset is prior to the offset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I also added some more log messages.

buffer.poll();
Expand Down Expand Up @@ -250,13 +260,24 @@ private void syncKafkaOffsetWithLeader(ControlMessage message) {
// Recover kafka committed offsets, treating the commit offset from the coordinator
// as the source of truth
if (coordinatorCommittedKafkaOffset != null && coordinatorCommittedKafkaOffset >= 0) {
// Debug only messages
if (coordinatorCommittedKafkaOffset != committedKafkaOffset) {
LOG.warn(String.format("Recovering the kafka offset for partition %s to offset %s instead of local offset %s",
partition.partition(), coordinatorCommittedKafkaOffset, committedKafkaOffset));
context.offset(partition, coordinatorCommittedKafkaOffset);
LOG.warn(String.format("The coordinator offset for kafka partition %s is %d while the locally committed offset is %d, "
+ "hence resetting the local committed offset to the coordinator provided one to ensure consistency",
partition,
coordinatorCommittedKafkaOffset,
committedKafkaOffset));
}
committedKafkaOffset = coordinatorCommittedKafkaOffset;
return;
}
} else {
LOG.warn(String.format("The coordinator offset for kafka partition %s is not present while the locally committed offset is %d, "
+ "hence resetting the local committed offset to 0 to avoid data loss",
partition,
committedKafkaOffset));
}
// If the coordinator does not have a committed offset for this partition, reset to zero offset.
committedKafkaOffset = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public class TransactionInfo<T> {

private final String commitTime;
private final ConnectWriter<T> writer;
private long lastWrittenKafkaOffset;
private long expectedKafkaOffset;
private boolean commitInitiated;

public TransactionInfo(String commitTime, ConnectWriter<T> writer) {
this.commitTime = commitTime;
this.writer = writer;
this.lastWrittenKafkaOffset = 0;
this.expectedKafkaOffset = 0;
this.commitInitiated = false;
}

Expand All @@ -47,16 +47,16 @@ public ConnectWriter<T> getWriter() {
return writer;
}

public long getLastWrittenKafkaOffset() {
return lastWrittenKafkaOffset;
public long getExpectedKafkaOffset() {
return expectedKafkaOffset;
}

public boolean isCommitInitiated() {
return commitInitiated;
}

public void setLastWrittenKafkaOffset(long lastWrittenKafkaOffset) {
this.lastWrittenKafkaOffset = lastWrittenKafkaOffset;
public void setExpectedKafkaOffset(long expectedKafkaOffset) {
this.expectedKafkaOffset = expectedKafkaOffset;
}

public void commitInitiated() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockKafkaControlAgent;
import org.apache.hudi.helper.TestHudiWriterProvider;
import org.apache.hudi.helper.TestKafkaConnect;
import org.apache.hudi.helper.MockKafkaConnect;

import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -41,23 +41,24 @@
public class TestConnectTransactionParticipant {

private static final String TOPIC_NAME = "kafka-connect-test-topic";
private static final int NUM_RECORDS_BATCH = 5;
private static final int PARTITION_NUMBER = 4;

private ConnectTransactionParticipant participant;
private MockCoordinator coordinator;
private MockCoordinator mockCoordinator;
private TopicPartition partition;
private KafkaConnectConfigs configs;
private KafkaControlAgent kafkaControlAgent;
private TestHudiWriterProvider testHudiWriterProvider;
private TestKafkaConnect testKafkaConnect;
private MockKafkaConnect mockKafkaConnect;

@BeforeEach
public void setUp() throws Exception {
partition = new TopicPartition(TOPIC_NAME, PARTITION_NUMBER);
kafkaControlAgent = new MockKafkaControlAgent();
testKafkaConnect = new TestKafkaConnect(partition);
coordinator = new MockCoordinator(kafkaControlAgent);
coordinator.start();
mockKafkaConnect = new MockKafkaConnect(partition);
mockCoordinator = new MockCoordinator(kafkaControlAgent);
mockCoordinator.start();
configs = KafkaConnectConfigs.newBuilder()
.build();
initializeParticipant();
Expand All @@ -66,45 +67,28 @@ public void setUp() throws Exception {
@ParameterizedTest
@EnumSource(value = CoordinatorFailureTestScenarios.class)
public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios testScenario) {
int expectedRecordsWritten = 0;
try {
assertTrue(mockKafkaConnect.isPaused());
switch (testScenario) {
case REGULAR_SCENARIO:
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
break;
case COORDINATOR_FAILED_AFTER_START_COMMIT:
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
triggerAndProcessStartCommit();
// Coordinator Failed
initializeCoordinator();
break;
case COORDINATOR_FAILED_AFTER_END_COMMIT:
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
triggerAndProcessStartCommit();
triggerAndProcessEndCommit();
// Coordinator Failed
initializeCoordinator();
break;
default:
throw new HoodieException("Unknown test scenario " + testScenario);
}

// Regular Case or Coordinator Recovery Case
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isResumed());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
// Despite failures in the previous commit, a fresh 2-phase commit should PASS.
testTwoPhaseCommit(0);
} catch (Exception exception) {
throw new HoodieException("Unexpected test failure ", exception);
}
Expand All @@ -114,62 +98,38 @@ public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios t
@ParameterizedTest
@EnumSource(value = ParticipantFailureTestScenarios.class)
public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios testScenario) {
int expectedRecordsWritten = 0;
try {
int currentKafkaOffset = 0;
switch (testScenario) {
case FAILURE_BEFORE_START_COMMIT:
testKafkaConnect.putRecordsToParticipant();
// Participant fails
// Participant failing after START_COMMIT will not write any data in this commit cycle.
initializeParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isResumed());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
break;
case FAILURE_AFTER_START_COMMIT:
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Participant fails
triggerAndProcessStartCommit();
// Participant failing after START_COMMIT will not write any data in this commit cycle.
initializeParticipant();
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
triggerAndProcessEndCommit();
triggerAndProcessAckCommit();
break;
case FAILURE_AFTER_END_COMMIT:
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Participant fails
// Regular Case or Coordinator Recovery Case
triggerAndProcessStartCommit();
triggerAndProcessEndCommit();
initializeParticipant();
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
triggerAndProcessAckCommit();

// Participant failing after and END_COMMIT should not cause issues with the present commit,
// since the data would have been written by previous participant before failing
// and hence moved the kafka offset.
currentKafkaOffset = NUM_RECORDS_BATCH;
break;
default:
throw new HoodieException("Unknown test scenario " + testScenario);
}

// Despite failures in the previous commit, a fresh 2-phase commit should PASS.
testTwoPhaseCommit(currentKafkaOffset);
} catch (Exception exception) {
throw new HoodieException("Unexpected test failure ", exception);
}
Expand All @@ -180,15 +140,49 @@ private void initializeParticipant() {
participant = new ConnectTransactionParticipant(
partition,
kafkaControlAgent,
testKafkaConnect,
mockKafkaConnect,
testHudiWriterProvider);
testKafkaConnect.setParticipant(participant);
mockKafkaConnect.setParticipant(participant);
participant.start();
}

private void initializeCoordinator() {
coordinator = new MockCoordinator(kafkaControlAgent);
coordinator.start();
mockCoordinator = new MockCoordinator(kafkaControlAgent);
mockCoordinator.start();
}

// Test and validate result of a single 2 Phase commit from START_COMMIT to ACK_COMMIT.
// Validates that NUM_RECORDS_BATCH number of kafka records are written,
// and the kafka offset only increments by NUM_RECORDS_BATCH.
private void testTwoPhaseCommit(long currentKafkaOffset) {
triggerAndProcessStartCommit();
triggerAndProcessEndCommit();
triggerAndProcessAckCommit();

// Validate records written, current kafka offset and kafka offsets committed across
// coordinator and participant are in sync despite failure scenarios.
assertEquals(NUM_RECORDS_BATCH, testHudiWriterProvider.getLatestNumberWrites());
assertEquals((currentKafkaOffset + NUM_RECORDS_BATCH), mockKafkaConnect.getCurrentKafkaOffset());
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), mockCoordinator.getCommittedKafkaOffset());
}

private void triggerAndProcessStartCommit() {
mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
mockKafkaConnect.publishBatchRecordsToParticipant(NUM_RECORDS_BATCH);
assertTrue(mockKafkaConnect.isResumed());
}

private void triggerAndProcessEndCommit() {
mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
mockKafkaConnect.publishBatchRecordsToParticipant(0);
assertTrue(mockKafkaConnect.isPaused());
}

private void triggerAndProcessAckCommit() {
mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
mockKafkaConnect.publishBatchRecordsToParticipant(0);
assertTrue(mockKafkaConnect.isPaused());
}

private static class MockCoordinator implements TransactionCoordinator {
Expand Down Expand Up @@ -279,5 +273,4 @@ private enum ParticipantFailureTestScenarios {
FAILURE_AFTER_START_COMMIT,
FAILURE_AFTER_END_COMMIT,
}

}
Loading