From 63d168cbb8ebe80bfcf91a4d5070f1d9f3ec7aca Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 28 Jul 2017 11:31:40 +0100 Subject: [PATCH 1/4] [MINOR][BUILD] Fix current lint-java failures ## What changes were proposed in this pull request? Fixes current failures in dev/lint-java ## How was this patch tested? Existing linter, tests. Author: Sean Owen Closes #18757 from srowen/LintJava. --- .../apache/spark/network/TransportRequestHandlerSuite.java | 6 ++++-- .../apache/spark/shuffle/sort/ShuffleExternalSorter.java | 6 ++++-- .../spark/sql/execution/vectorized/ArrowColumnVector.java | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java index 1fb987a8a7aa7..1ed57116bc7bf 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java @@ -105,7 +105,7 @@ private class ExtendedChannelPromise extends DefaultChannelPromise { private List listeners = new ArrayList<>(); private boolean success; - public ExtendedChannelPromise(Channel channel) { + ExtendedChannelPromise(Channel channel) { super(channel); success = false; } @@ -127,7 +127,9 @@ public void finish(boolean success) { listeners.forEach(listener -> { try { listener.operationComplete(this); - } catch (Exception e) { } + } catch (Exception e) { + // do nothing + } }); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 338faaadb33d4..da6c55d9b8ac3 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -120,14 +120,16 @@ final class ShuffleExternalSorter extends MemoryConsumer { this.taskContext = taskContext; this.numPartitions = numPartitions; // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - this.fileBufferSizeBytes = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; + this.fileBufferSizeBytes = + (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; this.numElementsForSpillThreshold = conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024); this.writeMetrics = writeMetrics; this.inMemSorter = new ShuffleInMemorySorter( this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true)); this.peakMemoryUsedBytes = getMemoryUsage(); - this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); + this.diskWriteBufferSize = + (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); } /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java index 31dea6ad31b12..59d66c599c518 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java @@ -322,7 +322,7 @@ public ArrowColumnVector(ValueVector vector) { anyNullsSet = numNulls > 0; } - private static abstract class ArrowVectorAccessor { + private abstract static class ArrowVectorAccessor { private final ValueVector vector; private final ValueVector.Accessor nulls; From 784680903cdf003ac965b2d97b3f793324909bd3 Mon Sep 17 00:00:00 2001 From: davidxdh Date: Fri, 28 Jul 2017 15:21:45 +0100 Subject: [PATCH 2/4] [SPARK-21553][SPARK SHELL] Add the description of the default value of master parameter in the spark-shell When I type spark-shell --help, I find that the default value description for the master parameter is missing. The user does not know what the default value is when the master parameter is not included, so we need to add the master parameter default description to the help information. [https://issues.apache.org/jira/browse/SPARK-21553](https://issues.apache.org/jira/browse/SPARK-21553) Author: davidxdh Author: Donghui Xu Closes #18755 from davidxdh/dev_0728. --- .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index fd1521193fdee..3721b98d68685 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -514,7 +514,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S outStream.println( s""" |Options: - | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. + | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local + | (Default: local[*]). | --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or | on one of the worker machines inside the cluster ("cluster") | (Default: client). From 69ab0e4bddccb461f960fcb48a390a1517e504dd Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 28 Jul 2017 09:23:08 -0500 Subject: [PATCH 3/4] [SPARK-21541][YARN] Spark Logs show incorrect job status for a job that does not create SparkContext If you run a spark job without creating the SparkSession or SparkContext, the spark job logs says it succeeded but yarn says it fails and retries 3 times. Also, since, Application Master unregisters with Resource Manager and exits successfully, it deletes the spark staging directory, so when yarn makes subsequent retries, it fails to find the staging directory and thus, the retries fail. Added a flag to check whether user has initialized SparkContext. If it is true, we let Application Master unregister with Resource Manager else, we do not let AM unregister with RM. ## How was this patch tested? Manually tested the fix. Before: screen shot-before After: screen shot-after Please review http://spark.apache.org/contributing.html before opening a pull request. Author: pgandhi Author: pgandhi999 Closes #18741 from pgandhi999/SPARK-21541. --- .../spark/deploy/yarn/ApplicationMaster.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index fc925022b2718..ca6a3ef3ebbb5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -90,6 +90,9 @@ private[spark] class ApplicationMaster( @volatile private var reporterThread: Thread = _ @volatile private var allocator: YarnAllocator = _ + // A flag to check whether user has initialized spark context + @volatile private var registered = false + private val userClassLoader = { val classpath = Client.getUserClasspath(sparkConf) val urls = classpath.map { entry => @@ -319,7 +322,7 @@ private[spark] class ApplicationMaster( */ final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { synchronized { - if (!unregistered) { + if (registered && !unregistered) { logInfo(s"Unregistering ApplicationMaster with $status" + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) unregistered = true @@ -332,10 +335,15 @@ private[spark] class ApplicationMaster( synchronized { if (!finished) { val inShutdown = ShutdownHookManager.inShutdown() - logInfo(s"Final app status: $status, exitCode: $code" + + if (registered) { + exitCode = code + finalStatus = status + } else { + finalStatus = FinalApplicationStatus.FAILED + exitCode = ApplicationMaster.EXIT_SC_NOT_INITED + } + logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) - exitCode = code - finalStatus = status finalMsg = msg finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { @@ -439,12 +447,11 @@ private[spark] class ApplicationMaster( sc.getConf.get("spark.driver.port"), isClusterMode = true) registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr) + registered = true } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. - if (!finished) { - throw new IllegalStateException("SparkContext is null but app is still running!") - } + throw new IllegalStateException("User did not initialize spark context!") } userClassThread.join() } catch { From 0ef9fe64e2d59bdf98c0f4729e5017d258f7c07d Mon Sep 17 00:00:00 2001 From: Johan Grande Date: Fri, 28 Jul 2017 16:51:18 +0100 Subject: [PATCH 4/4] Typo in comment - Author: Johan Grande Closes #18738 from nahoj/patch-1. --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2985c90119468..5435f59ea0d28 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -55,7 +55,7 @@ import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, Poi * Doubles; and * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that * can be saved as SequenceFiles. - * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] + * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) * through implicit. * * Internally, each RDD is characterized by five main properties: