Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions connector/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@
</properties>

<dependencies>
<dependency>
<groupId>com.spotify</groupId>
<artifactId>docker-client</artifactId>
<scope>test</scope>
<classifier>shaded</classifier>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down Expand Up @@ -101,14 +95,6 @@
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>
<!-- Although SPARK-28737 upgraded Jersey to 2.29 for JDK11, 'com.spotify.docker-client' still
uses this repackaged 'jersey-guava'. We add this back for JDK8/JDK11 testing. -->
<dependency>
<groupId>org.glassfish.jersey.bundles.repackaged</groupId>
<artifactId>jersey-guava</artifactId>
<version>2.25.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
Expand Down Expand Up @@ -139,5 +125,15 @@
<artifactId>mysql-connector-j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-transport-zerodep</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.security.PrivilegedExceptionAction
import java.sql.Connection
import javax.security.auth.login.Configuration

import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import com.github.dockerjava.api.model.{AccessMode, Bind, ContainerConfig, HostConfig, Volume}
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS
import org.scalatest.time.SpanSugar._
Expand Down Expand Up @@ -66,14 +66,15 @@ class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
}

override def beforeContainerStart(
hostConfigBuilder: HostConfig.Builder,
containerConfigBuilder: ContainerConfig.Builder): Unit = {
hostConfigBuilder: HostConfig,
containerConfigBuilder: ContainerConfig): Unit = {
copyExecutableResource("db2_krb_setup.sh", initDbDir, replaceIp)

hostConfigBuilder.appendBinds(
HostConfig.Bind.from(initDbDir.getAbsolutePath)
.to("/var/custom").readOnly(true).build()
)
val newBind = new Bind(
initDbDir.getAbsolutePath,
new Volume("/var/custom"),
AccessMode.ro)
hostConfigBuilder.withBinds(hostConfigBuilder.getBinds :+ newBind: _*)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ package org.apache.spark.sql.jdbc
import java.net.ServerSocket
import java.sql.{Connection, DriverManager}
import java.util.Properties
import java.util.concurrent.TimeUnit

import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import com.spotify.docker.client._
import com.spotify.docker.client.DockerClient.{ListContainersParam, LogsParam}
import com.spotify.docker.client.exceptions.ImageNotFoundException
import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding}
import com.github.dockerjava.api.DockerClient
import com.github.dockerjava.api.async.{ResultCallback, ResultCallbackTemplate}
import com.github.dockerjava.api.command.CreateContainerResponse
import com.github.dockerjava.api.exception.NotFoundException
import com.github.dockerjava.api.model._
import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientImpl}
import com.github.dockerjava.zerodep.ZerodepDockerHttpClient
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

Expand Down Expand Up @@ -88,8 +92,8 @@ abstract class DatabaseOnDocker {
* Optional step before container starts
*/
def beforeContainerStart(
hostConfigBuilder: HostConfig.Builder,
containerConfigBuilder: ContainerConfig.Builder): Unit = {}
hostConfigBuilder: HostConfig,
containerConfigBuilder: ContainerConfig): Unit = {}
}

abstract class DockerJDBCIntegrationSuite
Expand All @@ -111,56 +115,75 @@ abstract class DockerJDBCIntegrationSuite
sock.close()
port
}
private var containerId: String = _
private var container: CreateContainerResponse = _
private var pulled: Boolean = false
protected var jdbcUrl: String = _

