Skip to content

Conversation

@dbtsai
Copy link
Member

@dbtsai dbtsai commented Sep 27, 2018

What changes were proposed in this pull request?

Allows Spark to translate a Catalyst Expression on a nested field into a data source Filter, and it's a building block to have Parquet, ORC, and other data sources to support the nested predicate pushdown.

How was this patch tested?

Tests added

@dbtsai
Copy link
Member Author

dbtsai commented Sep 27, 2018

@dbtsai dbtsai force-pushed the dataSourcePredicate branch 2 times, most recently from 53165b8 to d59cb55 Compare September 27, 2018 20:17
@SparkQA
Copy link

SparkQA commented Sep 27, 2018

Test build #96709 has finished for PR 22573 at commit 2f21842.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 28, 2018

Test build #96710 has finished for PR 22573 at commit 53165b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 28, 2018

Test build #96711 has finished for PR 22573 at commit d59cb55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause regression for data source supporting dot in column name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any data source currently supporting dot in the column name with pushdown? The worst case will be no pushdown for those data sources.

I know ORC doesn't work for now. We can have another followup PR to address this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDBC data source seems having no such restrict. So I worry that this change can cause some regressions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @dbtsai . This PR has a regression on ORC at least. The following is ORC result in Spark 2.3.2 and it will slowdown at least 5 times like Parquet.

I know ORC doesn't work for now. We can have another followup PR to address this.

scala> val df = spark.range(Int.MaxValue).sample(0.2).toDF("col.with.dot")
scala> df.write.mode("overwrite").orc("/tmp/orc")
scala> df.write.mode("overwrite").parquet("/tmp/parquet")
scala> spark.sql("set spark.sql.orc.impl=native")
scala> spark.sql("set spark.sql.orc.filterPushdown=true")
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` = 50000").count)
Time taken: 803 ms

scala> spark.time(spark.read.parquet("/tmp/parquet").where("`col.with.dot` = 50000").count)
Time taken: 5573 ms

scala> spark.version
res6: String = 2.3.2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apache Spark 2.4.0 RC2 has a regression on this case. So, for now, this PR doesn't have regssion on master branch.

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` = 50000").count)
Time taken: 2405 ms

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a dumb question. Is it possible to store a column with . in its name in parquet?

More holistically, I think it would be better to create an abstraction for a multipart identifier in a filter as opposed to encoding it using a '.'.

@cloud-fan
Copy link
Contributor

I think the problem is, the current public Filter API uses string as the attribute type, which is hard to represent nested fields.

Ideally we should extend the API, create a new interface for column and nested column, instead of string. But Filter is a public API so this is hard to do.

This PR proposes to encode nested columns as string. This works, but we should think carefully about how to encode, so that column name with dot is still supported.

@dbtsai
Copy link
Member Author

dbtsai commented Sep 28, 2018

I was thinking to change the APIs in Filter so we can represent nested fields easier, but also realized that it's a stable public interface.

Without changing the interface of Filter, we can have the following options,

  1. Use backtick to wrap around the column name and structure name containing dots. For example,
`column.1`.`attribute.b`

It's also easier for people to understand when they are reading the pushdown plans in text format.

  1. We can use ASCII delimited text to avoid delimiter collision, for example \31 is commonly used between fields of a record, or members of a row. This simplifies parsing significantly, but the downside is that it's not readable, so when we print the plan, we need to add the backtick for visualization.

What do you think?

@dongjoon-hyun
Copy link
Member

Can we update public Filter API in Spark 3.0.0? @cloud-fan and @gatorsmile .

@gatorsmile
Copy link
Member

Updating Filter APIs sounds reasonable to me. This should be part of our data source API v2. cc @cloud-fan @rxin @rdblue

@dongjoon-hyun
Copy link
Member

That's great!

@rdblue
Copy link
Contributor

rdblue commented Oct 1, 2018

The approach we've taken in Iceberg is to allow . in names by using an index in the top-level schema. The full path of every leaf in the schema is produced and added to a map from the full field name to the field's ID.

The reason why we do this is to avoid problem areas:

  • Parsing the name using . as a delimiter
  • Traversing the schema structure

