diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 610618afad4..54c897b45fa 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -98,6 +98,9 @@ public class JDBCInterpreter extends Interpreter { static final String EMPTY_COLUMN_VALUE = ""; + private final String CONCURRENT_EXECUTION_KEY = "zeppelin.jdbc.concurrent.use"; + private final String CONCURRENT_EXECUTION_COUNT = "zeppelin.jdbc.concurrent.max_connection"; + private final HashMap propertiesMap; private final Map paragraphIdStatementMap; @@ -433,8 +436,10 @@ public int getProgress(InterpreterContext context) { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - JDBCInterpreter.class.getName() + this.hashCode()); + String schedulerName = JDBCInterpreter.class.getName() + this.hashCode(); + return isConcurrentExecution() ? + SchedulerFactory.singleton().createOrGetParallelScheduler(schedulerName, 10) + : SchedulerFactory.singleton().createOrGetFIFOScheduler(schedulerName); } @Override @@ -452,5 +457,17 @@ public int getMaxResult() { return Integer.valueOf( propertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT)); } + + boolean isConcurrentExecution() { + return Boolean.valueOf(getProperty(CONCURRENT_EXECUTION_KEY)); + } + + int getMaxConcurrentConnection() { + try { + return Integer.valueOf(getProperty(CONCURRENT_EXECUTION_COUNT)); + } catch (Exception e) { + return 10; + } + } } diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json index 97b2c618103..069880c2d02 100644 --- a/jdbc/src/main/resources/interpreter-setting.json +++ b/jdbc/src/main/resources/interpreter-setting.json @@ -33,6 +33,18 @@ "propertyName": "common.max_count", "defaultValue": "1000", "description": "Max number of SQL result to display." + }, + "zeppelin.jdbc.concurrent.use": { + "envName": null, + "propertyName": "zeppelin.jdbc.concurrent.use", + "defaultValue": "true", + "description": "Use parallel scheduler" + }, + "zeppelin.jdbc.concurrent.max_connection": { + "envName": null, + "propertyName": "zeppelin.jdbc.concurrent.max_connection", + "defaultValue": "10", + "description": "Number of concurrent execution" } } } diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java index 065f4ed5b88..317dbcf7778 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java @@ -22,6 +22,8 @@ import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER; import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL; import static org.apache.zeppelin.jdbc.JDBCInterpreter.COMMON_MAX_LINE; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.Files; @@ -32,6 +34,9 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.jdbc.JDBCInterpreter; +import org.apache.zeppelin.scheduler.FIFOScheduler; +import org.apache.zeppelin.scheduler.ParallelScheduler; +import org.apache.zeppelin.scheduler.Scheduler; import org.junit.Before; import org.junit.Test; @@ -200,4 +205,27 @@ public void testSelectQueryMaxResult() throws SQLException, IOException { assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type()); assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message()); } + + @Test + public void concurrentSettingTest() { + Properties properties = new Properties(); + properties.setProperty("zeppelin.jdbc.concurrent.use", "true"); + properties.setProperty("zeppelin.jdbc.concurrent.max_connection", "10"); + JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties); + + assertTrue(jdbcInterpreter.isConcurrentExecution()); + assertEquals(10, jdbcInterpreter.getMaxConcurrentConnection()); + + Scheduler scheduler = jdbcInterpreter.getScheduler(); + assertTrue(scheduler instanceof ParallelScheduler); + + properties.clear(); + properties.setProperty("zeppelin.jdbc.concurrent.use", "false"); + jdbcInterpreter = new JDBCInterpreter(properties); + + assertFalse(jdbcInterpreter.isConcurrentExecution()); + + scheduler = jdbcInterpreter.getScheduler(); + assertTrue(scheduler instanceof FIFOScheduler); + } }