Update upstream#121
Merged
GulajavaMinistudio merged 13 commits intoGulajavaMinistudio:masterfrom Aug 2, 2017
Merged
Conversation
…ypes ## What changes were proposed in this pull request? Currently, StructType.merge() only reports data types of conflicting fields when merging two incompatible schemas. It would be nice to also report the field names for easier debugging. ## How was this patch tested? Unit test in DataTypeSuite. Print exception message when conflict is triggered. Author: bravo-zhang <mzhang1230@gmail.com> Closes #16365 from bravo-zhang/spark-18950.
…tion algorithms ## What changes were proposed in this pull request? SPARK-20307 Added handleInvalid option to RFormula for tree-based classification algorithms. We should add this parameter for other classification algorithms in SparkR. This is a followup PR for SPARK-20307. ## How was this patch tested? New Unit tests are added. Author: wangmiao1981 <wm624@hotmail.com> Closes #18605 from wangmiao1981/class.
…leOutputStream in some critical paths ## What changes were proposed in this pull request? Java's `FileInputStream` and `FileOutputStream` overrides finalize(), even this file input/output stream is closed correctly and promptly, it will still leave some memory footprints which will only get cleaned in Full GC. This will introduce two side effects: 1. Lots of memory footprints regarding to Finalizer will be kept in memory and this will increase the memory overhead. In our use case of external shuffle service, a busy shuffle service will have bunch of this object and potentially lead to OOM. 2. The Finalizer will only be called in Full GC, and this will increase the overhead of Full GC and lead to long GC pause. https://bugs.openjdk.java.net/browse/JDK-8080225 https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful So to fix this potential issue, here propose to use NIO's Files#newInput/OutputStream instead in some critical paths like shuffle. Left unchanged FileInputStream in core which I think is not so critical: ``` ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:467: val file = new DataInputStream(new FileInputStream(filename)) ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:942: val in = new FileInputStream(new File(path)) ./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:76: val fileIn = new FileInputStream(file) ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:248: val fis = new FileInputStream(file) ./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:910: input = new FileInputStream(new File(t)) ./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:20:import java.io.{FileInputStream, InputStream} ./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:132: case Some(f) => new FileInputStream(f) ./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:20:import java.io.{FileInputStream, InputStream} ./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:77: val fis = new FileInputStream(f) ./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:27:import org.apache.spark.io.NioBufferedFileInputStream ./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:94: new DataInputStream(new NioBufferedFileInputStream(index)) ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:111: val channel = new FileInputStream(file).getChannel() ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:219: val channel = new FileInputStream(file).getChannel() ./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream} ./core/src/main/scala/org/apache/spark/TestUtils.scala:106: val in = new FileInputStream(file) ./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:89: inputStream = new FileInputStream(activeFile) ./core/src/main/scala/org/apache/spark/util/Utils.scala:329: if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream] ./core/src/main/scala/org/apache/spark/util/Utils.scala:332: val inChannel = in.asInstanceOf[FileInputStream].getChannel() ./core/src/main/scala/org/apache/spark/util/Utils.scala:1533: gzInputStream = new GZIPInputStream(new FileInputStream(file)) ./core/src/main/scala/org/apache/spark/util/Utils.scala:1560: new GZIPInputStream(new FileInputStream(file)) ./core/src/main/scala/org/apache/spark/util/Utils.scala:1562: new FileInputStream(file) ./core/src/main/scala/org/apache/spark/util/Utils.scala:2090: val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8) ``` Left unchanged FileOutputStream in core: ``` ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:957: val out = new FileOutputStream(file) ./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:20:import java.io.{DataOutputStream, File, FileOutputStream, IOException} ./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:131: val dos = new DataOutputStream(new FileOutputStream(f)) ./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:62: val fileOut = new FileOutputStream(file) ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:160: val outStream = new FileOutputStream(outPath) ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:239: val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false)) ./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:949: val out = new FileOutputStream(tempFile) ./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException} ./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:106: val out = new FileOutputStream(file, true) ./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:109: * Therefore, for local files, use FileOutputStream instead. */ ./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:112: new FileOutputStream(uri.getPath) ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:20:import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream} ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:71: private var fos: FileOutputStream = null ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:102: fos = new FileOutputStream(file, true) ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:213: var truncateStream: FileOutputStream = null ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:215: truncateStream = new FileOutputStream(file, true) ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:153: val out = new FileOutputStream(file).getChannel() ./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream} ./core/src/main/scala/org/apache/spark/TestUtils.scala:81: val jarStream = new JarOutputStream(new FileOutputStream(jarFile)) ./core/src/main/scala/org/apache/spark/TestUtils.scala:96: val jarFileStream = new FileOutputStream(jarFile) ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException} ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:31: volatile private var outputStream: FileOutputStream = null ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:97: outputStream = new FileOutputStream(file, true) ./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:90: gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile)) ./core/src/main/scala/org/apache/spark/util/Utils.scala:329: if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream] ./core/src/main/scala/org/apache/spark/util/Utils.scala:333: val outChannel = out.asInstanceOf[FileOutputStream].getChannel() ./core/src/main/scala/org/apache/spark/util/Utils.scala:527: val out = new FileOutputStream(tempFile) ``` Here in `DiskBlockObjectWriter`, it uses `FileDescriptor` so it is not easy to change to NIO Files API. For the `FileInputStream` and `FileOutputStream` in common/shuffle* I changed them all. ## How was this patch tested? Existing tests and manual verification. Author: jerryshao <sshao@hortonworks.com> Closes #18684 from jerryshao/SPARK-21475.
…from HasThreshold ## What changes were proposed in this pull request? GBTs inherit from HasStepSize & LInearSVC/Binarizer from HasThreshold ## How was this patch tested? existing tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Author: Ruifeng Zheng <ruifengz@foxmail.com> Closes #18612 from zhengruifeng/override_HasXXX.
… for Client Mode The fix deployed for SPARK-21541 resulted in the Application Master to set the final status of a spark application as Failed for the client mode as the flag 'registered' was not being set to true for client mode. So, in order to fix the issue, I have set the flag 'registered' as true in client mode on successfully registering Application Master. ## How was this patch tested? Tested the patch manually. Before: <img width="1275" alt="screen shot-before2" src="https://user-images.githubusercontent.com/22228190/28799641-02b5ed78-760f-11e7-9eb0-bf8407dad0ad.png"> After: <img width="1221" alt="screen shot-after2" src="https://user-images.githubusercontent.com/22228190/28799646-0ac9ef14-760f-11e7-8bf5-9dfd743d0f2f.png"> Please review http://spark.apache.org/contributing.html before opening a pull request. Author: pgandhi <pgandhi@yahoo-inc.com> Author: pgandhi999 <parthkgandhi9@gmail.com> Closes #18788 from pgandhi999/SPARK-21585.
Handle the case where the server closes the socket before the full message has been written by the client. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #18727 from vanzin/SPARK-21522.
…art. The main goal of this change is to avoid the situation described in the bug, where an AM restart in the middle of a job may cause no new executors to be allocated because of faulty logic in the reset path. The change does two things: - fixes the executor alloc manager's reset() so that it does not stop allocation after a reset() in the middle of a job - re-orders the initialization of the YarnAllocator class so that it fetches the current executor ID before triggering the reset() above. This ensures both that the new allocator gets new requests for executors, and that it starts from the correct executor id. Tested with unit tests and by manually causing AM restarts while running jobs using spark-shell in YARN mode. Closes #17882 Author: Marcelo Vanzin <vanzin@cloudera.com> Author: Guoqiang Li <witgo@qq.com> Closes #18663 from vanzin/SPARK-20079.
…tions in Maven build `scala-maven-plugin` in `incremental` mode compiles `Scala` and `Java` classes. There is no need to execute `maven-compiler-plugin` goals to compile (in fact recompile) `Java`. This change reduces compilation time (over 10% on my machine). Author: Grzegorz Slowikowski <gslowikowski@gmail.com> Closes #18750 from gslowikowski/remove-redundant-compilation-from-maven.
## What changes were proposed in this pull request? Fix 2 rendering errors on configuration doc page, due to SPARK-21243 and SPARK-15355. ## How was this patch tested? Manually built and viewed docs with jekyll Author: Sean Owen <sowen@cloudera.com> Closes #18793 from srowen/SPARK-21593.
…o classpath on windows The --packages option jars are getting added to the classpath with the scheme as "file:///", in Unix it doesn't have problem with this since the scheme contains the Unix Path separator which separates the jar name with location in the classpath. In Windows, the jar file is not getting resolved from the classpath because of the scheme. Windows : file:///C:/Users/<user>/.ivy2/jars/<jar-name>.jar Unix : file:///home/<user>/.ivy2/jars/<jar-name>.jar With this PR, we are avoiding the 'file://' scheme to get added to the packages jar files. I have verified manually in Windows and Unix environments, with the change it adds the jar to classpath like below, Windows : C:\Users\<user>\.ivy2\jars\<jar-name>.jar Unix : /home/<user>/.ivy2/jars/<jar-name>.jar Author: Devaraj K <devaraj@apache.org> Closes #18708 from devaraj-kavali/SPARK-21339.
## What changes were proposed in this pull request? When using PySpark broadcast variables in a multi-threaded environment, `SparkContext._pickled_broadcast_vars` becomes a shared resource. A race condition can occur when broadcast variables that are pickled from one thread get added to the shared ` _pickled_broadcast_vars` and become part of the python command from another thread. This PR introduces a thread-safe pickled registry using thread local storage so that when python command is pickled (causing the broadcast variable to be pickled and added to the registry) each thread will have their own view of the pickle registry to retrieve and clear the broadcast variables used. ## How was this patch tested? Added a unit test that causes this race condition using another thread. Author: Bryan Cutler <cutlerb@gmail.com> Closes #18695 from BryanCutler/pyspark-bcast-threadsafe-SPARK-12717.
### What changes were proposed in this pull request? The original error message is pretty confusing. It is unable to tell which number is `number of partitions` and which one is the `RDD ID`. This PR is to improve the checkpoint checking. ### How was this patch tested? N/A Author: gatorsmile <gatorsmile@gmail.com> Closes #18796 from gatorsmile/improveErrMsgForCheckpoint.
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Oct 19, 2019
### What changes were proposed in this pull request? Updated kubernetes client. ### Why are the changes needed? https://issues.apache.org/jira/browse/SPARK-27812 https://issues.apache.org/jira/browse/SPARK-27927 We need this fix fabric8io/kubernetes-client#1768 that was released on version 4.6 of the client. The root cause of the problem is better explained in apache#25785 ### Does this PR introduce any user-facing change? Nope, it should be transparent to users ### How was this patch tested? This patch was tested manually using a simple pyspark job ```python from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.getOrCreate() ``` The expected behaviour of this "job" is that both python's and jvm's process exit automatically after the main runs. This is the case for spark versions <= 2.4. On version 2.4.3, the jvm process hangs because there's a non daemon thread running ``` "OkHttp WebSocket https://10.96.0.1/..." #121 prio=5 os_prio=0 tid=0x00007fb27c005800 nid=0x24b waiting on condition [0x00007fb300847000] "OkHttp WebSocket https://10.96.0.1/..." #117 prio=5 os_prio=0 tid=0x00007fb28c004000 nid=0x247 waiting on condition [0x00007fb300e4b000] ``` This is caused by a bug on `kubernetes-client` library, which is fixed on the version that we are upgrading to. When the mentioned job is run with this patch applied, the behaviour from spark <= 2.4.3 is restored and both processes terminate successfully Closes apache#26093 from igorcalabria/k8s-client-update. Authored-by: igor.calabria <igor.calabria@ubee.in> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Jul 20, 2020
…or its output partitioning
### What changes were proposed in this pull request?
Currently, the `BroadcastHashJoinExec`'s `outputPartitioning` only uses the streamed side's `outputPartitioning`. However, if the join type of `BroadcastHashJoinExec` is an inner-like join, the build side's info (the join keys) can be added to `BroadcastHashJoinExec`'s `outputPartitioning`.
For example,
```Scala
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")
// join1 is a sort merge join.
val join1 = t1.join(t2, t1("i1") === t2("i2"))
// join2 is a broadcast join where t3 is broadcasted.
val join2 = join1.join(t3, join1("i1") === t3("i3"))
// Join on the column from the broadcasted side (i3).
val join3 = join2.join(t4, join2("i3") === t4("i4"))
join3.explain
```
You see that `Exchange hashpartitioning(i2#103, 200)` is introduced because there is no output partitioning info from the build side.
```
== Physical Plan ==
*(6) SortMergeJoin [i3#29], [i4#40], Inner
:- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i3#29, 200), true, [id=#55]
: +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight
: :- *(3) SortMergeJoin [i1#7], [i2#18], Inner
: : :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(i1#7, 200), true, [id=#28]
: : : +- LocalTableScan [i1#7, j1#8]
: : +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i2#18, 200), true, [id=#29]
: : +- LocalTableScan [i2#18, j2#19]
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34]
: +- LocalTableScan [i3#29, j3#30]
+- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i4#40, 200), true, [id=#39]
+- LocalTableScan [i4#40, j4#41]
```
This PR proposes to introduce output partitioning for the build side for `BroadcastHashJoinExec` if the streamed side has a `HashPartitioning` or a collection of `HashPartitioning`s.
There is a new internal config `spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit`, which can limit the number of partitioning a `HashPartitioning` can expand to. It can be set to "0" to disable this feature.
### Why are the changes needed?
To remove unnecessary shuffle.
### Does this PR introduce _any_ user-facing change?
Yes, now the shuffle in the above example can be eliminated:
```
== Physical Plan ==
*(5) SortMergeJoin [i3#108], [i4#119], Inner
:- *(3) Sort [i3#108 ASC NULLS FIRST], false, 0
: +- *(3) BroadcastHashJoin [i1#86], [i3#108], Inner, BuildRight
: :- *(3) SortMergeJoin [i1#86], [i2#97], Inner
: : :- *(1) Sort [i1#86 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(i1#86, 200), true, [id=#120]
: : : +- LocalTableScan [i1#86, j1#87]
: : +- *(2) Sort [i2#97 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i2#97, 200), true, [id=#121]
: : +- LocalTableScan [i2#97, j2#98]
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#126]
: +- LocalTableScan [i3#108, j3#109]
+- *(4) Sort [i4#119 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i4#119, 200), true, [id=#130]
+- LocalTableScan [i4#119, j4#120]
```
### How was this patch tested?
Added new tests.
Closes apache#28676 from imback82/broadcast_join_output.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.