diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java index 1fb987a8a7aa7..1ed57116bc7bf 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java @@ -105,7 +105,7 @@ private class ExtendedChannelPromise extends DefaultChannelPromise { private List listeners = new ArrayList<>(); private boolean success; - public ExtendedChannelPromise(Channel channel) { + ExtendedChannelPromise(Channel channel) { super(channel); success = false; } @@ -127,7 +127,9 @@ public void finish(boolean success) { listeners.forEach(listener -> { try { listener.operationComplete(this); - } catch (Exception e) { } + } catch (Exception e) { + // do nothing + } }); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 338faaadb33d4..da6c55d9b8ac3 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -120,14 +120,16 @@ final class ShuffleExternalSorter extends MemoryConsumer { this.taskContext = taskContext; this.numPartitions = numPartitions; // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - this.fileBufferSizeBytes = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; + this.fileBufferSizeBytes = + (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; this.numElementsForSpillThreshold = conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024); this.writeMetrics = writeMetrics; this.inMemSorter = new ShuffleInMemorySorter( this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true)); this.peakMemoryUsedBytes = getMemoryUsage(); - this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); + this.diskWriteBufferSize = + (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index fd1521193fdee..3721b98d68685 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -514,7 +514,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S outStream.println( s""" |Options: - | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. + | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local + | (Default: local[*]). | --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or | on one of the worker machines inside the cluster ("cluster") | (Default: client). diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2985c90119468..5435f59ea0d28 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -55,7 +55,7 @@ import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, Poi * Doubles; and * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that * can be saved as SequenceFiles. - * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] + * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) * through implicit. * * Internally, each RDD is characterized by five main properties: diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index fc925022b2718..ca6a3ef3ebbb5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -90,6 +90,9 @@ private[spark] class ApplicationMaster( @volatile private var reporterThread: Thread = _ @volatile private var allocator: YarnAllocator = _ + // A flag to check whether user has initialized spark context + @volatile private var registered = false + private val userClassLoader = { val classpath = Client.getUserClasspath(sparkConf) val urls = classpath.map { entry => @@ -319,7 +322,7 @@ private[spark] class ApplicationMaster( */ final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { synchronized { - if (!unregistered) { + if (registered && !unregistered) { logInfo(s"Unregistering ApplicationMaster with $status" + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) unregistered = true @@ -332,10 +335,15 @@ private[spark] class ApplicationMaster( synchronized { if (!finished) { val inShutdown = ShutdownHookManager.inShutdown() - logInfo(s"Final app status: $status, exitCode: $code" + + if (registered) { + exitCode = code + finalStatus = status + } else { + finalStatus = FinalApplicationStatus.FAILED + exitCode = ApplicationMaster.EXIT_SC_NOT_INITED + } + logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) - exitCode = code - finalStatus = status finalMsg = msg finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { @@ -439,12 +447,11 @@ private[spark] class ApplicationMaster( sc.getConf.get("spark.driver.port"), isClusterMode = true) registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr) + registered = true } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. - if (!finished) { - throw new IllegalStateException("SparkContext is null but app is still running!") - } + throw new IllegalStateException("User did not initialize spark context!") } userClassThread.join() } catch { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java index 31dea6ad31b12..59d66c599c518 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java @@ -322,7 +322,7 @@ public ArrowColumnVector(ValueVector vector) { anyNullsSet = numNulls > 0; } - private static abstract class ArrowVectorAccessor { + private abstract static class ArrowVectorAccessor { private final ValueVector vector; private final ValueVector.Accessor nulls;