Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5721b88
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 8, 2016
4ae92d8
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 9, 2016
75bca1b
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 9, 2016
97d21f6
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
rbalamohan Aug 22, 2016
4004c0a
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 22, 2016
9a8838a
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 23, 2016
eb8a955
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 23, 2016
70cf84d
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 24, 2016
c9d677b
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 24, 2016
7385a06
Revert "[SPARK-16948][SQL] Querying empty partitioned orc tables thro…
rbalamohan Aug 24, 2016
1746853
Revert "Revert "[SPARK-16948][SQL] Querying empty partitioned orc tab…
rbalamohan Aug 24, 2016
5f7e5ad
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 24, 2016
e8e2d70
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
0f901aa
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
0772a96
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
6ff7e5d
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
fc14e2d
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
9ecb2ed
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 26, 2016
fa71370
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Sep 22, 2016
e39715e
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Sep 22, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,21 +237,24 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
new Path(metastoreRelation.catalogTable.storage.locationUri.get),
partitionSpec)

val inferredSchema = if (fileType.equals("parquet")) {
val inferredSchema =
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
val inferredSchema =
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we be more specific here? e.g. doing it only if it is parquet or orc.

val schema = if (fileType == "parquet") {
// For Parquet, get correct schema by merging Metastore schema data types
// and Parquet schema field names.
inferredSchema.map { schema =>
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, schema)
}.getOrElse(metastoreSchema)
} else {
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get
// For others (e.g orc), fall back to metastore schema if needed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the fileType is not parquet, the only possible type is orc. To ensure this is always true, we should check fileType == "orc".

inferredSchema.getOrElse(metastoreSchema)
}

val relation = HadoopFsRelation(
sparkSession = sparkSession,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
dataSchema = schema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.sql.hive.orc

import java.io.FileNotFoundException
import java.net.URI
import java.util.Properties

import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
Expand Down Expand Up @@ -54,10 +57,12 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
OrcFileOperator.readSchema(
files.map(_.getPath.toUri.toString),
Some(sparkSession.sessionState.newHadoopConf())
)
// Safe to ignore FileNotFoundException in case no files are found.
val schema = Try(OrcFileOperator.readSchema(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in case this is referred anytime later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know, the changes could impact the behaviors of the callers of inferSchema.

Could you write a test case to cover this scenario?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajeshbalamohan is this change unnecessary for this PR? If so, I'd like to revert it to make the PR as small as possible.

files.map(_.getPath.toUri.toString),
Some(sparkSession.sessionState.newHadoopConf())))
.recover { case _: FileNotFoundException => None }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we ignoring file not found exception here?

schema.get
}

override def prepareWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,48 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}

test("SPARK-16948. Check empty orc partitioned tables in ORC") {
withSQLConf((HiveUtils.CONVERT_METASTORE_ORC.key, "true")) {
withTempPath { dir =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove this line?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this?

withTable("empty_orc_partitioned") {
spark.sql(
s"""CREATE TABLE empty_orc_partitioned(key INT, value STRING)
| PARTITIONED BY (p INT) STORED AS ORC
""".stripMargin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment about the style

          sql(
            """
              |CREATE TABLE empty_orc_partitioned(key INT, value STRING)
              |PARTITIONED BY (p INT) STORED AS ORC
            """.stripMargin)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

          sql(
            """
              |CREATE TABLE empty_orc_partitioned(key INT, value STRING)
              |PARTITIONED BY (p INT) STORED AS ORC
            """.stripMargin)


val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
emptyDF.createOrReplaceTempView("empty")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove this line?


// Query empty table
val df = spark.sql(
s"""SELECT key, value FROM empty_orc_partitioned
| WHERE key > 10
""".stripMargin)
checkAnswer(df, emptyDF)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment about the style.

          checkAnswer(
            sql("SELECT key, value FROM empty_orc_partitioned WHERE key > 10"),
            emptyDF)

}
}

withTempPath { dir =>
withTable("empty_text_partitioned") {
spark.sql(
s"""CREATE TABLE empty_text_partitioned(key INT, value STRING)
| PARTITIONED BY (p INT) STORED AS TEXTFILE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing the textfile format sounds useless. We do not convert it to LogicalRelation.

""".stripMargin)
Copy link
Member

@gatorsmile gatorsmile Aug 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

          sql(
            """
              |CREATE TABLE empty_orc(key INT, value STRING)
              |STORED AS ORC
            """.stripMargin)


val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
emptyDF.createOrReplaceTempView("empty_text")

// Query empty table
val df = spark.sql(
s"""SELECT key, value FROM empty_text_partitioned
| WHERE key > 10
""".stripMargin)
checkAnswer(df, emptyDF)
}
}
}
}

test("SPARK-10623 Enable ORC PPD") {
withTempPath { dir =>
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
Expand Down