Skip to content

Conversation

@mccheah
Copy link

@mccheah mccheah commented Jul 31, 2015

This takes the Python implementation of group-by-key and brings Scala up to parity, making Scala's group-by-key fault-tolerant to a single large group. It does so via using an ExternalList data structure to combine the groups, where ExternalList can spill if it gets too large.

First, the performance testing:

I tried a few versions of this. In my first implementation, I wrote the ExternalList class, and simply replaced the CompactBuffer usage in PairRDDFunctions.groupByKey with external lists. I then ran the ExternalListSuite that did the group by key operation. With 7200000 unique integers, bucketed into 5 buckets, the trials yielded:

89.333 ms
89.992 ms
104.026 ms

I then switched to the current implementation. It matches exactly what Python does with an ExternalSorter. So in essence, this is a sort-based group by. It wasn't clear to me why sort-based group by is better... until I saw the numbers, for the same RDD and buckets:

49632 ms
53615 ms
54340 ms

Therefore I went with the current implementation.

Some caveats that I am concerned about:

  1. ExternalList never cleans up the files it creates for spilling. This is noticeably different from ExternalAppendOnlyMap, which expects to only be iterated over once and then the map is discarded. We can't make that assumption for ExternalList, since the user could wish to iterate over the list a few times in any given RDD operation, meaning we have to keep the files around. I think that Spark itself will clean up its shuffle directories so we should be ok with not leaking files, but I'm still a bit paranoid.
  2. Serialization is a bit hacky in ExternalList - it took a bit of work to get ExternalList to be both serializable and yet still be able to be spillable. For example, I use an extra companion object to hold references to "constants" that I don't want to be wiped out upon serialization. Due to the way Java serialization works, the code blocks in a Scala class will not be invoked again, leaving all vals as null. The time where the val makes sense to be instantiated only upon creating the class, I either have to make it a var and instantiate it in the serialization process, or mark the val as lazy so it is re-initialized upon use post-serialization.
  3. ExternalList's spilling behavior is tied to shuffle parameters, which is a bit unintuitive. This is what Python does as well. I guess the alternative would be other Spark configurations, or parameters to group-by-key that have reasonable defaults. Pretty ugly either way.

cc @mingyukim , @zls88 , and @muyouzhi for review.

@JoshRosen
Copy link

ExternalList never cleans up the files it creates for spilling. This is noticeably different from ExternalAppendOnlyMap, which expects to only be iterated over once and then the map is discarded.

You might be able to use TaskContext's task completion listener mechanism to automatically perform cleanup at the end of a stage.

ExternalList's spilling behavior is tied to shuffle parameters, which is a bit unintuitive. This is what Python does as well.

It's quite likely that Tungsten will end up unifying / cleaning up our memory management interfaces such that we no longer have this weird scenario of anything that's spillable going through a ShuffleMemoryManager even in non-shuffle contexts.

@mccheah
Copy link
Author

mccheah commented Jul 31, 2015

@JoshRosen , what will happen if the RDD with the external list is cached and then the task completion listener decides to clean it? Actually I didn't think about caching nearly enough but I think these ExternalList objects as it currently stands should be able to be cached as the backing files are still around.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After having done some more reading on lazy val, it's become more clear that this should really be a var and instantiated on serialization because lazy val is definitely not "free"

@JoshRosen
Copy link

@mccheah Ah, right: caching definitely is a concern here. Creating the files within the shuffle spill directory should be sufficient to ensure that they're cleaned up once the application exits, but I think the concern with file leaks is that super-long-lived SparkContexts that perform tons and tons of queries might run out of disk space. I guess that we can't safely delete these files until the grouped RDD becomes unreachable and is garbage collected. Maybe there's a way to have the ContextCleaner handle cleanup in those cases, similar to how it cleans up shuffle files once ShuffleDependencies are garbage-collected.

@mccheah
Copy link
Author

mccheah commented Jul 31, 2015

I think Python has cleanup logic in the list's "destructor" because I believe destructors are a thing in Python. Of course, no analogous thing exists in Scala / Java.

I can't think of a great way to make ContextCleaner be able to handle this situation. The hard part is getting ExternalList to hook itself into ContextCleaner and for the ExternalList to associate itself with a cached RDD. And also if the RDD isn't cached then the cleanup should happen on the task completion listener side. The interaction is a bit complex.

@mccheah
Copy link
Author

mccheah commented Jul 31, 2015

I also thought a bit more about if this could be ignored for a first iteration but it's a pretty serious problem. If the list is spilled to disk, that means it couldn't fit in an executor's RAM, which means the list will probably be pretty big. So I could see the disk space being a legitimate concern.

@mccheah
Copy link
Author

mccheah commented Jul 31, 2015

@JoshRosen the problem with using ContextCleaner is that it's a Spark driver side construct, right? However it is the executors that know explicitly when the ExternalLists are being constructed and spilled. It would seem more intuitive to tie the cleanup to the Executor side.

It might be necessary, but a little hacky, to create an equivalent executor-side context cleaner. It's either going to be that or the executors informing the driver to record all the external lists that are written, which is even less than ideal.

I'm thinking of creating something generic like "ExecutorCleaner" and put it in the SparkEnv for the executor side and start it with an API to clean up ExternalLists. What do you think?

@mccheah
Copy link
Author

mccheah commented Jul 31, 2015

Ok I created an ExecutorCleaner class (not the best named class...). ExternalList objects register themselves for cleanup, but only after the task finishes because the task needs to build up the entire ExternalList first before registering the files that are to be purged. Probably still ugly parts of the code style-wise but feel free to look over the logic.

cloud-fan and others added 22 commits August 2, 2015 23:41
This PR adds a UnsafeArrayData, current we encode it in this way:

first 4 bytes is the # elements
then each 4 byte is the start offset of the element, unless it is negative, in which case the element is null.
followed by the elements themselves

