Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 15, 2016

What changes were proposed in this pull request?

KeyValueGroupedDataset.reduceGroups is currently implemented via flatMapGroups, which does not support partial aggregation and so is very inefficient.

KeyValueGroupedDataset.reduceGroups should support partial aggregation. This PR implements it with Aggregator.

How was this patch tested?

Existing tests.


implicit val resultEncoder = ExpressionEncoder.tuple(kExprEnc, vExprEnc)
flatMapGroups(func)
def zero: (Int, V) = (0, null.asInstanceOf[V])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One problem with Aggregator here is the zero value. This PR uses an Int (can be Boolean too) to indicate if the buffer is initialized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull this out to be a generic ReduceAggregator and add unit test for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I will update this.

@SparkQA
Copy link

SparkQA commented Jul 15, 2016

Test build #62375 has finished for PR 14222 at commit 1135773.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koertkuipers
Copy link
Contributor

there is a usefulness to this ReduceAggregator beyond .reduceGroups. basically you can take any Aggregator without a zero and turn it into a valid Aggregator, with the caveat being that the result is nullable and will be null if no inputs are provided to the Aggregator.

@viirya viirya force-pushed the improve-reducegroups branch from 4ba124c to 7e8d8c1 Compare July 18, 2016 08:55
@SparkQA
Copy link

SparkQA commented Jul 18, 2016

Test build #62454 has finished for PR 14222 at commit 4ba124c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 18, 2016

Test build #62456 has finished for PR 14222 at commit 7e8d8c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 19, 2016

ping @rxin The change is ok for you? Please review this. Thanks.

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.ReduceAggregator

class ReduceAggregatorSuite extends SparkFunSuite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just put this in DatasetAggregatorSuite

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to update this as you want to take over it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it.

@rxin
Copy link
Contributor

rxin commented Jul 19, 2016

@viirya I'm going to take over the PR and play with the API a little bit.

@viirya
Copy link
Member Author

viirya commented Jul 19, 2016

Ok.

@SparkQA
Copy link

SparkQA commented Jul 20, 2016

Test build #62594 has finished for PR 14222 at commit 6032325.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 21, 2016

@rxin Any thing I need to update for this? Thanks.

@viirya
Copy link
Member Author

viirya commented Jul 25, 2016

ping @rxin

@viirya
Copy link
Member Author

viirya commented Aug 9, 2016

ping @rxin any thoughts on this? It is waiting for a while. Thanks.

rxin added a commit to rxin/spark that referenced this pull request Aug 10, 2016
[SPARK-16391][SQL] KeyValueGroupedDataset.reduceGroups should support partial aggregation
@rxin
Copy link
Contributor

rxin commented Aug 10, 2016

I've created a pr here for discussion, based on my playing with the API: #14576

@viirya
Copy link
Member Author

viirya commented Aug 19, 2016

Close this now since the pr #14576 is merged.

@viirya viirya closed this Aug 19, 2016
@viirya viirya deleted the improve-reducegroups branch December 27, 2023 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants