diff --git a/connector/docker-integration-tests/pom.xml b/connector/docker-integration-tests/pom.xml index 3f73177d7dd4..4cca3ef12ae5 100644 --- a/connector/docker-integration-tests/pom.xml +++ b/connector/docker-integration-tests/pom.xml @@ -35,12 +35,6 @@ - - com.spotify - docker-client - test - shaded - org.apache.httpcomponents httpclient @@ -101,14 +95,6 @@ hadoop-minikdc test - - - org.glassfish.jersey.bundles.repackaged - jersey-guava - 2.25.1 - test - org.mariadb.jdbc mariadb-java-client @@ -139,5 +125,15 @@ mysql-connector-j test + + com.github.docker-java + docker-java + test + + + com.github.docker-java + docker-java-transport-zerodep + test + diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala index 9b518d61d252..66e2afbb6eff 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala @@ -21,7 +21,7 @@ import java.security.PrivilegedExceptionAction import java.sql.Connection import javax.security.auth.login.Configuration -import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} +import com.github.dockerjava.api.model.{AccessMode, Bind, ContainerConfig, HostConfig, Volume} import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS import org.scalatest.time.SpanSugar._ @@ -66,14 +66,15 @@ class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { } override def beforeContainerStart( - hostConfigBuilder: HostConfig.Builder, - containerConfigBuilder: ContainerConfig.Builder): Unit = { + hostConfigBuilder: HostConfig, + containerConfigBuilder: ContainerConfig): Unit = { copyExecutableResource("db2_krb_setup.sh", initDbDir, replaceIp) - hostConfigBuilder.appendBinds( - HostConfig.Bind.from(initDbDir.getAbsolutePath) - .to("/var/custom").readOnly(true).build() - ) + val newBind = new Bind( + initDbDir.getAbsolutePath, + new Volume("/var/custom"), + AccessMode.ro) + hostConfigBuilder.withBinds(hostConfigBuilder.getBinds :+ newBind: _*) } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index bcad9ae874eb..fde228939dd4 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -20,14 +20,18 @@ package org.apache.spark.sql.jdbc import java.net.ServerSocket import java.sql.{Connection, DriverManager} import java.util.Properties +import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -import com.spotify.docker.client._ -import com.spotify.docker.client.DockerClient.{ListContainersParam, LogsParam} -import com.spotify.docker.client.exceptions.ImageNotFoundException -import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding} +import com.github.dockerjava.api.DockerClient +import com.github.dockerjava.api.async.{ResultCallback, ResultCallbackTemplate} +import com.github.dockerjava.api.command.CreateContainerResponse +import com.github.dockerjava.api.exception.NotFoundException +import com.github.dockerjava.api.model._ +import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientImpl} +import com.github.dockerjava.zerodep.ZerodepDockerHttpClient import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar._ @@ -88,8 +92,8 @@ abstract class DatabaseOnDocker { * Optional step before container starts */ def beforeContainerStart( - hostConfigBuilder: HostConfig.Builder, - containerConfigBuilder: ContainerConfig.Builder): Unit = {} + hostConfigBuilder: HostConfig, + containerConfigBuilder: ContainerConfig): Unit = {} } abstract class DockerJDBCIntegrationSuite @@ -111,56 +115,75 @@ abstract class DockerJDBCIntegrationSuite sock.close() port } - private var containerId: String = _ + private var container: CreateContainerResponse = _ private var pulled: Boolean = false protected var jdbcUrl: String = _ override def beforeAll(): Unit = runIfTestsEnabled(s"Prepare for ${this.getClass.getName}") { super.beforeAll() try { - docker = DefaultDockerClient.fromEnv.build() + val config = DefaultDockerClientConfig.createDefaultConfigBuilder.build + val httpClient = new ZerodepDockerHttpClient.Builder() + .dockerHost(config.getDockerHost) + .sslConfig(config.getSSLConfig) + .build() + docker = DockerClientImpl.getInstance(config, httpClient) // Check that Docker is actually up try { - docker.ping() + docker.pingCmd().exec() } catch { case NonFatal(e) => log.error("Exception while connecting to Docker. Check whether Docker is running.") throw e } - // Ensure that the Docker image is installed: try { - docker.inspectImage(db.imageName) + // Ensure that the Docker image is installed: + docker.inspectImageCmd(db.imageName).exec() } catch { - case e: ImageNotFoundException => + case e: NotFoundException => log.warn(s"Docker image ${db.imageName} not found; pulling image from registry") - docker.pull(db.imageName) + docker.pullImageCmd(db.imageName) + .start() + .awaitCompletion(connectionTimeout.value.toSeconds, TimeUnit.SECONDS) pulled = true } - val hostConfigBuilder = HostConfig.builder() - .privileged(db.privileged) - .networkMode("bridge") - .ipcMode(if (db.usesIpc) "host" else "") - .portBindings( - Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava) - // Create the database container: - val containerConfigBuilder = ContainerConfig.builder() - .image(db.imageName) - .networkDisabled(false) - .env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava) - .exposedPorts(s"${db.jdbcPort}/tcp") - if (db.getEntryPoint.isDefined) { - containerConfigBuilder.entrypoint(db.getEntryPoint.get) - } - if (db.getStartupProcessName.isDefined) { - containerConfigBuilder.cmd(db.getStartupProcessName.get) + + docker.pullImageCmd(db.imageName) + .start() + .awaitCompletion(connectionTimeout.value.toSeconds, TimeUnit.SECONDS) + + val hostConfig = HostConfig + .newHostConfig() + .withNetworkMode("bridge") + .withPrivileged(db.privileged) + .withPortBindings(PortBinding.parse(s"$dockerIp:$externalPort:${db.jdbcPort}")) + + if (db.usesIpc) { + hostConfig.withIpcMode("host") } - db.beforeContainerStart(hostConfigBuilder, containerConfigBuilder) - containerConfigBuilder.hostConfig(hostConfigBuilder.build()) - val config = containerConfigBuilder.build() + + val containerConfig = new ContainerConfig() + + db.beforeContainerStart(hostConfig, containerConfig) + // Create the database container: - containerId = docker.createContainer(config).id + val createContainerCmd = docker.createContainerCmd(db.imageName) + .withHostConfig(hostConfig) + .withExposedPorts(ExposedPort.tcp(db.jdbcPort)) + .withEnv(db.env.map { case (k, v) => s"$k=$v" }.toList.asJava) + .withNetworkDisabled(false) + + + db.getEntryPoint.foreach(ep => createContainerCmd.withEntrypoint(ep)) + db.getStartupProcessName.foreach(n => createContainerCmd.withCmd(n)) + + container = createContainerCmd.exec() // Start the container and wait until the database can accept JDBC connections: - docker.startContainer(containerId) + docker.startContainerCmd(container.getId).exec() + eventually(connectionTimeout, interval(1.second)) { + val response = docker.inspectContainerCmd(container.getId).exec() + assert(response.getState.getRunning) + } jdbcUrl = db.getJdbcUrl(dockerIp, externalPort) var conn: Connection = null eventually(connectionTimeout, interval(1.second)) { @@ -206,36 +229,35 @@ abstract class DockerJDBCIntegrationSuite def dataPreparation(connection: Connection): Unit private def cleanupContainer(): Unit = { - if (docker != null && containerId != null && !keepContainer) { + if (docker != null && container != null && !keepContainer) { try { - docker.killContainer(containerId) + docker.killContainerCmd(container.getId).exec() } catch { case NonFatal(e) => - val exitContainerIds = - docker.listContainers(ListContainersParam.withStatusExited()).asScala.map(_.id()) - if (exitContainerIds.contains(containerId)) { - logWarning(s"Container $containerId already stopped") - } else { - logWarning(s"Could not stop container $containerId", e) - } + val response = docker.inspectContainerCmd(container.getId).exec() + logWarning(s"Container $container already stopped") + val status = Option(response).map(_.getState.getStatus).getOrElse("unknown") + logWarning(s"Could not stop container $container at stage '$status'", e) } finally { logContainerOutput() - docker.removeContainer(containerId) + docker.removeContainerCmd(container.getId).exec() if (removePulledImage && pulled) { - docker.removeImage(db.imageName) + docker.removeImageCmd(db.imageName).exec() } } } } private def logContainerOutput(): Unit = { - val logStream = docker.logs(containerId, LogsParam.stdout(), LogsParam.stderr()) - try { - logInfo("\n\n===== CONTAINER LOGS FOR container Id: " + containerId + " =====") - logInfo(logStream.readFully()) - logInfo("\n\n===== END OF CONTAINER LOGS FOR container Id: " + containerId + " =====") - } finally { - logStream.close() - } + logInfo("\n\n===== CONTAINER LOGS FOR container Id: " + container + " =====") + docker.logContainerCmd(container.getId) + .withStdOut(true) + .withStdErr(true) + .withFollowStream(true) + .withSince(0).exec( + new ResultCallbackTemplate[ResultCallback[Frame], Frame] { + override def onNext(f: Frame): Unit = logInfo(f.toString) + }) + logInfo("\n\n===== END OF CONTAINER LOGS FOR container Id: " + container + " =====") } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala index 873d5ad1ee43..49c9e3dba0d7 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc import javax.security.auth.login.Configuration -import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} +import com.github.dockerjava.api.model.{AccessMode, Bind, ContainerConfig, HostConfig, Volume} import org.apache.spark.sql.execution.datasources.jdbc.connection.SecureConnectionProvider import org.apache.spark.tags.DockerTest @@ -52,17 +52,17 @@ class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { Some("/docker-entrypoint/mariadb_docker_entrypoint.sh") override def beforeContainerStart( - hostConfigBuilder: HostConfig.Builder, - containerConfigBuilder: ContainerConfig.Builder): Unit = { + hostConfigBuilder: HostConfig, + containerConfigBuilder: ContainerConfig): Unit = { copyExecutableResource("mariadb_docker_entrypoint.sh", entryPointDir, replaceIp) copyExecutableResource("mariadb_krb_setup.sh", initDbDir, replaceIp) - hostConfigBuilder.appendBinds( - HostConfig.Bind.from(entryPointDir.getAbsolutePath) - .to("/docker-entrypoint").readOnly(true).build(), - HostConfig.Bind.from(initDbDir.getAbsolutePath) - .to("/docker-entrypoint-initdb.d").readOnly(true).build() - ) + val binds = + Seq(entryPointDir -> "/docker-entrypoint", initDbDir -> "/docker-entrypoint-initdb.d") + .map { case (from, to) => + new Bind(from.getAbsolutePath, new Volume(to), AccessMode.ro) + } + hostConfigBuilder.withBinds(hostConfigBuilder.getBinds ++ binds: _*) } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala index 4debe24754de..baf24b3c1357 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc import javax.security.auth.login.Configuration -import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} +import com.github.dockerjava.api.model.{AccessMode, Bind, ContainerConfig, HostConfig, Volume} import org.apache.spark.sql.execution.datasources.jdbc.connection.SecureConnectionProvider import org.apache.spark.tags.DockerTest @@ -48,14 +48,14 @@ class PostgresKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { s"jdbc:postgresql://$ip:$port/postgres?user=$principal&gsslib=gssapi" override def beforeContainerStart( - hostConfigBuilder: HostConfig.Builder, - containerConfigBuilder: ContainerConfig.Builder): Unit = { + hostConfigBuilder: HostConfig, + containerConfigBuilder: ContainerConfig): Unit = { copyExecutableResource("postgres_krb_setup.sh", initDbDir, replaceIp) - - hostConfigBuilder.appendBinds( - HostConfig.Bind.from(initDbDir.getAbsolutePath) - .to("/docker-entrypoint-initdb.d").readOnly(true).build() - ) + val newBind = new Bind( + initDbDir.getAbsolutePath, + new Volume("/docker-entrypoint-initdb.d"), + AccessMode.ro) + hostConfigBuilder.withBinds(hostConfigBuilder.getBinds :+ newBind: _*) } } diff --git a/pom.xml b/pom.xml index d2f8ff5777e7..aeade2bccc21 100644 --- a/pom.xml +++ b/pom.xml @@ -1237,22 +1237,31 @@ test - com.spotify - docker-client - 8.14.1 + com.github.docker-java + docker-java + 3.3.4 test - shaded - - guava - com.google.guava - commons-logging commons-logging + + com.github.docker-java + docker-java-transport-netty + + + com.github.docker-java + docker-java-transport-jersey + + + com.github.docker-java + docker-java-transport-zerodep + 3.3.4 + test + com.mysql mysql-connector-j diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ac86aeee3d28..5523932ac0ad 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -415,8 +415,7 @@ object SparkBuild extends PomBuild { /* Protobuf settings */ enable(SparkProtobuf.settings)(protobuf) - // SPARK-14738 - Remove docker tests from main Spark build - // enable(DockerIntegrationTests.settings)(dockerIntegrationTests) + enable(DockerIntegrationTests.settings)(dockerIntegrationTests) if (!profiles.contains("volcano")) { enable(Volcano.settings)(kubernetes)