Skip to content

Commit

Permalink
KAFKA-18028: the effective kraft version of --no-initial-controllers …
Browse files Browse the repository at this point in the history
…should be 1 rather than 0

Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Nov 17, 2024
1 parent dfa5aa5 commit 4c03c5f
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 6 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ object StorageTool extends Logging {
if (namespace.getBoolean("standalone")) {
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
}
if (namespace.getBoolean("no_initial_controllers")) {
formatter.setNoInitialControllers(true)
}
if (!namespace.getBoolean("no_initial_controllers")) {
if (config.processRoles.contains(ProcessRole.ControllerRole)) {
if (config.quorumConfig.voters().isEmpty) {
Expand Down
32 changes: 28 additions & 4 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,20 +483,44 @@ Found problem:
Seq("--release-version", "3.9-IV0"))).getMessage)
}

@Test
def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties,
Seq("--no-initial-controllers", "--release-version", "3.9-IV0")))
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers")
if (setKraftVersionFeature) {
arguments += "--feature"
arguments += "kraft.version=1"
}
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
assertTrue(stream.toString().
contains("Formatting metadata directory %s".format(availableDirs.head)),
"Failed to find content in output: " + stream.toString())
}

@Test
def testFormatWithNoInitialControllersFlagAndStandaloneFlagFails(): Unit = {
val arguments = ListBuffer[String](
"--release-version", "3.9-IV0",
"--no-initial-controllers", "--standalone")
assertThrows(classOf[ArgumentParserException], () => StorageTool.parseArguments(arguments.toArray))
}

@Test
def testFormatWithNoInitialControllersFlagAndInitialControllersFlagFails(): Unit = {
val arguments = ListBuffer[String](
"--release-version", "3.9-IV0",
"--no-initial-controllers", "--initial-controllers",
"0@localhost:8020:K90IZ-0DRNazJ49kCZ1EMQ," +
"1@localhost:8030:aUARLskQTCW4qCZDtS_cwA," +
"2@localhost:8040:2ggvsS4kQb-fSJ_-zC_Ang")
assertThrows(classOf[ArgumentParserException], () => StorageTool.parseArguments(arguments.toArray))
}

@Test
def testFormatWithoutStaticQuorumSucceedsWithoutInitialControllersOnBroker(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ public class Formatter {

/**
* Maps feature names to the level they will start off with.
*
* Visible for testing.
*/
private Map<String, Short> featureLevels = new TreeMap<>();
protected Map<String, Short> featureLevels = new TreeMap<>();

/**
* The bootstrap metadata used to format the cluster.
Expand Down Expand Up @@ -130,6 +132,7 @@ public class Formatter {
* The initial KIP-853 voters.
*/
private Optional<DynamicVoters> initialControllers = Optional.empty();
private boolean noInitialControllers = false;

public Formatter setPrintStream(PrintStream printStream) {
this.printStream = printStream;
Expand Down Expand Up @@ -215,12 +218,17 @@ public Formatter setInitialControllers(DynamicVoters initialControllers) {
return this;
}

public Formatter setNoInitialControllers(boolean noInitialControllers) {
this.noInitialControllers = noInitialControllers;
return this;
}

public Optional<DynamicVoters> initialVoters() {
return initialControllers;
}

boolean hasDynamicQuorum() {
return initialControllers.isPresent();
return initialControllers.isPresent() || noInitialControllers;
}

public BootstrapMetadata bootstrapMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.run();
assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
assertEquals(Arrays.asList(
String.format("Formatting data directory %s with %s %s.",
testEnv.directory(1),
Expand Down Expand Up @@ -446,4 +447,68 @@ public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex
() -> formatter1.formatter.run()).getMessage());
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
if (specifyKRaftVersion) {
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
}
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setNoInitialControllers(true);
assertTrue(formatter1.formatter.hasDynamicQuorum());

formatter1.formatter.run();
assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
assertEquals(Arrays.asList(
String.format("Formatting data directory %s with %s %s.",
testEnv.directory(1),
MetadataVersion.FEATURE_NAME,
MetadataVersion.latestTesting()),
String.format("Formatting metadata directory %s with %s %s.",
testEnv.directory(0),
MetadataVersion.FEATURE_NAME,
MetadataVersion.latestTesting())),
formatter1.outputLines().stream().sorted().collect(Collectors.toList()));
MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader().
addLogDirs(testEnv.directories).
load();
MetaProperties logDirProps0 = ensemble.logDirProps().get(testEnv.directory(0));
assertNotNull(logDirProps0);
MetaProperties logDirProps1 = ensemble.logDirProps().get(testEnv.directory(1));
assertNotNull(logDirProps1);
}
}

@Test
public void testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setNoInitialControllers(false);
assertFalse(formatter1.formatter.hasDynamicQuorum());
assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " +
"Try removing the --feature flag for kraft.version.",
assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage());
}
}

@Test
public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setNoInitialControllers(true);
assertTrue(formatter1.formatter.hasDynamicQuorum());
assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " +
"Try removing the --feature flag for kraft.version.",
assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage());
}
}
}

0 comments on commit 4c03c5f

Please sign in to comment.