For example, the schema 0: a struct<2: x int, 3: y int>, 1: a.z int produces this index: Map("a" -> 0, "a.x" -> 2, "a.y" -> 3, "a.z" -> 1).

Binding filters like a.x > 3 or a.z < 5 is done using the index instead of parsing the field name and traversing, so you get the right result without needing to decide whether "a.x" is nested or if it is the actual name. So the lookup is quick and correctly produces id(2) > 3 and id(1) < 5. This is also used for projection because users want to be able to select nested columns by name using dotted field names.

The only drawback to this approach is that you can't have duplicates in the index: each full field name must be unique. In the example above, the top-level a.z field could not be named a.x or else it would collide with x nested in a.

@dongjoon-hyun
Copy link
Member

Thank you, @rdblue . BTW, in general, indexing might be unsafe in Apache Spark when Metastore Schema is different from File Schema. Does it assume schema evolution feature in IceBerg?

@rdblue
Copy link
Contributor

rdblue commented Oct 1, 2018

@dongjoon-hyun, Iceberg schema evolution is based on the field IDs, not on names. The current table schema's names are the runtime names for columns in that table, and all reads happen by first translating those names to IDs and projecting the IDs from the data files. That way, renames can never cause you to get incorrect data.

You're mostly right that Spark has a problem with schema evolution for HadoopFS tables. That wouldn't affect my suggestion here, though. If you're filtering or projecting field m.n, then Spark currently handles that by matching columns by name. If you're matching by name, then m.n can't change across versions, or at least you can always project m.n from the data (in the case of Avro).

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97748 has finished for PR 22573 at commit d59cb55.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97765 has finished for PR 22573 at commit d59cb55.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97805 has finished for PR 22573 at commit d59cb55.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2019

Test build #101202 has finished for PR 22573 at commit a996547.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@prodeezy
Copy link

prodeezy commented Feb 28, 2019

@dbtsai Thanks for this PR! As I understand, this effort is to add support for struct type columns. Wondering if there's another effort to support Maps and Arrays.

@dbtsai
Copy link
Member Author

dbtsai commented Mar 1, 2019

@prodeezy This is for struct type, and @dongjoon-hyun and I are working on to extend to Maps and Arrays.

@prodeezy
Copy link

prodeezy commented Mar 7, 2019

@dbtsai can you point me to the jira that tracks the maps/arrays support? thanks!

@dbtsai dbtsai force-pushed the dataSourcePredicate branch from a996547 to ae88eeb Compare April 5, 2019 05:36
@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #104311 has finished for PR 22573 at commit ae88eeb.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Apr 5, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #104315 has finished for PR 22573 at commit ae88eeb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

@dbtsai what is the status of this PR?

case a: Attribute if !a.name.contains(".") =>
Some(a.name)
case s: GetStructField if !s.childSchema(s.ordinal).name.contains(".") =>
attrName(s.child).map(_ + s".${s.childSchema(s.ordinal).name}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ + "." + s.childSchema(s.ordinal).name?

@prodeezy
Copy link

@dbtsai Thanks again for your work on this feature. We'v recently merged a PR to support struct filtering in Iceberg [1]. This still requires Spark to pushdown the filters to the datasource. Would be great to have this work merged as well so that we can leverage it downstream. Can you tell us what's currently blocking this PR (if any)?

[1] - apache/iceberg#123

@dbtsai
Copy link
Member Author

dbtsai commented Jun 12, 2019

@prodeezy I am working on it to make it happen in Spark 3.0. The challenging is in DSv1 filter API, there is no easy way to express the nested column, so we just put dot in the string. In DSv2 API, we have a better design of handling nested column, but unfortunately, it still uses DSv1 filter module. We would like to purpose a new set of filter APIs in v2 to handle this situation.

@rdblue
Copy link
Contributor

rdblue commented Jun 12, 2019

@dbtsai, +1 for a better public filter API! Let me know what you need and we can work toward getting it in.

@github-actions
Copy link

github-actions bot commented Jan 6, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 6, 2020
@prodeezy
Copy link

prodeezy commented Jan 6, 2020

@dbtsai are we targeting a new api to handle nested filtering as part of a different PR or would that be done here? If so can you point me to it?

@github-actions github-actions bot closed this Jan 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants