diff --git a/presto-docs/src/main/sphinx/connector/kudu.rst b/presto-docs/src/main/sphinx/connector/kudu.rst
index a5cc7e2073d3..53822edb5e79 100644
--- a/presto-docs/src/main/sphinx/connector/kudu.rst
+++ b/presto-docs/src/main/sphinx/connector/kudu.rst
@@ -64,6 +64,22 @@ replacing the properties as appropriate:
## Disable Kudu client's collection of statistics.
#kudu.client.disable-statistics = false
+ #######################
+ ### Advanced Kudu Kerberos authentication configuration
+ #######################
+
+ ## Whether to enable kerberos authentication, default is false
+ #kudu.kerberos-auth.enabled=true
+
+ # whether to output kerberos debug information, default is false
+ #kudu.kerberos-auth.debug.enabled=true
+
+ # The Kerberos principal that Presto will use when connecting to Kudu
+ #kudu.kerberos-auth.principal=xxx
+
+ # Kudu client keytab location
+ #kudu.kerberos-auth.keytab=xxx.keytab
+
Querying Data
-------------
@@ -625,4 +641,3 @@ Known limitations
-----------------
- Only lower case table and column names in Kudu are supported.
-- Using a secured Kudu cluster has not been tested.
diff --git a/presto-kudu/pom.xml b/presto-kudu/pom.xml
index 70e232a589bb..36d6bd8e939c 100644
--- a/presto-kudu/pom.xml
+++ b/presto-kudu/pom.xml
@@ -168,6 +168,11 @@
testcontainers
test
+
+
+ io.prestosql.hadoop
+ hadoop-apache
+
diff --git a/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduClientConfig.java b/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduClientConfig.java
index 5bcbceefc75b..a787e5c15262 100755
--- a/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduClientConfig.java
+++ b/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduClientConfig.java
@@ -40,6 +40,10 @@ public class KuduClientConfig
private boolean disableStatistics;
private boolean schemaEmulationEnabled;
private String schemaEmulationPrefix = "presto::";
+ private boolean kerberosAuthEnabled;
+ private String kerberosPrincipal;
+ private String kerberosKeytab;
+ private boolean kerberosAuthDebugEnabled;
@NotNull
@Size(min = 1)
@@ -138,4 +142,52 @@ public KuduClientConfig setSchemaEmulationEnabled(boolean enabled)
this.schemaEmulationEnabled = enabled;
return this;
}
+
+ public boolean isKerberosAuthEnabled()
+ {
+ return kerberosAuthEnabled;
+ }
+
+ @Config("kudu.kerberos-auth.enabled")
+ public KuduClientConfig setKerberosAuthEnabled(boolean enabled)
+ {
+ this.kerberosAuthEnabled = enabled;
+ return this;
+ }
+
+ public String getKerberosPrincipal()
+ {
+ return kerberosPrincipal;
+ }
+
+ @Config("kudu.kerberos-auth.principal")
+ public KuduClientConfig setKerberosPrincipal(String principal)
+ {
+ this.kerberosPrincipal = principal;
+ return this;
+ }
+
+ public String getKerberosKeytab()
+ {
+ return kerberosKeytab;
+ }
+
+ @Config("kudu.kerberos-auth.keytab")
+ public KuduClientConfig setKerberosKeytab(String keytab)
+ {
+ this.kerberosKeytab = keytab;
+ return this;
+ }
+
+ public boolean isKerberosAuthDebugEnabled()
+ {
+ return kerberosAuthDebugEnabled;
+ }
+
+ @Config("kudu.kerberos-auth.debug.enabled")
+ public KuduClientConfig setKerberosAuthDebugEnabled(boolean enabled)
+ {
+ this.kerberosAuthDebugEnabled = enabled;
+ return this;
+ }
}
diff --git a/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduClientSession.java b/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduClientSession.java
index 486b1f2e3552..2c8b2b7233c0 100644
--- a/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduClientSession.java
+++ b/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduClientSession.java
@@ -65,6 +65,7 @@
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.kudu.KuduUtil.reTryKerberos;
import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
import static io.prestosql.spi.predicate.Marker.Bound.ABOVE;
@@ -81,15 +82,18 @@ public class KuduClientSession
public static final String DEFAULT_SCHEMA = "default";
private final KuduClient client;
private final SchemaEmulation schemaEmulation;
+ private final boolean kerberosAuthEnabled;
- public KuduClientSession(KuduClient client, SchemaEmulation schemaEmulation)
+ public KuduClientSession(KuduClient client, SchemaEmulation schemaEmulation, boolean kerberosAuthEnabled)
{
this.client = client;
this.schemaEmulation = schemaEmulation;
+ this.kerberosAuthEnabled = kerberosAuthEnabled;
}
public List listSchemaNames()
{
+ reTryKerberos(kerberosAuthEnabled);
return schemaEmulation.listSchemaNames(client);
}
@@ -108,6 +112,7 @@ private List internalListTables(String prefix)
public List listTables(Optional optSchemaName)
{
+ reTryKerberos(kerberosAuthEnabled);
if (optSchemaName.isPresent()) {
return listTablesSingleSchema(optSchemaName.get());
}
@@ -136,18 +141,21 @@ private List listTablesSingleSchema(String schemaName)
public Schema getTableSchema(KuduTableHandle tableHandle)
{
+ reTryKerberos(kerberosAuthEnabled);
KuduTable table = tableHandle.getTable(this);
return table.getSchema();
}
public Map getTableProperties(KuduTableHandle tableHandle)
{
+ reTryKerberos(kerberosAuthEnabled);
KuduTable table = tableHandle.getTable(this);
return KuduTableProperties.toMap(table);
}
public List buildKuduSplits(KuduTableHandle tableHandle)
{
+ reTryKerberos(kerberosAuthEnabled);
KuduTable table = tableHandle.getTable(this);
final int primaryKeyColumnCount = table.getSchema().getPrimaryKeyColumnCount();
KuduScanToken.KuduScanTokenBuilder builder = client.newScanTokenBuilder(table);
@@ -210,6 +218,7 @@ public List buildKuduSplits(KuduTableHandle tableHandle)
public KuduScanner createScanner(KuduSplit kuduSplit)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
return KuduScanToken.deserializeIntoScanner(kuduSplit.getSerializedScanToken(), client);
}
@@ -220,6 +229,7 @@ public KuduScanner createScanner(KuduSplit kuduSplit)
public KuduTable openTable(SchemaTableName schemaTableName)
{
+ reTryKerberos(kerberosAuthEnabled);
String rawName = schemaEmulation.toRawName(schemaTableName);
try {
return client.openTable(rawName);
@@ -235,21 +245,25 @@ public KuduTable openTable(SchemaTableName schemaTableName)
public KuduSession newSession()
{
+ reTryKerberos(kerberosAuthEnabled);
return client.newSession();
}
public void createSchema(String schemaName)
{
+ reTryKerberos(kerberosAuthEnabled);
schemaEmulation.createSchema(client, schemaName);
}
public void dropSchema(String schemaName)
{
+ reTryKerberos(kerberosAuthEnabled);
schemaEmulation.dropSchema(client, schemaName);
}
public void dropTable(SchemaTableName schemaTableName)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
client.deleteTable(rawName);
@@ -261,6 +275,7 @@ public void dropTable(SchemaTableName schemaTableName)
public void renameTable(SchemaTableName schemaTableName, SchemaTableName newSchemaTableName)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
String newRawName = schemaEmulation.toRawName(newSchemaTableName);
@@ -275,6 +290,7 @@ public void renameTable(SchemaTableName schemaTableName, SchemaTableName newSche
public KuduTable createTable(ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(tableMetadata.getTable());
if (ignoreExisting) {
@@ -301,6 +317,7 @@ public KuduTable createTable(ConnectorTableMetadata tableMetadata, boolean ignor
public void addColumn(SchemaTableName schemaTableName, ColumnMetadata column)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
AlterTableOptions alterOptions = new AlterTableOptions();
@@ -315,6 +332,7 @@ public void addColumn(SchemaTableName schemaTableName, ColumnMetadata column)
public void dropColumn(SchemaTableName schemaTableName, String name)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
AlterTableOptions alterOptions = new AlterTableOptions();
@@ -328,6 +346,7 @@ public void dropColumn(SchemaTableName schemaTableName, String name)
public void renameColumn(SchemaTableName schemaTableName, String oldName, String newName)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
AlterTableOptions alterOptions = new AlterTableOptions();
@@ -341,11 +360,13 @@ public void renameColumn(SchemaTableName schemaTableName, String oldName, String
public void addRangePartition(SchemaTableName schemaTableName, RangePartition rangePartition)
{
+ reTryKerberos(kerberosAuthEnabled);
changeRangePartition(schemaTableName, rangePartition, RangePartitionChange.ADD);
}
public void dropRangePartition(SchemaTableName schemaTableName, RangePartition rangePartition)
{
+ reTryKerberos(kerberosAuthEnabled);
changeRangePartition(schemaTableName, rangePartition, RangePartitionChange.DROP);
}
diff --git a/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduModule.java b/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduModule.java
index afa06a4e7deb..c6a6dffbd8e9 100755
--- a/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduModule.java
+++ b/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduModule.java
@@ -83,14 +83,14 @@ KuduClientSession createKuduClientSession(KuduClientConfig config)
{
requireNonNull(config, "config is null");
- KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder(config.getMasterAddresses());
- builder.defaultAdminOperationTimeoutMs(config.getDefaultAdminOperationTimeout().toMillis());
- builder.defaultOperationTimeoutMs(config.getDefaultOperationTimeout().toMillis());
- builder.defaultSocketReadTimeoutMs(config.getDefaultSocketReadTimeout().toMillis());
- if (config.isDisableStatistics()) {
- builder.disableStatistics();
+ KuduClient client;
+ if (!config.isKerberosAuthEnabled()) {
+ client = KuduUtil.createKuduClient(config);
+ }
+ else {
+ KuduUtil.initKerberosENV(config.getKerberosPrincipal(), config.getKerberosKeytab(), config.isKerberosAuthDebugEnabled());
+ client = KuduUtil.createKuduKerberosClient(config);
}
- KuduClient client = builder.build();
SchemaEmulation strategy;
if (config.isSchemaEmulationEnabled()) {
@@ -99,6 +99,6 @@ KuduClientSession createKuduClientSession(KuduClientConfig config)
else {
strategy = new NoSchemaEmulation();
}
- return new KuduClientSession(client, strategy);
+ return new KuduClientSession(client, strategy, config.isKerberosAuthEnabled());
}
}
diff --git a/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduUtil.java b/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduUtil.java
new file mode 100644
index 000000000000..65f9f08ed9bb
--- /dev/null
+++ b/presto-kudu/src/main/java/io/prestosql/plugin/kudu/KuduUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.kudu;
+
+import io.airlift.log.Logger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kudu.client.KuduClient;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+public class KuduUtil
+{
+ private static final Logger log = Logger.get(KuduUtil.class);
+
+ private KuduUtil()
+ {
+ // not allowed to be called to initialize instance
+ }
+
+ /**
+ * Initialize kerberos authentication
+ */
+ static void initKerberosENV(String principal, String keytab, boolean debugEnabled)
+ {
+ try {
+ Configuration conf = new Configuration(false);
+ conf.set("hadoop.security.authentication", "kerberos");
+ if (debugEnabled) {
+ System.setProperty("sun.security.krb5.debug", "true");
+ }
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.loginUserFromKeytab(principal, keytab);
+ log.info("Connecting to kudu with kerberos authentication");
+ log.info("Current user: " + UserGroupInformation.getCurrentUser());
+ log.info("Login user: " + UserGroupInformation.getLoginUser());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static KuduClient createKuduKerberosClient(KuduClientConfig config)
+ {
+ KuduClient client = null;
+ try {
+ reTryKerberos(true);
+ client = UserGroupInformation.getLoginUser().doAs(
+ (PrivilegedExceptionAction) () -> createKuduClient(config));
+ return client;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static KuduClient createKuduClient(KuduClientConfig config)
+ {
+ KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder(config.getMasterAddresses());
+ builder.defaultAdminOperationTimeoutMs(config.getDefaultAdminOperationTimeout().toMillis());
+ builder.defaultOperationTimeoutMs(config.getDefaultOperationTimeout().toMillis());
+ builder.defaultSocketReadTimeoutMs(config.getDefaultSocketReadTimeout().toMillis());
+ if (config.isDisableStatistics()) {
+ builder.disableStatistics();
+ }
+ return builder.build();
+ }
+
+ static void reTryKerberos(boolean enabled)
+ {
+ if (enabled) {
+ log.debug("Try relogin kerberos at first!");
+ try {
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
+ }
+ else if (UserGroupInformation.isLoginTicketBased()) {
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}