Skip to content

Conversation

@squito
Copy link
Owner

@squito squito commented May 20, 2016

This code probably doesn't belong in Spark (at least, it doesn't have a nice home there presently), so for now just opening this as a central place to discuss.

Some crude performance tests for the scheduler using apache#8559

In these tests, you setup a mock backend, and run as many simulated jobs as possible in 10 seconds. This lets you compare the performance of the scheduler with different simulated backends. Eg., what happens when there are 1000 nodes? What about when one executor is bad?

I've tried to design things so that the mocking doesn't get in the way of measuring profile, while still allowing enough flexibility. Would definitely appreciate a second pair of eyes, in any case.

So far I've just been running this manually in sbt, using the "-z" option to limit to some comparisons, eg.

build/sbt -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0
> project core
> test-only org.apache.spark.scheduler.SchedulerPerformanceSuite -- -z "COMPARE D bad execs"
...
[info] SchedulerPerformanceSuite:
ran 57 iterations in 10.3 s (179 ms per itr)
[info] - COMPARE D bad execs with advanced blacklist (11 seconds, 201 milliseconds)
ran 102 iterations in 10.1 s (98 ms per itr)
[info] - COMPARE D bad execs with simple blacklist (10 seconds, 126 milliseconds)

Some observations so far:

  • scheduling is sloooooooooow when there are lots of executors. EDIT: see note below, this is now explained, but leaving discussion here. (this is true even before any of the blacklist changes are in place). on my laptop anyway, it starts to drop off dramatically when there are around 200 nodes (800 executors in my setup). Almost all the time is spent in taskScheduler.resourceOffers. I am pretty certain the slowdown is not coming from the mock setup -- it doesn't care how many nodes there are. The only part of it which does scale is that tasks will get queued up for how many slots there are -- but scheduling is fast even if I have one "super executor" with 20K cores. The code shown here includes the new blacklist mechanism, but the numbers are basically the same either way.
    To be clear -- the super node has about 5x as many cores as the 550 node cluster, yet the scheduling is > 30x slower on the 550 node cluster.
> test-only org.apache.spark.scheduler.SchedulerPerformanceSuite -- -z "COMPARE A"
...
ran 3 iterations in 12.7 s (4.2 s per itr)
[info] - COMPARE A Scheduling speed -- large job on a super node (13 seconds, 707 milliseconds)
ran 3 iterations in 12.5 s (4.2 s per itr)
[info] - COMPARE A Scheduling speed -- large job on 50 node cluster (12 seconds, 550 milliseconds)
ran 2 iterations in 10.1 s (5.0 s per itr)
[info] - COMPARE A Scheduling speed -- large job on 100 node cluster (10 seconds, 96 milliseconds)
ran 1 iterations in 10.5 s (10.5 s per itr)
[info] - COMPARE A: Scheduling speed -- large job on 200 node cluster (10 seconds, 531 milliseconds)
ran 1 iterations in 22.4 s (22.4 s per itr)
[info] - COMPARE A: Scheduling speed -- large job on 300 node cluster (22 seconds, 460 milliseconds)
ran 1 iterations in 46.9 s (46.9 s per itr)
[info] - COMPARE A: Scheduling speed -- large job on 400 node cluster (46 seconds, 954 milliseconds)
ran 1 iterations in 1.1 m (1.1 m per itr)
[info] - COMPARE A: Scheduling speed -- large job on 450 node cluster (1 minute, 4 seconds)
ran 1 iterations in 1.6 m (1.6 m per itr)
[info] - COMPARE A: Scheduling speed -- large job on 500 node cluster (1 minute, 33 seconds)
ran 1 iterations in 2.3 m (2.3 m per itr)
[info] - COMPARE A: Scheduling speed -- large job on 550 node cluster (2 minutes, 20 seconds)
  • EDIT: The performance issue were mostly just a wrinkle of how I was doing the tests, and that I was stressing the new version more. After making the tests consistent, its no slower than before. In fact, after discovering one bottleneck even in the old blacklist code, and fixing it, the new version is 20x faster when there is a bad node and you use the node-blacklist strategy. I don't fully understand why this is yet, but when there are failures, the advanced blacklist strategy is significantly slower than the "single task" strategy. (Note that before there are any failures, it is just as fast). Here its 2x slower on a small example, but with more tasks it can easily be > 5x slower

results running on my laptop using 456f578
In particular, note how much faster the runs are when (1) we use the advanced blacklist (2) all the bad executors are on one node and (3) there are no nodes with just one bad executor (the run named "COMPARE D bad host with advanced blacklist (12 seconds, 608 milliseconds)")

