diff --git a/docs/category.json b/docs/category.json index e90c674a2e9..563aacbecd0 100644 --- a/docs/category.json +++ b/docs/category.json @@ -33,7 +33,8 @@ "user/ppl/functions/string.rst", "user/ppl/functions/condition.rst", "user/ppl/functions/relevance.rst", - "user/ppl/functions/expressions.rst" + "user/ppl/functions/expressions.rst", + "user/ppl/admin/jdbc.rst" ], "sql_cli": [ "user/dql/expressions.rst", diff --git a/docs/user/ppl/admin/jdbc.rst b/docs/user/ppl/admin/jdbc.rst new file mode 100644 index 00000000000..343e28034c3 --- /dev/null +++ b/docs/user/ppl/admin/jdbc.rst @@ -0,0 +1,80 @@ +.. highlight:: sh + +============== +JDBC Connector +============== + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 1 + + +Introduction +============ + +This page covers JDBC connector properties for dataSource configuration and the nuances associated with JDBC connector. + + +JDBC Connector Properties in DataSource Configuration +===================================================== +JDBC Connector Properties. + +* ``url`` [Required]. + * This parameters provides the URL to connect to a database instance provided endpoint. +* ``driver`` [Required]. + * This parameters provides the Driver to connect to a database instance provided endpoint. Only support ``org.apache.hive.jdbc.HiveDriver`` +* ``username`` [Optional]. + * This username for basicauth. +* ``password`` [Optional]. + * This password for basicauth. + +Example dataSource configuration with basic authentications +=========================================================== + +No Auth :: + + [{ + "name" : "myspark", + "connector": "jdbc", + "properties" : { + "url" : "jdbc:hive2://localhost:10000/default", + "driver" : "org.apache.hive.jdbc.HiveDriver" + } + }] + +Basic Auth :: + + [{ + "name" : "myspark", + "connector": "jdbc", + "properties" : { + "url" : "jdbc:hive2://localhost:10000/default", + "driver" : "org.apache.hive.jdbc.HiveDriver", + "username" : "username", + "password" : "password" + } + }] + +PPL supported for jdbc connector +================================ + +JDBC Table Function +------------------- +JDBC datasource could execute direct SQL against the target database. The SQL must be supported by target database. + +Example:: + + os> source = myspark.jdbc('SHOW DATABASES'); + fetched rows / total rows = 1/1 + +-------------+ + | namespace | + |-------------| + | default | + +-------------+ + +Limitation +================================ + +* PPL command other source is not supported. for example, if user use ``source = myspark.jdbc('SHOW DATABASES') | fields namespace``, query engine will throw exception. diff --git a/docs/user/ppl/index.rst b/docs/user/ppl/index.rst index a69136bb192..9ac0a1f3a3d 100644 --- a/docs/user/ppl/index.rst +++ b/docs/user/ppl/index.rst @@ -38,6 +38,8 @@ The query start with search command and then flowing a set of command delimited - `Prometheus Connector `_ + - `JDBC Connector `_ + * **Commands** - `Syntax `_ diff --git a/doctest/build.gradle b/doctest/build.gradle index 7b52a1cfa4e..d1a73a5c33b 100644 --- a/doctest/build.gradle +++ b/doctest/build.gradle @@ -32,6 +32,54 @@ task bootstrap(type: Exec, dependsOn: ['cloneSqlCli']) { } +String SPARK_VERSION = "spark-3.3.2" +String SPARK_BINARY = "${SPARK_VERSION}-bin-hadoop3"; + +task startMaster(type: SpawnProcessTask) { + doFirst { + download.run { + src "https://dlcdn.apache.org/spark/${SPARK_VERSION}/${SPARK_BINARY}.tgz" + dest new File("$projectDir/bin", "${SPARK_BINARY}.tgz") + overwrite false + } + copy { + from tarTree("$projectDir/bin/${SPARK_BINARY}.tgz") + into "$projectDir/bin" + } + } + command "$projectDir/bin/${SPARK_BINARY}/bin/spark-class org.apache.spark.deploy.master.Master -h localhost -p 7077 --webui-port 8080" + ready 'started' + pidLockFileName '.spark-master.pid.lock' +} + +task startWorker(type: SpawnProcessTask, dependsOn: startMaster) { + command "$projectDir/bin/${SPARK_BINARY}/bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077" + ready 'started' + pidLockFileName '.spark-worker.pid.lock' +} + +task startThrift(type: SpawnProcessTask, dependsOn: startWorker) { + command "$projectDir/bin/${SPARK_BINARY}/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 spark://localhost:7077" + ready 'started' + pidLockFileName '.spark-thriftserver.pid.lock' +} + +task stopMaster(type: KillProcessTask) { + pidLockFileName '.spark-master.pid.lock' +} + +task stopWorker(type: KillProcessTask, dependsOn: stopMaster) { + pidLockFileName '.spark-worker.pid.lock' +} + +task stopThrift(type: KillProcessTask, dependsOn: stopWorker) { + pidLockFileName '.spark-thriftserver.pid.lock' + doLast { + file("$projectDir/bin/${SPARK_BINARY}").deleteDir() + file("$projectDir/bin/${SPARK_BINARY}.tgz").delete() + } +} + task startPrometheus(type: SpawnProcessTask) { doFirst { download.run { @@ -99,51 +147,27 @@ task stopPrometheus() { } } } + + +clean.doLast { + file("$projectDir/bin/${SPARK_BINARY}").deleteDir() + file("$projectDir/bin/${SPARK_BINARY}.tgz").delete() +} + if(getOSFamilyType() != "windows") { stopPrometheus.mustRunAfter startPrometheus - startOpenSearch.dependsOn startPrometheus - stopOpenSearch.finalizedBy stopPrometheus + stopThrift.mustRunAfter startThrift + startOpenSearch.dependsOn(startPrometheus, startThrift) + stopOpenSearch.finalizedBy(stopPrometheus, stopThrift) } doctest.dependsOn startOpenSearch doctest.finalizedBy stopOpenSearch check.dependsOn doctest clean.dependsOn(cleanBootstrap) -// 2.0.0-alpha1-SNAPSHOT -> 2.0.0.0-alpha1-SNAPSHOT -String opensearch_no_snapshot = opensearch_version.replace('-SNAPSHOT', '') -String[] version_tokens = opensearch_no_snapshot.tokenize('-') -String opensearch_build = version_tokens[0] + '.0' -if (version_tokens.length > 1) { - opensearch_build += '-' + version_tokens[1] -} -String mlCommonsRemoteFile = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-ml-' + opensearch_build + '.zip' -String mlCommonsPlugin = 'opensearch-ml' - testClusters { docTestCluster { keystore 'plugins.query.federation.datasources.config', new File("$projectDir/datasource", 'datasources.json') - // Disable loading of `ML-commons` plugin, because it might be unavailable (not released yet). - /* - plugin(provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - File dir = new File('./doctest/' + mlCommonsPlugin) - if (!dir.exists()) { - dir.mkdirs() - } - File f = new File(dir, mlCommonsPlugin + '-' + opensearch_build + '.zip') - if (!f.exists()) { - new URL(mlCommonsRemoteFile).withInputStream{ ins -> f.withOutputStream{ it << ins } } - } - return fileTree(mlCommonsPlugin).getSingleFile() - } - } - } - })) - */ plugin ':opensearch-sql-plugin' testDistribution = 'integ_test' } diff --git a/doctest/datasource/datasources.json b/doctest/datasource/datasources.json index 5f195747ae0..78ee0d05c90 100644 --- a/doctest/datasource/datasources.json +++ b/doctest/datasource/datasources.json @@ -5,5 +5,13 @@ "properties" : { "prometheus.uri" : "http://localhost:9090" } + }, + { + "name" : "myspark", + "connector": "jdbc", + "properties" : { + "url" : "jdbc:hive2://localhost:10000/default", + "driver": "org.apache.hive.jdbc.HiveDriver" + } } -] \ No newline at end of file +] diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 5a707a17b0d..fdcaf3563c0 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -65,7 +65,7 @@ configurations.all { resolutionStrategy.force 'junit:junit:4.13.2' resolutionStrategy.force "commons-logging:commons-logging:1.2" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 - resolutionStrategy.force 'commons-codec:commons-codec:1.13' + resolutionStrategy.force 'commons-codec:commons-codec:1.15' resolutionStrategy.force 'com.google.guava:guava:31.0.1-jre' resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" @@ -78,6 +78,7 @@ configurations.all { resolutionStrategy.force "org.apache.httpcomponents:httpcore:4.4.13" resolutionStrategy.force "joda-time:joda-time:2.10.12" resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" + resolutionStrategy.force "org.apache.httpcomponents:httpclient:4.5.13" } configurations { diff --git a/jdbc/build.gradle b/jdbc/build.gradle new file mode 100644 index 00000000000..a1153457a1c --- /dev/null +++ b/jdbc/build.gradle @@ -0,0 +1,118 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java-library' + id "io.freefair.lombok" + id 'jacoco' + id 'info.solidsoft.pitest' version '1.9.0' +} + +dependencies { + implementation project(':core') + implementation project(':common') + runtimeOnly('org.apache.hive:hive-jdbc:3.1.3') { + exclude group: "org.apache.hadoop", module: "hadoop-common" + exclude group: 'org.apache.zookeeper' + exclude group: "org.apache.hive", module: "hive-metastore" + exclude group: "org.apache.hive", module: "hive-shims" + exclude group: 'org.apache.hive', module: 'hive-llap-server' + exclude group: 'org.apache.hive', module: 'hive-upgrade-acid' + + // exclude for resolving version conflict + exclude group: 'org.apache.httpcomponents', module: 'httpcore' + exclude group: 'org.apache.httpcomponents', module: 'httpclient' + + // exclude for include with transitive = false + exclude group: "org.apache.hive", module: "hive-common" + exclude group: "org.apache.hive", module: "hive-service" + exclude group: "org.apache.hive", module: "hive-serde" + exclude group: 'org.apache.hive', module: 'hive-service-rpc' + + // exclude because of CVE-2019-0205 + exclude group: 'org.apache.thrift', module: 'libthrift' + } + runtimeOnly('org.apache.httpcomponents:httpcore:4.4.12') + runtimeOnly('org.apache.httpcomponents:httpclient:4.5.13') { + exclude group: 'commons-codec', module: 'commons-codec' + } + runtimeOnly('org.apache.hive:hive-service:3.1.3') { + transitive = false + } + runtimeOnly('org.apache.hive:hive-serde:3.1.3') { + transitive = false + } + runtimeOnly('org.apache.hive:hive-common:3.1.3') { + transitive = false + } + runtimeOnly('org.apache.hive:hive-service-rpc:3.1.3') { + transitive = false + } + runtimeOnly('commons-codec:commons-codec:1.13') { + transitive = false + } + runtimeOnly('org.apache.thrift:libthrift:0.18.1') + runtimeOnly 'commons-lang:commons-lang:2.6' + + testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') + testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' +} + +pitest { + targetClasses = ['org.opensearch.sql.*'] + pitestVersion = '1.9.0' + threads = 4 + outputFormats = ['HTML', 'XML'] + timestampedReports = false + junit5PluginVersion = '1.0.0' +} + +test { + useJUnitPlatform() + testLogging { + events "skipped", "failed" + exceptionFormat "full" + } +} + +jacocoTestReport { + reports { + html.enabled true + xml.enabled true + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +test.finalizedBy(project.tasks.jacocoTestReport) + +jacocoTestCoverageVerification { + violationRules { + rule { + element = 'CLASS' + excludes = [ + ] + limit { + counter = 'LINE' + minimum = 1.0 + } + limit { + counter = 'BRANCH' + minimum = 1.0 + } + } + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +check.dependsOn jacocoTestCoverageVerification +jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/JDBCStorageEngine.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/JDBCStorageEngine.java new file mode 100644 index 00000000000..f1d35040946 --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/JDBCStorageEngine.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc; + +import java.util.Collection; +import java.util.Collections; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.DataSourceSchemaName; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.expression.function.FunctionResolver; +import org.opensearch.sql.jdbc.functions.JDBCTableFunctionResolver; +import org.opensearch.sql.jdbc.parser.PropertiesParser; +import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.Table; + +/** + * JDBC {@link StorageEngine} only support direct query. + */ +@RequiredArgsConstructor +public class JDBCStorageEngine implements StorageEngine { + + private final DataSourceMetadata dataSourceMetadata; + + @Override + public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) { + throw new SemanticCheckException("JDBC does not support getTable operation"); + } + + @Override + public Collection getFunctions() { + return Collections.singletonList( + new JDBCTableFunctionResolver(dataSourceMetadata, new PropertiesParser())); + } +} diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/JDBCStorageFactory.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/JDBCStorageFactory.java new file mode 100644 index 00000000000..79fd980a081 --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/JDBCStorageFactory.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc; + +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; + +public class JDBCStorageFactory implements DataSourceFactory { + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.JDBC; + } + + @Override + public DataSource createDataSource(DataSourceMetadata metadata) { + return new DataSource(metadata.getName(), DataSourceType.JDBC, new JDBCStorageEngine(metadata)); + } +} diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/functions/JDBCFunction.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/functions/JDBCFunction.java new file mode 100644 index 00000000000..0a3c816b5c0 --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/functions/JDBCFunction.java @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc.functions; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.FunctionExpression; +import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.expression.function.TableFunctionImplementation; +import org.opensearch.sql.jdbc.operator.JDBCQueryOperator; +import org.opensearch.sql.planner.DefaultImplementor; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalProject; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.storage.Table; +import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.read.TableScanBuilder; + +/** + * JDBC function definition. + */ +public class JDBCFunction extends FunctionExpression implements TableFunctionImplementation { + + private final String sqlQuery; + + private final Properties properties; + + /** + * constructor. + */ + public JDBCFunction( + FunctionName functionName, String sqlQuery, Properties properties) { + super(functionName, List.of()); + this.sqlQuery = sqlQuery; + this.properties = properties; + } + + @Override + public ExprValue valueOf(Environment valueEnv) { + throw new SemanticCheckException("JDBC function is only supported in source command"); + } + + @Override + public ExprType type() { + return ExprCoreType.STRUCT; + } + + @Override + public Table applyArguments() { + return new JDBCFunctionTable(); + } + + /** + * The table created from {@link JDBCFunction}. + */ + @VisibleForTesting + protected class JDBCFunctionTable implements Table { + /** + * return empty map at query analysis stage. + */ + @Override + public Map getFieldTypes() { + return ImmutableMap.of(); + } + + // todo, the implement interface should be removed. https://github.com/opensearch-project/sql/issues/1463 + @SuppressWarnings("deprecation") + @Override + public PhysicalPlan implement(LogicalPlan plan) { + return plan.accept(new DefaultImplementor<>(), null); + } + + @Override + public TableScanBuilder createScanBuilder() { + return new JDBCFunctionTableScanBuilder(); + } + } + + /** + * {@link JDBCFunctionTable} scan builder. + */ + @VisibleForTesting + protected class JDBCFunctionTableScanBuilder extends TableScanBuilder { + + @Override + public TableScanOperator build() { + return new JDBCQueryOperator(sqlQuery, properties); + } + + /** + * PPL by default add a LogicalProject operator. It should be ignored. + */ + @Override + public boolean pushDownProject(LogicalProject project) { + return true; + } + } +} diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/functions/JDBCTableFunctionResolver.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/functions/JDBCTableFunctionResolver.java new file mode 100644 index 00000000000..541ad085ab5 --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/functions/JDBCTableFunctionResolver.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc.functions; + +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Locale; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.sql.common.antlr.SyntaxCheckException; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.exception.ExpressionEvaluationException; +import org.opensearch.sql.expression.function.FunctionBuilder; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.expression.function.FunctionResolver; +import org.opensearch.sql.expression.function.FunctionSignature; +import org.opensearch.sql.jdbc.parser.PropertiesParser; + +/** + * JDBC datasource defined {@link FunctionResolver}. + */ +@RequiredArgsConstructor +public class JDBCTableFunctionResolver implements FunctionResolver { + + public static final FunctionName JDBC_FUNCTION_NAME = FunctionName.of("jdbc"); + + @VisibleForTesting + public static final FunctionSignature JDBC_FUNCTION_SIGNATURE = + new FunctionSignature(JDBC_FUNCTION_NAME, List.of(STRING)); + + private final DataSourceMetadata dataSourceMetadata; + + private final PropertiesParser propertiesParser; + + @Override + public Pair resolve(FunctionSignature functionSignature) { + FunctionBuilder functionBuilder = + (properties, arguments) -> { + try { + String sqlQuery = arguments.get(0).valueOf().stringValue(); + return new JDBCFunction( + JDBC_FUNCTION_NAME, + sqlQuery, + propertiesParser.parse(dataSourceMetadata.getProperties())); + } catch (ExpressionEvaluationException e) { + throw new SyntaxCheckException( + String.format( + Locale.ROOT, + "SQL statement is required. For example %s.jdbc('select * from table')", + dataSourceMetadata.getName())); + } + }; + return Pair.of(JDBC_FUNCTION_SIGNATURE, functionBuilder); + } + + @Override + public FunctionName getFunctionName() { + return JDBC_FUNCTION_NAME; + } +} diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCQueryOperator.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCQueryOperator.java new file mode 100644 index 00000000000..37baa648841 --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCQueryOperator.java @@ -0,0 +1,129 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc.operator; + +import static org.opensearch.sql.jdbc.operator.JDBCResponseHandle.execute; +import static org.opensearch.sql.jdbc.parser.PropertiesParser.DRIVER; +import static org.opensearch.sql.jdbc.parser.PropertiesParser.URL; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientException; +import java.sql.Statement; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.storage.TableScanOperator; + +/** + * Define JDBC function as TableScanOperator. + */ +@RequiredArgsConstructor +public class JDBCQueryOperator extends TableScanOperator { + + private final String url; + + private final String driver; + + private final String sqlQuery; + + private final Properties properties; + + private Connection connection; + + private Statement statement; + + private JDBCResponseHandle jdbcResponseHandle; + + /** + * constructor. + */ + public JDBCQueryOperator(String sqlQuery, Properties properties) { + this.sqlQuery = sqlQuery; + this.properties = properties; + this.url = properties.getProperty(URL); + this.driver = properties.getProperty(DRIVER); + } + + @Override + public String explain() { + return String.format(Locale.ROOT, "jdbc(%s)", sqlQuery); + } + + @Override + public void open() { + AccessController.doPrivileged( + (PrivilegedAction>) + () -> + handleSQLException( + () -> { + try { + Class.forName(driver); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + connection = DriverManager.getConnection(url, properties); + statement = connection.createStatement(); + jdbcResponseHandle = execute(statement, sqlQuery); + return null; + })); + } + + @Override + public void close() { + try { + if (jdbcResponseHandle != null) { + jdbcResponseHandle.close(); + } + if (statement != null) { + statement.close(); + } + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return handleSQLException(() -> jdbcResponseHandle.hasNext()); + } + + @Override + public ExprValue next() { + return handleSQLException(jdbcResponseHandle::next); + } + + /** + * Schema is determined at query execution time. + */ + @Override + public ExecutionEngine.Schema schema() { + return jdbcResponseHandle.schema(); + } + + private static T handleSQLException(SQLOperation action) { + try { + return action.execute(); + } catch (SQLNonTransientException e) { + throw new IllegalArgumentException(e); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + interface SQLOperation { + T execute() throws SQLException; + } +} diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCResponseHandle.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCResponseHandle.java new file mode 100644 index 00000000000..b203311cfe5 --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCResponseHandle.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc.operator; + +import java.sql.SQLException; +import java.sql.Statement; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; + +/** + * handle JDBC response. + */ +public interface JDBCResponseHandle { + + /** + * execute sql query. + * + * @param statement {@link Statement} + * @param sqlQuery sql query + * @return {@link JDBCResponseHandle} + */ + static JDBCResponseHandle execute(Statement statement, String sqlQuery) throws SQLException { + boolean execute = statement.execute(sqlQuery); + return execute ? new JDBCResultSetResponseHandle(statement.getResultSet()) : + new JDBCUpdateCountResponseHandle(statement.getUpdateCount()); + } + + /** + * Return true if JDBC response has more result. + */ + boolean hasNext() throws SQLException; + + /** + * Return JDBC response as {@link ExprValue}. Attention, the method must been called when + * hasNext return true. + */ + ExprValue next() throws SQLException; + + /** + * Close {@link JDBCResponseHandle}. + */ + void close() throws SQLException; + + /** + * Return ExecutionEngine.Schema of the JDBC response. + */ + ExecutionEngine.Schema schema(); +} diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCResultSetResponseHandle.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCResultSetResponseHandle.java new file mode 100644 index 00000000000..68016dd12bb --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCResultSetResponseHandle.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc.operator; + +import static org.opensearch.sql.data.type.ExprCoreType.BOOLEAN; +import static org.opensearch.sql.data.type.ExprCoreType.BYTE; +import static org.opensearch.sql.data.type.ExprCoreType.DATE; +import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.LONG; +import static org.opensearch.sql.data.type.ExprCoreType.SHORT; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.data.type.ExprCoreType.TIME; +import static org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.exception.ExpressionEvaluationException; +import org.opensearch.sql.executor.ExecutionEngine; + +/** + * Handle JDBC {@link ResultSet} response. + */ +public class JDBCResultSetResponseHandle implements JDBCResponseHandle { + + private final ResultSet resultSet; + + private final List columnHandleList; + + /** + * constructor. + */ + public JDBCResultSetResponseHandle(ResultSet resultSet) { + this.resultSet = resultSet; + try { + ResultSetMetaData metaData = resultSet.getMetaData(); + columnHandleList = new ArrayList<>(); + int columnCount = metaData.getColumnCount(); + for (int i = 1; i <= columnCount; i++) { + // the default type is STRING. + ExprType exprType = jdbcTypeToCoreType(metaData.getColumnType(i)); + columnHandleList.add(new ColumnHandle(i, metaData.getColumnName(i), exprType)); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws SQLException { + resultSet.close(); + } + + @Override + public boolean hasNext() throws SQLException { + return resultSet.next(); + } + + @Override + public ExprValue next() throws SQLException { + LinkedHashMap result = new LinkedHashMap<>(); + for (ColumnHandle columnHandle : columnHandleList) { + result.put(columnHandle.getName(), columnHandle.parse(resultSet)); + } + return new ExprTupleValue(result); + } + + @Override + public ExecutionEngine.Schema schema() { + return new ExecutionEngine.Schema( + columnHandleList.stream() + .map(c -> new ExecutionEngine.Schema.Column(c.getName(), c.getName(), c.getType())) + .collect(Collectors.toList())); + } + + @Getter + @RequiredArgsConstructor + static class ColumnHandle { + private final int index; + + private final String name; + + private final ExprType type; + + ExprValue parse(ResultSet rs) throws SQLException { + return new JDBCRowExprValue(rs.getObject(index)); + } + } + + @RequiredArgsConstructor + static class JDBCRowExprValue implements ExprValue { + + private final Object value; + + @Override + public Object value() { + return value; + } + + @Override + public ExprType type() { + throw new ExpressionEvaluationException("[BUG] - invalid to get type on JDBCRowExprValue"); + } + + @Override + public int compareTo(ExprValue o) { + throw new ExpressionEvaluationException("[BUG] - invalid to compare on JDBCRowExprValue"); + } + } + + /** + * Mapping JDBC type to Core Engine {@link ExprType}. + * + * @param type jdbc type. + * @return ExprType. + */ + static ExprType jdbcTypeToCoreType(int type) { + switch (type) { + case Types.BIT: + case Types.BOOLEAN: + return BOOLEAN; + + case Types.TINYINT: + return BYTE; + + case Types.SMALLINT: + return SHORT; + + case Types.INTEGER: + return INTEGER; + + case Types.BIGINT: + return LONG; + + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + case Types.NUMERIC: + case Types.DECIMAL: + return DOUBLE; + + case Types.CHAR: + case Types.NCHAR: + case Types.VARCHAR: + case Types.NVARCHAR: + case Types.LONGVARCHAR: + case Types.LONGNVARCHAR: + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + return STRING; + + case Types.DATE: + return DATE; + + case Types.TIME: + return TIME; + + case Types.TIMESTAMP: + return TIMESTAMP; + + // we assume the result is json encoded string. refer https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.0.0.2/ds_Hive/jdbc-hs2.html, + case Types.ARRAY: + case Types.JAVA_OBJECT: + case Types.STRUCT: + return STRING; + default: + return STRING; + } + } +} diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCUpdateCountResponseHandle.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCUpdateCountResponseHandle.java new file mode 100644 index 00000000000..3b5cf80afc7 --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/operator/JDBCUpdateCountResponseHandle.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc.operator; + +import static org.opensearch.sql.data.model.ExprValueUtils.integerValue; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import com.google.common.collect.UnmodifiableIterator; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; + +public class JDBCUpdateCountResponseHandle implements JDBCResponseHandle { + + private final UnmodifiableIterator iterator; + + public JDBCUpdateCountResponseHandle(int count) { + iterator = Iterators.singletonIterator(integerValue(count)); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ExprValue next() { + return iterator.next(); + } + + @Override + public void close() { + // do nothing + } + + @Override + public ExecutionEngine.Schema schema() { + return new ExecutionEngine.Schema( + ImmutableList.of(new ExecutionEngine.Schema.Column("result", "result", INTEGER))); + } +} diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/parser/Option.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/parser/Option.java new file mode 100644 index 00000000000..c394f6dd2e2 --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/parser/Option.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc.parser; + +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import lombok.Builder; +import lombok.Getter; +import org.opensearch.sql.exception.SemanticCheckException; + +/** Describe a single datasource configuration option. */ +@Builder +public class Option { + /** Configuration name. */ + @Getter private final String name; + + /** It the configuration required. */ + private final boolean required; + + /** + * Resolve the datasource property. + * + * @param properties properties. + * @return optional value. + */ + public Optional resolve(Map properties) { + if (required && !properties.containsKey(name)) { + throw new SemanticCheckException( + String.format(Locale.ROOT, "%s is required in datasource configuration", name)); + } else { + return Optional.ofNullable(properties.get(name)); + } + } +} diff --git a/jdbc/src/main/java/org/opensearch/sql/jdbc/parser/PropertiesParser.java b/jdbc/src/main/java/org/opensearch/sql/jdbc/parser/PropertiesParser.java new file mode 100644 index 00000000000..84e6119904b --- /dev/null +++ b/jdbc/src/main/java/org/opensearch/sql/jdbc/parser/PropertiesParser.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.jdbc.parser; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * JDBC datasource properties parser. + */ +public class PropertiesParser { + + public static final String URL = "url"; + + public static final String USERNAME = "username"; + + public static final String PASSWORD = "password"; + + public static final String DRIVER = "driver"; + + private final List