-
Notifications
You must be signed in to change notification settings - Fork 17
[SPARK-18471][CORE] New treeAggregate overload for big large aggregators #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18471][CORE] New treeAggregate overload for big large aggregators #10
Conversation
The zero for the aggregation used to be shipped into a closure which is higly problematic when this zero is big (100s of MB is typical for ML). This change introduces a new overload of treeAggregate which only ships a function able to generate this zero.
tibidoh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change is very application specific to be present on core API level.
Though, as we verbally agreed that this change is rolled back if it is not submitted upstream, I'm OK with it
| * | ||
| * @param depth suggested depth of the tree (default: 2) | ||
| * @see [[org.apache.spark.rdd.RDD#aggregate]] | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OPT: Please keep BFS order of method declarations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BFS ? What do you mean ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Breadth first (search) order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| * Aggregates the elements of this RDD in a multi-level tree pattern. | ||
| * | ||
| * This variant with a function generating the zero, provide for efficiently | ||
| * running on big aggregation structure like large dense vectors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think where should be no mentions of vector manipulations in core APIs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| * @see [[org.apache.spark.rdd.RDD#aggregate]] | ||
| */ | ||
| def treeAggregate[U: ClassTag](zeroValue: U)( | ||
| def treeAggregateWithZeroGenerator[U: ClassTag](zeroValueGenerator: () => U)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use a lazy parameter here instead of an explicit closure, see e.g. this blog post.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestion ! I'm not sure however how lazy val A a/o => A behave when serialized to an executor... Any idea ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the blog post, it should be exactly what we want. But yeah, we need to test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss/test this live because I reckon that the value will be instantiated when thee serializer is called, which is NOT what we want.
|
This PR is now to be discussed directly here: apache#16038 |
…mand ### What changes were proposed in this pull request? This PR proposes to sort table properties in DESCRIBE TABLE command. This is consistent with DSv2 command as well: https://github.com/apache/spark/blob/e3058ba17cb4512537953eb4ded884e24ee93ba2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala#L63 This PR fixes the test case in Scala 2.13 build as well where the table properties have different order in the map. ### Why are the changes needed? To keep the deterministic and pretty output, and fix the tests in Scala 2.13 build. See https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-3.2-scala-2.13/49/testReport/junit/org.apache.spark.sql/SQLQueryTestSuite/describe_sql/ ``` describe.sql Expected "...spark_catalog, view.[query.out.col.2=c, view.referredTempFunctionsNames=[], view.catalogAndNamespace.part.1=default]]", but got "...spark_catalog, view.[catalogAndNamespace.part.1=default, view.query.out.col.2=c, view.referredTempFunctionsNames=[]]]" Result did not match for query #29 DESC FORMATTED v ``` ### Does this PR introduce _any_ user-facing change? Yes, it will change the text output from `DESCRIBE [EXTENDED|FORMATTED] table_name`. Now the table properties are sorted by its key. ### How was this patch tested? Related unittests were fixed accordingly. Closes apache#30799 from HyukjinKwon/SPARK-33803. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 7845865) Signed-off-by: Wenchen Fan <[email protected]>
…plan properly ### What changes were proposed in this pull request? Make `ResolveRelations` handle plan id properly cherry-pick bugfix apache#45214 to 3.5 ### Why are the changes needed? bug fix for Spark Connect, it won't affect classic Spark SQL before this PR: ``` from pyspark.sql import functions as sf spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1") spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2") df1 = spark.read.table("test_table_1") df2 = spark.read.table("test_table_2") df3 = spark.read.table("test_table_1") join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2) join2 = df3.join(join1, how="left", on=join1.index==df3.id) join2.schema ``` fails with ``` AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704 ``` That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations === '[#12]Join LeftOuter, '`==`('index, 'id) '[#12]Join LeftOuter, '`==`('index, 'id) !:- '[#9]UnresolvedRelation [test_table_1], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 !+- '[#11]Project ['index, 'value_2] : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#10]Join Inner, '`==`('id, 'index) +- '[#11]Project ['index, 'value_2] ! :- '[#7]UnresolvedRelation [test_table_1], [], false +- '[#10]Join Inner, '`==`('id, 'index) ! +- '[#8]UnresolvedRelation [test_table_2], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 ! : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#8]SubqueryAlias spark_catalog.default.test_table_2 ! +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false Can not resolve 'id with plan 7 ``` `[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one ``` :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ``` ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? ci Closes apache#46291 from zhengruifeng/connect_fix_read_join_35. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
What changes were proposed in this pull request?
The zero for the aggregation used to be shipped into a closure which is
higly problematic when this zero is big (100s of MB is typical for ML).
This change introduces a new overload of treeAggregate which only ships a
function able to generate this zero.
How was this patch tested?
Unit tests fore core module launched locally