Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e841fa3
[SPARK-37961][SQL] Override maxRows/maxRowsPerPartition for some logi…
zhengruifeng Jun 16, 2022
264d8fd
[SPARK-39493][BUILD] Update ORC to 1.7.5
williamhyun Jun 16, 2022
75b3965
[SPARK-39495][SQL][TESTS] Support `SPARK_TEST_HIVE_CLIENT_VERSIONS` f…
dongjoon-hyun Jun 17, 2022
e5febda
[SPARK-39501][TESTS] Propagate `java.net.preferIPv6Addresses=true` in…
dongjoon-hyun Jun 17, 2022
9cc7cdc
[SPARK-39502][BUILD] Downgrade scala-maven-plugin to 4.6.1
LuciferYang Jun 17, 2022
37dbf1c
[SPARK-39505][UI] Escape log content rendered in UI
srowen Jun 17, 2022
c4d5390
[SPARK-39496][SQL] Handle null struct in `Inline.eval`
bersprockets Jun 18, 2022
e125c48
[SPARK-39456][DOCS][PYTHON] Fix broken function links in the auto-gen…
beobest2 Jun 18, 2022
0f2fe34
[SPARK-39492][SQL] Rework MISSING_COLUMN
srielau Jun 18, 2022
a859dd2
[SPARK-39509][INFRA] Support `DEFAULT_ARTIFACT_REPOSITORY` in `check-…
dongjoon-hyun Jun 18, 2022
362f27f
[SPARK-39507][CORE] `SocketAuthServer` should respect Java IPv6 options
dongjoon-hyun Jun 19, 2022
27ed89b
[SPARK-38775][ML] cleanup validation functions
zhengruifeng Jun 19, 2022
be5c85c
[SPARK-36979][SQL][TESTS][FOLLOWUP] Move the test from `SQLQuerySuite…
wangyum Jun 19, 2022
0226d18
[SPARK-39514][CORE][TESTS] `LauncherBackendSuite` should add `java.ne…
dongjoon-hyun Jun 19, 2022
77339dc
[SPARK-39508][CORE][PYTHON] Support IPv6 between JVM and Python Daemo…
dongjoon-hyun Jun 20, 2022
7c9b505
[SPARK-39517][INFRA] Recover branch-3.2 build broken by is-changed.py…
HyukjinKwon Jun 20, 2022
38e1750
[SPARK-39516][INFRA] Set a scheduled build for branch-3.3
HyukjinKwon Jun 20, 2022
1128cc0
[SPARK-39491][YARN] Fix yarn module compilation error with `-Phadoop-…
LuciferYang Jun 20, 2022
540e695
[SPARK-39464][CORE][TESTS][FOLLOWUP] Use Utils.localHostNameForURI in…
LuciferYang Jun 20, 2022
2416a8d
Combine configure-jobs and precondition
EnricoMi Jun 20, 2022
16bdc53
Move all logic (conditions) into configure-jobs
EnricoMi Jun 20, 2022
8d2b579
Fix substitution in echo
EnricoMi Jun 20, 2022
b7a5440
Fix envs output
EnricoMi Jun 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 128 additions & 102 deletions .github/workflows/build_and_test.yml

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,6 @@
"Key <keyValue> does not exist. Use `try_element_at` to tolerate non-existent key and return NULL instead. If necessary set <config> to \"false\" to bypass this error."
]
},
"MISSING_COLUMN" : {
"message" : [
"Column '<columnName>' does not exist. Did you mean one of the following? [<proposal>]"
],
"sqlState" : "42000"
},
"MISSING_STATIC_PARTITION_COLUMN" : {
"message" : [
"Unknown static partition column: <columnName>"
Expand Down Expand Up @@ -352,6 +346,12 @@
],
"sqlState" : "42000"
},
"UNRESOLVED_COLUMN" : {
"message" : [
"A column or function parameter with name <objectName> cannot be resolved. Did you mean one of the following? [<objectList>]"
],
"sqlState" : "42000"
},
"UNSUPPORTED_DATATYPE" : {
"message" : [
"Unsupported data type <typeName>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ function loadMore() {
if (retStartByte == 0) {
disableMoreButton();
}
$("pre", ".log-content").prepend(cleanData);
$("pre", ".log-content").prepend(document.createTextNode(cleanData));

curLogLength = curLogLength + (startByte - retStartByte);
startByte = retStartByte;
Expand Down Expand Up @@ -115,7 +115,7 @@ function loadNew() {
var retLogLength = dataInfo[2];

var cleanData = data.substring(newlineIndex + 1);
$("pre", ".log-content").append(cleanData);
$("pre", ".log-content").append(document.createTextNode(cleanData));

curLogLength = curLogLength + (retEndByte - retStartByte);
endByte = retEndByte;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

@GuardedBy("self")
private var daemon: Process = null
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
val daemonHost = InetAddress.getLoopbackAddress()
@GuardedBy("self")
private var daemonPort: Int = 0
@GuardedBy("self")
Expand Down Expand Up @@ -153,7 +153,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
private def createSimpleWorker(): (Socket, Option[Int]) = {
var serverSocket: ServerSocket = null
try {
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress())

// Create and start the worker
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule))
Expand All @@ -164,6 +164,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
workerEnv.put("PYTHONUNBUFFERED", "YES")
workerEnv.put("PYTHON_WORKER_FACTORY_PORT", serverSocket.getLocalPort.toString)
workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
if (Utils.preferIPv6) {
workerEnv.put("SPARK_PREFER_IPV6", "True")
}
val worker = pb.start()

// Redirect worker stdout and stderr
Expand Down Expand Up @@ -211,6 +214,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
if (Utils.preferIPv6) {
workerEnv.put("SPARK_PREFER_IPV6", "True")
}
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
daemon = pb.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[spark] abstract class SocketAuthServer[T](

private def startServer(): (Int, String) = {
logTrace("Creating listening socket")
val serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
val serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress())
// Close the socket if no connection in the configured seconds
val timeout = authHelper.conf.get(PYTHON_AUTH_SOCKET_TIMEOUT).toInt
logTrace(s"Setting timeout to $timeout sec")
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1988,6 +1988,11 @@ private[spark] object Utils extends Logging {
*/
val isMacOnAppleSilicon = SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64")

/**
* Whether the underlying JVM prefer IPv6 addresses.
*/
val preferIPv6 = "true".equals(System.getProperty("java.net.preferIPv6Addresses"))

/**
* Pattern for matching a Windows drive, which contains only a single alphabet character.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class SparkThrowableSuite extends SparkFunSuite {
test("Check if message parameters match message format") {
// Requires 2 args
intercept[IllegalFormatException] {
getMessage("MISSING_COLUMN", null, Array.empty)
getMessage("UNRESOLVED_COLUMN", null, Array.empty)
}

// Does not fail with too many args (expects 0 args)
Expand All @@ -172,8 +172,9 @@ class SparkThrowableSuite extends SparkFunSuite {
}

test("Error message is formatted") {
assert(getMessage("MISSING_COLUMN", null, Array("foo", "bar, baz")) ==
"[MISSING_COLUMN] Column 'foo' does not exist. Did you mean one of the following? [bar, baz]")
assert(getMessage("UNRESOLVED_COLUMN", null, Array("`foo`", "`bar`, `baz`")) ==
"[UNRESOLVED_COLUMN] A column or function parameter with name `foo` cannot be resolved. " +
"Did you mean one of the following? [`bar`, `baz`]")
}

test("Try catching legacy SparkError") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.util.Utils
import org.apache.spark.util.{DependencyUtils, Utils}

class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {

Expand Down Expand Up @@ -304,4 +304,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
s" Resolved jars are: $jarPath")
}
}

test("SPARK-39501: Resolve maven dependenicy in IPv6") {
assume(Utils.preferIPv6)
DependencyUtils.resolveMavenDependencies(
URI.create("ivy://org.apache.logging.log4j:log4j-api:2.17.2"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with

private var provider: FsHistoryProvider = null
private var server: HistoryServer = null
private val localhost: String = Utils.localCanonicalHostName()
private val localhost: String = Utils.localHostNameForURI()
private var port: Int = -1

protected def diskBackend: HybridStoreDiskBackend.Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ class MasterSuite extends SparkFunSuite
val conf = new SparkConf()
val localCluster = LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
val masterUrl = s"http://${Utils.localCanonicalHostName()}:${localCluster.masterWebUIPort}"
val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
try {
eventually(timeout(50.seconds), interval(100.milliseconds)) {
val json = Utils
Expand Down Expand Up @@ -362,7 +362,7 @@ class MasterSuite extends SparkFunSuite
conf.set(UI_REVERSE_PROXY, true)
val localCluster = LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
val masterUrl = s"http://${Utils.localCanonicalHostName()}:${localCluster.masterWebUIPort}"
val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
try {
eventually(timeout(50.seconds), interval(100.milliseconds)) {
val json = Utils
Expand Down Expand Up @@ -400,7 +400,7 @@ class MasterSuite extends SparkFunSuite
conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl)
val localCluster = LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
val masterUrl = s"http://${Utils.localCanonicalHostName()}:${localCluster.masterWebUIPort}"
val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
try {
eventually(timeout(50.seconds), interval(100.milliseconds)) {
val json = Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MasterWebUISuite extends SparkFunSuite with BeforeAndAfterAll {

when(master.idToApp).thenReturn(HashMap[String, ApplicationInfo]((activeApp.id, activeApp)))

val url = s"http://${Utils.localCanonicalHostName()}:${masterWebUI.boundPort}/app/kill/"
val url = s"http://${Utils.localHostNameForURI()}:${masterWebUI.boundPort}/app/kill/"
val body = convPostDataToString(Map(("id", activeApp.id), ("terminate", "true")))
val conn = sendHttpRequest(url, "POST", body)
conn.getResponseCode
Expand All @@ -80,7 +80,7 @@ class MasterWebUISuite extends SparkFunSuite with BeforeAndAfterAll {

test("kill driver") {
val activeDriverId = "driver-0"
val url = s"http://${Utils.localCanonicalHostName()}:${masterWebUI.boundPort}/driver/kill/"
val url = s"http://${Utils.localHostNameForURI()}:${masterWebUI.boundPort}/driver/kill/"
val body = convPostDataToString(Map(("id", activeDriverId), ("terminate", "true")))
val conn = sendHttpRequest(url, "POST", body)
conn.getResponseCode
Expand All @@ -90,7 +90,7 @@ class MasterWebUISuite extends SparkFunSuite with BeforeAndAfterAll {
}

private def testKillWorkers(hostnames: Seq[String]): Unit = {
val url = s"http://${Utils.localCanonicalHostName()}:${masterWebUI.boundPort}/workers/kill/"
val url = s"http://${Utils.localHostNameForURI()}:${masterWebUI.boundPort}/workers/kill/"
val body = convPostDataToString(hostnames.map(("host", _)))
val conn = sendHttpRequest(url, "POST", body)
// The master is mocked here, so cannot assert on the response code
Expand All @@ -100,7 +100,7 @@ class MasterWebUISuite extends SparkFunSuite with BeforeAndAfterAll {
}

test("Kill one host") {
testKillWorkers(Seq("${Utils.localCanonicalHostName()}"))
testKillWorkers(Seq(s"${Utils.localHostNameForURI()}"))
}

test("Kill multiple hosts") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers {
.setSparkHome(sys.props("spark.test.home"))
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.setConf(UI_ENABLED.key, "false")
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console")
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
s"-Dtest.appender=console -Djava.net.preferIPv6Addresses=${Utils.preferIPv6}")
.setMaster(master)
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(TestApp.getClass.getName().stripSuffix("$"))
Expand Down
2 changes: 1 addition & 1 deletion dev/check-license
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

acquire_rat_jar () {

URL="https://repo.maven.apache.org/maven2/org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"
URL="${DEFAULT_ARTIFACT_REPOSITORY:-https://repo1.maven.org/maven2/}org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"

JAR="$rat_jar"

Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ objenesis/3.2//objenesis-3.2.jar
okhttp/3.12.12//okhttp-3.12.12.jar
okio/1.14.0//okio-1.14.0.jar
opencsv/2.3//opencsv-2.3.jar
orc-core/1.7.4//orc-core-1.7.4.jar
orc-mapreduce/1.7.4//orc-mapreduce-1.7.4.jar
orc-shims/1.7.4//orc-shims-1.7.4.jar
orc-core/1.7.5//orc-core-1.7.5.jar
orc-mapreduce/1.7.5//orc-mapreduce-1.7.5.jar
orc-shims/1.7.5//orc-shims-1.7.5.jar
oro/2.0.8//oro-2.0.8.jar
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
paranamer/2.8//paranamer-2.8.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ opencsv/2.3//opencsv-2.3.jar
opentracing-api/0.33.0//opentracing-api-0.33.0.jar
opentracing-noop/0.33.0//opentracing-noop-0.33.0.jar
opentracing-util/0.33.0//opentracing-util-0.33.0.jar
orc-core/1.7.4//orc-core-1.7.4.jar
orc-mapreduce/1.7.4//orc-mapreduce-1.7.4.jar
orc-shims/1.7.4//orc-shims-1.7.4.jar
orc-core/1.7.5//orc-core-1.7.5.jar
orc-mapreduce/1.7.5//orc-mapreduce-1.7.5.jar
orc-shims/1.7.5//orc-shims-1.7.5.jar
oro/2.0.8//oro-2.0.8.jar
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
paranamer/2.8//paranamer-2.8.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.ml.param.{IntParam, ParamMap}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.col

/**
* A simple example demonstrating how to write your own learning algorithm using Estimator,
Expand Down Expand Up @@ -120,8 +121,10 @@ private class MyLogisticRegression(override val uid: String)

// This method is used by fit()
override protected def train(dataset: Dataset[_]): MyLogisticRegressionModel = {
// Extract columns from data using helper method.
val oldDataset = extractLabeledPoints(dataset)
// Extract columns from data.
val oldDataset = dataset.select(col($(labelCol)).cast("double"), col($(featuresCol)))
.rdd
.map { case Row(l: Double, f: Vector) => LabeledPoint(l, f) }

// Do learning to estimate the coefficients vector.
val numFeatures = oldDataset.take(1)(0).features.size
Expand Down
51 changes: 2 additions & 49 deletions mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
package org.apache.spark.ml

import org.apache.spark.annotation.Since
import org.apache.spark.ml.feature.{Instance, LabeledPoint}
import org.apache.spark.ml.functions.checkNonNegativeWeight
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util.SchemaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}

Expand Down Expand Up @@ -63,40 +60,6 @@ private[ml] trait PredictorParams extends Params
}
SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType)
}

/**
* Extract [[labelCol]], weightCol(if any) and [[featuresCol]] from the given dataset,
* and put it in an RDD with strong types.
*/
protected def extractInstances(dataset: Dataset[_]): RDD[Instance] = {
val w = this match {
case p: HasWeightCol =>
if (isDefined(p.weightCol) && $(p.weightCol).nonEmpty) {
checkNonNegativeWeight((col($(p.weightCol)).cast(DoubleType)))
} else {
lit(1.0)
}
}

dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd.map {
case Row(label: Double, weight: Double, features: Vector) =>
Instance(label, weight, features)
}
}

/**
* Extract [[labelCol]], weightCol(if any) and [[featuresCol]] from the given dataset,
* and put it in an RDD with strong types.
* Validate the output instances with the given function.
*/
protected def extractInstances(
dataset: Dataset[_],
validateInstance: Instance => Unit): RDD[Instance] = {
extractInstances(dataset).map { instance =>
validateInstance(instance)
instance
}
}
}

/**
Expand Down Expand Up @@ -176,16 +139,6 @@ abstract class Predictor[
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema, fitting = true, featuresDataType)
}

/**
* Extract [[labelCol]] and [[featuresCol]] from the given dataset,
* and put it in an RDD with strong types.
*/
protected def extractLabeledPoints(dataset: Dataset[_]): RDD[LabeledPoint] = {
dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map {
case Row(label: Double, features: Vector) => LabeledPoint(label, features)
}
}
}

/**
Expand Down
Loading