Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions external/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
<sbt.project.name>docker-integration-tests</sbt.project.name>
</properties>

<repositories>
<repository>
<id>db2</id>
<url>https://app.camunda.com/nexus/content/repositories/public/</url>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine but I'll be really mad if this repository ever goes down and breaks the build 😄

</repository>
</repositories>

<dependencies>
<dependency>
<groupId>com.spotify</groupId>
Expand Down Expand Up @@ -180,5 +187,28 @@
</exclusions>
</dependency>
<!-- End Jersey dependencies -->

<!-- DB2 JCC driver manual installation instructions

You can build this datasource if you:
1) have the DB2 artifacts installed in a local repo and supply the URL:
-Dmaven.repo.drivers=http://my.local.repo

2) have a copy of the DB2 JCC driver and run the following commands :
mvn install:install-file -Dfile=${path to db2jcc4.jar} \
-DgroupId=com.ibm.db2 \
-DartifactId=db2jcc4 \
-Dversion=10.5 \
-Dpackaging=jar

Note: IBM DB2 JCC driver is available for download at
http://www-01.ibm.com/support/docview.wss?uid=swg21363866
-->
<dependency>
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc4</artifactId>
<version>10.5.0.5</version>
<type>jar</type>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import java.math.BigDecimal
import java.sql.{Connection, Date, Timestamp}
import java.util.Properties

import org.scalatest._

import org.apache.spark.tags.DockerTest

@DockerTest
@Ignore // AMPLab Jenkins needs to be updated before shared memory works on docker
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
override val db = new DatabaseOnDocker {
override val imageName = "lresende/db2express-c:10.5.0.5-3.10.0"
override val env = Map(
"DB2INST1_PASSWORD" -> "rootpass",
"LICENSE" -> "accept"
)
override val usesIpc = true
override val jdbcPort: Int = 50000
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:db2://$ip:$port/foo:user=db2inst1;password=rootpass;"
override def getStartupProcessName: Option[String] = Some("db2start")
}

override def dataPreparation(conn: Connection): Unit = {
conn.prepareStatement("CREATE TABLE tbl (x INTEGER, y VARCHAR(8))").executeUpdate()
conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate()
conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate()

conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), "
+ "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, "
+ "dbl DOUBLE)").executeUpdate()
conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
+ "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, "
+ "42.75, 1.0000000000000002)").executeUpdate()

conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, "
+ "yr YEAR)").executeUpdate()
conn.prepareStatement("INSERT INTO dates VALUES ('1991-11-09', '13:31:24', "
+ "'1996-01-01 01:23:45', '2009-02-13 23:31:30', '2001')").executeUpdate()

// TODO: Test locale conversion for strings.
conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c CLOB, d BLOB, "
+ "e CHAR FOR BIT DATA)").executeUpdate()
conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', 'jumps'")
.executeUpdate()
}

test("Basic test") {
val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
val rows = df.collect()
assert(rows.length == 2)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 2)
assert(types(0).equals("class java.lang.Integer"))
assert(types(1).equals("class java.lang.String"))
}

test("Numeric types") {
val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 9)
assert(types(0).equals("class java.lang.Boolean"))
assert(types(1).equals("class java.lang.Long"))
assert(types(2).equals("class java.lang.Integer"))
assert(types(3).equals("class java.lang.Integer"))
assert(types(4).equals("class java.lang.Integer"))
assert(types(5).equals("class java.lang.Long"))
assert(types(6).equals("class java.math.BigDecimal"))
assert(types(7).equals("class java.lang.Double"))
assert(types(8).equals("class java.lang.Double"))
assert(rows(0).getBoolean(0) == false)
assert(rows(0).getLong(1) == 0x225)
assert(rows(0).getInt(2) == 17)
assert(rows(0).getInt(3) == 77777)
assert(rows(0).getInt(4) == 123456789)
assert(rows(0).getLong(5) == 123456789012345L)
val bd = new BigDecimal("123456789012345.12345678901234500000")
assert(rows(0).getAs[BigDecimal](6).equals(bd))
assert(rows(0).getDouble(7) == 42.75)
assert(rows(0).getDouble(8) == 1.0000000000000002)
}

