Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
<version>1.0.8</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>

<build>
Expand Down
198 changes: 99 additions & 99 deletions jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
import java.nio.charset.StandardCharsets;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.*;
import java.util.*;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.dbcp2.ConnectionFactory;
import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
import org.apache.commons.dbcp2.PoolableConnectionFactory;
import org.apache.commons.dbcp2.PoolingDriver;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
Expand Down Expand Up @@ -99,14 +100,15 @@ public class JDBCInterpreter extends Interpreter {

static final String EMPTY_COLUMN_VALUE = "";


private final String CONCURRENT_EXECUTION_KEY = "zeppelin.jdbc.concurrent.use";
private final String CONCURRENT_EXECUTION_COUNT = "zeppelin.jdbc.concurrent.max_connection";

private final String DBCP_STRING = "jdbc:apache:commons:dbcp:";

private final HashMap<String, Properties> propertiesMap;
private final Map<String, Statement> paragraphIdStatementMap;

private final Map<String, ArrayList<Connection>> propertyKeyUnusedConnectionListMap;
private final Map<String, Connection> paragraphIdConnectionMap;
private final Map<String, PoolingDriver> poolingDriverMap;

private final Map<String, SqlCompleter> propertyKeySqlCompleterMap;

Expand All @@ -122,9 +124,8 @@ public InterpreterCompletion apply(CharSequence seq) {
public JDBCInterpreter(Properties property) {
super(property);
propertiesMap = new HashMap<>();
propertyKeyUnusedConnectionListMap = new HashMap<>();
paragraphIdStatementMap = new HashMap<>();
paragraphIdConnectionMap = new HashMap<>();
poolingDriverMap = new HashMap<>();
propertyKeySqlCompleterMap = new HashMap<>();
}

Expand Down Expand Up @@ -193,22 +194,41 @@ private SqlCompleter createSqlCompleter(Connection jdbcConnection) {
return completer;
}

private boolean isConnectionInPool(String driverName) {
if (poolingDriverMap.containsKey(driverName)) return true;
return false;
}

private void createConnectionPool(String url, String propertyKey, Properties properties) {
ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(url, properties);

PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
connectionFactory, null);
ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory);

poolableConnectionFactory.setPool(connectionPool);
PoolingDriver driver = new PoolingDriver();
driver.registerPool(propertyKey, connectionPool);

poolingDriverMap.put(propertyKey, driver);
}

private Connection getConnectionFromPool(String url, String propertyKey, Properties properties)
throws SQLException {
if (!isConnectionInPool(propertyKey)) {
createConnectionPool(url, propertyKey, properties);
}

return DriverManager.getConnection(DBCP_STRING + propertyKey);
}

public Connection getConnection(String propertyKey, String user)
throws ClassNotFoundException, SQLException, InterpreterException {
Connection connection = null;
if (propertyKey == null || propertiesMap.get(propertyKey) == null) {
return null;
}
if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
ArrayList<Connection> connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey);
if (0 != connectionList.size()) {
connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0);
if (null != connection && connection.isClosed()) {
connection.close();
connection = null;
}
}
}
if (null == connection) {
final Properties properties = (Properties) propertiesMap.get(propertyKey).clone();
logger.info(properties.getProperty(DRIVER_KEY));
Expand All @@ -222,28 +242,30 @@ public Connection getConnection(String propertyKey, String user)
switch (authType) {
case KERBEROS:
if (user == null) {
connection = DriverManager.getConnection(url, properties);
connection = getConnectionFromPool(url, propertyKey, properties);
} else {
if ("hive".equalsIgnoreCase(propertyKey)) {
connection = DriverManager.getConnection(url + ";hive.server2.proxy.user=" + user,
properties);
connection = getConnectionFromPool(url + ";hive.server2.proxy.user=" + user,
propertyKey, properties);
} else {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getCurrentUser());
UserGroupInformation.getCurrentUser());
} catch (Exception e) {
logger.error("Error in createProxyUser", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
throw new InterpreterException(stringBuilder.toString());
}

final String poolKey = propertyKey;
try {
connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
@Override
public Connection run() throws Exception {
return DriverManager.getConnection(url, properties);
return getConnectionFromPool(url, poolKey, properties);
}
});
} catch (Exception e) {
Expand All @@ -258,101 +280,71 @@ public Connection run() throws Exception {
break;

default:
connection = DriverManager.getConnection(url, properties);
connection = getConnectionFromPool(url, propertyKey, properties);
}
}
}
propertyKeySqlCompleterMap.put(propertyKey, createSqlCompleter(connection));
return connection;
}

public Statement getStatement(String propertyKey, String paragraphId,
InterpreterContext interpreterContext)
throws SQLException, ClassNotFoundException, InterpreterException {
Connection connection;

if (paragraphIdConnectionMap.containsKey(paragraphId +
interpreterContext.getAuthenticationInfo().getUser())) {
connection = paragraphIdConnectionMap.get(paragraphId +
interpreterContext.getAuthenticationInfo().getUser());
} else {
connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser());
}

if (connection == null) {
return null;
private void initStatementMap() {
for (Statement statement : paragraphIdStatementMap.values()) {
try {
statement.close();
} catch (Exception e) {
logger.error("Error while closing paragraphIdStatementMap statement...", e);
}
}
paragraphIdStatementMap.clear();
}

Statement statement = connection.createStatement();
if (isStatementClosed(statement)) {
connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser());
statement = connection.createStatement();
private void initConnectionPoolMap() throws SQLException {
Iterator<String> it = poolingDriverMap.keySet().iterator();
while (it.hasNext()) {
String driverName = it.next();
poolingDriverMap.get(driverName).closePool(driverName);
it.remove();
}
paragraphIdConnectionMap.put(paragraphId + interpreterContext.getAuthenticationInfo().getUser(),
connection);
paragraphIdStatementMap.put(paragraphId + interpreterContext.getAuthenticationInfo().getUser(),
statement);
poolingDriverMap.clear();
}

return statement;
private void saveStatement(String key, Statement statement) throws SQLException {
paragraphIdStatementMap.put(key, statement);
statement.setMaxRows(getMaxResult());
}

private boolean isStatementClosed(Statement statement) {
try {
return statement.isClosed();
} catch (Throwable t) {
logger.debug("{} doesn't support isClosed method", statement);
return false;
}
private void removeStatement(String key) {
paragraphIdStatementMap.remove(key);
}

@Override
public void close() {
try {
for (List<Connection> connectionList : propertyKeyUnusedConnectionListMap.values()) {
for (Connection c : connectionList) {
try {
c.close();
} catch (Exception e) {
logger.error("Error while closing propertyKeyUnusedConnectionListMap connection...", e);
}
}
}

for (Statement statement : paragraphIdStatementMap.values()) {
try {
statement.close();
} catch (Exception e) {
logger.error("Error while closing paragraphIdStatementMap statement...", e);
}
}
paragraphIdStatementMap.clear();

for (Connection connection : paragraphIdConnectionMap.values()) {
try {
connection.close();
} catch (Exception e) {
logger.error("Error while closing paragraphIdConnectionMap connection...", e);
}
}
paragraphIdConnectionMap.clear();
initStatementMap();
initConnectionPoolMap();
} catch (Exception e) {
logger.error("Error while closing...", e);
}
}

private InterpreterResult executeSql(String propertyKey, String sql,
InterpreterContext interpreterContext) {

String paragraphId = interpreterContext.getParagraphId();
Connection connection;
Statement statement;
ResultSet resultSet = null;

try {
connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser());
if (connection == null) {
return new InterpreterResult(Code.ERROR, "Prefix not found.");
}

Statement statement = getStatement(propertyKey, paragraphId, interpreterContext);

statement = connection.createStatement();
if (statement == null) {
return new InterpreterResult(Code.ERROR, "Prefix not found.");
}
statement.setMaxRows(getMaxResult());

StringBuilder msg = null;
boolean isTableType = false;
Expand All @@ -364,8 +356,9 @@ private InterpreterResult executeSql(String propertyKey, String sql,
isTableType = true;
}

ResultSet resultSet = null;
try {
saveStatement(paragraphId +
interpreterContext.getAuthenticationInfo().getUser(), statement);

boolean isResultSetAvailable = statement.execute(sql);

Expand Down Expand Up @@ -408,16 +401,24 @@ private InterpreterResult executeSql(String propertyKey, String sql,
msg.append(updateCount).append(NEWLINE);
}
} finally {
try {
if (resultSet != null) {
if (resultSet != null) {
try {
resultSet.close();
}
statement.close();
} finally {
statement = null;
} catch (SQLException e) { /*ignored*/ }
}
if (statement != null) {
try {
statement.close();
} catch (SQLException e) { /*ignored*/ }
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) { /*ignored*/ }
}
removeStatement(paragraphId +
interpreterContext.getAuthenticationInfo().getUser());
}

return new InterpreterResult(Code.SUCCESS, msg.toString());

} catch (Exception e) {
Expand Down Expand Up @@ -452,7 +453,6 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr
cmd = cmd.trim();

logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);

return executeSql(propertyKey, cmd, contextInterpreter);
}

Expand Down
Loading