Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 51 additions & 30 deletions jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,16 @@ public class JDBCInterpreter extends KerberosInterpreter {
private static final String DBCP_STRING = "jdbc:apache:commons:dbcp:";
private static final String MAX_ROWS_KEY = "zeppelin.jdbc.maxRows";

private final HashMap<String, Properties> basePropretiesMap;
private static final Set<String> PRESTO_PROPERTIES = new HashSet<>(Arrays.asList(
"user", "password",
"socksProxy", "httpProxy", "clientTags", "applicationNamePrefix", "accessToken",
"SSL", "SSLKeyStorePath", "SSLKeyStorePassword", "SSLTrustStorePath",
"SSLTrustStorePassword", "KerberosRemoteServiceName", "KerberosPrincipal",
"KerberosUseCanonicalHostname", "KerberosServicePrincipalPattern",
"KerberosConfigPath", "KerberosKeytabPath", "KerberosCredentialCachePath",
"extraCredentials", "roles", "sessionProperties"));

private final HashMap<String, Properties> basePropertiesMap;
private final HashMap<String, JDBCUserConfigurations> jdbcUserConfigurationsMap;
private final HashMap<String, SqlCompleter> sqlCompletersMap;

Expand All @@ -153,7 +162,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
public JDBCInterpreter(Properties property) {
super(property);
jdbcUserConfigurationsMap = new HashMap<>();
basePropretiesMap = new HashMap<>();
basePropertiesMap = new HashMap<>();
sqlCompletersMap = new HashMap<>();
maxLineResults = MAX_LINE_DEFAULT;
}
Expand All @@ -180,7 +189,7 @@ protected boolean runKerberosLogin() {
}

public HashMap<String, Properties> getPropertiesMap() {
return basePropretiesMap;
return basePropertiesMap;
}

@Override
Expand All @@ -193,20 +202,20 @@ public void open() {
logger.debug("key: {}, value: {}", keyValue[0], keyValue[1]);

Properties prefixProperties;
if (basePropretiesMap.containsKey(keyValue[0])) {
prefixProperties = basePropretiesMap.get(keyValue[0]);
if (basePropertiesMap.containsKey(keyValue[0])) {
prefixProperties = basePropertiesMap.get(keyValue[0]);
} else {
prefixProperties = new Properties();
basePropretiesMap.put(keyValue[0].trim(), prefixProperties);
basePropertiesMap.put(keyValue[0].trim(), prefixProperties);
}
prefixProperties.put(keyValue[1].trim(), getProperty(propertyKey));
}
}

Set<String> removeKeySet = new HashSet<>();
for (String key : basePropretiesMap.keySet()) {
for (String key : basePropertiesMap.keySet()) {
if (!COMMON_KEY.equals(key)) {
Properties properties = basePropretiesMap.get(key);
Properties properties = basePropertiesMap.get(key);
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
key, DRIVER_KEY, key, key, URL_KEY);
Expand All @@ -216,9 +225,9 @@ public void open() {
}

for (String key : removeKeySet) {
basePropretiesMap.remove(key);
basePropertiesMap.remove(key);
}
logger.debug("JDBC PropretiesMap: {}", basePropretiesMap);
logger.debug("JDBC PropretiesMap: {}", basePropertiesMap);

setMaxLineResults();
setMaxRows();
Expand All @@ -238,9 +247,9 @@ protected boolean isKerboseEnabled() {
}

private void setMaxLineResults() {
if (basePropretiesMap.containsKey(COMMON_KEY) &&
basePropretiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) {
maxLineResults = Integer.valueOf(basePropretiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY));
if (basePropertiesMap.containsKey(COMMON_KEY) &&
basePropertiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) {
maxLineResults = Integer.valueOf(basePropertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY));
}
}

Expand Down Expand Up @@ -346,9 +355,9 @@ private String getJDBCDriverName(String user, String propertyKey) {
}

private boolean existAccountInBaseProperty(String propertyKey) {
return basePropretiesMap.get(propertyKey).containsKey(USER_KEY) &&
!isEmpty((String) basePropretiesMap.get(propertyKey).get(USER_KEY)) &&
basePropretiesMap.get(propertyKey).containsKey(PASSWORD_KEY);
return basePropertiesMap.get(propertyKey).containsKey(USER_KEY) &&
!isEmpty((String) basePropertiesMap.get(propertyKey).get(USER_KEY)) &&
basePropertiesMap.get(propertyKey).containsKey(PASSWORD_KEY);
}

private UsernamePassword getUsernamePassword(InterpreterContext interpreterContext,
Expand Down Expand Up @@ -384,14 +393,14 @@ private void setUserProperty(String propertyKey, InterpreterContext interpreterC
String user = interpreterContext.getAuthenticationInfo().getUser();

JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user);
if (basePropretiesMap.get(propertyKey).containsKey(USER_KEY) &&
!basePropretiesMap.get(propertyKey).getProperty(USER_KEY).isEmpty()) {
String password = getPassword(basePropretiesMap.get(propertyKey));
if (basePropertiesMap.get(propertyKey).containsKey(USER_KEY) &&
!basePropertiesMap.get(propertyKey).getProperty(USER_KEY).isEmpty()) {
String password = getPassword(basePropertiesMap.get(propertyKey));
if (!isEmpty(password)) {
basePropretiesMap.get(propertyKey).setProperty(PASSWORD_KEY, password);
basePropertiesMap.get(propertyKey).setProperty(PASSWORD_KEY, password);
}
}
jdbcUserConfigurations.setPropertyMap(propertyKey, basePropretiesMap.get(propertyKey));
jdbcUserConfigurations.setPropertyMap(propertyKey, basePropertiesMap.get(propertyKey));
if (existAccountInBaseProperty(propertyKey)) {
return;
}
Expand All @@ -407,7 +416,19 @@ private void setUserProperty(String propertyKey, InterpreterContext interpreterC
}

private void createConnectionPool(String url, String user, String propertyKey,
Properties properties) throws SQLException, ClassNotFoundException {
Properties properties) throws SQLException, ClassNotFoundException, IOException {

String driverClass = properties.getProperty(DRIVER_KEY);
if (driverClass != null && (driverClass.equals("com.facebook.presto.jdbc.PrestoDriver")
|| driverClass.equals("io.prestosql.jdbc.PrestoDriver"))) {
// Only add valid properties otherwise presto won't work.
for (Object key : properties.keySet()) {
if (!PRESTO_PROPERTIES.contains(key.toString())) {
properties.remove(key);
}
}
}

ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(url, properties);

Expand All @@ -420,14 +441,14 @@ private void createConnectionPool(String url, String user, String propertyKey,
ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory);

poolableConnectionFactory.setPool(connectionPool);
Class.forName(properties.getProperty(DRIVER_KEY));
Class.forName(driverClass);
PoolingDriver driver = new PoolingDriver();
driver.registerPool(propertyKey + user, connectionPool);
getJDBCConfiguration(user).saveDBDriverPool(propertyKey, driver);
}

private Connection getConnectionFromPool(String url, String user, String propertyKey,
Properties properties) throws SQLException, ClassNotFoundException {
Properties properties) throws SQLException, ClassNotFoundException, IOException {
String jdbcDriver = getJDBCDriverName(user, propertyKey);

if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) {
Expand All @@ -440,7 +461,7 @@ public Connection getConnection(String propertyKey, InterpreterContext interpret
throws ClassNotFoundException, SQLException, InterpreterException, IOException {
final String user = interpreterContext.getAuthenticationInfo().getUser();
Connection connection;
if (propertyKey == null || basePropretiesMap.get(propertyKey) == null) {
if (propertyKey == null || basePropertiesMap.get(propertyKey) == null) {
return null;
}

Expand All @@ -465,7 +486,7 @@ public Connection getConnection(String propertyKey, InterpreterContext interpret
getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) {
connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
} else {
if (basePropretiesMap.get(propertyKey).containsKey("proxy.user.property")) {
if (basePropertiesMap.get(propertyKey).containsKey("proxy.user.property")) {
connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
} else {
UserGroupInformation ugi = null;
Expand Down Expand Up @@ -505,17 +526,17 @@ private String appendProxyUserToURL(String url, String user, String propertyKey)
StringBuilder connectionUrl = new StringBuilder(url);

if (user != null && !user.equals("anonymous") &&
basePropretiesMap.get(propertyKey).containsKey("proxy.user.property")) {
basePropertiesMap.get(propertyKey).containsKey("proxy.user.property")) {

Integer lastIndexOfUrl = connectionUrl.indexOf("?");
if (lastIndexOfUrl == -1) {
lastIndexOfUrl = connectionUrl.length();
}
logger.info("Using proxy user as :" + user);
logger.info("Using proxy property for user as :" +
basePropretiesMap.get(propertyKey).getProperty("proxy.user.property"));
basePropertiesMap.get(propertyKey).getProperty("proxy.user.property"));
connectionUrl.insert(lastIndexOfUrl, ";" +
basePropretiesMap.get(propertyKey).getProperty("proxy.user.property") + "=" + user + ";");
basePropertiesMap.get(propertyKey).getProperty("proxy.user.property") + "=" + user + ";");
} else if (user != null && !user.equals("anonymous") && url.contains("hive")) {
logger.warn("User impersonation for hive has changed please refer: http://zeppelin.apache" +
".org/docs/latest/interpreter/jdbc.html#apache-hive");
Expand Down Expand Up @@ -608,7 +629,7 @@ private boolean isDDLCommand(int updatedCount, int columnCount) throws SQLExcept

public InterpreterResult executePrecode(InterpreterContext interpreterContext) {
InterpreterResult interpreterResult = null;
for (String propertyKey : basePropretiesMap.keySet()) {
for (String propertyKey : basePropertiesMap.keySet()) {
String precode = getProperty(String.format("%s.precode", propertyKey));
if (StringUtils.isNotBlank(precode)) {
interpreterResult = executeSql(propertyKey, precode, interpreterContext);
Expand Down