Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private void onReceiveWriteStatus(ControlMessage message) {
long totalRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
boolean hasErrors = totalErrorRecords > 0;

if ((!hasErrors || configs.allowCommitOnErrors()) && !allWriteStatuses.isEmpty()) {
if (!hasErrors || configs.allowCommitOnErrors()) {
boolean success = transactionServices.endCommit(currentCommitTime,
allWriteStatuses,
transformKafkaOffsets(currentConsumedKafkaOffsets));
Expand All @@ -319,8 +319,6 @@ private void onReceiveWriteStatus(ControlMessage message) {
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
}
});
} else {
LOG.warn("Empty write statuses were received from all Participants");
}

// Submit the next start commit, that will rollback the current commit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,27 +178,39 @@ private void testScenarios(ControlMessage message) {
List<ControlMessage> controlEvents = new ArrayList<>();
switch (testScenario) {
case ALL_CONNECT_TASKS_SUCCESS:
composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents);
composeControlEvent(
message.getCommitTime(), false, false, kafkaOffsets, controlEvents);
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
// This commit round should succeed, and the kafka offsets getting committed
kafkaOffsetsCommitted.putAll(kafkaOffsets);
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
break;
case ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS:
composeControlEvent(
message.getCommitTime(), false, true, kafkaOffsets, controlEvents);
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
// This commit round should succeed, and the kafka offsets getting committed
kafkaOffsetsCommitted.putAll(kafkaOffsets);
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
break;
case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED:
composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents);
composeControlEvent(
message.getCommitTime(), true, false, kafkaOffsets, controlEvents);
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
// Despite error records, this commit round should succeed, and the kafka offsets getting committed
kafkaOffsetsCommitted.putAll(kafkaOffsets);
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
break;
case SUBSET_WRITE_STATUS_FAILED:
composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents);
composeControlEvent(
message.getCommitTime(), true, false, kafkaOffsets, controlEvents);
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
// This commit round should fail, and a new commit round should start without kafka offsets getting committed
expectedMsgType = ControlMessage.EventType.START_COMMIT;
break;
case SUBSET_CONNECT_TASKS_FAILED:
composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents);
composeControlEvent(
message.getCommitTime(), false, false, kafkaOffsets, controlEvents);
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS / 2;
// This commit round should fail, and a new commit round should start without kafka offsets getting committed
expectedMsgType = ControlMessage.EventType.START_COMMIT;
Expand Down Expand Up @@ -235,10 +247,13 @@ public enum TestScenarios {
SUBSET_CONNECT_TASKS_FAILED,
SUBSET_WRITE_STATUS_FAILED,
SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED,
ALL_CONNECT_TASKS_SUCCESS
ALL_CONNECT_TASKS_SUCCESS,
ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS
}

private static void composeControlEvent(String commitTime, boolean shouldIncludeFailedRecords, Map<Integer, Long> kafkaOffsets, List<ControlMessage> controlEvents) {
private static void composeControlEvent(
String commitTime, boolean shouldIncludeFailedRecords, boolean useEmptyWriteStatus,
Map<Integer, Long> kafkaOffsets, List<ControlMessage> controlEvents) {
// Prepare the WriteStatuses for all partitions
for (int i = 1; i <= TOTAL_KAFKA_PARTITIONS; i++) {
try {
Expand All @@ -248,7 +263,8 @@ private static void composeControlEvent(String commitTime, boolean shouldInclude
commitTime,
new TopicPartition(TOPIC_NAME, i),
kafkaOffset,
shouldIncludeFailedRecords);
shouldIncludeFailedRecords,
useEmptyWriteStatus);
controlEvents.add(event);
} catch (Exception exception) {
throw new HoodieException("Fatal error sending control event to Coordinator");
Expand All @@ -259,9 +275,13 @@ private static void composeControlEvent(String commitTime, boolean shouldInclude
private static ControlMessage composeWriteStatusResponse(String commitTime,
TopicPartition partition,
long kafkaOffset,
boolean includeFailedRecords) throws Exception {
// send WS
WriteStatus writeStatus = includeFailedRecords ? getSubsetFailedRecordsWriteStatus() : getAllSuccessfulRecordsWriteStatus();
boolean includeFailedRecords,
boolean useEmptyWriteStatus) throws Exception {
List<WriteStatus> writeStatusList = useEmptyWriteStatus ? Collections.emptyList()
: Collections.singletonList(
includeFailedRecords
? getSubsetFailedRecordsWriteStatus()
: getAllSuccessfulRecordsWriteStatus());

return ControlMessage.newBuilder()
.setType(ControlMessage.EventType.WRITE_STATUS)
Expand All @@ -273,7 +293,7 @@ private static ControlMessage composeWriteStatusResponse(String commitTime,
.setCommitTime(commitTime)
.setParticipantInfo(
ControlMessage.ParticipantInfo.newBuilder()
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(Collections.singletonList(writeStatus)))
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatusList))
.setKafkaOffset(kafkaOffset)
.build()
).build();
Expand Down