Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
49a2350
add the code change to fix the sql query
SaurabhChawla100 Jul 8, 2020
195bc23
handle the scenrios for orc file created by hive data
SaurabhChawla100 Jul 9, 2020
9b51d67
Refactor the unit test as per the review comments
SaurabhChawla100 Jul 10, 2020
b9282d0
Remove drop table from the test as its already handled in withTable
SaurabhChawla100 Jul 11, 2020
ff938a5
Refactor the code change as per the review comments
SaurabhChawla100 Jul 11, 2020
45be048
add the unit test for ORC_VECTORIZED_READER_ENABLED
SaurabhChawla100 Jul 11, 2020
c2ca484
add the code change to handle the schema sent through requestedColumnIds
SaurabhChawla100 Jul 12, 2020
f8f7aff
removed the extra import from OrcFileFormat.scala
SaurabhChawla100 Jul 12, 2020
c2d7a21
refactor the code using seprate varible for sendActualSchema
SaurabhChawla100 Jul 13, 2020
4469e6d
Merge the orcimpl and vectorised test for complete code coverage
SaurabhChawla100 Jul 13, 2020
743ffe3
Merge remote-tracking branch 'os/master' into SPARK-32234
SaurabhChawla100 Jul 13, 2020
9de3516
reafactor the code and add similar logic in file source v2 code path
SaurabhChawla100 Jul 14, 2020
f8ece1f
refactor the code
SaurabhChawla100 Jul 14, 2020
5b8715e
Refactor some method return type and use it the code
SaurabhChawla100 Jul 15, 2020
d0f6b9b
Removed the code of location for create table in unit test
SaurabhChawla100 Jul 15, 2020
c79794f
fix the scala style in the unit test
SaurabhChawla100 Jul 15, 2020
cf68729
created new method for duplicate code
SaurabhChawla100 Jul 15, 2020
6150b08
switch the name of requestedColIds and requestedDataColIds
SaurabhChawla100 Jul 15, 2020
c0f6209
fix the name of the unit test
SaurabhChawla100 Jul 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ class OrcFileFormat
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
val capacity = sqlConf.orcVectorizedReaderBatchSize

val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
OrcConf.MAPRED_INPUT_SCHEMA.setString(hadoopConf, resultSchemaString)
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)

val broadcastedConf =
Expand All @@ -179,16 +177,18 @@ class OrcFileFormat

val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
val requestedColIdsOrEmptyFile =
val resultedColPruneInfo =
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, requiredSchema, reader, conf)
}

if (requestedColIdsOrEmptyFile.isEmpty) {
if (resultedColPruneInfo.isEmpty) {
Iterator.empty
} else {
val requestedColIds = requestedColIdsOrEmptyFile.get
val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
dataSchema, resultSchema, partitionSchema, conf)
assert(requestedColIds.length == requiredSchema.length,
"[BUG] requested column IDs do not match required schema")
val taskConf = new Configuration(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer}
import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -116,15 +116,17 @@ object OrcUtils extends Logging {
}

/**
* Returns the requested column ids from the given ORC file. Column id can be -1, which means the
* requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty.
* @return Returns the combination of requested column ids from the given ORC file and
* boolean flag to find if the pruneCols is allowed or not. Requested Column id can be
* -1, which means the requested column doesn't exist in the ORC file. Returns None
* if the given ORC file is empty.
*/
def requestedColumnIds(
isCaseSensitive: Boolean,
dataSchema: StructType,
requiredSchema: StructType,
reader: Reader,
conf: Configuration): Option[Array[Int]] = {
conf: Configuration): Option[(Array[Int], Boolean)] = {
val orcFieldNames = reader.getSchema.getFieldNames.asScala
if (orcFieldNames.isEmpty) {
// SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
Expand All @@ -136,14 +138,18 @@ object OrcUtils extends Logging {
assert(orcFieldNames.length <= dataSchema.length, "The given data schema " +
s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " +
"no idea which columns were dropped, fail to read.")
// for ORC file written by Hive, no field names
// in the physical schema, there is a need to send the
// entire dataSchema instead of required schema.
// So pruneCols is not done in this case
Some(requiredSchema.fieldNames.map { name =>
val index = dataSchema.fieldIndex(name)
if (index < orcFieldNames.length) {
index
} else {
-1
}
})
}, false)
} else {
if (isCaseSensitive) {
Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) =>
Expand All @@ -152,7 +158,7 @@ object OrcUtils extends Logging {
} else {
-1
}
})
}, true)
} else {
// Do case-insensitive resolution only if in case-insensitive mode
val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT))
Expand All @@ -170,7 +176,7 @@ object OrcUtils extends Logging {
idx
}
}.getOrElse(-1)
})
}, true)
}
}
}
Expand Down Expand Up @@ -199,4 +205,25 @@ object OrcUtils extends Logging {
s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
case _ => dt.catalogString
}

