From dacb0662dd37247aa7776a998bbbdd7e1d4a3edb Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 30 Dec 2024 22:51:15 +0800 Subject: [PATCH] KAFKA-17921: Support SASL_PLAINTEXT protocol with java.security.auth.login.config Signed-off-by: PoAn Yang --- checkstyle/import-control-test-common.xml | 1 + .../apache/kafka/common/test/JaasModule.java | 46 +++++++ .../apache/kafka/common/test/JaasUtils.java | 63 +++++++++ .../common/test/KafkaClusterTestKit.java | 46 ++++++- .../kafka/common/test/TestKitNodes.java | 5 +- .../kafka/common/test/TestKitNodeTest.java | 6 +- .../common/test/api/ClusterInstance.java | 25 +++- .../test/api/ClusterTestExtensionsTest.java | 130 +++++++++++++++++- 8 files changed, 307 insertions(+), 15 deletions(-) create mode 100644 test-common/src/main/java/org/apache/kafka/common/test/JaasModule.java create mode 100644 test-common/src/main/java/org/apache/kafka/common/test/JaasUtils.java diff --git a/checkstyle/import-control-test-common.xml b/checkstyle/import-control-test-common.xml index 9fe7f4d4844da..9520c0b21b86e 100644 --- a/checkstyle/import-control-test-common.xml +++ b/checkstyle/import-control-test-common.xml @@ -24,4 +24,5 @@ + diff --git a/test-common/src/main/java/org/apache/kafka/common/test/JaasModule.java b/test-common/src/main/java/org/apache/kafka/common/test/JaasModule.java new file mode 100644 index 0000000000000..8d70b36dc23fa --- /dev/null +++ b/test-common/src/main/java/org/apache/kafka/common/test/JaasModule.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.test; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public record JaasModule(String name, boolean debug, Map entries) { + + public static JaasModule plainLoginModule(String username, String password, boolean debug, Map validUsers) { + String name = "org.apache.kafka.common.security.plain.PlainLoginModule"; + + Map entries = new HashMap<>(); + entries.put("username", username); + entries.put("password", password); + validUsers.forEach((user, pass) -> entries.put("user_" + user, pass)); + + return new JaasModule( + name, + debug, + entries + ); + } + + @Override + public String toString() { + return String.format("%s required%n debug=%b%n %s;%n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .collect(Collectors.joining("\n "))); + } +} diff --git a/test-common/src/main/java/org/apache/kafka/common/test/JaasUtils.java b/test-common/src/main/java/org/apache/kafka/common/test/JaasUtils.java new file mode 100644 index 0000000000000..471d26ffa1a67 --- /dev/null +++ b/test-common/src/main/java/org/apache/kafka/common/test/JaasUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.security.auth.login.Configuration; + +public class JaasUtils { + public record JaasSection(String contextName, List modules) { + @Override + public String toString() { + return String.format( + "%s {%n %s%n};%n", + contextName, + modules.stream().map(Object::toString).collect(Collectors.joining("\n ")) + ); + } + } + + public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer"; + + public static final String KAFKA_PLAIN_USER1 = "plain-user1"; + public static final String KAFKA_PLAIN_USER1_PASSWORD = "plain-user1-secret"; + public static final String KAFKA_PLAIN_ADMIN = "plain-admin"; + public static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret"; + + public static File writeJaasContextsToFile(Map jaasSections) throws IOException { + File jaasFile = TestUtils.tempFile(); + try (FileOutputStream fileStream = new FileOutputStream(jaasFile); + OutputStreamWriter writer = new OutputStreamWriter(fileStream, StandardCharsets.UTF_8);) { + writer.write(String.join("", jaasSections.values().stream().map(Object::toString).toArray(String[]::new))); + } + return jaasFile; + } + + public static void refreshJavaLoginConfigParam(File file) { + System.setProperty(org.apache.kafka.common.security.JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file.getAbsolutePath()); + // This will cause a reload of the Configuration singleton when `getConfiguration` is called + Configuration.setConfiguration(null); + } +} diff --git a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index 2face50ca2fc3..555d3a4db336f 100644 --- a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -27,12 +27,15 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.controller.Controller; +import org.apache.kafka.metadata.authorizer.StandardAuthorizer; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.metadata.storage.Formatter; import org.apache.kafka.network.SocketServerConfigs; @@ -176,6 +179,15 @@ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException { // reduce log cleaner offset map memory usage props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152"); + if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) { + props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN"); + props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN"); + props.putIfAbsent(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN"); + props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, StandardAuthorizer.class.getName()); + props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false"); + props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG, "User:" + JaasUtils.KAFKA_PLAIN_ADMIN); + } + // Add associated broker node property overrides if (brokerNode != null) { props.putAll(brokerNode.propertyOverrides()); @@ -194,6 +206,25 @@ public KafkaClusterTestKit build() throws Exception { Map brokers = new HashMap<>(); Map jointServers = new HashMap<>(); File baseDirectory = null; + File jaasFile = null; + + if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) { + jaasFile = JaasUtils.writeJaasContextsToFile(Map.of( + JaasUtils.KAFKA_SERVER_CONTEXT_NAME, + new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME, + List.of( + JaasModule.plainLoginModule( + JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD, + true, + Map.of( + JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD, + JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD) + ) + ) + ) + )); + JaasUtils.refreshJavaLoginConfigParam(jaasFile); + } try { baseDirectory = new File(nodes.baseDirectory()); @@ -272,7 +303,8 @@ public KafkaClusterTestKit build() throws Exception { brokers, baseDirectory, faultHandlerFactory, - socketFactoryManager); + socketFactoryManager, + jaasFile == null ? Optional.empty() : Optional.of(jaasFile)); } private String listeners(int node) { @@ -316,6 +348,7 @@ private static void setupNodeDirectories(File baseDirectory, private final SimpleFaultHandlerFactory faultHandlerFactory; private final PreboundSocketFactoryManager socketFactoryManager; private final String controllerListenerName; + private final Optional jaasFile; private KafkaClusterTestKit( TestKitNodes nodes, @@ -323,7 +356,8 @@ private KafkaClusterTestKit( Map brokers, File baseDirectory, SimpleFaultHandlerFactory faultHandlerFactory, - PreboundSocketFactoryManager socketFactoryManager + PreboundSocketFactoryManager socketFactoryManager, + Optional jaasFile ) { /* Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers @@ -339,6 +373,7 @@ private KafkaClusterTestKit( this.faultHandlerFactory = faultHandlerFactory; this.socketFactoryManager = socketFactoryManager; this.controllerListenerName = nodes.controllerListenerName().value(); + this.jaasFile = jaasFile; } public void format() throws Exception { @@ -602,6 +637,13 @@ public void close() throws Exception { waitForAllFutures(futureEntries); futureEntries.clear(); Utils.delete(baseDirectory); + jaasFile.ifPresent(f -> { + try { + Utils.delete(f); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } catch (Exception e) { for (Entry> entry : futureEntries) { entry.getValue().cancel(true); diff --git a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java index dfab866d3a6f6..24aa7680e320b 100644 --- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java +++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java @@ -159,8 +159,9 @@ public TestKitNodes build() { throw new IllegalArgumentException("Invalid value for numDisksPerBroker"); } // TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished - if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT || controllerSecurityProtocol != SecurityProtocol.PLAINTEXT) { - throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol"); + if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT && brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT) || + (controllerSecurityProtocol != SecurityProtocol.PLAINTEXT && controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT)) { + throw new IllegalArgumentException("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol"); } if (baseDirectory == null) { this.baseDirectory = TestUtils.tempDirectory().toPath(); diff --git a/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java index c9adbe3431b29..b0bb8afa22cb3 100644 --- a/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java +++ b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java @@ -32,11 +32,11 @@ public class TestKitNodeTest { @ParameterizedTest @EnumSource(SecurityProtocol.class) public void testSecurityProtocol(SecurityProtocol securityProtocol) { - if (securityProtocol != SecurityProtocol.PLAINTEXT) { - assertEquals("Currently only support PLAINTEXT security protocol", + if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol != SecurityProtocol.SASL_PLAINTEXT) { + assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol", assertThrows(IllegalArgumentException.class, () -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage()); - assertEquals("Currently only support PLAINTEXT security protocol", + assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol", assertThrows(IllegalArgumentException.class, () -> new TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage()); } diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java index b6954e4492395..29819b2b0ea41 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java @@ -37,9 +37,12 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.test.JaasUtils; import org.apache.kafka.common.test.TestUtils; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.fault.FaultHandlerException; @@ -162,7 +165,7 @@ default Producer producer(Map configs) { props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); - return new KafkaProducer<>(props); + return new KafkaProducer<>(setClientSaslConfig(props)); } default Producer producer() { @@ -176,7 +179,7 @@ default Consumer consumer(Map configs) { props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5)); props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); - return new KafkaConsumer<>(props); + return new KafkaConsumer<>(setClientSaslConfig(props)); } default Consumer consumer() { @@ -192,7 +195,23 @@ default Admin admin(Map configs, boolean usingBootstrapControlle props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG); } - return Admin.create(props); + return Admin.create(setClientSaslConfig(props)); + } + + default Map setClientSaslConfig(Map configs) { + Map props = new HashMap<>(configs); + if (config().brokerSecurityProtocol() == SecurityProtocol.SASL_PLAINTEXT) { + props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); + props.putIfAbsent(SaslConfigs.SASL_MECHANISM, "PLAIN"); + props.putIfAbsent( + SaslConfigs.SASL_JAAS_CONFIG, + String.format( + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", + JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD + ) + ); + } + return props; } default Admin admin(Map configs) { diff --git a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java index 2a08d4e58ebb2..49695d2894427 100644 --- a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java +++ b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java @@ -24,14 +24,23 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.SaslAuthenticationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.JaasUtils; import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.server.common.MetadataVersion; @@ -48,7 +57,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; @@ -63,7 +74,9 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; @ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = { @ClusterConfigProperty(key = "default.key", value = "default.value"), @@ -273,12 +286,12 @@ public void testCreateProducerAndConsumer(ClusterInstance cluster) throws Interr String value = "value"; try (Admin adminClient = cluster.admin(); Producer producer = cluster.producer(Map.of( - ACKS_CONFIG, "all", - KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), - VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); + ACKS_CONFIG, "all", + KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); Consumer consumer = cluster.consumer(Map.of( - KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), - VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())) + KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())) ) { adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1))); assertNotNull(producer); @@ -331,4 +344,111 @@ public void testControllerListenerName(ClusterInstance cluster) throws Execution assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size()); } } + + @ClusterTest( + types = {Type.KRAFT, Type.CO_KRAFT}, + brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT, + controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT, + serverProperties = { + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + } + ) + public void testSaslPlaintext(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException { + Assertions.assertEquals(SecurityProtocol.SASL_PLAINTEXT, clusterInstance.config().brokerSecurityProtocol()); + + // default ClusterInstance#admin helper with admin credentials + try (Admin admin = clusterInstance.admin()) { + admin.describeAcls(AclBindingFilter.ANY).values().get(); + } + String topic = "sasl-plaintext-topic"; + clusterInstance.createTopic(topic, 1, (short) 1); + try (Producer producer = clusterInstance.producer()) { + producer.send(new ProducerRecord<>(topic, Utils.utf8("key"), Utils.utf8("value"))).get(); + producer.flush(); + } + try (Consumer consumer = clusterInstance.consumer()) { + consumer.subscribe(List.of(topic)); + TestUtils.waitForCondition(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + return records.count() == 1; + }, "Failed to receive message"); + } + + // client with non-admin credentials + Map nonAdminConfig = Map.of( + SaslConfigs.SASL_JAAS_CONFIG, + String.format( + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", + JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD + ) + ); + try (Admin admin = clusterInstance.admin(nonAdminConfig)) { + ExecutionException exception = assertThrows( + ExecutionException.class, + () -> admin.describeAcls(AclBindingFilter.ANY).values().get() + ); + assertInstanceOf(ClusterAuthorizationException.class, exception.getCause()); + } + try (Producer producer = clusterInstance.producer(nonAdminConfig)) { + ExecutionException exception = assertThrows( + ExecutionException.class, + () -> producer.send(new ProducerRecord<>(topic, Utils.utf8("key"), Utils.utf8("value"))).get() + ); + assertInstanceOf(TopicAuthorizationException.class, exception.getCause()); + } + try (Consumer consumer = clusterInstance.consumer(nonAdminConfig)) { + consumer.subscribe(List.of(topic)); + AtomicBoolean hasException = new AtomicBoolean(false); + TestUtils.waitForCondition(() -> { + if (hasException.get()) { + return true; + } + try { + consumer.poll(Duration.ofMillis(100)); + } catch (TopicAuthorizationException e) { + hasException.set(true); + } + return false; + }, "Failed to get exception"); + } + + // client with unknown credentials + Map unknownUserConfig = Map.of( + SaslConfigs.SASL_JAAS_CONFIG, + String.format( + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", + "unknown", "unknown" + ) + ); + try (Admin admin = clusterInstance.admin(unknownUserConfig)) { + ExecutionException exception = assertThrows( + ExecutionException.class, + () -> admin.describeAcls(AclBindingFilter.ANY).values().get() + ); + assertInstanceOf(SaslAuthenticationException.class, exception.getCause()); + } + try (Producer producer = clusterInstance.producer(unknownUserConfig)) { + ExecutionException exception = assertThrows( + ExecutionException.class, + () -> producer.send(new ProducerRecord<>(topic, Utils.utf8("key"), Utils.utf8("value"))).get() + ); + assertInstanceOf(SaslAuthenticationException.class, exception.getCause()); + } + try (Consumer consumer = clusterInstance.consumer(unknownUserConfig)) { + consumer.subscribe(List.of(topic)); + AtomicBoolean hasException = new AtomicBoolean(false); + TestUtils.waitForCondition(() -> { + if (hasException.get()) { + return true; + } + try { + consumer.poll(Duration.ofMillis(100)); + } catch (SaslAuthenticationException e) { + hasException.set(true); + } + return false; + }, "Failed to get exception"); + } + } }