an example:  [10, 11, 12, 13, null, 14] will be encoded as:
5, 28, 32, 36, 40, -44, 44, 10, 11, 12, 13, 14

Note that, when we read a UnsafeArrayData from bytes, we can read the first 4 bytes as numElements and take the rest(first 4 bytes skipped) as value region.

unsafe map data just use 2 unsafe array data, first 4 bytes is # of elements, second 4 bytes is numBytes of key array, the follows key array data and value array data.

Author: Wenchen Fan <[email protected]>

Closes apache#7752 from cloud-fan/unsafe-array and squashes the following commits:

3269bd7 [Wenchen Fan] fix a bug
6445289 [Wenchen Fan] add unit tests
49adf26 [Wenchen Fan] add unsafe map
20d1039 [Wenchen Fan] add comments and unsafe converter
821b8db [Wenchen Fan] add unsafe array
JIRA: https://issues.apache.org/jira/browse/SPARK-9549

This PR fix the following bugs:
1.  `UnaryMinus`'s codegen version would fail to compile when the input is `Long.MinValue`
2.  `BinaryComparison` would fail to compile in codegen mode when comparing Boolean types.
3.  `AddMonth` would fail if passed a huge negative month, which would lead accessing negative index of `monthDays` array.
4.  `Nanvl` with different type operands.

Author: Yijie Shen <[email protected]>

Closes apache#7882 from yjshen/minor_bug_fix and squashes the following commits:

41bbd2c [Yijie Shen] fix bug in Nanvl type coercion
3dee204 [Yijie Shen] address comments
4fa5de0 [Yijie Shen] fix bugs in expressions
This PR adds a base aggregation iterator `AggregationIterator`, which is used to create `SortBasedAggregationIterator` (for sort-based aggregation) and `UnsafeHybridAggregationIterator` (first it tries hash-based aggregation and falls back to the sort-based aggregation (using external sorter) if we cannot allocate memory for the map). With these two iterators, we will not need existing iterators and I am removing those. Also, we can use a single physical `Aggregate` operator and it internally determines what iterators to used.

https://issues.apache.org/jira/browse/SPARK-9240

Author: Yin Huai <[email protected]>

Closes apache#7813 from yhuai/AggregateOperator and squashes the following commits:

e317e2b [Yin Huai] Remove unnecessary change.
74d93c5 [Yin Huai] Merge remote-tracking branch 'upstream/master' into AggregateOperator
ba6afbc [Yin Huai] Add a little bit more comments.
c9cf3b6 [Yin Huai] update
0f1b06f [Yin Huai] Remove unnecessary code.
21fd15f [Yin Huai] Remove unnecessary change.
964f88b [Yin Huai] Implement fallback strategy.
b1ea5cf [Yin Huai] wip
7fcbd87 [Yin Huai] Add a flag to control what iterator to use.
533d5b2 [Yin Huai] Prepare for fallback!
33b7022 [Yin Huai] wip
bd9282b [Yin Huai] UDAFs now supports UnsafeRow.
f52ee53 [Yin Huai] wip
3171f44 [Yin Huai] wip
d2c45a0 [Yin Huai] wip
f60cc83 [Yin Huai] Also check input schema.
af32210 [Yin Huai] Check iter.hasNext before we create an iterator because the constructor of the iterato will read at least one row from a non-empty input iter.
299008c [Yin Huai] First round cleanup.
3915bac [Yin Huai] Create a base iterator class for aggregation iterators and add the initial version of the hybrid iterator.
…ce is used

This patch builds directly on apache#7820, which is largely written by tnachen. The only addition is one commit for cleaning up the code. There should be no functional differences between this and apache#7820.

Author: Timothy Chen <[email protected]>
Author: Andrew Or <[email protected]>

Closes apache#7881 from andrewor14/tim-cleanup-mesos-shuffle and squashes the following commits:

8894f7d [Andrew Or] Clean up code
2a5fa10 [Andrew Or] Merge branch 'mesos_shuffle_clean' of github.com:tnachen/spark into tim-cleanup-mesos-shuffle
fadff89 [Timothy Chen] Address comments.
e4d0f1d [Timothy Chen] Clean up external shuffle data on driver exit with Mesos.
…a copy buffer

Author: Wenchen Fan <[email protected]>

Closes apache#7885 from cloud-fan/cheap-copy and squashes the following commits:

0900ca1 [Wenchen Fan] replace == with ===
73f4ada [Wenchen Fan] add tests
07b865a [Wenchen Fan] add a cheap version of copy
Currently, when copy the bitsets, we didn't consider that the row1 may not sit in the beginning of byte array.

cc rxin

Author: Davies Liu <[email protected]>

Closes apache#7892 from davies/clean_join and squashes the following commits:

14cce9e [Davies Liu] cleanup generated UnsafeRowJoiner and fix bug
…ticClassifier

RandomForestClassifier now outputs rawPrediction based on tree probabilities, plus probability column computed from normalized rawPrediction.

CC: holdenk

Author: Joseph K. Bradley <[email protected]>

Closes apache#7859 from jkbradley/rf-prob and squashes the following commits:

6c28f51 [Joseph K. Bradley] Changed RandomForestClassifier to extend ProbabilisticClassifier
Certain use cases of Spark involve RDDs with long lineages that must be truncated periodically (e.g. GraphX). The existing way of doing it is through `rdd.checkpoint()`, which is expensive because it writes to HDFS. This patch provides an alternative to truncate lineages cheaply *without providing the same level of fault tolerance*.

**Local checkpointing** writes checkpointed data to the local file system through the block manager. It is much faster than replicating to a reliable storage and provides the same semantics as long as executors do not fail. It is accessible through a new operator `rdd.localCheckpoint()` and leaves the old one unchanged. Users may even decide to combine the two and call the reliable one less frequently.

