diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 29734bd8d8d20..b3ff5321625a4 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -143,14 +143,14 @@ object StorageTool extends Logging { if (namespace.getBoolean("standalone")) { formatter.setInitialControllers(createStandaloneDynamicVoters(config)) } - if (!namespace.getBoolean("no_initial_controllers")) { + if (namespace.getBoolean("no_initial_controllers")) { + formatter.setNoInitialControllersFlag(true) + } else { if (config.processRoles.contains(ProcessRole.ControllerRole)) { - if (config.quorumConfig.voters().isEmpty) { - if (formatter.initialVoters().isEmpty()) { - throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG + - " is not set on this controller, you must specify one of the following: " + - "--standalone, --initial-controllers, or --no-initial-controllers."); - } + if (config.quorumConfig.voters().isEmpty && formatter.initialVoters().isEmpty) { + throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG + + " is not set on this controller, you must specify one of the following: " + + "--standalone, --initial-controllers, or --no-initial-controllers."); } } } diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index beff77cf52377..92b08890bb029 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -483,20 +483,44 @@ Found problem: Seq("--release-version", "3.9-IV0"))).getMessage) } - @Test - def testFormatWithNoInitialControllersSucceedsOnController(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(false, true)) + def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() properties.putAll(defaultDynamicQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() - assertEquals(0, runFormatCommand(stream, properties, - Seq("--no-initial-controllers", "--release-version", "3.9-IV0"))) + val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers") + if (setKraftVersionFeature) { + arguments += "--feature" + arguments += "kraft.version=1" + } + assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq)) assertTrue(stream.toString(). contains("Formatting metadata directory %s".format(availableDirs.head)), "Failed to find content in output: " + stream.toString()) } + @Test + def testFormatWithNoInitialControllersFlagAndStandaloneFlagFails(): Unit = { + val arguments = ListBuffer[String]( + "--release-version", "3.9-IV0", + "--no-initial-controllers", "--standalone") + assertThrows(classOf[ArgumentParserException], () => StorageTool.parseArguments(arguments.toArray)) + } + + @Test + def testFormatWithNoInitialControllersFlagAndInitialControllersFlagFails(): Unit = { + val arguments = ListBuffer[String]( + "--release-version", "3.9-IV0", + "--no-initial-controllers", "--initial-controllers", + "0@localhost:8020:K90IZ-0DRNazJ49kCZ1EMQ," + + "1@localhost:8030:aUARLskQTCW4qCZDtS_cwA," + + "2@localhost:8040:2ggvsS4kQb-fSJ_-zC_Ang") + assertThrows(classOf[ArgumentParserException], () => StorageTool.parseArguments(arguments.toArray)) + } + @Test def testFormatWithoutStaticQuorumSucceedsWithoutInitialControllersOnBroker(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java index 53013307149f2..d512545384a40 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java @@ -93,8 +93,10 @@ public class Formatter { /** * Maps feature names to the level they will start off with. + * + * Visible for testing. */ - private Map featureLevels = new TreeMap<>(); + protected Map featureLevels = new TreeMap<>(); /** * The bootstrap metadata used to format the cluster. @@ -130,6 +132,7 @@ public class Formatter { * The initial KIP-853 voters. */ private Optional initialControllers = Optional.empty(); + private boolean noInitialControllersFlag = false; public Formatter setPrintStream(PrintStream printStream) { this.printStream = printStream; @@ -215,12 +218,17 @@ public Formatter setInitialControllers(DynamicVoters initialControllers) { return this; } + public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) { + this.noInitialControllersFlag = noInitialControllersFlag; + return this; + } + public Optional initialVoters() { return initialControllers; } boolean hasDynamicQuorum() { - return initialControllers.isPresent(); + return initialControllers.isPresent() || noInitialControllersFlag; } public BootstrapMetadata bootstrapMetadata() { diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 45a896c47c44c..c0d9cd4ee95f9 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -379,6 +379,7 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); formatter1.formatter.run(); + assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); assertEquals(Arrays.asList( String.format("Formatting data directory %s with %s %s.", testEnv.directory(1), @@ -446,4 +447,68 @@ public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex () -> formatter1.formatter.run()).getMessage()); } } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + if (specifyKRaftVersion) { + formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + } + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + formatter1.formatter.setNoInitialControllersFlag(true); + assertTrue(formatter1.formatter.hasDynamicQuorum()); + + formatter1.formatter.run(); + assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); + assertEquals(Arrays.asList( + String.format("Formatting data directory %s with %s %s.", + testEnv.directory(1), + MetadataVersion.FEATURE_NAME, + MetadataVersion.latestTesting()), + String.format("Formatting metadata directory %s with %s %s.", + testEnv.directory(0), + MetadataVersion.FEATURE_NAME, + MetadataVersion.latestTesting())), + formatter1.outputLines().stream().sorted().collect(Collectors.toList())); + MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). + addLogDirs(testEnv.directories). + load(); + MetaProperties logDirProps0 = ensemble.logDirProps().get(testEnv.directory(0)); + assertNotNull(logDirProps0); + MetaProperties logDirProps1 = ensemble.logDirProps().get(testEnv.directory(1)); + assertNotNull(logDirProps1); + } + } + + @Test + public void testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + formatter1.formatter.setNoInitialControllersFlag(false); + assertFalse(formatter1.formatter.hasDynamicQuorum()); + assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " + + "Try removing the --feature flag for kraft.version.", + assertThrows(FormatterException.class, + formatter1.formatter::run).getMessage()); + } + } + + @Test + public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + formatter1.formatter.setNoInitialControllersFlag(true); + assertTrue(formatter1.formatter.hasDynamicQuorum()); + assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " + + "Try removing the --feature flag for kraft.version.", + assertThrows(FormatterException.class, + formatter1.formatter::run).getMessage()); + } + } }