diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 6d03f6e8e66..8088dfc7592 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -72,7 +72,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/jdbc/README.md b/jdbc/README.md new file mode 100644 index 00000000000..74088e9f509 --- /dev/null +++ b/jdbc/README.md @@ -0,0 +1,25 @@ +# Interpreter Overview # + +The Zeppelin JDBC interpreter is meant to connect to JDBC backends whose drivers cannot be packaged with +Zeppelin due to licensing concerns. Examples are: SQL Server, Mysql. This interpreter is currently +only compatible with those and Postgresql, although it is easy to add support for your favorite JDBC backend. + +### Setting up a driver + +You should download your JDBC driver and place the .jar file somewhere that the interpreter can locate it, +e.g., "your_zeppelin_home"/interpreter/jdbc/. The interpreter will load and register your driver dynamically. + +### Interpreter Settings + +After launching Zeppelin, go to the interpreter settings menu, create a '%jdbc' interpreter, and set your +driver name, type, and location. + +### Adding support for a JDBC backend + +Since this interpreter uses java.sql.DriverManager, it supports all JDBC drivers with the same java code. +However, JDBC driver names and connection url formats vary. The only thing needed to support a new backend +is to modify JDBCConnectionUrlBuilder.java, and add a connection url method to use the right format for +your backend. + + + diff --git a/jdbc/pom.xml b/jdbc/pom.xml new file mode 100644 index 00000000000..8dbf40c22e1 --- /dev/null +++ b/jdbc/pom.xml @@ -0,0 +1,135 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.6.0-incubating-SNAPSHOT + + + org.apache.zeppelin + zeppelin-jdbc + jar + 0.6.0-incubating-SNAPSHOT + Zeppelin: JDBC interpreter + http://zeppelin.incubator.apache.org + + + + ${project.groupId} + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.commons + commons-exec + 1.1 + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + junit + junit + test + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/jdbc + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/jdbc + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + + + diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/DriverShim.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/DriverShim.java new file mode 100644 index 00000000000..75d54872988 --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/DriverShim.java @@ -0,0 +1,57 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ + +package org.apache.zeppelin.jdbc; + +import java.util.Properties; +import java.util.logging.Logger; +import java.sql.*; + +/** + * JDBC interpreter for Zeppelin. + * + * @author Andres Celis t-ancel@microsoft.com + * + */ +public class DriverShim implements Driver { + private Driver driver; + DriverShim(Driver d) { + this.driver = d; + } + public boolean acceptsURL(String u) throws SQLException { + return this.driver.acceptsURL(u); + } + public Connection connect(String u, Properties p) throws SQLException { + return this.driver.connect(u, p); + } + public int getMajorVersion() { + return this.driver.getMajorVersion(); + } + public int getMinorVersion() { + return this.driver.getMinorVersion(); + } + public DriverPropertyInfo[] getPropertyInfo(String u, Properties p) throws SQLException { + return this.driver.getPropertyInfo(u, p); + } + public Logger getParentLogger() { + return null; + } + public boolean jdbcCompliant() { + return this.driver.jdbcCompliant(); + } +} diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCConnectionUrlBuilder.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCConnectionUrlBuilder.java new file mode 100644 index 00000000000..dc05065c0eb --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCConnectionUrlBuilder.java @@ -0,0 +1,78 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ + +package org.apache.zeppelin.jdbc; + +/** + * JDBC connection url builder for Zeppelin. + * + * @author Andres Celis t-ancel@microsoft.com + * + */ + +// add case and connection url method to support other jdbc backends +public class JDBCConnectionUrlBuilder { + + private String connectionUrl; + + public JDBCConnectionUrlBuilder(String driverType, String host, String port, + String dbName, String windowsAuth) { + // determine format + switch(driverType) { + case "sqlserver": + buildSqlserverConnectionUrl(host, port, dbName, windowsAuth); + break; + case "postgresql": + buildPostgresqlConnectionUrl(host, port, dbName); + break; + case "mysql": + buildMysqlConnectionUrl(host, port, dbName); + break; + default: + this.connectionUrl = null; + } + } + + private void buildSqlserverConnectionUrl(String host, String port, + String dbName, String windowsAuth) { + this.connectionUrl = "jdbc:sqlserver://"; + this.connectionUrl += (host.equals("")) ? "localhost" : host; + this.connectionUrl += (port.equals("")) ? ":1433;" : ":" + port + ";"; + this.connectionUrl += (dbName.equals("")) ? "" : "database=" + dbName + ";"; + // assume false or empty is SQL authentication + this.connectionUrl += (windowsAuth.equals("true")) ? "integratedsecurity=true;" : ""; + } + + private void buildPostgresqlConnectionUrl(String host, String port, String dbName) { + this.connectionUrl = "jdbc:postgresql://"; + this.connectionUrl += (host.equals("")) ? "localhost" : host; + this.connectionUrl += (port.equals("")) ? "" : ":" + port; + this.connectionUrl += (dbName.equals("")) ? "" : "/" + dbName; + } + + private void buildMysqlConnectionUrl(String host, String port, String dbName) { + this.connectionUrl = "jdbc:mysql://"; + this.connectionUrl += host; + this.connectionUrl += (port.equals("")) ? "" : ":" + port; + this.connectionUrl += (dbName.equals("")) ? "" : "/" + dbName; + } + + public String getConnectionUrl() { + return this.connectionUrl; + } +} diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java new file mode 100644 index 00000000000..bcf8fa7d4e8 --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -0,0 +1,316 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ + +package org.apache.zeppelin.jdbc; + +import org.apache.zeppelin.interpreter.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; + +import java.util.List; +import java.util.HashMap; +import java.util.Properties; +import java.util.Vector; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * JDBC interpreter for Zeppelin. + * + * @author Andres Celis t-ancel@microsoft.com + * + */ + +public class JDBCInterpreter extends Interpreter { + Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class); + int commandTimeOut = 600000; + Connection conn = null; + ResultSet rs = null; + String connectionUrl; + HashMap stmts; + + static { + Interpreter.register( + "jdbc", + "jdbc", + JDBCInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add("jdbc.host", + "localhost", + "your hostname") + .add("jdbc.user", + "", + "your ID") + .add("jdbc.password", + "", + "your password") + .add("jdbc.port", + "1433", + "your port") + .add("jdbc.driver.name", + "", + "your JDBC driver name (e.g., sqlserver, postgresql, mysql)") + .add("jdbc.driver.classname", + "", + "your jdbc driver class name (e.g., com.microsoft.sqlserver.jdbc." + + "SQLServerDriver, org.postgresql.Driver, com.mysql.jdbc.Driver)") + .add("jdbc.driver.url", + "", + "url to your JDBC driver (e.g.," + + " jar:file://interpreter/jdbc/JDBCdriver.jar!/)") + .add("jdbc.database.name", + "", + "your database") + .add("jdbc.windows.auth", + "", + "true/false, sql authentication if false (optional)") + .build()); + } + + + public JDBCInterpreter(Properties property) { + super(property); + } + + + private String loadJDBCDriver(String url, String classname) { + try { + URL u = new URL(url); + URLClassLoader ucl = new URLClassLoader(new URL[] { u }); + Driver d = (Driver) Class.forName(classname, true, ucl).newInstance(); + DriverManager.registerDriver(new DriverShim(d)); + } + catch (Exception e) { + logger.error("Error loading driver: ", e); + return e.getMessage(); + } + return null; + } + + @Override + public void open() { + String host = ""; + String port = ""; + String user = ""; + String password = ""; + String driverName = ""; + String driverClassName = ""; + String driverUrl = ""; + String dbName = ""; + String windowsAuth = ""; + stmts = new HashMap(); + + // get Properties + Properties intpProperty = getProperty(); + for (Object k : intpProperty.keySet()) { + String key = (String) k; + String value = (String) intpProperty.get(key); + + switch (key) { + case "jdbc.host": + host = value; + break; + case "jdbc.port": + port = value; + break; + case "jdbc.user": + user = value; + break; + case "jdbc.password": + password = value; + break; + case "jdbc.driver.name": + driverName = value; + break; + case "jdbc.driver.classname": + driverClassName = value; + break; + case "jdbc.driver.url": + driverUrl = value; + break; + case "jdbc.database.name": + dbName = value; + break; + case "jdbc.windows.auth": + windowsAuth = value; + break; + default: + logger.info("else key : /" + key + "/"); + break; + } + } + + // enforce populated properties + if (driverClassName.equals("") || driverUrl.equals("") || driverName.equals("")) { + logger.info("Must specify JDBC driver in interpreter settings"); + } else { + // try loading driver, report errors + String msg = loadJDBCDriver(driverUrl, driverClassName); + if (msg != null) { + logger.info("Cannot load JDBC driver: " + msg); + } + } + + // build connection string + JDBCConnectionUrlBuilder jdbcUrl = new JDBCConnectionUrlBuilder( + driverName, host, port, dbName, windowsAuth); + connectionUrl = jdbcUrl.getConnectionUrl(); + if (connectionUrl == null) { + logger.info("Connection URL format unknown for: " + driverName); + } + + logger.info("Connect to " + connectionUrl); + + try { + + // connect to JDBC backend + Properties info = new Properties(); + info.setProperty("user", user); + info.setProperty("password", password); + conn = DriverManager.getConnection(connectionUrl, info); + // connection achieved + logger.info("Connection achieved: " + connectionUrl); + } catch ( SQLException e ) { + logger.error("Connection failed: ", e); + } + } + + @Override + public void close() { + try { + conn.close(); + } catch ( SQLException e ) { + logger.error("Close connection failed: ", e); + } + } + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext context) { + logger.info("Run SQL command '" + cmd + "'"); + Statement stmt; + String pId = context.getParagraphId(); + try { + stmt = conn.createStatement(); + stmts.put(pId, stmt); + + // execute the query + rs = stmt.executeQuery(cmd); + + // format result as zeppelin table + StringBuilder queryResult = new StringBuilder(); + queryResult.append("%table "); + Vector columnNames = new Vector(); + + if (rs != null) { + ResultSetMetaData columns = rs.getMetaData(); + + for ( int i = 1; i <= columns.getColumnCount(); ++i ) { + if ( i != 1 ) { + queryResult.append("\t"); + } + queryResult.append(columns.getColumnName(i)); + columnNames.add(columns.getColumnName(i)); + } + queryResult.append("\n"); + + logger.info(columnNames.toString()); + + while ( rs.next() ) { + for ( int i = 0; i < columnNames.size(); ++i) { + if ( i != 0 ) { + queryResult.append("\t"); + } + queryResult.append(rs.getString(columnNames.get(i))); + } + queryResult.append("\n"); + } + } + + // disconnect + if (stmt != null) stmt.close(); + stmts.remove(pId); + + return new InterpreterResult(InterpreterResult.Code.SUCCESS, queryResult.toString()); + } catch ( SQLException e ) { + logger.error("Can not run " + cmd, e); + stmts.remove(pId); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + } + + @Override + public void cancel(InterpreterContext context) { + Statement stmt = stmts.get(context.getParagraphId()); + if (stmt != null) { + try { + stmt.cancel(); + } + catch (SQLException ex) { + logger.info("Connection: " + connectionUrl); + logger.info("Cancel failed on " + stmt); + logger.info("Caused by: " + ex.getMessage()); + } + finally { + stmts.remove(context.getParagraphId()); + } + } + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + int maxConcurrency = 10; + return SchedulerFactory.singleton().createOrGetParallelScheduler( + JDBCInterpreter.class.getName() + this.hashCode(), maxConcurrency); + } + + @Override + public List completion(String buf, int cursor) { + return null; + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 0d2eb5804b7..e369aff9d9f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -396,6 +396,7 @@ public static enum ConfVars { + "org.apache.zeppelin.lens.LensInterpreter," + "org.apache.zeppelin.cassandra.CassandraInterpreter," + "org.apache.zeppelin.geode.GeodeOqlInterpreter," + + "org.apache.zeppelin.jdbc.JDBCInterpreter," + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),