Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions connect/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the file target/unit-tests.log
rootLogger.level = info
rootLogger.appenderRef.file.ref = ${sys:test.appender:-File}

appender.file.type = File
appender.file.name = File
appender.file.fileName = target/unit-tests.log
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex

# Tests that launch java subprocesses can set the "test.appender" system property to
# "console" to avoid having the child process's logs overwrite the unit test's
# log file.
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %t: %m%n%ex

# Ignore messages below warning level from Jetty, because it's a bit verbose
logger.jetty.name = org.sparkproject.jetty
logger.jetty.level = warn
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.connect.planner

import scala.collection.JavaConverters._

import org.apache.spark.{SparkFunSuite, TestUtils}
import org.apache.spark.SparkFunSuite
import org.apache.spark.connect.proto
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -58,11 +58,6 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest {

protected var spark: SparkSession = null

override def beforeAll(): Unit = {
super.beforeAll()
TestUtils.configTestLog4j2("INFO")
}

test("Simple Limit") {
assertThrows[IndexOutOfBoundsException] {
new SparkConnectPlanner(
Expand Down
47 changes: 24 additions & 23 deletions dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
automaton/1.11-8//automaton-1.11-8.jar
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
avro/1.11.1//avro-1.11.1.jar
Expand Down Expand Up @@ -69,7 +68,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
gcs-connector/hadoop2-2.2.7/shaded/gcs-connector-hadoop2-2.2.7-shaded.jar
generex/1.0.2//generex-1.0.2.jar
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
grpc-api/1.47.0//grpc-api-1.47.0.jar
grpc-context/1.47.0//grpc-context-1.47.0.jar
Expand Down Expand Up @@ -175,27 +173,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
lapack/3.0.2//lapack-3.0.2.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
Expand Down
47 changes: 24 additions & 23 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
automaton/1.11-8//automaton-1.11-8.jar
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
avro/1.11.1//avro-1.11.1.jar
Expand Down Expand Up @@ -66,7 +65,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
gcs-connector/hadoop3-2.2.7/shaded/gcs-connector-hadoop3-2.2.7-shaded.jar
generex/1.0.2//generex-1.0.2.jar
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
grpc-api/1.47.0//grpc-api-1.47.0.jar
grpc-context/1.47.0//grpc-context-1.47.0.jar
Expand Down Expand Up @@ -159,27 +157,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
lapack/3.0.2//lapack-3.0.2.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
<arrow.version>9.0.0</arrow.version>
<!-- org.fusesource.leveldbjni will be used except on arm64 platform. -->
<leveldbjni.group>org.fusesource.leveldbjni</leveldbjni.group>
<kubernetes-client.version>5.12.3</kubernetes-client.version>
<kubernetes-client.version>6.1.1</kubernetes-client.version>

<test.java.home>${java.home}</test.java.home>

Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.8.3")

addSbtPlugin("com.typesafe.sbt" % "sbt-pom-reader" % "2.2.0")

addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.1")
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6")
5 changes: 5 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-okhttp</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder}
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
Expand Down Expand Up @@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
}
logDebug("Kubernetes client config: " +
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config))
new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config)
new KubernetesClientBuilder()
.withHttpClientFactory(factoryWithCustomDispatcher)
.withConfig(config)
.build()
}

private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit
import scala.collection.JavaConverters._

import K8SSparkSubmitOperation.getGracePeriod
import io.fabric8.kubernetes.api.model.{Pod, PodList}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
import io.fabric8.kubernetes.client.dsl.PodResource

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitOperation
Expand All @@ -32,25 +32,23 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
import org.apache.spark.util.{CommandLineLoggingUtils, Utils}

private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
type NON_NAMESPACED_PODS =
NonNamespaceOperation[Pod, PodList, PodResource[Pod]]
def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def listPodsInNameSpace(namespace: Option[String])
(implicit client: KubernetesClient): NON_NAMESPACED_PODS = {
def getPod(namespace: Option[String], name: String)
(implicit client: KubernetesClient): PodResource = {
namespace match {
case Some(ns) => client.pods.inNamespace(ns)
case None => client.pods
case Some(ns) => client.pods.inNamespace(ns).withName(name)
case None => client.pods.withName(name)
}
}
}

private class KillApplication extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val podToDelete = listPodsInNameSpace(namespace).withName(pName)
val podToDelete = getPod(namespace, pName)

if (Option(podToDelete).isDefined) {
getGracePeriod(sparkConf) match {
Expand All @@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp {
(implicit client: KubernetesClient): Unit = {
if (pods.nonEmpty) {
pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") }
val listedPods = listPodsInNameSpace(namespace)

getGracePeriod(sparkConf) match {
case Some(period) =>
// this is not using the batch api because no option is provided
// when using the grace period.
pods.foreach { pod =>
listedPods
.withName(pod.getMetadata.getName)
.withGracePeriod(period)
.delete()
}
case _ => listedPods.delete(pods.asJava)
client.resourceList(pods.asJava).withGracePeriod(period).delete()
case _ =>
client.resourceList(pods.asJava).delete()
}
} else {
printMessage("No applications found.")
Expand All @@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp {
private class ListStatus extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val pod = listPodsInNameSpace(namespace).withName(pName).get()
val pod = getPod(namespace, pName).get()
if (Option(pod).isDefined) {
printMessage("Application status (driver): " +
Option(pod).map(formatPodState).getOrElse("unknown."))
Expand Down Expand Up @@ -145,13 +135,12 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
.pods
}
val pods = ops
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
.list()
.getItems
.asScala
.filter { pod =>
val meta = pod.getMetadata
meta.getName.startsWith(pName.stripSuffix("*")) &&
meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
pod.getMetadata.getName.startsWith(pName.stripSuffix("*"))
}.toList
op.executeOnGlob(pods, namespace, sparkConf)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ private[spark] class Client(
var watch: Watch = null
var createdDriverPod: Pod = null
try {
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
createdDriverPod =
kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create()
} catch {
case NonFatal(e) =>
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
Expand All @@ -163,7 +164,7 @@ private[spark] class Client(
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
throw e
}
Expand All @@ -175,7 +176,7 @@ private[spark] class Client(
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
throw e
}

Expand All @@ -185,6 +186,7 @@ private[spark] class Client(
while (true) {
val podWithName = kubernetesClient
.pods()
.inNamespace(conf.namespace)
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
Expand Down
Loading