The bulk of this patch involves refactoring the checkpointing interface to accept custom implementations of checkpointing. [Design doc](https://issues.apache.org/jira/secure/attachment/12741708/SPARK-7292-design.pdf).

Author: Andrew Or <[email protected]>

Closes apache#7279 from andrewor14/local-checkpoint and squashes the following commits:

729600f [Andrew Or] Oops, fix tests
34bc059 [Andrew Or] Avoid computing all partitions in local checkpoint
e43bbb6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
3be5aea [Andrew Or] Address comments
bf846a6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
ab003a3 [Andrew Or] Fix compile
c2e111b [Andrew Or] Address comments
33f167a [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
e908a42 [Andrew Or] Fix tests
f5be0f3 [Andrew Or] Use MEMORY_AND_DISK as the default local checkpoint level
a92657d [Andrew Or] Update a few comments
e58e3e3 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
4eb6eb1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
1bbe154 [Andrew Or] Simplify LocalCheckpointRDD
48a9996 [Andrew Or] Avoid traversing dependency tree + rewrite tests
62aba3f [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
db70dc2 [Andrew Or] Express local checkpointing through caching the original RDD
87d43c6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
c449b38 [Andrew Or] Fix style
4a182f3 [Andrew Or] Add fine-grained tests for local checkpointing
53b363b [Andrew Or] Rename a few more awkwardly named methods (minor)
e4cf071 [Andrew Or] Simplify LocalCheckpointRDD + docs + clean ups
4880deb [Andrew Or] Fix style
d096c67 [Andrew Or] Fix mima
172cb66 [Andrew Or] Fix mima?
e53d964 [Andrew Or] Fix style
56831c5 [Andrew Or] Add a few warnings and clear exception messages
2e59646 [Andrew Or] Add local checkpoint clean up tests
4dbbab1 [Andrew Or] Refactor CheckpointSuite to test local checkpointing
4514dc9 [Andrew Or] Clean local checkpoint files through RDD cleanups
0477eec [Andrew Or] Rename a few methods with awkward names (minor)
2e902e5 [Andrew Or] First implementation of local checkpointing
8447454 [Andrew Or] Fix tests
4ac1896 [Andrew Or] Refactor checkpoint interface for modularity
The issue was that the tokenizer was parsing "1one" into the numeric 1 using the code on line 110. I added another case to accept strings that start with a number and then have a letter somewhere else in it as well.

Author: Joseph Batchik <[email protected]>

Closes apache#7844 from JDrit/parse_error and squashes the following commits:

b8ca12f [Joseph Batchik] fixed parsing issue by adding another case
Author: Reynold Xin <[email protected]>

Closes apache#7897 from rxin/calculateBitSetWidthInBytes and squashes the following commits:

2e73b3a [Reynold Xin] [SQL][minor] Simplify UnsafeRow.calculateBitSetWidthInBytes.
Author: Cheng Lian <[email protected]>

Closes apache#7895 from liancheng/spark-9554/enable-in-memory-partition-pruning and squashes the following commits:

67c403e [Cheng Lian] Enables in-memory partition pruning by default
…nd Regressor

Added featureImportance to RandomForestClassifier and Regressor.

This follows the scikit-learn implementation here: [https://github.com/scikit-learn/scikit-learn/blob/a95203b249c1cf392f86d001ad999e29b2392739/sklearn/tree/_tree.pyx#L3341]

CC: yanboliang  Would you mind taking a look?  Thanks!

Author: Joseph K. Bradley <[email protected]>
Author: Feynman Liang <[email protected]>

Closes apache#7838 from jkbradley/dt-feature-importance and squashes the following commits:

72a167a [Joseph K. Bradley] fixed unit test
86cea5f [Joseph K. Bradley] Modified RF featuresImportances to return Vector instead of Map
5aa74f0 [Joseph K. Bradley] finally fixed unit test for real
33df5db [Joseph K. Bradley] fix unit test
42a2d3b [Joseph K. Bradley] fix unit test
fe94e72 [Joseph K. Bradley] modified feature importance unit tests
cc693ee [Feynman Liang] Add classifier tests
79a6f87 [Feynman Liang] Compare dense vectors in test
21d01fc [Feynman Liang] Added failing SKLearn test
ac0b254 [Joseph K. Bradley] Added featureImportance to RandomForestClassifier/Regressor.  Need to add unit tests
Now the memory defaults of master and slave in Standalone mode and History Server is 1g, not 512m. So let's update docs.

Author: Kousuke Saruta <[email protected]>

Closes apache#7896 from sarutak/update-doc-for-daemon-memory and squashes the following commits:

a77626c [Kousuke Saruta] Fix docs to follow the update of increase of memory defaults
Add ml.PCA user guide document and code examples for Scala/Java/Python.

Author: Yanbo Liang <[email protected]>

Closes apache#7522 from yanboliang/ml-pca-md and squashes the following commits:

60dec05 [Yanbo Liang] address comments
f992abe [Yanbo Liang] Add ml.PCA doc and examples
Add Python API for RFormula. Similar to other feature transformers in Python. This is just a thin wrapper over the Scala implementation. ericl MechCoder

Author: Xiangrui Meng <[email protected]>

Closes apache#7879 from mengxr/SPARK-9544 and squashes the following commits:

3d5ff03 [Xiangrui Meng] add an doctest for . and -
5e969a5 [Xiangrui Meng] fix pydoc
1cd41f8 [Xiangrui Meng] organize imports
3c18b10 [Xiangrui Meng] add Python API for RFormula
…ations

This patch exposes the memory used by internal data structures on the SparkUI. This tracks memory used by all spilling operations and SQL operators backed by Tungsten, e.g. `BroadcastHashJoin`, `ExternalSort`, `GeneratedAggregate` etc. The metric exposed is "peak execution memory", which broadly refers to the peak in-memory sizes of each of these data structure.

A separate patch will extend this by linking the new information to the SQL operators themselves.

<img width="950" alt="screen shot 2015-07-29 at 7 43 17 pm" src="https://cloud.githubusercontent.com/assets/2133137/8974776/b90fc980-362a-11e5-9e2b-842da75b1641.png">
<img width="802" alt="screen shot 2015-07-29 at 7 43 05 pm" src="https://cloud.githubusercontent.com/assets/2133137/8974777/baa76492-362a-11e5-9b77-e364a6a6b64e.png">

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7770)
<!-- Reviewable:end -->

Author: Andrew Or <[email protected]>

Closes apache#7770 from andrewor14/expose-memory-metrics and squashes the following commits:

9abecb9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
f5b0d68 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
d7df332 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
8eefbc5 [Andrew Or] Fix non-failing tests
9de2a12 [Andrew Or] Fix tests due to another logical merge conflict
876bfa4 [Andrew Or] Fix failing test after logical merge conflict
361a359 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
40b4802 [Andrew Or] Fix style?
d0fef87 [Andrew Or] Fix tests?
b3b92f6 [Andrew Or] Address comments
0625d73 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
c00a197 [Andrew Or] Fix potential NPEs
10da1cd [Andrew Or] Fix compile
17f4c2d [Andrew Or] Fix compile?
a87b4d0 [Andrew Or] Fix compile?
d70874d [Andrew Or] Fix test compile + address comments
2840b7d [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
6aa2f7a [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
b889a68 [Andrew Or] Minor changes: comments, spacing, style
663a303 [Andrew Or] UnsafeShuffleWriter: update peak memory before close
d090a94 [Andrew Or] Fix style
2480d84 [Andrew Or] Expand test coverage
5f1235b [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
1ecf678 [Andrew Or] Minor changes: comments, style, unused imports
0b6926c [Andrew Or] Oops
111a05e [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
a7a39a5 [Andrew Or] Strengthen presence check for accumulator
a919eb7 [Andrew Or] Add tests for unsafe shuffle writer
23c845d [Andrew Or] Add tests for SQL operators
a757550 [Andrew Or] Address comments
b5c51c1 [Andrew Or] Re-enable test in JavaAPISuite
5107691 [Andrew Or] Add tests for internal accumulators
59231e4 [Andrew Or] Fix tests
9528d09 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
5b5e6f3 [Andrew Or] Add peak execution memory to summary table + tooltip
92b4b6b [Andrew Or] Display peak execution memory on the UI
eee5437 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
d9b9015 [Andrew Or] Track execution memory in unsafe shuffles
770ee54 [Andrew Or] Track execution memory in broadcast joins
9c605a4 [Andrew Or] Track execution memory in GeneratedAggregate
9e824f2 [Andrew Or] Add back execution memory tracking for *ExternalSort
4ef4cb1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
e6c3e2f [Andrew Or] Move internal accumulators creation to Stage
a417592 [Andrew Or] Expose memory metrics in UnsafeExternalSorter
3c4f042 [Andrew Or] Track memory usage in ExternalAppendOnlyMap / ExternalSorter
bd7ab3f [Andrew Or] Add internal accumulators to TaskContext
Cherry picked the parts of the initial SPARK-8064 WiP branch needed to get sql/hive to compile against hive 1.2.1. That's the ASF release packaged under org.apache.hive, not any fork.

Tests not run yet: that's what the machines are for

Author: Steve Loughran <[email protected]>
Author: Cheng Lian <[email protected]>
Author: Michael Armbrust <[email protected]>
Author: Patrick Wendell <[email protected]>

Closes apache#7191 from steveloughran/stevel/feature/SPARK-8064-hive-1.2-002 and squashes the following commits:

7556d85 [Cheng Lian] Updates .q files and corresponding golden files
ef4af62 [Steve Loughran] Merge commit '6a92bb09f46a04d6cd8c41bdba3ecb727ebb9030' into stevel/feature/SPARK-8064-hive-1.2-002
6a92bb0 [Cheng Lian] Overrides HiveConf time vars
dcbb391 [Cheng Lian] Adds com.twitter:parquet-hadoop-bundle:1.6.0 for Hive Parquet SerDe
0bbe475 [Steve Loughran] SPARK-8064 scalastyle rejects the standard Hadoop ASF license header...
fdf759b [Steve Loughran] SPARK-8064 classpath dependency suite to be in sync with shading in final (?) hive-exec spark
7a6c727 [Steve Loughran] SPARK-8064 switch to second staging repo of the spark-hive artifacts. This one has the protobuf-shaded hive-exec jar
376c003 [Steve Loughran] SPARK-8064 purge duplicate protobuf declaration
2c74697 [Steve Loughran] SPARK-8064 switch to the protobuf shaded hive-exec jar with tests to chase it down
cc44020 [Steve Loughran] SPARK-8064 remove hadoop.version from runtest.py, as profile will fix that automatically.
6901fa9 [Steve Loughran] SPARK-8064 explicit protobuf import
da310dc [Michael Armbrust] Fixes for Hive tests.
a775a75 [Steve Loughran] SPARK-8064 cherry-pick-incomplete
7404f34 [Patrick Wendell] Add spark-hive staging repo
832c164 [Steve Loughran] SPARK-8064 try to supress compiler warnings on Complex.java pasted-thrift-code
312c0d4 [Steve Loughran] SPARK-8064  maven/ivy dependency purge; calcite declaration needed
fa5ae7b [Steve Loughran] HIVE-8064 fix up hive-thriftserver dependencies and cut back on evicted references in the hive- packages; this keeps mvn and ivy resolution compatible, as the reconciliation policy is "by hand"
c188048 [Steve Loughran] SPARK-8064 manage the Hive depencencies to that -things that aren't needed are excluded -sql/hive built with ivy is in sync with the maven reconciliation policy, rather than latest-first
4c8be8d [Cheng Lian] WIP: Partial fix for Thrift server and CLI tests
314eb3c [Steve Loughran] SPARK-8064 deprecation warning  noise in one of the tests
17b0341 [Steve Loughran] SPARK-8064 IDE-hinted cleanups of Complex.java to reduce compiler warnings. It's all autogenerated code, so still ugly.
d029b92 [Steve Loughran] SPARK-8064 rely on unescaping to have already taken place, so go straight to map of serde options
23eca7e [Steve Loughran] HIVE-8064 handle raw and escaped property tokens
54d9b06 [Steve Loughran] SPARK-8064 fix compilation regression surfacing from rebase
0b12d5f [Steve Loughran] HIVE-8064 use subset of hive complex type whose types deserialize
fce73b6 [Steve Loughran] SPARK-8064 poms rely implicitly on the version of kryo chill provides
fd3aa5d [Steve Loughran] SPARK-8064 version of hive to d/l from ivy is 1.2.1
dc73ece [Steve Loughran] SPARK-8064 revert to master's determinstic pushdown strategy
d3c1e4a [Steve Loughran] SPARK-8064 purge UnionType
051cc21 [Steve Loughran] SPARK-8064 switch to an unshaded version of hive-exec-core, which must have been built with Kryo 2.21. This currently looks for a (locally built) version 1.2.1.spark
6684c60 [Steve Loughran] SPARK-8064 ignore RTE raised in blocking process.exitValue() call
e6121e5 [Steve Loughran] SPARK-8064 address review comments
aa43dc6 [Steve Loughran] SPARK-8064  more robust teardown on JavaMetastoreDatasourcesSuite
f2bff01 [Steve Loughran] SPARK-8064 better takeup of asynchronously caught error text
8b1ef38 [Steve Loughran] SPARK-8064: on failures executing spark-submit in HiveSparkSubmitSuite, print command line and all logged output.
5a9ce6b [Steve Loughran] SPARK-8064 add explicit reason for kv split failure, rather than array OOB. *does not address the issue*
642b63a [Steve Loughran] SPARK-8064 reinstate something cut briefly during rebasing
97194dc [Steve Loughran] SPARK-8064 add extra logging to the YarnClusterSuite classpath test. There should be no reason why this is failing on jenkins, but as it is (and presumably its CP-related), improve the logging including any exception raised.
335357f [Steve Loughran] SPARK-8064 fail fast on thrive process spawning tests on exit codes and/or error string patterns seen in log.
3ed872f [Steve Loughran] SPARK-8064 rename field double to  dbl
bca55e5 [Steve Loughran] SPARK-8064 missed one of the `date` escapes
41d6479 [Steve Loughran] SPARK-8064 wrap tests with withTable() calls to avoid table-exists exceptions
2bc29a4 [Steve Loughran] SPARK-8064 ParquetSuites to escape `date` field name
1ab9bc4 [Steve Loughran] SPARK-8064 TestHive to use sered2.thrift.test.Complex
bf3a249 [Steve Loughran] SPARK-8064: more resubmit than fix; tighten startup timeout to 60s. Still no obvious reason why jersey server code in spark-assembly isn't being picked up -it hasn't been shaded
c829b8f [Steve Loughran] SPARK-8064: reinstate yarn-rm-server dependencies to hive-exec to ensure that jersey server is on classpath on hadoop versions < 2.6
0b0f738 [Steve Loughran] SPARK-8064: thrift server startup to fail fast on any exception in the main thread
13abaf1 [Steve Loughran] SPARK-8064 Hive compatibilty tests sin sync with explain/show output from Hive 1.2.1
d14d5ea [Steve Loughran] SPARK-8064: DATE is now a predicate; you can't use it as a field in select ops
26eef1c [Steve Loughran] SPARK-8064: HIVE-9039 renamed TOK_UNION => TOK_UNIONALL while adding TOK_UNIONDISTINCT
3d64523 [Steve Loughran] SPARK-8064 improve diagns on uknown token; fix scalastyle failure
d0360f6 [Steve Loughran] SPARK-8064: delicate merge in of the branch vanzin/hive-1.1
1126e5a [Steve Loughran] SPARK-8064: name of unrecognized file format wasn't appearing in error text
8cb09c4 [Steve Loughran] SPARK-8064: test resilience/assertion improvements. Independent of the rest of the work; can be backported to earlier versions
dec12cb [Steve Loughran] SPARK-8064: when a CLI suite test fails include the full output text in the raised exception; this ensures that the stdout/stderr is included in jenkins reports, so it becomes possible to diagnose the cause.
463a670 [Steve Loughran] SPARK-8064 run-tests.py adds a hadoop-2.6 profile, and changes info messages to say "w/Hive 1.2.1" in console output
2531099 [Steve Loughran] SPARK-8064 successful attempt to get rid of pentaho as a transitive dependency of hive-exec
1d59100 [Steve Loughran] SPARK-8064 (unsuccessful) attempt to get rid of pentaho as a transitive dependency of hive-exec
75733fc [Steve Loughran] SPARK-8064 change thrift binary startup message to "Starting ThriftBinaryCLIService on port"
3ebc279 [Steve Loughran] SPARK-8064 move strings used to check for http/bin thrift services up into constants
c80979d [Steve Loughran] SPARK-8064: SparkSQLCLIDriver drops remote mode support. CLISuite Tests pass instead of timing out: undetected regression?
27e8370 [Steve Loughran] SPARK-8064 fix some style & IDE warnings
00e50d6 [Steve Loughran] SPARK-8064 stop excluding hive shims from dependency (commented out , for now)
cb4f142 [Steve Loughran] SPARK-8054 cut pentaho dependency from calcite
f7aa9cb [Steve Loughran] SPARK-8064 everything compiles with some commenting and moving of classes into a hive package
6c310b4 [Steve Loughran] SPARK-8064 subclass  Hive ServerOptionsProcessor to make it public again
f61a675 [Steve Loughran] SPARK-8064 thrift server switched to Hive 1.2.1, though it doesn't compile everywhere
4890b9d [Steve Loughran] SPARK-8064, build against Hive 1.2.1
Add missing methods

1. getVectors
2. findSynonyms

to W2Vec scala and python API

mengxr

Author: MechCoder <[email protected]>

Closes apache#7263 from MechCoder/missing_methods_w2vec and squashes the following commits:

149d5ca [MechCoder] minor doc
69d91b7 [MechCoder] [SPARK-8874] [ML] Add missing methods in Word2Vec
This puts all the install commands that need to be run in one section instead of being spread over many paragraphs

cc rxin

Author: Shivaram Venkataraman <[email protected]>

Closes apache#7912 from shivaram/docs-setup-readme and squashes the following commits:

cf7a204 [Shivaram Venkataraman] Add a prerequisites section for building docs
Previous code assumed little-endian.

Author: Matthew Brandyberry <[email protected]>

Closes apache#7902 from mtbrandy/SPARK-9483 and squashes the following commits:

ec31df8 [Matthew Brandyberry] [SPARK-9483] Changes from review comments.
17d54c6 [Matthew Brandyberry] [SPARK-9483] Fix UTF8String.getPrefix for big-endian.
yhuai and others added 11 commits August 13, 2015 15:08
…olatedClientLoader when hiveMetastoreJars is set to maven.

https://issues.apache.org/jira/browse/SPARK-9885

cc marmbrus liancheng

Author: Yin Huai <[email protected]>

Closes apache#8158 from yhuai/classloaderMaven.
… column

PR apache#7967 enables us to save data source relations to metastore in Hive compatible format when possible. But it fails to persist Parquet relations with decimal column(s) to Hive metastore of versions lower than 1.2.0. This is because `ParquetHiveSerDe` in Hive versions prior to 1.2.0 doesn't support decimal. This PR checks for this case and falls back to Spark SQL specific metastore table format.

Author: Yin Huai <[email protected]>
Author: Cheng Lian <[email protected]>

Closes apache#8130 from liancheng/spark-9757/old-hive-parquet-decimal.
…here is a parent

Copied ML models must have the same parent of original ones

Author: lewuathe <[email protected]>
Author: Lewuathe <[email protected]>

Closes apache#7447 from Lewuathe/SPARK-9073.
…er, and Param

Added ml-guide Python Example: Estimator, Transformer, and Param
/docs/_site/ml-guide.html

Author: Rosstin <[email protected]>

Closes apache#8081 from Rosstin/SPARK-8965.
…PerceptronClassificationModel

To follow the naming rule of ML, change `MultilayerPerceptronClassifierModel` to `MultilayerPerceptronClassificationModel` like `DecisionTreeClassificationModel`, `GBTClassificationModel` and so on.

Author: Yanbo Liang <[email protected]>

Closes apache#8164 from yanboliang/mlp-name.
Switch to correct Sphinx syntax. MechCoder

Author: Xiangrui Meng <[email protected]>

Closes apache#8169 from mengxr/mllib-pydoc-fix.
This particular test did not load the default configurations so
it continued to start the REST server, which causes port bind
exceptions.
I skimmed through the docs for various instance of Object and replaced them with Java compaible versions of the same.

1. Some methods in LDAModel.
2. runMiniBatchSGD
3. kolmogorovSmirnovTest

Author: MechCoder <[email protected]>

Closes apache#8126 from MechCoder/java_incop.
…ndas

If pandas is broken (can't be imported, raise other exceptions other than ImportError), pyspark can't be imported, we should ignore all the exceptions.

Author: Davies Liu <[email protected]>

Closes apache#8173 from davies/fix_pandas.
What `StringIndexerInverse` does is not strictly associated with `StringIndexer`, and the name is not clearly describing the transformation. Renaming to `IndexToString` might be better.

~~I also changed `invert` to `inverse` without arguments. `inputCol` and `outputCol` could be set after.~~
I also removed `invert`.

jkbradley holdenk

Author: Xiangrui Meng <[email protected]>

Closes apache#8152 from mengxr/SPARK-9922.
@justinuang
Copy link

@mccheah, where is the code for the python destructor for the list? From a quick scan of the python code, it looks like they just create an iterable that returns a generator iterator that reads from the spill dirs and removes them right afterwards. I'm probably missing something, because this implies that the iterable actually can only be iterated over once.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally this is frowned upon, because foreach could possibly implemented in a parallel way, leading to concurrency issues if c1 isn't thread safe. I think in this case, we should be fine because c2 is not a parallel collection but just flagging the pattern.

I think you can do this instead

c1 ++= c2

http://www.scala-lang.org/api/current/index.html#scala.collection.generic.Growable@++=(xs:scala.collection.TraversableOnce[A]):Growable.this.type

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to define ++= on ExternalList to do this. Which is doable, just it wasn't done.

@justinuang
Copy link

Looks good overall, I'm mainly worried about the cleanup logic, lets discuss after you show me where the python cleanup logic is?

Conflicts:
	core/src/main/scala/org/apache/spark/ContextCleaner.scala
	core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@mccheah
Copy link
Author

mccheah commented Aug 25, 2015

k I'm going to open the PR against master now.

@mccheah mccheah closed this Aug 25, 2015
ash211 pushed a commit that referenced this pull request Feb 16, 2017
mccheah pushed a commit that referenced this pull request Apr 27, 2017
robert3005 pushed a commit that referenced this pull request May 2, 2017
…boxing/unboxing

## What changes were proposed in this pull request?

This PR improve performance of Dataset.map() for primitive types by removing boxing/unbox operations. This is based on [the discussion](apache#16391 (comment)) with cloud-fan.

Current Catalyst generates a method call to a `apply()` method of an anonymous function written in Scala. The types of an argument and return value are `java.lang.Object`. As a result, each method call for a primitive value involves a pair of unboxing and boxing for calling this `apply()` method and a pair of boxing and unboxing for returning from this `apply()` method.

This PR directly calls a specialized version of a `apply()` method without boxing and unboxing. For example, if types of an arguments ant return value is `int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any combination of `Int`, `Long`, `Float`, and `Double`.

The following is a benchmark result using [this program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a Dataset part of this program.

Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
RDD                                           1923 / 1952         52.0          19.2       1.0X
DataFrame                                      526 /  548        190.2           5.3       3.7X
Dataset                                       3094 / 3154         32.3          30.9       0.6X
```

With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
RDD                                           1883 / 1892         53.1          18.8       1.0X
DataFrame                                      502 /  642        199.1           5.0       3.7X
Dataset                                        657 /  784        152.2           6.6       2.9X
```

```java
  def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
    import spark.implicits._
    val rdd = spark.sparkContext.range(0, numRows)
    val ds = spark.range(0, numRows)
    val func = (l: Long) => l + 1
    val benchmark = new Benchmark("back-to-back map", numRows)
...
    benchmark.addCase("Dataset") { iter =>
      var res = ds.as[Long]
      var i = 0
      while (i < numChains) {
        res = res.map(func)
        i += 1
      }
      res.queryExecution.toRdd.foreach(_ => Unit)
    }
    benchmark
  }
```

A motivating example
```java
Seq(1, 2, 3).toDS.map(i => i * 7).show
```

Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow deserializetoobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 012 */   private int mapelements_argValue;
/* 013 */   private UnsafeRow mapelements_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 016 */   private UnsafeRow serializefromobject_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     inputadapter_input = inputs[0];
/* 028 */     deserializetoobject_result = new UnsafeRow(1);
/* 029 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0);
/* 030 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 031 */
/* 032 */     mapelements_result = new UnsafeRow(1);
/* 033 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0);
/* 034 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 035 */     serializefromobject_result = new UnsafeRow(1);
/* 036 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 037 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   protected void processNext() throws java.io.IOException {
/* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 043 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
/* 045 */
/* 046 */       boolean mapelements_isNull = true;
/* 047 */       int mapelements_value = -1;
/* 048 */       if (!false) {
/* 049 */         mapelements_argValue = inputadapter_value;
/* 050 */
/* 051 */         mapelements_isNull = false;
/* 052 */         if (!mapelements_isNull) {
/* 053 */           Object mapelements_funcResult = null;
/* 054 */           mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue);
/* 055 */           if (mapelements_funcResult == null) {
/* 056 */             mapelements_isNull = true;
/* 057 */           } else {
/* 058 */             mapelements_value = (Integer) mapelements_funcResult;
/* 059 */           }
/* 060 */
/* 061 */         }
/* 062 */
/* 063 */       }
/* 064 */
/* 065 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 066 */
/* 067 */       if (mapelements_isNull) {
/* 068 */         serializefromobject_rowWriter.setNullAt(0);
/* 069 */       } else {
/* 070 */         serializefromobject_rowWriter.write(0, mapelements_value);
/* 071 */       }
/* 072 */       append(serializefromobject_result);
/* 073 */       if (shouldStop()) return;
/* 074 */     }
/* 075 */   }
/* 076 */ }
```

Generated code with this PR (lines 48-56 are changed)
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow deserializetoobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 012 */   private int mapelements_argValue;
/* 013 */   private UnsafeRow mapelements_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 016 */   private UnsafeRow serializefromobject_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     inputadapter_input = inputs[0];
/* 028 */     deserializetoobject_result = new UnsafeRow(1);
/* 029 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0);
/* 030 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 031 */
/* 032 */     mapelements_result = new UnsafeRow(1);
/* 033 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0);
/* 034 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 035 */     serializefromobject_result = new UnsafeRow(1);
/* 036 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 037 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   protected void processNext() throws java.io.IOException {
/* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 043 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
/* 045 */
/* 046 */       boolean mapelements_isNull = true;
/* 047 */       int mapelements_value = -1;
/* 048 */       if (!false) {
/* 049 */         mapelements_argValue = inputadapter_value;
/* 050 */
/* 051 */         mapelements_isNull = false;
/* 052 */         if (!mapelements_isNull) {
/* 053 */           mapelements_value = ((scala.Function1) references[0]).apply$mcII$sp(mapelements_argValue);
/* 054 */         }
/* 055 */
/* 056 */       }
/* 057 */
/* 058 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 059 */
/* 060 */       if (mapelements_isNull) {
/* 061 */         serializefromobject_rowWriter.setNullAt(0);
/* 062 */       } else {
/* 063 */         serializefromobject_rowWriter.write(0, mapelements_value);
/* 064 */       }
/* 065 */       append(serializefromobject_result);
/* 066 */       if (shouldStop()) return;
/* 067 */     }
/* 068 */   }
/* 069 */ }
```

Java bytecode for methods for `i => i * 7`
```java
$ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
Compiled from "Test.scala"
public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
  public static final long serialVersionUID;

  public final int apply(int);
    Code:
       0: aload_0
       1: iload_1
       2: invokevirtual #18                 // Method apply$mcII$sp:(I)I
       5: ireturn

  public int apply$mcII$sp(int);
    Code:
       0: iload_1
       1: bipush        7
       3: imul
       4: ireturn

  public final java.lang.Object apply(java.lang.Object);
    Code:
       0: aload_0
       1: aload_1
       2: invokestatic  #29                 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
       5: invokevirtual #31                 // Method apply:(I)I
       8: invokestatic  #35                 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
      11: areturn

  public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
    Code:
       0: aload_0
       1: invokespecial #42                 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
       4: return
}
```
## How was this patch tested?

Added new test suites to `DatasetPrimitiveSuite`.

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#17172 from kiszk/SPARK-19008.
robert3005 pushed a commit that referenced this pull request May 2, 2017
…nd.stop

## What changes were proposed in this pull request?

`o.a.s.streaming.StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` is flaky is because there is a potential dead-lock in StandaloneSchedulerBackend which causes `await` timeout. Here is the related stack trace:
```
"Thread-31" #211 daemon prio=5 os_prio=31 tid=0x00007fedd4808000 nid=0x16403 waiting on condition [0x00007000239b7000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000079b49ca10> (a scala.concurrent.impl.Promise$CompletionLatch)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:402)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:213)
	- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:116)
	- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
	at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:517)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1657)
	at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1302)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)
	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:708)
	at org.apache.spark.streaming.StreamingContextSuite$$anonfun$43$$anonfun$apply$mcV$sp$66$$anon$3.run(StreamingContextSuite.scala:827)