Iteration 0 finished in 1.1 m
ran 1 iterations in 1.1 m (1.1 m per itr)
[info] - COMPARE D bad exec with simple blacklist (1 minute, 4 seconds)
Iteration 0 finished in 2.4 m
ran 1 iterations in 2.4 m (2.4 m per itr)
[info] - COMPARE D two bad execs with simple blacklist (2 minutes, 21 seconds)
Iteration 0 finished in 1.3 m
ran 1 iterations in 1.3 m (1.3 m per itr)
[info] - COMPARE D bad exec with advanced blacklist (1 minute, 17 seconds)
Iteration 0 finished in 4.2 s
Iteration 1 finished in 4.2 s
Iteration 2 finished in 4.2 s
ran 3 iterations in 12.6 s (4.2 s per itr)
[info] - COMPARE D bad host with advanced blacklist (12 seconds, 608 milliseconds)
Iteration 0 finished in 2.2 m
ran 1 iterations in 2.2 m (2.2 m per itr)
[info] - COMPARE D bad exec and host with advanced blacklist (2 minutes, 10 seconds)
  • When there are no failures, the blacklist doesn't seem to significantly slow things down. And on a large cluster, scheduling is already really slow in taskScheduler.resourceOffers, so it doesn't make any real difference.

squito added 29 commits May 17, 2016 10:56
…n in SingleCoreMockBackend when killTask is unsupported
…n in SingleCoreMockBackend when killTask is unsupported
@squito squito merged commit 456f578 into blacklist-SPARK-8426 May 26, 2016
@squito
Copy link
Owner Author

squito commented May 27, 2016

as of 10a83cc, on my laptop:

ran 111 iterations in 10.0 s (90 ms per itr)
[info] - Scheduling speed -- small job on a small cluster (11 seconds, 49 milliseconds)
ran 3 iterations in 12.7 s (4.2 s per itr)
[info] - COMPARE C Scheduling speed -- large job on a small cluster (12 seconds, 760 milliseconds)
ran 3 iterations in 13.0 s (4.3 s per itr)
[info] - COMPARE C Scheduling speed -- large job on a small cluster with advanced blacklist (13 seconds, 71 milliseconds)
ran 2 iterations in 10.6 s (5.3 s per itr)
[info] - COMPARE A Scheduling speed -- large job on a super node (10 seconds, 617 milliseconds)
ran 3 iterations in 13.2 s (4.4 s per itr)
[info] - COMPARE A Scheduling speed -- large job on 50 node cluster (13 seconds, 262 milliseconds)
ran 2 iterations in 11.1 s (5.6 s per itr)
[info] - COMPARE A Scheduling speed -- large job on 100 node cluster (11 seconds, 142 milliseconds)
ran 1 iterations in 10.7 s (10.7 s per itr)
[info] - COMPARE A: Scheduling speed -- large job on 200 node cluster (10 seconds, 676 milliseconds)
ran 1 iterations in 25.8 s (25.8 s per itr)
[info] - COMPARE A: Scheduling speed -- large job on 300 node cluster (25 seconds, 787 milliseconds)
ran 1 iterations in 34.2 s (34.2 s per itr)
[info] - COMPARE A: Scheduling speed -- large job on 400 node cluster (34 seconds, 218 milliseconds)
ran 1 iterations in 1.1 m (1.1 m per itr)
[info] - COMPARE A: Scheduling speed -- large job on 450 node cluster (1 minute, 3 seconds)
ran 1 iterations in 1.5 m (1.5 m per itr)
[info] - COMPARE A: Scheduling speed -- large job on 500 node cluster (1 minute, 28 seconds)
ran 1 iterations in 2.4 m (2.4 m per itr)
[info] - COMPARE A: Scheduling speed -- large job on 550 node cluster (2 minutes, 26 seconds)
ran 2 iterations in 10.6 s (5.3 s per itr)
[info] - COMPARE B: Lots of nodes (10 seconds, 637 milliseconds)
ran 2 iterations in 11.0 s (5.5 s per itr)
[info] - COMPARE B: Lots of executors, one node (11 seconds, 81 milliseconds)
ran 3 iterations in 13.0 s (4.3 s per itr)
[info] - COMPARE B: Super executor (13 seconds, 7 milliseconds)
ran 1 iterations in 58.4 s (58.4 s per itr)
[info] - COMPARE D bad exec with simple blacklist (58 seconds, 433 milliseconds)
ran 1 iterations in 2.2 m (2.2 m per itr)
[info] - COMPARE D two bad execs with simple blacklist (2 minutes, 14 seconds)
ran 1 iterations in 1.2 m (1.2 m per itr)
[info] - COMPARE D bad exec with advanced blacklist (1 minute, 11 seconds)
ran 3 iterations in 12.7 s (4.2 s per itr)
[info] - COMPARE D bad host with advanced blacklist (12 seconds, 756 milliseconds)
ran 1 iterations in 2.1 m (2.1 m per itr)
[info] - COMPARE D bad exec and host with advanced blacklist (2 minutes, 3 seconds)

