Skip to content

Conversation

@hvanhovell
Copy link
Contributor

This PR adds support for multiple distinct columns to the new aggregation code path.

The implementation uses the OpenHashSet class and set expressions. As a result we can only use the slower sort based aggregation code path. This also means the code will be probably slower than the old hash aggregation.

The PR is currently in the proof of concept phase, and I have submitted it to get some feedback to see if I am headed in the right direction. I'll add more tests if this considered to be the way to go.

An example using the new code path:

val df = sqlContext
  .range(1 << 25)
  .select(
    $"id".as("employee_id"),
    (rand(6321782L) * 4 + 1).cast("int").as("department_id"),
    when(rand(981293L) >= 0.5, "M").otherwise("F").as("gender"),
    (rand(7123L) * 3 + 1).cast("int").as("education_level")
  )

df.registerTempTable("employee")

// Regular query.
sql("""
select   department_id as d,
         count(distinct gender, education_level) as c0,
         count(distinct gender) as c1,
         count(distinct education_level) as c2
from     employee
group by department_id
""").show()

cc @yhuai

@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test.

@SparkQA
Copy link

SparkQA commented Oct 26, 2015

Test build #44363 has finished for PR 9280 at commit 3bd6db5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class DistinctAggregateFallback(function: AggregateFunction2) extends DeclarativeAggregate\n * case class ReduceSetUsingImperativeAggregate(left: Expression, right: ImperativeAggregate)\n * case class ReduceSetUsingDeclarativeAggregate(left: Expression, right: DeclarativeAggregate)\n

@SparkQA
Copy link

SparkQA commented Oct 26, 2015

Test build #44376 has finished for PR 9280 at commit b76b83b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class DistinctAggregateFallback(function: AggregateFunction2) extends DeclarativeAggregate\n * case class ReduceSetUsingImperativeAggregate(left: Expression, right: ImperativeAggregate)\n * case class ReduceSetUsingDeclarativeAggregate(left: Expression, right: DeclarativeAggregate)\n * case class DropAnyNull(child: Expression) extends UnaryExpression\n

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove this in the next iteration...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed....

@SparkQA
Copy link

SparkQA commented Oct 27, 2015

Test build #44430 has finished for PR 9280 at commit bfbf829.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class DistinctAggregateFallback(function: AggregateFunction2) extends DeclarativeAggregate\n * case class ReduceSetUsingImperativeAggregate(left: Expression, right: ImperativeAggregate)\n * case class ReduceSetUsingDeclarativeAggregate(left: Expression, right: DeclarativeAggregate)\n * case class DropAnyNull(child: Expression) extends UnaryExpression\n

@rxin
Copy link
Contributor

rxin commented Nov 5, 2015

@hvanhovell I think the approach we want to take is to either use the aggregate expansion (#9406), or use joins, and not this one. Do you mind closing this one? (We already have a record of it from JIRA in case we need to reference it).

@hvanhovell
Copy link
Contributor Author

Closing PR. Some of the code in here will probably re-emerge if we ever want distincts in window functions.

@hvanhovell hvanhovell closed this Nov 6, 2015
asfgit pushed a commit that referenced this pull request Nov 7, 2015
…g Rule

The second PR for SPARK-9241, this adds support for multiple distinct columns to the new aggregation code path.

This PR solves the multiple DISTINCT column problem by rewriting these Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information on this. The advantages over the - competing - [first PR](#9280) are:
- This can use the faster TungstenAggregate code path.
- It is impossible to OOM due to an ```OpenHashSet``` allocating to much memory. However, this will multiply the number of input rows by the number of distinct clauses (plus one), and puts a lot more memory pressure on the aggregation code path itself.

The location of this Rule is a bit funny, and should probably change when the old aggregation path is changed.

cc yhuai - Could you also tell me where to add tests for this?

Author: Herman van Hovell <[email protected]>

Closes #9406 from hvanhovell/SPARK-9241-rewriter.

(cherry picked from commit 6d0ead3)
Signed-off-by: Michael Armbrust <[email protected]>
asfgit pushed a commit that referenced this pull request Nov 7, 2015
…g Rule

The second PR for SPARK-9241, this adds support for multiple distinct columns to the new aggregation code path.

This PR solves the multiple DISTINCT column problem by rewriting these Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information on this. The advantages over the - competing - [first PR](#9280) are:
- This can use the faster TungstenAggregate code path.
- It is impossible to OOM due to an ```OpenHashSet``` allocating to much memory. However, this will multiply the number of input rows by the number of distinct clauses (plus one), and puts a lot more memory pressure on the aggregation code path itself.

The location of this Rule is a bit funny, and should probably change when the old aggregation path is changed.

cc yhuai - Could you also tell me where to add tests for this?

Author: Herman van Hovell <[email protected]>

Closes #9406 from hvanhovell/SPARK-9241-rewriter.
kiszk pushed a commit to kiszk/spark-gpu that referenced this pull request Dec 26, 2015
…g Rule

The second PR for SPARK-9241, this adds support for multiple distinct columns to the new aggregation code path.

This PR solves the multiple DISTINCT column problem by rewriting these Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information on this. The advantages over the - competing - [first PR](apache/spark#9280) are:
- This can use the faster TungstenAggregate code path.
- It is impossible to OOM due to an ```OpenHashSet``` allocating to much memory. However, this will multiply the number of input rows by the number of distinct clauses (plus one), and puts a lot more memory pressure on the aggregation code path itself.

The location of this Rule is a bit funny, and should probably change when the old aggregation path is changed.

cc yhuai - Could you also tell me where to add tests for this?

Author: Herman van Hovell <[email protected]>

Closes #9406 from hvanhovell/SPARK-9241-rewriter.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants