Skip to content

Commit

Permalink
KAFKA-17921: Support SASL_PLAINTEXT protocol with java.security.auth.…
Browse files Browse the repository at this point in the history
…login.config

Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Nov 26, 2024
1 parent 55577e7 commit 535eedb
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 12 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control-test-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@
<allow pkg="org" />
<allow pkg="kafka" />
<allow pkg="scala.jdk.javaapi" />
<allow pkg="javax.security" />
</import-control>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.test;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class JaasModule {

public static JaasModule plainLoginModule(String username, String password, boolean debug, Map<String, String> validUsers) {
String name = "org.apache.kafka.common.security.plain.PlainLoginModule";

Map<String, String> entries = new HashMap<>();
entries.put("username", username);
entries.put("password", password);
validUsers.forEach((user, pass) -> entries.put("user_" + user, pass));

return new JaasModule(
name,
debug,
entries
);
}

private final String name;

private final boolean debug;

private final Map<String, String> entries;

private JaasModule(String name, boolean debug, Map<String, String> entries) {
this.name = name;
this.debug = debug;
this.entries = entries;
}

public String name() {
return name;
}

public boolean debug() {
return debug;
}

@Override
public String toString() {
return String.format("%s required%n debug=%b%n %s;%n", name, debug, entries.entrySet().stream()
.map(e -> e.getKey() + "=\"" + e.getValue() + "\"")
.collect(Collectors.joining("\n ")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.test;

import org.apache.kafka.common.security.JaasUtils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

import javax.security.auth.login.Configuration;

public class JaasTestUtils {
public static class JaasSection {
private final String contextName;
private final List<JaasModule> modules;

public JaasSection(String contextName, List<JaasModule> modules) {
this.contextName = contextName;
this.modules = modules;
}

public List<JaasModule> getModules() {
return modules;
}

public String getContextName() {
return contextName;
}

@Override
public String toString() {
return String.format("%s {%n %s%n};%n",
contextName,
modules.stream().map(Object::toString).collect(Collectors.joining("\n ")));
}
}

public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer";

public static final String KAFKA_PLAIN_USER = "plain-user";
public static final String KAFKA_PLAIN_PASSWORD = "plain-user-secret";
public static final String KAFKA_PLAIN_ADMIN = "plain-admin";
public static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret";

public static File writeJaasContextsToFile(List<JaasSection> jaasSections) throws IOException {
File jaasFile = TestUtils.tempFile();
try (FileOutputStream fileStream = new FileOutputStream(jaasFile);
OutputStreamWriter writer = new OutputStreamWriter(fileStream, StandardCharsets.UTF_8);) {
writer.write(String.join("", jaasSections.stream().map(Object::toString).toArray(String[]::new)));
}
return jaasFile;
}

public static void refreshJavaLoginConfigParam(File file) {
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file.getAbsolutePath());
// This will cause a reload of the Configuration singleton when `getConfiguration` is called
Configuration.setConfiguration(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.network.SocketServerConfigs;
Expand Down Expand Up @@ -176,6 +179,15 @@ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
// reduce log cleaner offset map memory usage
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");

if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN");
props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN");
props.putIfAbsent(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN");
props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, StandardAuthorizer.class.getName());
props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG, "User:" + JaasTestUtils.KAFKA_PLAIN_ADMIN);
}

// Add associated broker node property overrides
if (brokerNode != null) {
props.putAll(brokerNode.propertyOverrides());
Expand All @@ -194,6 +206,18 @@ public KafkaClusterTestKit build() throws Exception {
Map<Integer, BrokerServer> brokers = new HashMap<>();
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
File jaasFile = null;

if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
jaasFile = JaasTestUtils.writeJaasContextsToFile(List.of(
new JaasTestUtils.JaasSection(JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME,
List.of(JaasModule.plainLoginModule(
JaasTestUtils.KAFKA_PLAIN_ADMIN, JaasTestUtils.KAFKA_PLAIN_ADMIN_PASSWORD, true,
Map.of(JaasTestUtils.KAFKA_PLAIN_USER, JaasTestUtils.KAFKA_PLAIN_PASSWORD,
JaasTestUtils.KAFKA_PLAIN_ADMIN, JaasTestUtils.KAFKA_PLAIN_ADMIN_PASSWORD))))
));
JaasTestUtils.refreshJavaLoginConfigParam(jaasFile);
}

try {
baseDirectory = new File(nodes.baseDirectory());
Expand Down Expand Up @@ -272,7 +296,8 @@ public KafkaClusterTestKit build() throws Exception {
brokers,
baseDirectory,
faultHandlerFactory,
socketFactoryManager);
socketFactoryManager,
jaasFile == null ? Optional.empty() : Optional.of(jaasFile));
}

private String listeners(int node) {
Expand Down Expand Up @@ -316,14 +341,16 @@ private static void setupNodeDirectories(File baseDirectory,
private final SimpleFaultHandlerFactory faultHandlerFactory;
private final PreboundSocketFactoryManager socketFactoryManager;
private final String controllerListenerName;
private final Optional<File> jaasFile;

private KafkaClusterTestKit(
TestKitNodes nodes,
Map<Integer, ControllerServer> controllers,
Map<Integer, BrokerServer> brokers,
File baseDirectory,
SimpleFaultHandlerFactory faultHandlerFactory,
PreboundSocketFactoryManager socketFactoryManager
PreboundSocketFactoryManager socketFactoryManager,
Optional<File> jaasFile
) {
/*
Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
Expand All @@ -339,6 +366,7 @@ private KafkaClusterTestKit(
this.faultHandlerFactory = faultHandlerFactory;
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
this.jaasFile = jaasFile;
}

public void format() throws Exception {
Expand Down Expand Up @@ -603,6 +631,9 @@ public void close() throws Exception {
waitForAllFutures(futureEntries);
futureEntries.clear();
Utils.delete(baseDirectory);
if (jaasFile.isPresent()) {
Utils.delete(jaasFile.get());
}
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ public TestKitNodes build() {
throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
}
// TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT || controllerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT && brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT) ||
(controllerSecurityProtocol != SecurityProtocol.PLAINTEXT && controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT)) {
throw new IllegalArgumentException("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol");
}
if (baseDirectory == null) {
this.baseDirectory = TestUtils.tempDirectory().toPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public class TestKitNodeTest {
@ParameterizedTest
@EnumSource(SecurityProtocol.class)
public void testSecurityProtocol(SecurityProtocol securityProtocol) {
if (securityProtocol != SecurityProtocol.PLAINTEXT) {
assertEquals("Currently only support PLAINTEXT security protocol",
if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol != SecurityProtocol.SASL_PLAINTEXT) {
assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol",
assertThrows(IllegalArgumentException.class,
() -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
assertEquals("Currently only support PLAINTEXT security protocol",
assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol",
assertThrows(IllegalArgumentException.class,
() -> new TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.common.test.api;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
Expand All @@ -28,9 +29,15 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.JaasTestUtils;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
Expand Down Expand Up @@ -272,12 +279,12 @@ public void testCreateProducerAndConsumer(ClusterInstance cluster) throws Interr
String value = "value";
try (Admin adminClient = cluster.admin();
Producer<String, String> producer = cluster.producer(Map.of(
ACKS_CONFIG, "all",
KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()));
ACKS_CONFIG, "all",
KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()));
Consumer<String, String> consumer = cluster.consumer(Map.of(
KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()))
KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()))
) {
adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1)));
assertNotNull(producer);
Expand Down Expand Up @@ -330,4 +337,42 @@ public void testControllerListenerName(ClusterInstance cluster) throws Execution
assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
}
}

@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT},
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
public void testSaslPlaintext(ClusterInstance clusterInstance) {
Assertions.assertEquals(SecurityProtocol.SASL_PLAINTEXT, clusterInstance.config().brokerSecurityProtocol());

Map<String, Object> configs = new HashMap<>();
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

// client with admin credentials
configs.put(SaslConfigs.SASL_JAAS_CONFIG,
String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
JaasTestUtils.KAFKA_PLAIN_ADMIN, JaasTestUtils.KAFKA_PLAIN_ADMIN_PASSWORD));
try (Admin admin = clusterInstance.admin(configs)) {
Assertions.assertDoesNotThrow(() -> admin.describeAcls(AclBindingFilter.ANY).values().get());
}

// client with non-admin credentials
configs.put(SaslConfigs.SASL_JAAS_CONFIG,
String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
JaasTestUtils.KAFKA_PLAIN_USER, JaasTestUtils.KAFKA_PLAIN_PASSWORD));
try (Admin admin = clusterInstance.admin(configs)) {
ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> admin.describeAcls(AclBindingFilter.ANY).values().get());
Assertions.assertInstanceOf(ClusterAuthorizationException.class, exception.getCause());
}

// client wit unknown credentials
configs.put(SaslConfigs.SASL_JAAS_CONFIG,
String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
"unknown", "unknown"));
try (Admin admin = clusterInstance.admin(configs)) {
ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> admin.describeAcls(AclBindingFilter.ANY).values().get());
Assertions.assertInstanceOf(SaslAuthenticationException.class, exception.getCause());
}
}
}

0 comments on commit 535eedb

Please sign in to comment.