Skip to content

Conversation

@CodingCat
Copy link
Contributor

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12991/

@rxin
Copy link
Contributor

rxin commented Mar 4, 2014

Thanks. Merged.

@asfgit asfgit closed this in 1865dd6 Mar 4, 2014
@CodingCat CodingCat deleted the SPARK-1178 branch March 17, 2014 17:22
jhartlaub referenced this pull request in jhartlaub/spark May 27, 2014
Job cancellation via job group id.

This PR adds a simple API to group together a set of jobs belonging to a thread and threads spawned from it. It also allows the cancellation of all jobs in this group.

An example:

    sc.setJobDescription("this_is_the_group_id", "some job description")
    sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()

In a separate thread:

    sc.cancelJobGroup("this_is_the_group_id")

(cherry picked from commit 599dcb0)
Signed-off-by: Reynold Xin <[email protected]>
clockfly added a commit to clockfly/spark that referenced this pull request Sep 22, 2016
…for implementing percentile_approx

This is cherry-pick of open source master branch (hash: cc33460)

## What changes were proposed in this pull request?

This is a sub-task of [SPARK-16283](https://issues.apache.org/jira/browse/SPARK-16283) (Implement percentile_approx SQL function), which moves class QuantileSummaries to project catalyst so that it can be reused when implementing aggregation function `percentile_approx`.

This PR only does class relocation, class implementation is not changed.

Author: Sean Zhong <seanzhongdatabricks.com>

Author: Sean Zhong <[email protected]>

Closes apache#74 from clockfly/move_quantile_summaries.
robert3005 pushed a commit to robert3005/spark that referenced this pull request Jan 12, 2017
cenyuhai added a commit to cenyuhai/spark that referenced this pull request Oct 8, 2017
[SPARK-20865] Structured streaming dataframe cache、unpersist报错

structured streaming dataset cache 会报错,应当给个log告警忽略cache,unpersist等操作。
resolve apache#74 

See merge request !65
jlopezmalla pushed a commit to jlopezmalla/spark that referenced this pull request Nov 3, 2017
ashangit added a commit to ashangit/spark that referenced this pull request Jul 18, 2018
Bump spark criteo-2.2 to last branch-2.2 commits
cloud-fan pushed a commit that referenced this pull request Jan 14, 2021
…join can be planned as broadcast join

### What changes were proposed in this pull request?

Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases.

```scala
spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1")
spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2")
spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain
```

Before this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
   +- HashAggregate(keys=[a#16L, b#17L], functions=[])
      +- HashAggregate(keys=[a#16L, b#17L], functions=[])
         +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#72]
            +- HashAggregate(keys=[a#16L, b#17L], functions=[])
               +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
                  :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#65]
                  :     +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
                  +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#66]
                        +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                           +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#61]
                              +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                                 +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
```

After this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
   +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74]
      +- HashAggregate(keys=[a#16L, b#17L], functions=[])
         +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
            :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67]
            :     +- HashAggregate(keys=[a#16L, b#17L], functions=[])
            :        +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#61]
            :           +- HashAggregate(keys=[a#16L, b#17L], functions=[])
            :              +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
            +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68]
                  +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                     +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#63]
                        +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                           +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
```

### Why are the changes needed?

1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance.
2. It will remove user added DISTINCT operator, e.g.: [q38](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q38.sql), [q87](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q87.sql).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test and benchmark test.

SQL | Before this PR(Seconds) | After this PR(Seconds)
-- | -- | --
q14a | 660 | 594
q14b | 660 | 600
q38 | 55 | 29
q87 | 66 | 35

Before this pr:
![image](https://user-images.githubusercontent.com/5399861/104452849-8789fc80-55de-11eb-88da-44059899f9a9.png)

After this pr:
![image](https://user-images.githubusercontent.com/5399861/104452899-9a043600-55de-11eb-9286-d8f3a23ca3b8.png)

Closes #31145 from wangyum/SPARK-34081.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…or on driver (apache#227)

* [HADP-43018] Disable rack resolve when registering executor on driver (apache#388) (apache#74)

Make `YarnClusterScheduler` to extend `TaskSchedulerImpl` rather than `YarnScheduler` such that rack resolve is disabled.

We've seen driver stuck in following thread with larger number of executors registering. Since we don't need rack info for locality, add a config to disable rack resolve by default, which could possibly eliminate the bottleneck in driver.

```
"dispatcher-event-loop-15" apache#50 daemon prio=5 os_prio=0 tid=0x00007f751a394000 nid=0x11953 runnable [0x00007f74c6290000]
   java.lang.Thread.State: RUNNABLE
	at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
	at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
	at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
	at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
	at java.net.InetAddress.getAllByName(InetAddress.java:1193)
	at java.net.InetAddress.getAllByName(InetAddress.java:1127)
	at java.net.InetAddress.getByName(InetAddress.java:1077)
	at org.apache.hadoop.net.NetUtils.normalizeHostName(NetUtils.java:563)
	at org.apache.hadoop.net.NetUtils.normalizeHostNames(NetUtils.java:580)
	at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:109)
	at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
	at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
	at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:37)
	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:329)
	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:318)
```

No

Add UT.

I've run a test https://bdp.vip.ebay.com/job/detail/?cluster=apollorno&jobType=SPARK&jobId=application_1635906065713_321559&tab=0 on apollorno. The test succeeded with 16612 executors and many executor failed to register. This patch could improve driver performance but it will still run into bottleneck when there are too many executors registering at the same time.

```
21/11/08 07:40:19 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com:30201
21/11/08 07:42:19 ERROR TransportChannelHandler: Connection to hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com/10.78.173.174:30201 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
21/11/08 07:42:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com/10.78.173.174:30201 is closed
21/11/08 07:42:19 WARN NettyRpcEnv: Ignored failure: java.io.IOException: Connection from hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com/10.78.173.174:30201 closed
21/11/08 07:42:19 ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Driver hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com:30201 disassociated! Shutting down.
21/11/08 07:42:19 ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Cannot register with driver: spark://CoarseGrainedScheduler@hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com:30201
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com:30201 in 120 seconds. This timeout is controlled by spark.network.timeout
```

Co-authored-by: tianlzhang <[email protected]>

Co-authored-by: yujli <[email protected]>
Co-authored-by: tianlzhang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants