Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,23 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
* @throws IllegalArgumentException if a field with the given name does not exist
*/
def apply(name: String): StructField = {
nameToField.getOrElse(name,
throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
if (name.contains('.')) {
Copy link
Member

@HyukjinKwon HyukjinKwon Sep 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this will drop the support to access the field name containing . (e.g. "a.b") which can be accessed via "a.b". Could you confirm this please?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for your review, mix the recursively get with the default apply has this problem, I fixed it in next patch and use ',' which is a invalid character in Parquet schema

val curFieldStr = name.split("\\.", 2)(0)
val nextFieldStr = name.split("\\.", 2)(1)
val curField = nameToField.getOrElse(curFieldStr,
throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
curField.dataType match {
case st: StructType =>
val newField = StructType(st.fields).apply(nextFieldStr)
StructField(curField.name, StructType(Seq(newField)),
curField.nullable, curField.metadata)
case _ =>
throw new IllegalArgumentException(s"""Field "$curFieldStr" is not struct field.""")
}
} else {
nameToField.getOrElse(name,
throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.StructType

/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
Expand Down Expand Up @@ -97,7 +98,15 @@ object FileSourceStrategy extends Strategy with Logging {
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
val outputSchema = if (fsRelation.sqlContext.conf.isParquetNestColumnPruning) {
Copy link
Member

@HyukjinKwon HyukjinKwon Sep 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will affect all other data sources. I am pretty sure any tests related with this will not pass.

Copy link
Member Author

@xuanyuanking xuanyuanking Sep 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I run all datasource tests by
./build/sbt "test-only org.apache.spark.sql.execution.datasources.*"
three test failed, but run failed suit seperately, all tests can passwd

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three tests failed because a datetime check error, correct answer is like '2015-05-23' but the spark answer is '2015-05-22', I don't think this error is made by my patch.
Do anybody have the same problem before? it really confusing me and running test suit seperate it will pass!

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant enabling/disabling affects the other data sources. I see it's disabled by default. I ran ,for example, JsonSuite after manually enabling this option (after leaving the comment above) and saw some failures related with nested structures.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do you mind if I ask which tests were failed? I will try to reproduce by myself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's my mistake here, I can only make sure this patch work for parquet,so I should check the fileFormat here, also like the config namespace(spark.sql.parquet.nestColumnPruning), it can only work for parquet. I add a patch to fix this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the command

./build/sbt "test-only org.apache.spark.sql.execution.datasources.*"

locally and three test suits failed

[error] Failed tests:
[error]     org.apache.spark.sql.execution.datasources.csv.CSVSuite
[error]     org.apache.spark.sql.execution.datasources.json.JsonSuite
[error]     org.apache.spark.sql.execution.datasources.parquet.ParquetPartitionDiscoverySuite

But I run them separately, all three pass. Also I run

./build/sbt "test-only org.apache.spark.sql.execution.csv.*"
./build/sbt "test-only org.apache.spark.sql.execution.json.*"

all tests pass.

val requiredColumnsWithNesting = generateRequiredColumnsContainsNesting(
projects, readDataColumns.attrs.map(_.name).toArray)
val totalSchema = readDataColumns.toStructType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe fullSchema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

val prunedSchema = StructType(requiredColumnsWithNesting.map(totalSchema(_)))
// Merge schema in same StructType and merge with filterAttributes
prunedSchema.fields.map(f => StructType(Array(f))).reduceLeft(_ merge _)
.merge(filterAttributes.toSeq.toStructType)
} else readDataColumns.toStructType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please re-format the above change to the following format:

if (
  ... &&
  ...
) {
  ...
} else {
  ...
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
Expand Down Expand Up @@ -126,4 +135,56 @@ object FileSourceStrategy extends Strategy with Logging {

case _ => Nil
}

private def generateRequiredColumnsContainsNesting(projects: Seq[Expression],
columns: Array[String]) : Array[String] = {
def generateAttributeMap(nestFieldMap: scala.collection.mutable.Map[String, Seq[String]],
isNestField: Boolean, curString: Option[String],
node: Expression) {
node match {
case ai: GetArrayItem =>
// Here we drop the curString for simplify array and map support.
// Same strategy in GetArrayStructFields and GetMapValue
generateAttributeMap(nestFieldMap, isNestField = true, None, ai.child)

case asf: GetArrayStructFields =>
generateAttributeMap(nestFieldMap, isNestField = true, None, asf.child)

case mv: GetMapValue =>
generateAttributeMap(nestFieldMap, isNestField = true, None, mv.child)

case attr: AttributeReference =>
if (isNestField && curString.isDefined) {
val attrStr = attr.name
if (nestFieldMap.contains(attrStr)) {
nestFieldMap(attrStr) = nestFieldMap(attrStr) ++ Seq(attrStr + "." + curString.get)
} else {
nestFieldMap += (attrStr -> Seq(attrStr + "." + curString.get))
}
}
case sf: GetStructField =>
val str = if (curString.isDefined) {
sf.name.get + "." + curString.get
} else sf.name.get
generateAttributeMap(nestFieldMap, isNestField = true, Option(str), sf.child)
case _ =>
if (node.children.nonEmpty) {
node.children.foreach(child => generateAttributeMap(nestFieldMap,
isNestField, curString, child))
}
}
}

val nestFieldMap = scala.collection.mutable.Map.empty[String, Seq[String]]
projects.foreach(p => generateAttributeMap(nestFieldMap, isNestField = false, None, p))
val col_list = columns.toList.flatMap(col => {
if (nestFieldMap.contains(col)) {
nestFieldMap.get(col).get.toList
} else {
List(col)
}
})
col_list.toArray
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_NEST_COLUMN_PRUNING = SQLConfigBuilder("spark.sql.parquet.nestColumnPruning")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to PARQUET_NESTED_COLUMN_PRUNING and spark.sql.parquet.nestedColumnPruning respectively.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename done

.doc("When set this to true, we will tell parquet only read the nest column`s leaf fields ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reword this doc string to:

When true, Parquet column pruning also works for nested fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reword done

.booleanConf
.createWithDefault(false)

val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
.doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
.booleanConf
Expand Down Expand Up @@ -661,6 +666,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)

def isParquetNestColumnPruning: Boolean = getConf(PARQUET_NEST_COLUMN_PRUNING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquetNestedColumnPruningEnabled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename done


def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)

def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,44 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}

test("SPARK-4502 parquet nested fields pruning") {
// Schema of "test-data/nested-array-struct.parquet":
// root
// |-- primitive: integer (nullable = true)
// |-- myComplex: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- id: integer (nullable = true)
// | | |-- repeatedMessage: array (nullable = true)
// | | | |-- element: struct (containsNull = true)
// | | | | |-- someId: integer (nullable = true)
val df = readResourceParquetFile("test-data/nested-array-struct.parquet")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't have this file in this PR. So running tests will fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/spark/blob/master/sql/core/src/test/resources/test-data/nested-array-struct.parquet
I reuse this file to test nested struct in paruqet, this file in sql/core/src/test/resources/test-data/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed. Sorry.

df.createOrReplaceTempView("tmp_table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may use SQLTestUtils.withTempView to wrap this test so that you don't need to drop the temporary view manually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

// normal test
val query1 = "select primitive,myComplex[0].id from tmp_table"
val result1 = sql(query1)
withSQLConf(SQLConf.PARQUET_NEST_COLUMN_PRUNING.key -> "true") {
checkAnswer(sql(query1), result1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really test if the nested fields are pruned? I think this test will pass regardless of the newly added option.

}
// test for array in struct
val query2 = "select primitive,myComplex[0].repeatedMessage[0].someId from tmp_table"
val result2 = sql(query2)
withSQLConf(SQLConf.PARQUET_NEST_COLUMN_PRUNING.key -> "true") {
checkAnswer(sql(query2), result2)
}
// test for same struct meta merge
// myComplex.id and myComplex.repeatedMessage.someId should merge
// like myComplex.[id, repeatedMessage.someId] before pass to parquet
val query3 = "select myComplex[0].id, myComplex[0].repeatedMessage[0].someId" +
" from tmp_table"
val result3 = sql(query3)
withSQLConf(SQLConf.PARQUET_NEST_COLUMN_PRUNING.key -> "true") {
checkAnswer(sql(query3), result3)
}

spark.sessionState.catalog.dropTable(
TableIdentifier("tmp_table"), ignoreIfNotExists = true, purge = false)
}

test("expand UDT in StructType") {
val schema = new StructType().add("n", new NestedStructUDT, nullable = true)
val expected = new StructType().add("n", new NestedStructUDT().sqlType, nullable = true)
Expand Down