Skip to content
Merged

sync #14

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b333ed0
[SPARK-31923][CORE] Ignore internal accumulators that use unrecognize…
zsxwing Jun 8, 2020
37b7d32
[SPARK-30845] Do not upload local pyspark archives for spark-submit o…
shanyu Jun 8, 2020
06959eb
[SPARK-31934][BUILD] Remove set -x from docker image tool
holdenk Jun 8, 2020
e289140
[SPARK-31849][PYTHON][SQL][FOLLOW-UP] More correct error message in P…
HyukjinKwon Jun 9, 2020
8305b77
[SPARK-28199][SPARK-28199][SS][FOLLOWUP] Mention the change of into t…
HeartSaVioR Jun 9, 2020
ddd8d5f
[SPARK-31932][SQL][TESTS] Add date/timestamp benchmarks for `HiveResu…
MaxGekk Jun 9, 2020
ca2cfd4
[SPARK-31906][SQL][DOCS] Enhance comments in NamedExpression.qualifier
zhulipeng Jun 9, 2020
6befb2d
[SPARK-31486][CORE] spark.submit.waitAppCompletion flag to control sp…
akshatb1 Jun 9, 2020
de91915
[SPARK-31940][SQL][DOCS] Document the default JVM time zone in to/fro…
MaxGekk Jun 9, 2020
717ec5e
[SPARK-29295][SQL][FOLLOWUP] Dynamic partition map parsed from partit…
turboFei Jun 9, 2020
1d1eacd
[SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiv…
wangyum Jun 9, 2020
38873d5
[SPARK-31921][CORE] Fix the wrong warning: "App app-xxx requires more…
Ngone51 Jun 9, 2020
02f32cf
[SPARK-31926][SQL][TEST-HIVE1.2] Fix concurrency issue for ThriftCLIS…
yaooqinn Jun 9, 2020
6a424b9
[SPARK-31830][SQL] Consistent error handling for datetime formatting …
yaooqinn Jun 9, 2020
f3771c6
[SPARK-31935][SQL] Hadoop file system config should be effective in d…
gengliangwang Jun 9, 2020
e14029b
[SPARK-26905][SQL] Add `TYPE` in the ANSI non-reserved list
maropu Jun 10, 2020
032d179
[SPARK-31945][SQL][PYSPARK] Enable cache for the same Python function
ueshin Jun 10, 2020
8490eab
[SPARK-31486][CORE][FOLLOW-UP] Use ConfigEntry for config "spark.stan…
Ngone51 Jun 10, 2020
82ff29b
[SPARK-31941][CORE] Replace SparkException to NoSuchElementException …
SaurabhChawla100 Jun 10, 2020
43063e2
[SPARK-27217][SQL] Nested column aliasing for more operators which ca…
viirya Jun 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
# This script builds and pushes docker images when run from a release of Spark
# with Kubernetes support.

set -x

function error {
echo "$@" 1>&2
exit 1
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,25 @@ private[spark] class PythonRDD(
* runner.
*/
private[spark] case class PythonFunction(
command: Array[Byte],
command: Seq[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: PythonAccumulatorV2)
accumulator: PythonAccumulatorV2) {

def this(
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: PythonAccumulatorV2) = {
this(command.toSeq, envVars, pythonIncludes, pythonExec, pythonVer, broadcastVars, accumulator)
}
}

/**
* A wrapper for chained Python functions (from bottom to top).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions])
protected override def writeCommand(dataOut: DataOutputStream): Unit = {
val command = funcs.head.funcs.head.command
dataOut.writeInt(command.length)
dataOut.write(command)
dataOut.write(command.toArray)
}

protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
Expand Down
94 changes: 68 additions & 26 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy

import java.util.concurrent.TimeUnit

import scala.collection.mutable.HashSet
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
Expand All @@ -27,6 +29,7 @@ import org.apache.log4j.Logger
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.resource.ResourceUtils
Expand Down Expand Up @@ -61,6 +64,11 @@ private class ClientEndpoint(

private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
private val waitAppCompletion = conf.get(config.STANDALONE_SUBMIT_WAIT_APP_COMPLETION)
private val REPORT_DRIVER_STATUS_INTERVAL = 10000
private var submittedDriverID = ""
private var driverStatusReported = false


private def getProperty(key: String, conf: SparkConf): Option[String] = {
sys.props.get(key).orElse(conf.getOption(key))
Expand Down Expand Up @@ -107,8 +115,13 @@ private class ClientEndpoint(

case "kill" =>
val driverId = driverArgs.driverId
submittedDriverID = driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
logInfo("... waiting before polling master for driver state")
forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
monitorDriverStatus()
}, 5000, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)
}

/**
Expand All @@ -124,58 +137,87 @@ private class ClientEndpoint(
}
}

/* Find out driver status then exit the JVM */
def pollAndReportStatus(driverId: String): Unit = {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
logInfo("... waiting before polling master for driver state")
Thread.sleep(5000)
logInfo("... polling master for driver state")
val statusResponse =
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
if (statusResponse.found) {
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
private def monitorDriverStatus(): Unit = {
if (submittedDriverID != "") {
asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID))
}
}

/**
* Processes and reports the driver status then exit the JVM if the
* waitAppCompletion is set to false, else reports the driver status
* if debug logs are enabled.
*/

def reportDriverStatus(
found: Boolean,
state: Option[DriverState],
workerId: Option[String],
workerHostPort: Option[String],
exception: Option[Exception]): Unit = {
if (found) {
// Using driverStatusReported to avoid writing following
// logs again when waitAppCompletion is set to true
if (!driverStatusReported) {
driverStatusReported = true
logInfo(s"State of $submittedDriverID is ${state.get}")
// Worker node, if present
(workerId, workerHostPort, state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
}
// Exception, if present
statusResponse.exception match {
exception match {
case Some(e) =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
case _ =>
System.exit(0)
state.get match {
case DriverState.FINISHED | DriverState.FAILED |
DriverState.ERROR | DriverState.KILLED =>
logInfo(s"State of driver $submittedDriverID is ${state.get}, " +
s"exiting spark-submit JVM.")
System.exit(0)
case _ =>
if (!waitAppCompletion) {
logInfo(s"spark-submit not configured to wait for completion, " +
s"exiting spark-submit JVM.")
System.exit(0)
} else {
logDebug(s"State of driver $submittedDriverID is ${state.get}, " +
s"continue monitoring driver status.")
}
}
}
} else {
logError(s"ERROR: Cluster master did not recognize $submittedDriverID")
System.exit(-1)
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
}
}

override def receive: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(master, success, driverId, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId.get)
submittedDriverID = driverId.get
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}


case KillDriverResponse(master, driverId, success, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}

case DriverStatusResponse(found, state, workerId, workerHostPort, exception) =>
reportDriverStatus(found, state, workerId, workerHostPort, exception)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ private[deploy] class Master(
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canLaunchExecutor(_, app.desc))
.sortBy(_.coresFree).reverse
if (waitingApps.length == 1 && usableWorkers.isEmpty) {
val appMayHang = waitingApps.length == 1 &&
waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
if (appMayHang) {
logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
}
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1864,4 +1864,13 @@ package object config {
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STANDALONE_SUBMIT_WAIT_APP_COMPLETION =
ConfigBuilder("spark.standalone.submit.waitAppCompletion")
.doc("In standalone cluster mode, controls whether the client waits to exit until the " +
"application completes. If set to true, the client process will stay alive polling " +
"the driver's status. Otherwise, the client process will exit after submission.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class AppStatusStore(
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
} catch {
case _: NoSuchElementException =>
throw new SparkException("Failed to get the application information. " +
throw new NoSuchElementException("Failed to get the application information. " +
"If you are starting up Spark, please wait a while until it's ready.")
}
}
Expand Down
20 changes: 15 additions & 5 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,22 @@ private[spark] object JsonProtocol {
case v: Long => JInt(v)
// We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
// the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]`
case v =>
JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map {
case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
case v: java.util.List[_] =>
JArray(v.asScala.toList.flatMap {
case (id: BlockId, status: BlockStatus) =>
Some(
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
)
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should
// not crash.
None
})
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not
// crash.
JNothing
}
} else {
// For all external accumulators, just use strings
Expand Down
48 changes: 48 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,54 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), 123, JString("123"))
}