@squito
Copy link
Owner Author

squito commented Jun 6, 2016

Some updates on the slow scheduling with lots of nodes:

  1. this was largely an artifact of the testing framework -- when a task completed, it revived offers on all executors, not just the executor that completed. Fixed here: 8ce8596 and now there is no noticeable difference in performance

  2. But there was another performance issue this ended up unearthing. TaskSchedulerImpl.resourceOfferSingleTaskSet indexes into shuffledOffers by index, but that can be really slow if you've got a linkedlist style structure (all that is guaranteed by Seq, and indeed the normal structure you'll get from eg. where its called in CoarseGrainedSchedulerBackend). This only matters when you first submit tasks to a taskset, though, or when you add a new executor. With a large cluster, adding and removing executors can happen quite frequently with dynamic allocation, so this can be a notable slowdown. (though perhaps that call should also use makeOffers(executorId) instead? not sure why it doesn't already.)

squito pushed a commit that referenced this pull request Aug 18, 2016
## What changes were proposed in this pull request?

Implements `eval()` method for expression `AssertNotNull` so that we can convert local projection on LocalRelation to another LocalRelation.

### Before change:
```
scala> import org.apache.spark.sql.catalyst.dsl.expressions._
scala> import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
scala> import org.apache.spark.sql.Column
scala> case class A(a: Int)
scala> Seq((A(1),2)).toDS().select(new Column(AssertNotNull("_1".attr, Nil))).explain

java.lang.UnsupportedOperationException: Only code-generated evaluation is supported.
  at org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull.eval(objects.scala:850)
  ...
```

### After the change:
```
scala> Seq((A(1),2)).toDS().select(new Column(AssertNotNull("_1".attr, Nil))).explain(true)

== Parsed Logical Plan ==
'Project [assertnotnull('_1) AS assertnotnull(_1)#5]
+- LocalRelation [_1#2, _2#3]

== Analyzed Logical Plan ==
assertnotnull(_1): struct<a:int>
Project [assertnotnull(_1#2) AS assertnotnull(_1)#5]
+- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
LocalRelation [assertnotnull(_1)#5]

== Physical Plan ==
LocalTableScan [assertnotnull(_1)#5]
```

## How was this patch tested?

Unit test.

Author: Sean Zhong <[email protected]>

Closes apache#14486 from clockfly/assertnotnull_eval.
squito pushed a commit that referenced this pull request Oct 29, 2018
## What changes were proposed in this pull request?

Implements Every, Some, Any aggregates in SQL. These new aggregate expressions are analyzed in normal way and rewritten to equivalent existing aggregate expressions in the optimizer.

Every(x) => Min(x)  where x is boolean.
Some(x) => Max(x) where x is boolean.

Any is a synonym for Some.
SQL
```
explain extended select every(v) from test_agg group by k;
```
Plan :
```
== Parsed Logical Plan ==
'Aggregate ['k], [unresolvedalias('every('v), None)]
+- 'UnresolvedRelation `test_agg`

== Analyzed Logical Plan ==
every(v): boolean
Aggregate [k#0], [every(v#1) AS every(v)#5]
+- SubqueryAlias `test_agg`
   +- Project [k#0, v#1]
      +- SubqueryAlias `test_agg`
         +- LocalRelation [k#0, v#1]

== Optimized Logical Plan ==
Aggregate [k#0], [min(v#1) AS every(v)#5]
+- LocalRelation [k#0, v#1]

== Physical Plan ==
*(2) HashAggregate(keys=[k#0], functions=[min(v#1)], output=[every(v)#5])
+- Exchange hashpartitioning(k#0, 200)
   +- *(1) HashAggregate(keys=[k#0], functions=[partial_min(v#1)], output=[k#0, min#7])
      +- LocalTableScan [k#0, v#1]
Time taken: 0.512 seconds, Fetched 1 row(s)
```

## How was this patch tested?
Added tests in SQLQueryTestSuite, DataframeAggregateSuite

Closes apache#22809 from dilipbiswal/SPARK-19851-specific-rewrite.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
squito pushed a commit that referenced this pull request Jul 18, 2019
Introduces the new Shuffle Writer API. Ported from bloomberg#5.
squito pushed a commit that referenced this pull request Sep 27, 2019
## What changes were proposed in this pull request?
This PR aims at improving the way physical plans are explained in spark.

Currently, the explain output for physical plan may look very cluttered and each operator's
string representation can be very wide and wraps around in the display making it little
hard to follow. This especially happens when explaining a query 1) Operating on wide tables
2) Has complex expressions etc.

