diff --git a/presto-kudu/pom.xml b/presto-kudu/pom.xml
index f5207ceed9c7d..2c50a8c4bafc4 100644
--- a/presto-kudu/pom.xml
+++ b/presto-kudu/pom.xml
@@ -23,6 +23,86 @@
${kudu.version}
+
+ org.apache.hadoop
+ hadoop-common
+ 2.6.0
+
+
+ jaxb-impl
+ com.sun.xml.bind
+
+
+ commons-beanutils-core
+ commons-beanutils
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-lang
+ commons-lang
+
+
+ commons-logging
+ commons-logging
+
+
+ netty
+ io.netty
+
+
+ servlet-api
+ javax.servlet
+
+
+ httpclient
+ org.apache.httpcomponents
+
+
+ httpcore
+ org.apache.httpcomponents
+
+
+ jackson-core-asl
+ org.codehaus.jackson
+
+
+ jackson-mapper-asl
+ org.codehaus.jackson
+
+
+ jdk.tools
+ jdk.tools
+
+
+ jasper-compiler
+ tomcat
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j
+ log4j
+
+
+ jersey-server
+ com.sun.jersey
+
+
+ jersey-core
+ com.sun.jersey
+
+
+ asm
+ asm
+
+
+
+
com.facebook.airlift
bootstrap
diff --git a/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduClientConfig.java b/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduClientConfig.java
index c451eefb47051..887a18705c8ca 100755
--- a/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduClientConfig.java
+++ b/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduClientConfig.java
@@ -40,6 +40,10 @@ public class KuduClientConfig
private boolean disableStatistics;
private boolean schemaEmulationEnabled;
private String schemaEmulationPrefix = "presto::";
+ private boolean kerberosAuthEnabled;
+ private String kerberosPrincipal;
+ private String kerberosKeytab;
+ private boolean kerberosAuthDebugEnabled;
@NotNull
@Size(min = 1)
@@ -138,4 +142,52 @@ public KuduClientConfig setSchemaEmulationEnabled(boolean enabled)
this.schemaEmulationEnabled = enabled;
return this;
}
+
+ public boolean isKerberosAuthEnabled()
+ {
+ return kerberosAuthEnabled;
+ }
+
+ @Config("kudu.kerberos-auth.enabled")
+ public KuduClientConfig setKerberosAuthEnabled(boolean enabled)
+ {
+ this.kerberosAuthEnabled = enabled;
+ return this;
+ }
+
+ public String getKerberosPrincipal()
+ {
+ return kerberosPrincipal;
+ }
+
+ @Config("kudu.kerberos-auth.principal")
+ public KuduClientConfig setKerberosPrincipal(String principal)
+ {
+ this.kerberosPrincipal = principal;
+ return this;
+ }
+
+ public String getKerberosKeytab()
+ {
+ return kerberosKeytab;
+ }
+
+ @Config("kudu.kerberos-auth.keytab")
+ public KuduClientConfig setKerberosKeytab(String keytab)
+ {
+ this.kerberosKeytab = keytab;
+ return this;
+ }
+
+ public boolean isKerberosAuthDebugEnabled()
+ {
+ return kerberosAuthDebugEnabled;
+ }
+
+ @Config("kudu.kerberos-auth.debug.enabled")
+ public KuduClientConfig setKerberosAuthDebugEnabled(boolean enabled)
+ {
+ this.kerberosAuthDebugEnabled = enabled;
+ return this;
+ }
}
diff --git a/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduClientSession.java b/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduClientSession.java
index ff841c9d2e876..bced945efefce 100644
--- a/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduClientSession.java
+++ b/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduClientSession.java
@@ -64,6 +64,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static com.facebook.presto.kudu.KuduUtil.reTryKerberos;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -75,16 +76,19 @@ public class KuduClientSession
private final KuduConnectorId connectorId;
private final KuduClient client;
private final SchemaEmulation schemaEmulation;
+ private final boolean kerberosAuthEnabled;
- public KuduClientSession(KuduConnectorId connectorId, KuduClient client, SchemaEmulation schemaEmulation)
+ public KuduClientSession(KuduConnectorId connectorId, KuduClient client, SchemaEmulation schemaEmulation, boolean kerberosAuthEnabled)
{
this.connectorId = connectorId;
this.client = client;
this.schemaEmulation = schemaEmulation;
+ this.kerberosAuthEnabled = kerberosAuthEnabled;
}
public List listSchemaNames()
{
+ reTryKerberos(kerberosAuthEnabled);
return schemaEmulation.listSchemaNames(client);
}
@@ -105,6 +109,7 @@ private List internalListTables(String prefix)
public List listTables(Optional optSchemaName)
{
+ reTryKerberos(kerberosAuthEnabled);
if (optSchemaName.isPresent()) {
return listTablesSingleSchema(optSchemaName.get());
}
@@ -130,18 +135,21 @@ private List listTablesSingleSchema(String schemaName)
public Schema getTableSchema(KuduTableHandle tableHandle)
{
+ reTryKerberos(kerberosAuthEnabled);
KuduTable table = tableHandle.getTable(this);
return table.getSchema();
}
public Map getTableProperties(KuduTableHandle tableHandle)
{
+ reTryKerberos(kerberosAuthEnabled);
KuduTable table = tableHandle.getTable(this);
return KuduTableProperties.toMap(table);
}
public List buildKuduSplits(KuduTableLayoutHandle layoutHandle)
{
+ reTryKerberos(kerberosAuthEnabled);
KuduTableHandle tableHandle = layoutHandle.getTableHandle();
KuduTable table = tableHandle.getTable(this);
final int primaryKeyColumnCount = table.getSchema().getPrimaryKeyColumnCount();
@@ -185,6 +193,7 @@ public List buildKuduSplits(KuduTableLayoutHandle layoutHandle)
public KuduScanner createScanner(KuduSplit kuduSplit)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
return KuduScanToken.deserializeIntoScanner(kuduSplit.getSerializedScanToken(), client);
}
@@ -195,6 +204,7 @@ public KuduScanner createScanner(KuduSplit kuduSplit)
public KuduTable openTable(SchemaTableName schemaTableName)
{
+ reTryKerberos(kerberosAuthEnabled);
String rawName = schemaEmulation.toRawName(schemaTableName);
try {
return client.openTable(rawName);
@@ -210,21 +220,25 @@ public KuduTable openTable(SchemaTableName schemaTableName)
public KuduSession newSession()
{
+ reTryKerberos(kerberosAuthEnabled);
return client.newSession();
}
public void createSchema(String schemaName)
{
+ reTryKerberos(kerberosAuthEnabled);
schemaEmulation.createSchema(client, schemaName);
}
public void dropSchema(String schemaName)
{
+ reTryKerberos(kerberosAuthEnabled);
schemaEmulation.dropSchema(client, schemaName);
}
public void dropTable(SchemaTableName schemaTableName)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
client.deleteTable(rawName);
@@ -236,6 +250,7 @@ public void dropTable(SchemaTableName schemaTableName)
public void renameTable(SchemaTableName schemaTableName, SchemaTableName newSchemaTableName)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
String newRawName = schemaEmulation.toRawName(newSchemaTableName);
@@ -250,6 +265,7 @@ public void renameTable(SchemaTableName schemaTableName, SchemaTableName newSche
public KuduTable createTable(ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(tableMetadata.getTable());
if (ignoreExisting) {
@@ -276,6 +292,7 @@ public KuduTable createTable(ConnectorTableMetadata tableMetadata, boolean ignor
public void addColumn(SchemaTableName schemaTableName, ColumnMetadata column)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
AlterTableOptions alterOptions = new AlterTableOptions();
@@ -290,6 +307,7 @@ public void addColumn(SchemaTableName schemaTableName, ColumnMetadata column)
public void dropColumn(SchemaTableName schemaTableName, String name)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
AlterTableOptions alterOptions = new AlterTableOptions();
@@ -303,6 +321,7 @@ public void dropColumn(SchemaTableName schemaTableName, String name)
public void renameColumn(SchemaTableName schemaTableName, String oldName, String newName)
{
+ reTryKerberos(kerberosAuthEnabled);
try {
String rawName = schemaEmulation.toRawName(schemaTableName);
AlterTableOptions alterOptions = new AlterTableOptions();
@@ -316,11 +335,13 @@ public void renameColumn(SchemaTableName schemaTableName, String oldName, String
public void addRangePartition(SchemaTableName schemaTableName, RangePartition rangePartition)
{
+ reTryKerberos(kerberosAuthEnabled);
changeRangePartition(schemaTableName, rangePartition, RangePartitionChange.ADD);
}
public void dropRangePartition(SchemaTableName schemaTableName, RangePartition rangePartition)
{
+ reTryKerberos(kerberosAuthEnabled);
changeRangePartition(schemaTableName, rangePartition, RangePartitionChange.DROP);
}
diff --git a/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduModule.java b/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduModule.java
index 4cd0530bf4f63..10ab42d0664d4 100755
--- a/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduModule.java
+++ b/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduModule.java
@@ -91,14 +91,14 @@ KuduClientSession createKuduClientSession(
{
requireNonNull(config, "config is null");
- KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder(config.getMasterAddresses());
- builder.defaultAdminOperationTimeoutMs(config.getDefaultAdminOperationTimeout().toMillis());
- builder.defaultOperationTimeoutMs(config.getDefaultOperationTimeout().toMillis());
- builder.defaultSocketReadTimeoutMs(config.getDefaultSocketReadTimeout().toMillis());
- if (config.isDisableStatistics()) {
- builder.disableStatistics();
+ KuduClient client;
+ if (!config.isKerberosAuthEnabled()) {
+ client = KuduUtil.getKuduClient(config);
+ }
+ else {
+ KuduUtil.initKerberosENV(config.getKerberosPrincipal(), config.getKerberosKeytab(), config.isKerberosAuthDebugEnabled());
+ client = KuduUtil.getKuduKerberosClient(config);
}
- KuduClient client = builder.build();
SchemaEmulation strategy;
if (config.isSchemaEmulationEnabled()) {
@@ -107,6 +107,6 @@ KuduClientSession createKuduClientSession(
else {
strategy = new NoSchemaEmulation();
}
- return new KuduClientSession(connectorId, client, strategy);
+ return new KuduClientSession(connectorId, client, strategy, config.isKerberosAuthEnabled());
}
}
diff --git a/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduUtil.java b/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduUtil.java
new file mode 100644
index 0000000000000..cf0dc76220ff6
--- /dev/null
+++ b/presto-kudu/src/main/java/com/facebook/presto/kudu/KuduUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kudu;
+
+import com.facebook.airlift.log.Logger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kudu.client.KuduClient;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+public class KuduUtil
+{
+ private static final Logger log = Logger.get(KuduUtil.class);
+
+ private KuduUtil()
+ {
+ //not allowed to be called to initialize instance
+ }
+
+ /**
+ * Initialize kerberos authentication
+ */
+ static void initKerberosENV(String principal, String keytab, boolean debugEnabled)
+ {
+ try {
+ Configuration conf = new Configuration();
+ conf.set("hadoop.security.authentication", "kerberos");
+ if (debugEnabled) {
+ System.setProperty("sun.security.krb5.debug", "true");
+ }
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.loginUserFromKeytab(principal, keytab);
+ log.warn("getting connection from kudu with kerberos");
+ log.warn("----------current user: " + UserGroupInformation.getCurrentUser() + "----------");
+ log.warn("----------login user: " + UserGroupInformation.getLoginUser() + "----------");
+ }
+ catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ }
+
+ static KuduClient getKuduKerberosClient(KuduClientConfig config)
+ {
+ KuduClient client = null;
+ try {
+ reTryKerberos(true);
+ client = UserGroupInformation.getLoginUser().doAs(
+ (PrivilegedExceptionAction) () -> getKuduClient(config));
+ }
+ catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ return client;
+ }
+
+ static KuduClient getKuduClient(KuduClientConfig config)
+ {
+ KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder(config.getMasterAddresses());
+ builder.defaultAdminOperationTimeoutMs(config.getDefaultAdminOperationTimeout().toMillis());
+ builder.defaultOperationTimeoutMs(config.getDefaultOperationTimeout().toMillis());
+ builder.defaultSocketReadTimeoutMs(config.getDefaultSocketReadTimeout().toMillis());
+ if (config.isDisableStatistics()) {
+ builder.disableStatistics();
+ }
+ return builder.build();
+ }
+
+ static void reTryKerberos(boolean enabled)
+ {
+ if (enabled) {
+ log.warn("Try relogin kerberos at first!");
+ try {
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
+ }
+ else if (UserGroupInformation.isLoginTicketBased()) {
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+ }
+ catch (IOException e) {
+ log.error("Try relogin kerberos failed!");
+ log.error(e.getMessage());
+ }
+ }
+ }
+}