/** Create an AccumulableInfo and verify we can serialize and deserialize it. */
private def testAccumulableInfo(
name: String,
value: Option[Any],
expectedValue: Option[Any]): Unit = {
val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
val accum = AccumulableInfo(
123L,
Some(name),
update = value,
value = value,
internal = isInternal,
countFailedValues = false)
val json = JsonProtocol.accumulableInfoToJson(accum)
val newAccum = JsonProtocol.accumulableInfoFromJson(json)
assert(newAccum == accum.copy(update = expectedValue, value = expectedValue))
}

test("SPARK-31923: unexpected value type of internal accumulator") {
// Because a user may use `METRICS_PREFIX` in an accumulator name, we should test unexpected
// types to make sure we don't crash.
import InternalAccumulator.METRICS_PREFIX
testAccumulableInfo(
METRICS_PREFIX + "fooString",
value = Some("foo"),
expectedValue = None)
testAccumulableInfo(
METRICS_PREFIX + "fooList",
value = Some(java.util.Arrays.asList("string")),
expectedValue = Some(java.util.Collections.emptyList())
)
val blocks = Seq(
(TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
(TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
testAccumulableInfo(
METRICS_PREFIX + "fooList",
value = Some(java.util.Arrays.asList(
"string",
blocks(0),
blocks(1))),
expectedValue = Some(blocks.asJava)
)
testAccumulableInfo(
METRICS_PREFIX + "fooSet",
value = Some(Set("foo")),
expectedValue = None)
}

test("SPARK-30936: forwards compatibility - ignore unknown fields") {
val expected = TestListenerEvent("foo", 123)
val unknownFieldsJson =
Expand Down
19 changes: 19 additions & 0 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,25 @@ To run an interactive Spark shell against the cluster, run the following command

You can also pass an option `--total-executor-cores <numCores>` to control the number of cores that spark-shell uses on the cluster.

# Client Properties

Spark applications supports the following configuration properties specific to standalone mode:

<table class="table">
<tr><th style="width:21%">Property Name</th><th>Default Value</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.standalone.submit.waitAppCompletion</code></td>
<td><code>false</code></td>
<td>
In standalone cluster mode, controls whether the client waits to exit until the application completes.
If set to <code>true</code>, the client process will stay alive polling the driver's status.
Otherwise, the client process will exit after submission.
</td>
<td>3.1.0</td>
</tr>
</table>


# Launching Spark Applications

The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ license: |
- In Spark 3.1, grouping_id() returns long values. In Spark version 3.0 and earlier, this function returns int values. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.integerGroupingId` to `true`.

- In Spark 3.1, SQL UI data adopts the `formatted` mode for the query plan explain results. To restore the behavior before Spark 3.0, you can set `spark.sql.ui.explainMode` to `extended`.

- In Spark 3.1, `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date` will fail if the specified datetime pattern is invalid. In Spark 3.0 or earlier, they result `NULL`.

## Upgrading from Spark SQL 2.4 to 3.0

Expand Down
2 changes: 2 additions & 0 deletions docs/ss-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.
- In Spark 3.0, Structured Streaming forces the source schema into nullable when file-based datasources such as text, json, csv, parquet and orc are used via `spark.readStream(...)`. Previously, it respected the nullability in source schema; however, it caused issues tricky to debug with NPE. To restore the previous behavior, set `spark.sql.streaming.fileSource.schema.forceNullable` to `false`.

- Spark 3.0 fixes the correctness issue on Stream-stream outer join, which changes the schema of state. (See [SPARK-26154](https://issues.apache.org/jira/browse/SPARK-26154) for more details). If you start your query from checkpoint constructed from Spark 2.x which uses stream-stream outer join, Spark 3.0 fails the query. To recalculate outputs, discard the checkpoint and replay previous inputs.

- In Spark 3.0, the deprecated class `org.apache.spark.sql.streaming.ProcessingTime` has been removed. Use `org.apache.spark.sql.streaming.Trigger.ProcessingTime` instead. Likewise, `org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger` has been removed in favor of `Trigger.Continuous`, and `org.apache.spark.sql.execution.streaming.OneTimeTrigger` has been hidden in favor of `Trigger.Once`.
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ object SparkParallelTestGrouping {
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite",
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite",
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite",
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInBinarySuite",
"org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite"
)

Expand Down
9 changes: 9 additions & 0 deletions python/pyspark/sql/tests/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,15 @@ def f(*a):
r = df.select(fUdf(*df.columns))
self.assertEqual(r.first()[0], "success")

def test_udf_cache(self):
func = lambda x: x

df = self.spark.range(1)
df.select(udf(func)("id")).cache()

self.assertEqual(df.select(udf(func)("id"))._jdf.queryExecution()
.withCachedData().getClass().getSimpleName(), 'InMemoryRelation')


class UDFInitializationTests(unittest.TestCase):
def tearDown(self):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ def convert_exception(e):
# To make sure this only catches Python UDFs.
and any(map(lambda v: "org.apache.spark.sql.execution.python" in v.toString(),
c.getStackTrace()))):
msg = ("\n An exception was thrown from Python worker in the executor. "
"The below is the Python worker stacktrace.\n%s" % c.getMessage())
msg = ("\n An exception was thrown from the Python worker. "
"Please see the stack trace below.\n%s" % c.getMessage())
return PythonException(msg, stacktrace)
return UnknownException(s, stacktrace, c)

Expand Down
Loading