diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java index 3d7e3a7941daa..2389aa7fc1e02 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java @@ -157,6 +157,33 @@ public class HoodieHBaseIndexConfig extends HoodieConfig { .withDocumentation("When set to true, the rollback method will delete the last failed task index. " + "The default value is false. Because deleting the index will add extra load on the Hbase cluster for each rollback"); + public static final ConfigProperty SECURITY_AUTHENTICATION = ConfigProperty + .key("hoodie.index.hbase.security.authentication") + .defaultValue("simple") + .withDocumentation("Property to decide if the hbase cluster secure authentication is enabled or not. " + + "Possible values are 'simple' (no authentication), and 'kerberos'."); + + public static final ConfigProperty KERBEROS_USER_KEYTAB = ConfigProperty + .key("hoodie.index.hbase.kerberos.user.keytab") + .noDefaultValue() + .withDocumentation("File name of the kerberos keytab file for connecting to the hbase cluster."); + + public static final ConfigProperty KERBEROS_USER_PRINCIPAL = ConfigProperty + .key("hoodie.index.hbase.kerberos.user.principal") + .noDefaultValue() + .withDocumentation("The kerberos principal name for connecting to the hbase cluster."); + + public static final ConfigProperty REGIONSERVER_PRINCIPAL = ConfigProperty + .key("hoodie.index.hbase.regionserver.kerberos.principal") + .noDefaultValue() + .withDocumentation("The value of hbase.regionserver.kerberos.principal in hbase cluster."); + + public static final ConfigProperty MASTER_PRINCIPAL = ConfigProperty + .key("hoodie.index.hbase.master.kerberos.principal") + .noDefaultValue() + .withDocumentation("The value of hbase.master.kerberos.principal in hbase cluster."); + + /** * @deprecated Use {@link #ZKQUORUM} and its methods instead */ @@ -444,6 +471,31 @@ public Builder hbaseZkZnodeParent(String zkZnodeParent) { return this; } + public Builder hbaseSecurityAuthentication(String authentication) { + hBaseIndexConfig.setValue(SECURITY_AUTHENTICATION, authentication); + return this; + } + + public Builder hbaseKerberosUserKeytab(String keytab) { + hBaseIndexConfig.setValue(KERBEROS_USER_KEYTAB, keytab); + return this; + } + + public Builder hbaseKerberosUserPrincipal(String principal) { + hBaseIndexConfig.setValue(KERBEROS_USER_PRINCIPAL, principal); + return this; + } + + public Builder hbaseKerberosRegionserverPrincipal(String principal) { + hBaseIndexConfig.setValue(REGIONSERVER_PRINCIPAL, principal); + return this; + } + + public Builder hbaseKerberosMasterPrincipal(String principal) { + hBaseIndexConfig.setValue(MASTER_PRINCIPAL, principal); + return this; + } + /** *

* Method to set maximum QPS allowed per Region Server. This should be same across various jobs. This is intended to diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 7b49a7a466785..322c2e84e7e89 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1488,6 +1488,26 @@ public boolean getHBaseIndexShouldComputeQPSDynamically() { return getBoolean(HoodieHBaseIndexConfig.COMPUTE_QPS_DYNAMICALLY); } + public String getHBaseIndexSecurityAuthentication() { + return getString(HoodieHBaseIndexConfig.SECURITY_AUTHENTICATION); + } + + public String getHBaseIndexKerberosUserKeytab() { + return getString(HoodieHBaseIndexConfig.KERBEROS_USER_KEYTAB); + } + + public String getHBaseIndexKerberosUserPrincipal() { + return getString(HoodieHBaseIndexConfig.KERBEROS_USER_PRINCIPAL); + } + + public String getHBaseIndexRegionserverPrincipal() { + return getString(HoodieHBaseIndexConfig.REGIONSERVER_PRINCIPAL); + } + + public String getHBaseIndexMasterPrincipal() { + return getString(HoodieHBaseIndexConfig.MASTER_PRINCIPAL); + } + public int getHBaseIndexDesiredPutsTime() { return getInt(HoodieHBaseIndexConfig.DESIRED_PUTS_TIME_IN_SECONDS); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index fc73a0aed7d70..f841117d5c3a1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -42,7 +42,6 @@ import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionLocation; @@ -60,10 +59,12 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; +import org.apache.spark.SparkFiles; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -72,6 +73,7 @@ import java.io.IOException; import java.io.Serializable; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -150,9 +152,28 @@ private Connection getHBaseConnection() { } String port = String.valueOf(config.getHbaseZkPort()); hbaseConfig.set("hbase.zookeeper.property.clientPort", port); + try { - return ConnectionFactory.createConnection(hbaseConfig); - } catch (IOException e) { + String authentication = config.getHBaseIndexSecurityAuthentication(); + if (authentication.equals("kerberos")) { + hbaseConfig.set("hbase.security.authentication", "kerberos"); + hbaseConfig.set("hadoop.security.authentication", "kerberos"); + hbaseConfig.set("hbase.security.authorization", "true"); + hbaseConfig.set("hbase.regionserver.kerberos.principal", config.getHBaseIndexRegionserverPrincipal()); + hbaseConfig.set("hbase.master.kerberos.principal", config.getHBaseIndexMasterPrincipal()); + + String principal = config.getHBaseIndexKerberosUserPrincipal(); + String keytab = SparkFiles.get(config.getHBaseIndexKerberosUserKeytab()); + + UserGroupInformation.setConfiguration(hbaseConfig); + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); + return ugi.doAs((PrivilegedExceptionAction) () -> + (Connection) ConnectionFactory.createConnection(hbaseConfig) + ); + } else { + return ConnectionFactory.createConnection(hbaseConfig); + } + } catch (IOException | InterruptedException e) { throw new HoodieDependentSystemUnavailableException(HoodieDependentSystemUnavailableException.HBASE, quorum + ":" + port, e); }