test("Date types") {
val df = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 5)
assert(types(0).equals("class java.sql.Date"))
assert(types(1).equals("class java.sql.Timestamp"))
assert(types(2).equals("class java.sql.Timestamp"))
assert(types(3).equals("class java.sql.Timestamp"))
assert(types(4).equals("class java.sql.Date"))
assert(rows(0).getAs[Date](0).equals(Date.valueOf("1991-11-09")))
assert(rows(0).getAs[Timestamp](1).equals(Timestamp.valueOf("1970-01-01 13:31:24")))
assert(rows(0).getAs[Timestamp](2).equals(Timestamp.valueOf("1996-01-01 01:23:45")))
assert(rows(0).getAs[Timestamp](3).equals(Timestamp.valueOf("2009-02-13 23:31:30")))
assert(rows(0).getAs[Date](4).equals(Date.valueOf("2001-01-01")))
}

test("String types") {
val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 9)
assert(types(0).equals("class java.lang.String"))
assert(types(1).equals("class java.lang.String"))
assert(types(2).equals("class java.lang.String"))
assert(types(3).equals("class java.lang.String"))
assert(types(4).equals("class java.lang.String"))
assert(types(5).equals("class java.lang.String"))
assert(types(6).equals("class [B"))
assert(types(7).equals("class [B"))
assert(types(8).equals("class [B"))
assert(rows(0).getString(0).equals("the"))
assert(rows(0).getString(1).equals("quick"))
assert(rows(0).getString(2).equals("brown"))
assert(rows(0).getString(3).equals("fox"))
assert(rows(0).getString(4).equals("jumps"))
assert(rows(0).getString(5).equals("over"))
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), Array[Byte](116, 104, 101, 0)))
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7), Array[Byte](108, 97, 122, 121)))
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8), Array[Byte](100, 111, 103)))
}

test("Basic write test") {
val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
df1.write.jdbc(jdbcUrl, "numberscopy", new Properties)
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ abstract class DatabaseOnDocker {
*/
val env: Map[String, String]

/**
* Wheather or not to use ipc mode for shared memory when starting docker image
*/
val usesIpc: Boolean

/**
* The container-internal JDBC port that the database listens on.
*/
Expand All @@ -53,6 +58,11 @@ abstract class DatabaseOnDocker {
* Return a JDBC URL that connects to the database running at the given IP address and port.
*/
def getJdbcUrl(ip: String, port: Int): String

/**
* Optional process to run when container starts
*/
def getStartupProcessName: Option[String]
}

abstract class DockerJDBCIntegrationSuite
Expand Down Expand Up @@ -97,17 +107,23 @@ abstract class DockerJDBCIntegrationSuite
val dockerIp = DockerUtils.getDockerIp()
val hostConfig: HostConfig = HostConfig.builder()
.networkMode("bridge")
.ipcMode(if (db.usesIpc) "host" else "")
.portBindings(
Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava)
.build()
// Create the database container:
val config = ContainerConfig.builder()
val containerConfigBuilder = ContainerConfig.builder()
.image(db.imageName)
.networkDisabled(false)
.env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
.hostConfig(hostConfig)
.exposedPorts(s"${db.jdbcPort}/tcp")
.build()
if(db.getStartupProcessName.isDefined) {
containerConfigBuilder
.cmd(db.getStartupProcessName.get)
}
val config = containerConfigBuilder.build()
// Create the database container:
containerId = docker.createContainer(config).id
// Start the container and wait until the database can accept JDBC connections:
docker.startContainer(containerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
override val env = Map(
"MYSQL_ROOT_PASSWORD" -> "rootpass"
)
override val usesIpc = false
override val jdbcPort: Int = 3306
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
override def getStartupProcessName: Option[String] = None
}

override def dataPreparation(conn: Connection): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
override val env = Map(
"ORACLE_ROOT_PASSWORD" -> "oracle"
)
override val usesIpc = false
override val jdbcPort: Int = 1521
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:oracle:thin:system/oracle@//$ip:$port/xe"
override def getStartupProcessName: Option[String] = None
}

override def dataPreparation(conn: Connection): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
override val env = Map(
"POSTGRES_PASSWORD" -> "rootpass"
)
override val usesIpc = false
override val jdbcPort = 5432
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:postgresql://$ip:$port/postgres?user=postgres&password=rootpass"
override def getStartupProcessName: Option[String] = None
}

override def dataPreparation(conn: Connection): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@
<groupId>com.spotify</groupId>
<artifactId>docker-client</artifactId>
<classifier>shaded</classifier>
<version>3.4.0</version>
<version>3.6.6</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down
4 changes: 3 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,10 @@ object Flume {
object DockerIntegrationTests {
// This serves to override the override specified in DependencyOverrides:
lazy val settings = Seq(
dependencyOverrides += "com.google.guava" % "guava" % "18.0"
dependencyOverrides += "com.google.guava" % "guava" % "18.0",
resolvers ++= Seq("DB2" at "https://app.camunda.com/nexus/content/repositories/public/")
)

}

/**
Expand Down