"dispatcher-event-loop-3" #18 daemon prio=5 os_prio=31 tid=0x00007fedd603a000 nid=0x6203 waiting for monitor entry [0x0000700003be4000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:253)
	- waiting to lock <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:124)
	at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
	at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```

This PR removes `synchronized` and changes `stopping` to AtomicBoolean to ensure idempotent to fix the dead-lock.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#17610 from zsxwing/SPARK-20131.
mccheah pushed a commit that referenced this pull request Aug 26, 2019
## What changes were proposed in this pull request?
This PR aims at improving the way physical plans are explained in spark.

Currently, the explain output for physical plan may look very cluttered and each operator's
string representation can be very wide and wraps around in the display making it little
hard to follow. This especially happens when explaining a query 1) Operating on wide tables
2) Has complex expressions etc.

This PR attempts to split the output into two sections. In the header section, we display
the basic operator tree with a number associated with each operator. In this section, we strictly
control what we output for each operator. In the footer section, each operator is verbosely
displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be
correlated by the originating expression id from its parent plan.

To illustrate, here is a simple plan displayed in old vs new way.

Example query1 :
```
EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0
```

Old :
```
*(2) Project [key#2, max(val)#15]
+- *(2) Filter (isnotnull(max(val#3)#18) AND (max(val#3)#18 > 0))
   +- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)#15, max(val#3)#18])
      +- Exchange hashpartitioning(key#2, 200)
         +- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21])
            +- *(1) Project [key#2, val#3]
               +- *(1) Filter (isnotnull(key#2) AND (key#2 > 0))
                  +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>
```
New :
```
Project (8)
+- Filter (7)
   +- HashAggregate (6)
      +- Exchange (5)
         +- HashAggregate (4)
            +- Project (3)
               +- Filter (2)
                  +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (isnotnull(key#2) AND (key#2 > 0))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]

(4) HashAggregate [codegen id : 1]
Input: [key#2, val#3]

(5) Exchange
Input: [key#2, max#11]

(6) HashAggregate [codegen id : 2]
Input: [key#2, max#11]

(7) Filter [codegen id : 2]
Input     : [key#2, max(val)#5, max(val#3)#8]
Condition : (isnotnull(max(val#3)#8) AND (max(val#3)#8 > 0))

(8) Project [codegen id : 2]
Output    : [key#2, max(val)#5]
Input     : [key#2, max(val)#5, max(val#3)#8]
```

Example Query2 (subquery):
```
SELECT * FROM   explain_temp1 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp2 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp3 WHERE  val > 0) AND val = 2) AND val > 3
```
Old:
```
*(1) Project [key#2, val#3]
+- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3))
   :  +- Subquery scalar-subquery#39
   :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)#45])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47])
   :              +- *(1) Project [key#26]
   :                 +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2))
   :                    :  +- Subquery scalar-subquery#38
   :                    :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)#43])
   :                    :        +- Exchange SinglePartition
   :                    :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49])
   :                    :              +- *(1) Project [key#28]
   :                    :                 +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0))
   :                    :                    +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct<key:int,val:int>
   :                    +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct<key:int,val:int>
   +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct<key:int,val:int>
```
New:
```
Project (3)
+- Filter (2)
   +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]
===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23
HashAggregate (9)
+- Exchange (8)
   +- HashAggregate (7)
      +- Project (6)
         +- Filter (5)
            +- Scan parquet default.explain_temp2 (4)

(4) Scan parquet default.explain_temp2 [codegen id : 1]
Output: [key#26, val#27]

(5) Filter [codegen id : 1]
Input     : [key#26, val#27]
Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2))

(6) Project [codegen id : 1]
Output    : [key#26]
Input     : [key#26, val#27]

(7) HashAggregate [codegen id : 1]
Input: [key#26]

(8) Exchange
Input: [max#35]

(9) HashAggregate [codegen id : 2]
Input: [max#35]

Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22
HashAggregate (15)
+- Exchange (14)
   +- HashAggregate (13)
      +- Project (12)
         +- Filter (11)
            +- Scan parquet default.explain_temp3 (10)

(10) Scan parquet default.explain_temp3 [codegen id : 1]
Output: [key#28, val#29]

(11) Filter [codegen id : 1]
Input     : [key#28, val#29]
Condition : (isnotnull(val#29) AND (val#29 > 0))

(12) Project [codegen id : 1]
Output    : [key#28]
Input     : [key#28, val#29]

(13) HashAggregate [codegen id : 1]
Input: [key#28]

(14) Exchange
Input: [max#37]

(15) HashAggregate [codegen id : 2]
Input: [max#37]
```

Note:
I opened this PR as a WIP to start getting feedback. I will be on vacation starting tomorrow
would not be able to immediately incorporate the feedback. I will start to
work on them as soon as i can. Also, currently this PR provides a basic infrastructure
for explain enhancement. The details about individual operators will be implemented
in follow-up prs
## How was this patch tested?
Added a new test `explain.sql` that tests basic scenarios. Need to add more tests.

Closes apache#24759 from dilipbiswal/explain_feature.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.