-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-31272][SQL] Support DB2 Kerberos login in JDBC connector #28215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| #!/usr/bin/env bash | ||
|
|
||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| USERPROFILE=/database/config/db2inst1/sqllib/userprofile | ||
| echo "export DB2_KRB5_PRINCIPAL=db2/[email protected]" >> $USERPROFILE | ||
| echo "export KRB5_KTNAME=/var/custom/db2.keytab" >> $USERPROFILE | ||
| # This trick is needed because DB2 forwards environment variables automatically only if it's starting with DB2. | ||
| su - db2inst1 -c "db2set DB2ENVLIST=KRB5_KTNAME" | ||
|
|
||
| su - db2inst1 -c "db2 UPDATE DBM CFG USING SRVCON_GSSPLUGIN_LIST IBMkrb5 IMMEDIATE" | ||
| su - db2inst1 -c "db2 UPDATE DBM CFG USING SRVCON_AUTH KERBEROS IMMEDIATE" | ||
|
|
||
| su - db2inst1 -c "db2stop force; db2start" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,6 @@ import org.apache.spark.sql.Row | |
| import org.apache.spark.sql.types.{BooleanType, ByteType, ShortType, StructType} | ||
| import org.apache.spark.tags.DockerTest | ||
|
|
||
|
|
||
| @DockerTest | ||
| @Ignore // AMPLab Jenkins needs to be updated before shared memory works on docker | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not relevant change. Since docker tests are not integrated into jenkins we can turn this on.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm seeing that other test suites for other DBMS don't have this, so good to remove to make it consistent.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really following the discussion; are you guys saying this line should be removed? Because there's nothing changing here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, as other tests don't have this. It's not a kind of "should be", but "can be".
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And it's removed in code diff as of now.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see it removed. It's still there.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the confusion, I should be more clearer - the change is removed, in other words, rolled back. No change. |
||
| class DB2IntegrationSuite extends DockerJDBCIntegrationSuite { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.jdbc | ||
|
|
||
| import java.security.PrivilegedExceptionAction | ||
| import java.sql.Connection | ||
| import javax.security.auth.login.Configuration | ||
|
|
||
| import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} | ||
| import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} | ||
| import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS | ||
|
|
||
| import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions | ||
| import org.apache.spark.sql.execution.datasources.jdbc.connection.{DB2ConnectionProvider, SecureConnectionProvider} | ||
| import org.apache.spark.tags.DockerTest | ||
|
|
||
| @DockerTest | ||
| class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { | ||
| override protected val userName = s"db2/$dockerIp" | ||
| override protected val keytabFileName = "db2.keytab" | ||
|
|
||
| override val db = new DatabaseOnDocker { | ||
| override val imageName = "ibmcom/db2:11.5.0.0a" | ||
| override val env = Map( | ||
| "DB2INST1_PASSWORD" -> "rootpass", | ||
| "LICENSE" -> "accept", | ||
| "DBNAME" -> "db2" | ||
| ) | ||
| override val usesIpc = false | ||
| override val jdbcPort = 50000 | ||
| override val privileged = true | ||
| override def getJdbcUrl(ip: String, port: Int): String = s"jdbc:db2://$ip:$port/db2" | ||
| override def getJdbcProperties() = { | ||
| val options = new JDBCOptions(Map[String, String]( | ||
| JDBCOptions.JDBC_URL -> getJdbcUrl(dockerIp, externalPort), | ||
| JDBCOptions.JDBC_TABLE_NAME -> "bar", | ||
| JDBCOptions.JDBC_KEYTAB -> keytabFileName, | ||
| JDBCOptions.JDBC_PRINCIPAL -> principal | ||
| )) | ||
| new DB2ConnectionProvider(null, options).getAdditionalProperties() | ||
| } | ||
|
|
||
| override def beforeContainerStart( | ||
| hostConfigBuilder: HostConfig.Builder, | ||
| containerConfigBuilder: ContainerConfig.Builder): Unit = { | ||
| copyExecutableResource("db2_krb_setup.sh", initDbDir, replaceIp) | ||
|
|
||
| hostConfigBuilder.appendBinds( | ||
| HostConfig.Bind.from(initDbDir.getAbsolutePath) | ||
| .to("/var/custom").readOnly(true).build() | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| override protected def setAuthentication(keytabFile: String, principal: String): Unit = { | ||
| val config = new SecureConnectionProvider.JDBCConfiguration( | ||
| Configuration.getConfiguration, "JaasClient", keytabFile, principal) | ||
| Configuration.setConfiguration(config) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this safe when scanning tables in different secure databases ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch! I'll create a separate jira to handle config synchronisation globally...
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spark can scan different JDBC relations concurrently though, could we synchronized them easily?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the solution shouldn't be complicated but it effects all other providers which change the configuration (not just DB2).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've filed https://issues.apache.org/jira/browse/SPARK-31575 to handle the issue. |
||
| } | ||
|
|
||
| override def getConnection(): Connection = { | ||
| val config = new org.apache.hadoop.conf.Configuration | ||
| SecurityUtil.setAuthenticationMethod(KERBEROS, config) | ||
| UserGroupInformation.setConfiguration(config) | ||
|
|
||
| UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFullPath).doAs( | ||
| new PrivilegedExceptionAction[Connection]() { | ||
| override def run(): Connection = { | ||
| DB2KrbIntegrationSuite.super.getConnection() | ||
| } | ||
| } | ||
| ) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,8 @@ | |
| package org.apache.spark.sql.jdbc | ||
|
|
||
| import java.net.ServerSocket | ||
| import java.sql.Connection | ||
| import java.sql.{Connection, DriverManager} | ||
| import java.util.Properties | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.util.control.NonFatal | ||
|
|
@@ -53,11 +54,21 @@ abstract class DatabaseOnDocker { | |
| */ | ||
| val jdbcPort: Int | ||
|
|
||
| /** | ||
| * Parameter whether the container should run privileged. | ||
| */ | ||
| val privileged: Boolean = false | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DB2 docker requires privileged run. |
||
|
|
||
| /** | ||
| * Return a JDBC URL that connects to the database running at the given IP address and port. | ||
| */ | ||
| def getJdbcUrl(ip: String, port: Int): String | ||
|
|
||
| /** | ||
| * Return the JDBC properties needed for the connection. | ||
| */ | ||
| def getJdbcProperties(): Properties = new Properties() | ||
|
|
||
| /** | ||
| * Optional entry point when container starts | ||
| * | ||
|
|
@@ -118,6 +129,7 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu | |
| port | ||
| } | ||
| val hostConfigBuilder = HostConfig.builder() | ||
| .privileged(db.privileged) | ||
| .networkMode("bridge") | ||
| .ipcMode(if (db.usesIpc) "host" else "") | ||
| .portBindings( | ||
|
|
@@ -142,12 +154,11 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu | |
| // Start the container and wait until the database can accept JDBC connections: | ||
| docker.startContainer(containerId) | ||
| jdbcUrl = db.getJdbcUrl(dockerIp, externalPort) | ||
| eventually(timeout(1.minute), interval(1.second)) { | ||
| val conn = java.sql.DriverManager.getConnection(jdbcUrl) | ||
| conn.close() | ||
| var conn: Connection = null | ||
| eventually(timeout(2.minutes), interval(1.second)) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Single connection simplification + timeout increase. |
||
| conn = getConnection() | ||
| } | ||
| // Run any setup queries: | ||
| val conn: Connection = java.sql.DriverManager.getConnection(jdbcUrl) | ||
| try { | ||
| dataPreparation(conn) | ||
| } finally { | ||
|
|
@@ -183,6 +194,13 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Return the JDBC connection. | ||
| */ | ||
| def getConnection(): Connection = { | ||
| DriverManager.getConnection(jdbcUrl, db.getJdbcProperties()) | ||
| } | ||
|
|
||
| /** | ||
| * Prepare databases and tables for testing. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,8 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite | |
| } | ||
| } | ||
|
|
||
| protected def replaceIp(s: String): String = s.replace("__IP_ADDRESS_REPLACE_ME__", dockerIp) | ||
|
|
||
| protected def copyExecutableResource( | ||
| fileName: String, dir: File, processLine: String => String = identity) = { | ||
| val newEntry = new File(dir.getAbsolutePath, fileName) | ||
|
|
@@ -100,7 +102,7 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite | |
| } | ||
|
|
||
| override def dataPreparation(conn: Connection): Unit = { | ||
| conn.prepareStatement("CREATE TABLE bar (c0 text)").executeUpdate() | ||
| conn.prepareStatement("CREATE TABLE bar (c0 VARCHAR(8))").executeUpdate() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DB2 doesn't support text.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be surprised if this change affects others, but it may be worth to test others manually and mention the result.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When doing modifications I'm always re-executing all of them. This has happened here too.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And not to avoid the question all passed :) |
||
| conn.prepareStatement("INSERT INTO bar VALUES ('hello')").executeUpdate() | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.jdbc.connection | ||
|
|
||
| import java.security.PrivilegedExceptionAction | ||
| import java.sql.{Connection, Driver} | ||
| import java.util.Properties | ||
|
|
||
| import org.apache.hadoop.security.UserGroupInformation | ||
|
|
||
| import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions | ||
|
|
||
| private[sql] class DB2ConnectionProvider(driver: Driver, options: JDBCOptions) | ||
| extends SecureConnectionProvider(driver, options) { | ||
| override val appEntry: String = "JaasClient" | ||
|
|
||
| override def getConnection(): Connection = { | ||
| setAuthenticationConfigIfNeeded() | ||
| UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( | ||
| new PrivilegedExceptionAction[Connection]() { | ||
| override def run(): Connection = { | ||
| DB2ConnectionProvider.super.getConnection() | ||
| } | ||
| } | ||
| ) | ||
| } | ||
|
|
||
| override def getAdditionalProperties(): Properties = { | ||
| val result = new Properties() | ||
| // 11 is the integer value for kerberos | ||
| result.put("securityMechanism", new String("11")) | ||
| result.put("KerberosServerPrincipal", options.principal) | ||
| result | ||
| } | ||
|
|
||
| override def setAuthenticationConfigIfNeeded(): Unit = { | ||
gaborgsomogyi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val (parent, configEntry) = getConfigWithAppEntry() | ||
| if (configEntry == null || configEntry.isEmpty) { | ||
| setAuthenticationConfig(parent) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private[sql] object DB2ConnectionProvider { | ||
| val driverClass = "com.ibm.db2.jcc.DB2Driver" | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This trick is needed because DB2 forwards environment variables automatically only if it's starting with
DB2(KRB5_KTNAMEdoesn't fit).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to add this as "comment" to reduce the hops to finally reach this comment on understanding this trick.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, added.