override def beforeAll(): Unit = runIfTestsEnabled(s"Prepare for ${this.getClass.getName}") {
super.beforeAll()
try {
docker = DefaultDockerClient.fromEnv.build()
val config = DefaultDockerClientConfig.createDefaultConfigBuilder.build
val httpClient = new ZerodepDockerHttpClient.Builder()
.dockerHost(config.getDockerHost)
.sslConfig(config.getSSLConfig)
.build()
docker = DockerClientImpl.getInstance(config, httpClient)
// Check that Docker is actually up
try {
docker.ping()
docker.pingCmd().exec()
} catch {
case NonFatal(e) =>
log.error("Exception while connecting to Docker. Check whether Docker is running.")
throw e
}
// Ensure that the Docker image is installed:
try {
docker.inspectImage(db.imageName)
// Ensure that the Docker image is installed:
docker.inspectImageCmd(db.imageName).exec()
} catch {
case e: ImageNotFoundException =>
case e: NotFoundException =>
log.warn(s"Docker image ${db.imageName} not found; pulling image from registry")
docker.pull(db.imageName)
docker.pullImageCmd(db.imageName)
.start()
.awaitCompletion(connectionTimeout.value.toSeconds, TimeUnit.SECONDS)
pulled = true
}
val hostConfigBuilder = HostConfig.builder()
.privileged(db.privileged)
.networkMode("bridge")
.ipcMode(if (db.usesIpc) "host" else "")
.portBindings(
Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava)
// Create the database container:
val containerConfigBuilder = ContainerConfig.builder()
.image(db.imageName)
.networkDisabled(false)
.env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
.exposedPorts(s"${db.jdbcPort}/tcp")
if (db.getEntryPoint.isDefined) {
containerConfigBuilder.entrypoint(db.getEntryPoint.get)
}
if (db.getStartupProcessName.isDefined) {
containerConfigBuilder.cmd(db.getStartupProcessName.get)

docker.pullImageCmd(db.imageName)
.start()
.awaitCompletion(connectionTimeout.value.toSeconds, TimeUnit.SECONDS)

val hostConfig = HostConfig
.newHostConfig()
.withNetworkMode("bridge")
.withPrivileged(db.privileged)
.withPortBindings(PortBinding.parse(s"$dockerIp:$externalPort:${db.jdbcPort}"))

if (db.usesIpc) {
hostConfig.withIpcMode("host")
}
db.beforeContainerStart(hostConfigBuilder, containerConfigBuilder)
containerConfigBuilder.hostConfig(hostConfigBuilder.build())
val config = containerConfigBuilder.build()

val containerConfig = new ContainerConfig()

db.beforeContainerStart(hostConfig, containerConfig)

// Create the database container:
containerId = docker.createContainer(config).id
val createContainerCmd = docker.createContainerCmd(db.imageName)
.withHostConfig(hostConfig)
.withExposedPorts(ExposedPort.tcp(db.jdbcPort))
.withEnv(db.env.map { case (k, v) => s"$k=$v" }.toList.asJava)
.withNetworkDisabled(false)


db.getEntryPoint.foreach(ep => createContainerCmd.withEntrypoint(ep))
db.getStartupProcessName.foreach(n => createContainerCmd.withCmd(n))

container = createContainerCmd.exec()
// Start the container and wait until the database can accept JDBC connections:
docker.startContainer(containerId)
docker.startContainerCmd(container.getId).exec()
eventually(connectionTimeout, interval(1.second)) {
val response = docker.inspectContainerCmd(container.getId).exec()
assert(response.getState.getRunning)
}
jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
var conn: Connection = null
eventually(connectionTimeout, interval(1.second)) {
Expand Down Expand Up @@ -206,36 +229,35 @@ abstract class DockerJDBCIntegrationSuite
def dataPreparation(connection: Connection): Unit

private def cleanupContainer(): Unit = {
if (docker != null && containerId != null && !keepContainer) {
if (docker != null && container != null && !keepContainer) {
try {
docker.killContainer(containerId)
docker.killContainerCmd(container.getId).exec()
} catch {
case NonFatal(e) =>
val exitContainerIds =
docker.listContainers(ListContainersParam.withStatusExited()).asScala.map(_.id())
if (exitContainerIds.contains(containerId)) {
logWarning(s"Container $containerId already stopped")
} else {
logWarning(s"Could not stop container $containerId", e)
}
val response = docker.inspectContainerCmd(container.getId).exec()
logWarning(s"Container $container already stopped")
val status = Option(response).map(_.getState.getStatus).getOrElse("unknown")
logWarning(s"Could not stop container $container at stage '$status'", e)
} finally {
logContainerOutput()
docker.removeContainer(containerId)
docker.removeContainerCmd(container.getId).exec()
if (removePulledImage && pulled) {
docker.removeImage(db.imageName)
docker.removeImageCmd(db.imageName).exec()
}
}
}
}

private def logContainerOutput(): Unit = {
val logStream = docker.logs(containerId, LogsParam.stdout(), LogsParam.stderr())
try {
logInfo("\n\n===== CONTAINER LOGS FOR container Id: " + containerId + " =====")
logInfo(logStream.readFully())
logInfo("\n\n===== END OF CONTAINER LOGS FOR container Id: " + containerId + " =====")
} finally {
logStream.close()
}
logInfo("\n\n===== CONTAINER LOGS FOR container Id: " + container + " =====")
docker.logContainerCmd(container.getId)
.withStdOut(true)
.withStdErr(true)
.withFollowStream(true)
.withSince(0).exec(
new ResultCallbackTemplate[ResultCallback[Frame], Frame] {
override def onNext(f: Frame): Unit = logInfo(f.toString)
})
logInfo("\n\n===== END OF CONTAINER LOGS FOR container Id: " + container + " =====")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc

import javax.security.auth.login.Configuration

import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import com.github.dockerjava.api.model.{AccessMode, Bind, ContainerConfig, HostConfig, Volume}

import org.apache.spark.sql.execution.datasources.jdbc.connection.SecureConnectionProvider
import org.apache.spark.tags.DockerTest
Expand Down Expand Up @@ -52,17 +52,17 @@ class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
Some("/docker-entrypoint/mariadb_docker_entrypoint.sh")

override def beforeContainerStart(
hostConfigBuilder: HostConfig.Builder,
containerConfigBuilder: ContainerConfig.Builder): Unit = {
hostConfigBuilder: HostConfig,
containerConfigBuilder: ContainerConfig): Unit = {
copyExecutableResource("mariadb_docker_entrypoint.sh", entryPointDir, replaceIp)
copyExecutableResource("mariadb_krb_setup.sh", initDbDir, replaceIp)

hostConfigBuilder.appendBinds(
HostConfig.Bind.from(entryPointDir.getAbsolutePath)
.to("/docker-entrypoint").readOnly(true).build(),
HostConfig.Bind.from(initDbDir.getAbsolutePath)
.to("/docker-entrypoint-initdb.d").readOnly(true).build()
)
val binds =
Seq(entryPointDir -> "/docker-entrypoint", initDbDir -> "/docker-entrypoint-initdb.d")
.map { case (from, to) =>
new Bind(from.getAbsolutePath, new Volume(to), AccessMode.ro)
}
hostConfigBuilder.withBinds(hostConfigBuilder.getBinds ++ binds: _*)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc

import javax.security.auth.login.Configuration

import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import com.github.dockerjava.api.model.{AccessMode, Bind, ContainerConfig, HostConfig, Volume}

import org.apache.spark.sql.execution.datasources.jdbc.connection.SecureConnectionProvider
import org.apache.spark.tags.DockerTest
Expand Down Expand Up @@ -48,14 +48,14 @@ class PostgresKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
s"jdbc:postgresql://$ip:$port/postgres?user=$principal&gsslib=gssapi"

override def beforeContainerStart(
hostConfigBuilder: HostConfig.Builder,
containerConfigBuilder: ContainerConfig.Builder): Unit = {
hostConfigBuilder: HostConfig,
containerConfigBuilder: ContainerConfig): Unit = {
copyExecutableResource("postgres_krb_setup.sh", initDbDir, replaceIp)

hostConfigBuilder.appendBinds(
HostConfig.Bind.from(initDbDir.getAbsolutePath)
.to("/docker-entrypoint-initdb.d").readOnly(true).build()
)
val newBind = new Bind(
initDbDir.getAbsolutePath,
new Volume("/docker-entrypoint-initdb.d"),
AccessMode.ro)
hostConfigBuilder.withBinds(hostConfigBuilder.getBinds :+ newBind: _*)
}
}

Expand Down
25 changes: 17 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1237,22 +1237,31 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.spotify</groupId>
<artifactId>docker-client</artifactId>
<version>8.14.1</version>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
<version>3.3.4</version>
<scope>test</scope>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-transport-netty</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-transport-jersey</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-transport-zerodep</artifactId>
<version>3.3.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
Expand Down
3 changes: 1 addition & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,7 @@ object SparkBuild extends PomBuild {
/* Protobuf settings */
enable(SparkProtobuf.settings)(protobuf)

// SPARK-14738 - Remove docker tests from main Spark build
// enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
enable(DockerIntegrationTests.settings)(dockerIntegrationTests)

if (!profiles.contains("volcano")) {
enable(Volcano.settings)(kubernetes)
Expand Down