Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.{StructField, StructType}

/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
Expand Down Expand Up @@ -97,7 +99,15 @@ object FileSourceStrategy extends Strategy with Logging {
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
val outputSchema = if (fsRelation.sqlContext.conf.isParquetNestColumnPruning
&& fsRelation.fileFormat.isInstanceOf[ParquetFileFormat]) {
val totalSchema = readDataColumns.toStructType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe fullSchema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

val prunedSchema = StructType(
generateStructFieldsContainsNesting(projects, totalSchema))
// Merge schema in same StructType and merge with filterAttributes
prunedSchema.fields.map(f => StructType(Array(f))).reduceLeft(_ merge _)
.merge(filterAttributes.toSeq.toStructType)
} else readDataColumns.toStructType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please re-format the above change to the following format:

if (
  ... &&
  ...
) {
  ...
} else {
  ...
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
Expand Down Expand Up @@ -126,4 +136,52 @@ object FileSourceStrategy extends Strategy with Logging {

case _ => Nil
}

private def generateStructFieldsContainsNesting(projects: Seq[Expression],
totalSchema: StructType) : Seq[StructField] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check Spark code style guide and re-format this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add comments and test cases for testing this method, which is basically the essential part of this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix code style done.
No problem, I'll add tests for the private func generateStructFieldsContainsNesting next patch, this patch fix all code style and naming problem.

def generateStructField(curField: List[String],
node: Expression) : Seq[StructField] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

node match {
case ai: GetArrayItem =>
// Here we drop the previous for simplify array and map support.
// Same strategy in GetArrayStructFields and GetMapValue
generateStructField(List.empty[String], ai.child)
case asf: GetArrayStructFields =>
generateStructField(List.empty[String], asf.child)
case mv: GetMapValue =>
generateStructField(List.empty[String], mv.child)
case attr: AttributeReference =>
Seq(getFieldRecursively(totalSchema, attr.name :: curField))
case sf: GetStructField =>
generateStructField(sf.name.get :: curField, sf.child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is optional and might not be set. We should retrieve the actual field name using the ordinal of sf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done.
a little question here, all projects parse from sql must have the name while projects from dataframe api may not , right?

case _ =>
if (node.children.nonEmpty) {
node.children.flatMap(child => generateStructField(curField, child))
} else {
Seq.empty[StructField]
}
}
}

def getFieldRecursively(totalSchema: StructType,
name: List[String]): StructField = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

if (name.length > 1) {
val curField = name.head
val curFieldType = totalSchema(curField)
curFieldType.dataType match {
case st: StructType =>
val newField = getFieldRecursively(StructType(st.fields), name.drop(1))
StructField(curFieldType.name, StructType(Seq(newField)),
curFieldType.nullable, curFieldType.metadata)
case _ =>
throw new IllegalArgumentException(s"""Field "$curField" is not struct field.""")
}
} else {
totalSchema(name.head)
}
}
Copy link
Contributor

@liancheng liancheng Oct 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this function can be simplified to:

def getNestedField(schema: StructType, path: Seq[String]): StructField = {
  require(path.nonEmpty, "<error message>")

  path.tail.foldLeft(schema(path.head)) { (field, name) =>
    field.dataType match {
      case t: StructType => t(name)
      case _ => ??? // Throw exception here
    }
  }
}

Copy link
Member Author

@xuanyuanking xuanyuanking Oct 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The func getFieldRecursively here need the return value which is a StructField contains all nested relation in path. For example:
The fullSchema is:

root
 |-- col: struct (nullable = true)
 |    |-- s1: struct (nullable = true)
 |    |    |-- s1_1: long (nullable = true)
 |    |    |-- s1_2: long (nullable = true)
 |    |-- str: string (nullable = true)
 |-- num: long (nullable = true)
 |-- str: string (nullable = true)

and when we want to get col.s1.s1_1, the func should return:

StructField(col,StructType(StructField(s1,StructType(StructField(s1_1,LongType,true)),true)),true)

So maybe I can't use the simplified func getNestedField because it returns only the last StructField:

StructField(s1_1,LongType,true)


projects.flatMap(p => generateStructField(List.empty[String], p))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_NEST_COLUMN_PRUNING = SQLConfigBuilder("spark.sql.parquet.nestColumnPruning")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to PARQUET_NESTED_COLUMN_PRUNING and spark.sql.parquet.nestedColumnPruning respectively.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename done

.doc("When set this to true, we will tell parquet only read the nest column`s leaf fields ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reword this doc string to:

When true, Parquet column pruning also works for nested fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reword done

.booleanConf
.createWithDefault(false)

val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
.doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
.booleanConf
Expand Down Expand Up @@ -661,6 +666,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)

def isParquetNestColumnPruning: Boolean = getConf(PARQUET_NEST_COLUMN_PRUNING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquetNestedColumnPruningEnabled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename done


def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)

def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,37 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}

test("SPARK-4502 parquet nested fields pruning") {
// Schema of "test-data/nested-array-struct.parquet":
// root
// |-- col: struct (nullable = true)
// | |-- s1: struct (nullable = true)
// | | |-- s1_1: long (nullable = true)
// | | |-- s1_2: long (nullable = true)
// | |-- str: string (nullable = true)
// |-- num: long (nullable = true)
// |-- str: string (nullable = true)
val df = readResourceParquetFile("test-data/nested-struct.snappy.parquet")
df.createOrReplaceTempView("tmp_table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may use SQLTestUtils.withTempView to wrap this test so that you don't need to drop the temporary view manually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

// normal test
val query1 = "select num,col.s1.s1_1 from tmp_table"
val result1 = sql(query1)
withSQLConf(SQLConf.PARQUET_NEST_COLUMN_PRUNING.key -> "true") {
checkAnswer(sql(query1), result1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really test if the nested fields are pruned? I think this test will pass regardless of the newly added option.

}
// test for same struct meta merge
// col.s1.s1_1 and col.str should merge
// like col.[s1.s1_1, str] before pass to parquet
val query2 = "select col.s1.s1_1,col.str from tmp_table"
val result2 = sql(query2)
withSQLConf(SQLConf.PARQUET_NEST_COLUMN_PRUNING.key -> "true") {
checkAnswer(sql(query2), result2)
}

spark.sessionState.catalog.dropTable(
TableIdentifier("tmp_table"), ignoreIfNotExists = true, purge = false)
}

test("expand UDT in StructType") {
val schema = new StructType().add("n", new NestedStructUDT, nullable = true)
val expected = new StructType().add("n", new NestedStructUDT().sqlType, nullable = true)
Expand Down