Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
omkreddy committed Nov 21, 2024
1 parent 1b09e9e commit 85dbf98
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ private AutoOffsetResetStrategy(String name) {
}

public static boolean isValid(String configValue) {
if (configValue == null) {
return false;
}
switch (configValue) {
case EARLIEST_STRATEGY_NAME:
case LATEST_STRATEGY_NAME:
Expand All @@ -47,7 +50,11 @@ public static boolean isValid(String configValue) {
}
}


public static AutoOffsetResetStrategy valueOf(String configValue) {
if (configValue == null) {
throw new IllegalArgumentException("auto offset reset strategy is null");
}
switch (configValue) {
case EARLIEST_STRATEGY_NAME:
return EARLIEST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void testIsValid() {
assertFalse(AutoOffsetResetStrategy.isValid("invalid"));
assertFalse(AutoOffsetResetStrategy.isValid("LATEST"));
assertFalse(AutoOffsetResetStrategy.isValid(""));
assertFalse(AutoOffsetResetStrategy.isValid(null));
}

@Test
Expand All @@ -47,6 +48,7 @@ public void testValueOf() {
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.valueOf("invalid"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.valueOf("LATEST"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.valueOf(""));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.valueOf(null));
}

@Test
Expand All @@ -58,6 +60,7 @@ public void testValidator() {
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "invalid"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "LATEST"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", ""));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", null));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1299,24 +1299,20 @@ private void resetOffsets(final Set<TopicPartition> partitions, final Exception
// This may be null if the task we are currently processing was apart of a named topology that was just removed.
// TODO KAFKA-13713: keep the StreamThreads and TopologyMetadata view of named topologies in sync until final thread has acked
if (offsetResetStrategy != null) {
switch (offsetResetStrategy.name()) {
case AutoOffsetResetStrategy.EARLIEST_STRATEGY_NAME:
addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics);
break;
case AutoOffsetResetStrategy.LATEST_STRATEGY_NAME:
addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
break;
case AutoOffsetResetStrategy.NONE_STRATEGY_NAME:
if ("earliest".equals(originalReset)) {
addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
} else if ("latest".equals(originalReset)) {
addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
} else {
notReset.add(partition);
}
break;
default:
throw new IllegalStateException("Unable to locate topic " + partition.topic() + " in the topology");
if (offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST) {
addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics);
} else if (offsetResetStrategy == AutoOffsetResetStrategy.LATEST) {
addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
} else if (offsetResetStrategy == AutoOffsetResetStrategy.NONE) {
if (AutoOffsetResetStrategy.EARLIEST == AutoOffsetResetStrategy.valueOf(originalReset)) {
addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
} else if (AutoOffsetResetStrategy.LATEST == AutoOffsetResetStrategy.valueOf(originalReset)) {
addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
} else {
notReset.add(partition);
}
} else {
throw new IllegalStateException("Unable to locate topic " + partition.topic() + " in the topology");
}
}
}
Expand Down

0 comments on commit 85dbf98

Please sign in to comment.