diff --git a/jdbc/pom.xml b/jdbc/pom.xml index f4e97c955ce..73c66c07201 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -104,6 +104,12 @@ 1.0.8 test + + + org.apache.commons + commons-dbcp2 + 2.0.1 + diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 0655f3a65a3..5f784d7eb7b 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -19,15 +19,16 @@ import java.nio.charset.StandardCharsets; import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.*; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.dbcp2.ConnectionFactory; +import org.apache.commons.dbcp2.DriverManagerConnectionFactory; +import org.apache.commons.dbcp2.PoolableConnectionFactory; +import org.apache.commons.dbcp2.PoolingDriver; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.hadoop.security.UserGroupInformation; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -99,14 +100,15 @@ public class JDBCInterpreter extends Interpreter { static final String EMPTY_COLUMN_VALUE = ""; + private final String CONCURRENT_EXECUTION_KEY = "zeppelin.jdbc.concurrent.use"; private final String CONCURRENT_EXECUTION_COUNT = "zeppelin.jdbc.concurrent.max_connection"; + private final String DBCP_STRING = "jdbc:apache:commons:dbcp:"; + private final HashMap propertiesMap; private final Map paragraphIdStatementMap; - - private final Map> propertyKeyUnusedConnectionListMap; - private final Map paragraphIdConnectionMap; + private final Map poolingDriverMap; private final Map propertyKeySqlCompleterMap; @@ -122,9 +124,8 @@ public InterpreterCompletion apply(CharSequence seq) { public JDBCInterpreter(Properties property) { super(property); propertiesMap = new HashMap<>(); - propertyKeyUnusedConnectionListMap = new HashMap<>(); paragraphIdStatementMap = new HashMap<>(); - paragraphIdConnectionMap = new HashMap<>(); + poolingDriverMap = new HashMap<>(); propertyKeySqlCompleterMap = new HashMap<>(); } @@ -193,22 +194,41 @@ private SqlCompleter createSqlCompleter(Connection jdbcConnection) { return completer; } + private boolean isConnectionInPool(String driverName) { + if (poolingDriverMap.containsKey(driverName)) return true; + return false; + } + + private void createConnectionPool(String url, String propertyKey, Properties properties) { + ConnectionFactory connectionFactory = + new DriverManagerConnectionFactory(url, properties); + + PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory( + connectionFactory, null); + ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory); + + poolableConnectionFactory.setPool(connectionPool); + PoolingDriver driver = new PoolingDriver(); + driver.registerPool(propertyKey, connectionPool); + + poolingDriverMap.put(propertyKey, driver); + } + + private Connection getConnectionFromPool(String url, String propertyKey, Properties properties) + throws SQLException { + if (!isConnectionInPool(propertyKey)) { + createConnectionPool(url, propertyKey, properties); + } + + return DriverManager.getConnection(DBCP_STRING + propertyKey); + } + public Connection getConnection(String propertyKey, String user) throws ClassNotFoundException, SQLException, InterpreterException { Connection connection = null; if (propertyKey == null || propertiesMap.get(propertyKey) == null) { return null; } - if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) { - ArrayList connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey); - if (0 != connectionList.size()) { - connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0); - if (null != connection && connection.isClosed()) { - connection.close(); - connection = null; - } - } - } if (null == connection) { final Properties properties = (Properties) propertiesMap.get(propertyKey).clone(); logger.info(properties.getProperty(DRIVER_KEY)); @@ -222,16 +242,16 @@ public Connection getConnection(String propertyKey, String user) switch (authType) { case KERBEROS: if (user == null) { - connection = DriverManager.getConnection(url, properties); + connection = getConnectionFromPool(url, propertyKey, properties); } else { if ("hive".equalsIgnoreCase(propertyKey)) { - connection = DriverManager.getConnection(url + ";hive.server2.proxy.user=" + user, - properties); + connection = getConnectionFromPool(url + ";hive.server2.proxy.user=" + user, + propertyKey, properties); } else { UserGroupInformation ugi = null; try { ugi = UserGroupInformation.createProxyUser(user, - UserGroupInformation.getCurrentUser()); + UserGroupInformation.getCurrentUser()); } catch (Exception e) { logger.error("Error in createProxyUser", e); StringBuilder stringBuilder = new StringBuilder(); @@ -239,11 +259,13 @@ public Connection getConnection(String propertyKey, String user) stringBuilder.append(e.getCause()); throw new InterpreterException(stringBuilder.toString()); } + + final String poolKey = propertyKey; try { connection = ugi.doAs(new PrivilegedExceptionAction() { @Override public Connection run() throws Exception { - return DriverManager.getConnection(url, properties); + return getConnectionFromPool(url, poolKey, properties); } }); } catch (Exception e) { @@ -258,7 +280,7 @@ public Connection run() throws Exception { break; default: - connection = DriverManager.getConnection(url, properties); + connection = getConnectionFromPool(url, propertyKey, properties); } } } @@ -266,75 +288,41 @@ public Connection run() throws Exception { return connection; } - public Statement getStatement(String propertyKey, String paragraphId, - InterpreterContext interpreterContext) - throws SQLException, ClassNotFoundException, InterpreterException { - Connection connection; - - if (paragraphIdConnectionMap.containsKey(paragraphId + - interpreterContext.getAuthenticationInfo().getUser())) { - connection = paragraphIdConnectionMap.get(paragraphId + - interpreterContext.getAuthenticationInfo().getUser()); - } else { - connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser()); - } - - if (connection == null) { - return null; + private void initStatementMap() { + for (Statement statement : paragraphIdStatementMap.values()) { + try { + statement.close(); + } catch (Exception e) { + logger.error("Error while closing paragraphIdStatementMap statement...", e); + } } + paragraphIdStatementMap.clear(); + } - Statement statement = connection.createStatement(); - if (isStatementClosed(statement)) { - connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser()); - statement = connection.createStatement(); + private void initConnectionPoolMap() throws SQLException { + Iterator it = poolingDriverMap.keySet().iterator(); + while (it.hasNext()) { + String driverName = it.next(); + poolingDriverMap.get(driverName).closePool(driverName); + it.remove(); } - paragraphIdConnectionMap.put(paragraphId + interpreterContext.getAuthenticationInfo().getUser(), - connection); - paragraphIdStatementMap.put(paragraphId + interpreterContext.getAuthenticationInfo().getUser(), - statement); + poolingDriverMap.clear(); + } - return statement; + private void saveStatement(String key, Statement statement) throws SQLException { + paragraphIdStatementMap.put(key, statement); + statement.setMaxRows(getMaxResult()); } - private boolean isStatementClosed(Statement statement) { - try { - return statement.isClosed(); - } catch (Throwable t) { - logger.debug("{} doesn't support isClosed method", statement); - return false; - } + private void removeStatement(String key) { + paragraphIdStatementMap.remove(key); } @Override public void close() { try { - for (List connectionList : propertyKeyUnusedConnectionListMap.values()) { - for (Connection c : connectionList) { - try { - c.close(); - } catch (Exception e) { - logger.error("Error while closing propertyKeyUnusedConnectionListMap connection...", e); - } - } - } - - for (Statement statement : paragraphIdStatementMap.values()) { - try { - statement.close(); - } catch (Exception e) { - logger.error("Error while closing paragraphIdStatementMap statement...", e); - } - } - paragraphIdStatementMap.clear(); - - for (Connection connection : paragraphIdConnectionMap.values()) { - try { - connection.close(); - } catch (Exception e) { - logger.error("Error while closing paragraphIdConnectionMap connection...", e); - } - } - paragraphIdConnectionMap.clear(); + initStatementMap(); + initConnectionPoolMap(); } catch (Exception e) { logger.error("Error while closing...", e); } @@ -342,17 +330,21 @@ public void close() { private InterpreterResult executeSql(String propertyKey, String sql, InterpreterContext interpreterContext) { - String paragraphId = interpreterContext.getParagraphId(); + Connection connection; + Statement statement; + ResultSet resultSet = null; try { + connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser()); + if (connection == null) { + return new InterpreterResult(Code.ERROR, "Prefix not found."); + } - Statement statement = getStatement(propertyKey, paragraphId, interpreterContext); - + statement = connection.createStatement(); if (statement == null) { return new InterpreterResult(Code.ERROR, "Prefix not found."); } - statement.setMaxRows(getMaxResult()); StringBuilder msg = null; boolean isTableType = false; @@ -364,8 +356,9 @@ private InterpreterResult executeSql(String propertyKey, String sql, isTableType = true; } - ResultSet resultSet = null; try { + saveStatement(paragraphId + + interpreterContext.getAuthenticationInfo().getUser(), statement); boolean isResultSetAvailable = statement.execute(sql); @@ -408,16 +401,24 @@ private InterpreterResult executeSql(String propertyKey, String sql, msg.append(updateCount).append(NEWLINE); } } finally { - try { - if (resultSet != null) { + if (resultSet != null) { + try { resultSet.close(); - } - statement.close(); - } finally { - statement = null; + } catch (SQLException e) { /*ignored*/ } } + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { /*ignored*/ } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { /*ignored*/ } + } + removeStatement(paragraphId + + interpreterContext.getAuthenticationInfo().getUser()); } - return new InterpreterResult(Code.SUCCESS, msg.toString()); } catch (Exception e) { @@ -452,7 +453,6 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr cmd = cmd.trim(); logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd); - return executeSql(propertyKey, cmd, contextInterpreter); }