Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,30 @@ class HadoopTableReader(
val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)

// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)

val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}
val locationPath = new Path(inputPathStr)
val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about replacing sparkSession.sessionState.newHadoopConf() by broadcastedHadoopConf.value.value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~


// if the table location is not exists, return an empty RDD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is not exists -> does not exist

if (!fs.exists(locationPath)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, how about hive serde tables with storage handler? their data is stored in somewhere else(maybe in hbase) and the table path is always non-existing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! Thanks!
I test it in Hive, when the table created by stored by(e.g. HBase), even if there is a table path created under warehouse path when we create the table, but there is no data files exist after we insert into the table, and it is ok to select data after we delete the table path.

new EmptyRDD[InternalRow](sparkSession.sparkContext)
} else {
// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)

deserializedHadoopRDD
val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}

deserializedHadoopRDD
}
}

override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils

import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{EmptyRDD, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
Expand Down Expand Up @@ -1494,4 +1495,148 @@ class HiveDDLSuite
}
}
}

test("insert data to a hive serde table which has a not existed location should succeed") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: non-existing location

withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getCanonicalPath}")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert to Path and compare please


val tableLocFile = new File(table.location.stripPrefix("file:"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new File(new URI(table.location))? please avoid .stripPrefix("file:") which looks very hacky.

tableLocFile.delete()
assert(!tableLocFile.exists())
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists())
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

Utils.deleteRecursively(dir)
assert(!tableLocFile.exists())
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
assert(tableLocFile.exists())
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '${newDirFile.getAbsolutePath}'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location.stripSuffix("/") == newDirFile.getAbsolutePath.stripSuffix("/"))
assert(!newDirFile.exists())

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
assert(newDirFile.exists())
}
}
}

test("insert into a hive serde table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val partLoc = new File(s"${dir.getAbsolutePath}/a=1")
Utils.deleteRecursively(partLoc)
assert(!partLoc.exists())
// insert overwrite into a partition which location has been deleted.
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)

val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " +
s"'${newDirFile.getAbsolutePath}'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorten it to a single line?

assert(!newDirFile.exists())

// insert into a partition which location does not exists.
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 9, 10")
assert(newDirFile.exists())
checkAnswer(spark.table("t"), Row(9, 10, 1, 2) :: Nil)
}
}
}

test("read data from a hive serde table which has a not existed location should succeed") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only test case failed without this fix. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is~

withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getAbsolutePath}")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '${newDirFile.getAbsolutePath}'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location.stripSuffix("/") == newDirFile.getAbsolutePath.stripSuffix("/"))
assert(!newDirFile.exists())
checkAnswer(spark.table("t"), Nil)
}
}
}

test("read data from a hive serde table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless?


spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " +
s"'${newDirFile.getAbsolutePath}'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorten it to a single line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

101 characters...
let me modify some code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, 101 is still ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e...is't it 100? let me test it...
I have modify some code to make it moer clear

assert(!newDirFile.exists())
// select from a partition which location has changed to a not existed location
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why setting this conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't set it,it will throw an exception,if we set it,it will check if the partition path exists,and will not throw exception just return emptyrdd even if path not existed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expected? I think hive will always return empty result right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this conf will be removed soon, as it has bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~thanks~ then here we also need to modify something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, hive return empty , if there is a bug here(could you describe what the bug is?), we can remove the conf ,and always return result?

checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 5, 6")
checkAnswer(spark.table("t"), Row(5, 6, 1, 2) :: Nil)
assert(newDirFile.exists())

// select from a partition which location has been deleted.
Utils.deleteRecursively(newDirFile)
assert(!newDirFile.exists())
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}
}
}
}
}