/**
* @return Returns the result schema string based on the canPruneCols flag.
* resultSchemaString will be created using resultsSchema in case of
* canPruneCols is true and for canPruneCols as false value
* resultSchemaString will be created using the actual dataSchema.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description is not clear enough. This utility function also changed the value of conf. We need to document it.

@SaurabhChawla100 Could you submit a follow-up PR to improve the description?

Copy link
Contributor Author

@SaurabhChawla100 SaurabhChawla100 Jul 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile - This is the new helper method that we have added as the part of this PR

sure I Will update the description in the follow-up PR . Shall I raised the PR against the new Jira or with this same jira . Since this Jira is already resolved

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall I raised the PR against the new Jira or with this same jira

Its okay to refer to this JIRA ticket. Then, please add [FOLLOWUP] in the PR title.

*/
def orcResultSchemaString(
canPruneCols: Boolean,
dataSchema: StructType,
resultSchema: StructType,
partitionSchema: StructType,
conf: Configuration): String = {
val resultSchemaString = if (canPruneCols) {
OrcUtils.orcTypeDescriptionString(resultSchema)
} else {
OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
}
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
resultSchemaString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,24 @@ case class OrcPartitionReaderFactory(
override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = {
val conf = broadcastedConf.value.value

val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)

val filePath = new Path(new URI(file.filePath))

val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
val requestedColIdsOrEmptyFile =
val resultedColPruneInfo =
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, reader, conf)
}

if (requestedColIdsOrEmptyFile.isEmpty) {
if (resultedColPruneInfo.isEmpty) {
new EmptyPartitionReader[InternalRow]
} else {
val requestedColIds = requestedColIdsOrEmptyFile.get
val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
dataSchema, resultSchema, partitionSchema, conf)
assert(requestedColIds.length == readDataSchema.length,
"[BUG] requested column IDs do not match required schema")

Expand Down Expand Up @@ -112,24 +112,25 @@ case class OrcPartitionReaderFactory(
override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = {
val conf = broadcastedConf.value.value

val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)

val filePath = new Path(new URI(file.filePath))

val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
val requestedColIdsOrEmptyFile =
val resultedColPruneInfo =
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, reader, conf)
}

if (requestedColIdsOrEmptyFile.isEmpty) {
if (resultedColPruneInfo.isEmpty) {
new EmptyPartitionReader
} else {
val requestedColIds = requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1)
val (requestedDataColIds, canPruneCols) = resultedColPruneInfo.get
val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
dataSchema, resultSchema, partitionSchema, conf)
val requestedColIds = requestedDataColIds ++ Array.fill(partitionSchema.length)(-1)
assert(requestedColIds.length == resultSchema.length,
"[BUG] requested column IDs do not match required schema")
val taskConf = new Configuration(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,32 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton {
}
}
}

test("SPARK-32234 read ORC table with column names all starting with '_col'") {
Seq("native", "hive").foreach { orcImpl =>
Seq("false", "true").foreach { vectorized =>
withSQLConf(
SQLConf.ORC_IMPLEMENTATION.key -> orcImpl,
SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized) {
withTable("test_hive_orc_impl") {
spark.sql(
s"""
| CREATE TABLE test_hive_orc_impl
| (_col1 INT, _col2 STRING, _col3 INT)
| STORED AS ORC
""".stripMargin)
spark.sql(
s"""
| INSERT INTO
| test_hive_orc_impl
| VALUES(9, '12', 2020)
""".stripMargin)

val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl")
checkAnswer(df, Row("12"))
}
}
}
}
}
}