From 7009ef0510dae444c72e7513357e681b08379603 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Sat, 5 Nov 2022 21:29:02 +0900 Subject: [PATCH] [SPARK-32380][SQL] Fixing access of HBase table via Hive from Spark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? This is an update of https://github.com/apache/spark/pull/29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section). The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'. - environments (Cloudera distribution 7.1.7.SP1): hadoop 3.1.1 hive 3.1.300 spark 3.2.1 hbase 2.2.3 ### Why are the changes needed? With the `NewHadoopRDD` the following exception is raised: ``` java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details. at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704) at org.apache.spark.sql.Dataset.head(Dataset.scala:2728) at org.apache.spark.sql.Dataset.take(Dataset.scala:2935) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287) at org.apache.spark.sql.Dataset.showString(Dataset.scala:326) at org.apache.spark.sql.Dataset.show(Dataset.scala:806) at org.apache.spark.sql.Dataset.show(Dataset.scala:765) at org.apache.spark.sql.Dataset.show(Dataset.scala:774) ... 47 elided Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557) at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248) ... 86 more ``` ### Short summary of the root cause There are two interfaces: - the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List`) - the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`) And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here. ### Detailed analyses Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target). Spark in `NewHadoopRDD` uses the new interface providing the one arg method: https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136 Hive on the other hand binds the initialisation to the two args method coming from the old interface. See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268): ``` Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException { ``` This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too: ``` initializeTable(conn, tableName); ``` Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier. By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230). Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237) which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640): ```java /** * Handle subclass specific set up. Each of the entry points used by the MapReduce framework, * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)}, * will call {link #initialize(JobContext)} as a convenient centralized location to handle * retrieving the necessary configuration information and calling * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize * call such that it is safe to call multiple times. The current TableInputFormatBase * implementation relies on a non-null table reference to decide if an initialize call is needed, * but this behavior may change in the future. In particular, it is critical that initializeTable * not be called multiple times since this will leak Connection instances. */ protected void initialize(JobContext context) throws IOException { } ``` This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? 1) create hbase table ``` hbase(main):001:0>create 'hbase_test1', 'cf1' hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123' ``` 2) create hive table related to hbase table hive> ``` CREATE EXTERNAL TABLE `hivetest.hbase_test`( `key` string COMMENT '', `value` string COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe' STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ( 'hbase.columns.mapping'=':key,cf1:v1', 'serialization.format'='1') TBLPROPERTIES ( 'hbase.table.name'='hbase_test') ```   3): spark-shell query hive table while data in HBase ``` scala> spark.sql("select * from hivetest.hbase_test").show() 22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist 22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f +---+-----+ |key|value| +---+-----+ | r1| 123| +---+-----+ ``` Closes #38516 from attilapiros/SPARK-32380. Authored-by: attilapiros Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/hive/TableReader.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 6a6ff6ca948cf..c5e8a9f17640c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -299,6 +299,16 @@ class HadoopTableReader( } } + /** + * True if the new org.apache.hadoop.mapreduce.InputFormat is implemented (except + * HiveHBaseTableInputFormat where although the new interface is implemented by base HBase class + * the table inicialization in the Hive layer only happens via the old interface methods - + * for more details see SPARK-32380). + */ + private def compatibleWithNewHadoopRDD(inputClass: Class[_ <: oldInputClass[_, _]]): Boolean = + classOf[newInputClass[_, _]].isAssignableFrom(inputClass) && + !inputClass.getName.equalsIgnoreCase("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat") + /** * The entry of creating a RDD. * [SPARK-26630] Using which HadoopRDD will be decided by the input format of tables. @@ -307,7 +317,7 @@ class HadoopTableReader( */ private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { val inputFormatClazz = localTableDesc.getInputFileFormatClass - if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) { + if (compatibleWithNewHadoopRDD(inputFormatClazz)) { createNewHadoopRDD(localTableDesc, inputPathStr) } else { createOldHadoopRDD(localTableDesc, inputPathStr) @@ -316,7 +326,7 @@ class HadoopTableReader( private def createHadoopRDD(partitionDesc: PartitionDesc, inputPathStr: String): RDD[Writable] = { val inputFormatClazz = partitionDesc.getInputFileFormatClass - if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) { + if (compatibleWithNewHadoopRDD(inputFormatClazz)) { createNewHadoopRDD(partitionDesc, inputPathStr) } else { createOldHadoopRDD(partitionDesc, inputPathStr)