This PR attempts to split the output into two sections. In the header section, we display
the basic operator tree with a number associated with each operator. In this section, we strictly
control what we output for each operator. In the footer section, each operator is verbosely
displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be
correlated by the originating expression id from its parent plan.

To illustrate, here is a simple plan displayed in old vs new way.

Example query1 :
```
EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0
```

Old :
```
*(2) Project [key#2, max(val)apache#15]
+- *(2) Filter (isnotnull(max(val#3)apache#18) AND (max(val#3)apache#18 > 0))
   +- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)apache#15, max(val#3)apache#18])
      +- Exchange hashpartitioning(key#2, 200)
         +- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21])
            +- *(1) Project [key#2, val#3]
               +- *(1) Filter (isnotnull(key#2) AND (key#2 > 0))
                  +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>
```
New :
```
Project (8)
+- Filter (7)
   +- HashAggregate (6)
      +- Exchange (5)
         +- HashAggregate (4)
            +- Project (3)
               +- Filter (2)
                  +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (isnotnull(key#2) AND (key#2 > 0))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]

(4) HashAggregate [codegen id : 1]
Input: [key#2, val#3]

(5) Exchange
Input: [key#2, max#11]

(6) HashAggregate [codegen id : 2]
Input: [key#2, max#11]

(7) Filter [codegen id : 2]
Input     : [key#2, max(val)#5, max(val#3)#8]
Condition : (isnotnull(max(val#3)#8) AND (max(val#3)#8 > 0))

(8) Project [codegen id : 2]
Output    : [key#2, max(val)#5]
Input     : [key#2, max(val)#5, max(val#3)#8]
```

Example Query2 (subquery):
```
SELECT * FROM   explain_temp1 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp2 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp3 WHERE  val > 0) AND val = 2) AND val > 3
```
Old:
```
*(1) Project [key#2, val#3]
+- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3))
   :  +- Subquery scalar-subquery#39
   :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)apache#45])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47])
   :              +- *(1) Project [key#26]
   :                 +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2))
   :                    :  +- Subquery scalar-subquery#38
   :                    :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)apache#43])
   :                    :        +- Exchange SinglePartition
   :                    :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49])
   :                    :              +- *(1) Project [key#28]
   :                    :                 +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0))
   :                    :                    +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct<key:int,val:int>
   :                    +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct<key:int,val:int>
   +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct<key:int,val:int>
```
New:
```
Project (3)
+- Filter (2)
   +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]
===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23
HashAggregate (9)
+- Exchange (8)
   +- HashAggregate (7)
      +- Project (6)
         +- Filter (5)
            +- Scan parquet default.explain_temp2 (4)

(4) Scan parquet default.explain_temp2 [codegen id : 1]
Output: [key#26, val#27]

(5) Filter [codegen id : 1]
Input     : [key#26, val#27]
Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2))

(6) Project [codegen id : 1]
Output    : [key#26]
Input     : [key#26, val#27]

(7) HashAggregate [codegen id : 1]
Input: [key#26]

(8) Exchange
Input: [max#35]

(9) HashAggregate [codegen id : 2]
Input: [max#35]

Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22
HashAggregate (15)
+- Exchange (14)
   +- HashAggregate (13)
      +- Project (12)
         +- Filter (11)
            +- Scan parquet default.explain_temp3 (10)

(10) Scan parquet default.explain_temp3 [codegen id : 1]
Output: [key#28, val#29]

(11) Filter [codegen id : 1]
Input     : [key#28, val#29]
Condition : (isnotnull(val#29) AND (val#29 > 0))

(12) Project [codegen id : 1]
Output    : [key#28]
Input     : [key#28, val#29]

(13) HashAggregate [codegen id : 1]
Input: [key#28]

(14) Exchange
Input: [max#37]

(15) HashAggregate [codegen id : 2]
Input: [max#37]
```

Note:
I opened this PR as a WIP to start getting feedback. I will be on vacation starting tomorrow
would not be able to immediately incorporate the feedback. I will start to
work on them as soon as i can. Also, currently this PR provides a basic infrastructure
for explain enhancement. The details about individual operators will be implemented
in follow-up prs
## How was this patch tested?
Added a new test `explain.sql` that tests basic scenarios. Need to add more tests.

Closes apache#24759 from dilipbiswal/explain_feature.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants