From 70588d649c1a4a26f1de90bfa4bc6a068cd859de Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Wed, 7 Jun 2023 17:14:12 -0700 Subject: [PATCH 01/15] Create Spark Connector Signed-off-by: Vamsi Manohar --- .../sql/datasource/model/DataSourceType.java | 3 +- settings.gradle | 4 +- spark/.gitignore | 42 ++++++++++++ spark/build.gradle | 66 +++++++++++++++++++ .../sql/spark/SparkDataSourceFactory.java | 18 +++++ 5 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 spark/.gitignore create mode 100644 spark/build.gradle create mode 100644 spark/src/main/java/org/opensearch/sql/spark/SparkDataSourceFactory.java diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java index 48098b97410..5010e41942d 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -8,8 +8,7 @@ public enum DataSourceType { PROMETHEUS("prometheus"), OPENSEARCH("opensearch"), - JDBC("jdbc"); - + SPARK("spark"); private String text; DataSourceType(String text) { diff --git a/settings.gradle b/settings.gradle index 6f7214cb3ad..2140ad6c9e0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,4 +19,6 @@ include 'legacy' include 'sql' include 'prometheus' include 'benchmarks' -include 'datasources' \ No newline at end of file +include 'datasources' +include 'spark' + diff --git a/spark/.gitignore b/spark/.gitignore new file mode 100644 index 00000000000..b63da4551b2 --- /dev/null +++ b/spark/.gitignore @@ -0,0 +1,42 @@ +.gradle +build/ +!gradle/wrapper/gradle-wrapper.jar +!**/src/main/**/build/ +!**/src/test/**/build/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/spark/build.gradle b/spark/build.gradle new file mode 100644 index 00000000000..05b9d38864d --- /dev/null +++ b/spark/build.gradle @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java-library' + id "io.freefair.lombok" + id 'jacoco' +} + +repositories { + mavenCentral() +} + +dependencies { + api project(':core') + implementation project(':datasources') +} + +test { + useJUnitPlatform() + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + } +} + +jacocoTestReport { + reports { + html.enabled true + xml.enabled true + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +test.finalizedBy(project.tasks.jacocoTestReport) + +jacocoTestCoverageVerification { + violationRules { + rule { + element = 'CLASS' + excludes = [ + 'org.opensearch.sql.prometheus.data.constants.*' + ] + limit { + counter = 'LINE' + minimum = 1.0 + } + limit { + counter = 'BRANCH' + minimum = 1.0 + } + } + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +check.dependsOn jacocoTestCoverageVerification +jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/spark/src/main/java/org/opensearch/sql/spark/SparkDataSourceFactory.java b/spark/src/main/java/org/opensearch/sql/spark/SparkDataSourceFactory.java new file mode 100644 index 00000000000..0484213ae33 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/SparkDataSourceFactory.java @@ -0,0 +1,18 @@ +package org.opensearch.sql.spark; + +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; + +public class SparkDataSourceFactory implements DataSourceFactory { + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.SPARK; + } + + @Override + public DataSource createDataSource(DataSourceMetadata metadata) { + return null; + } +} From 2b8ceb7999d1d0436d664566174b6b6e06bd0d0e Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Wed, 21 Jun 2023 10:48:59 -0700 Subject: [PATCH 02/15] Add spark client and engine Signed-off-by: Rupal Mahajan --- .../sql/datasource/model/DataSourceType.java | 1 + spark/build.gradle | 3 ++ .../sql/spark/SparkDataSourceFactory.java | 18 ------- .../sql/spark/client/SparkClient.java | 15 ++++++ .../sql/spark/storage/SparkStorageEngine.java | 39 ++++++++++++++ .../spark/storage/SparkStorageFactory.java | 52 +++++++++++++++++++ 6 files changed, 110 insertions(+), 18 deletions(-) delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/SparkDataSourceFactory.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java index 5010e41942d..78c431a81f5 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -8,6 +8,7 @@ public enum DataSourceType { PROMETHEUS("prometheus"), OPENSEARCH("opensearch"), + JDBC("jdbc"), SPARK("spark"); private String text; diff --git a/spark/build.gradle b/spark/build.gradle index 05b9d38864d..4b661360b03 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -16,6 +16,9 @@ repositories { dependencies { api project(':core') implementation project(':datasources') + + implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" + implementation group: 'org.json', name: 'json', version: '20230227' } test { diff --git a/spark/src/main/java/org/opensearch/sql/spark/SparkDataSourceFactory.java b/spark/src/main/java/org/opensearch/sql/spark/SparkDataSourceFactory.java deleted file mode 100644 index 0484213ae33..00000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/SparkDataSourceFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.opensearch.sql.spark; - -import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.storage.DataSourceFactory; - -public class SparkDataSourceFactory implements DataSourceFactory { - @Override - public DataSourceType getDataSourceType() { - return DataSourceType.SPARK; - } - - @Override - public DataSource createDataSource(DataSourceMetadata metadata) { - return null; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java new file mode 100644 index 00000000000..f6ffc446483 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.client; + +import org.json.JSONObject; + +import java.io.IOException; + +public interface SparkClient { + + JSONObject sql(String query) throws IOException; +} \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java new file mode 100644 index 00000000000..1a39cfbd138 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java @@ -0,0 +1,39 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.storage; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.DataSourceSchemaName; +import org.opensearch.sql.expression.function.FunctionResolver; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.Table; + +import java.util.Collection; +import java.util.Collections; + + +/** + * Spark storage engine implementation. + */ +@RequiredArgsConstructor +public class SparkStorageEngine implements StorageEngine { + + private final SparkClient sparkClient; + + @Override + public Collection getFunctions() { + //TODO: add SqlTableFunctionResolver to list + return Collections.singletonList(null); + } + + @Override + public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { + return null; + } +} \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java new file mode 100644 index 00000000000..ab9db5e749e --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java @@ -0,0 +1,52 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.storage; + +import lombok.RequiredArgsConstructor; +import org.opensearch.client.Client; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.storage.DataSourceFactory; +import org.opensearch.sql.storage.StorageEngine; + +import java.util.Map; + +@RequiredArgsConstructor +public class SparkStorageFactory implements DataSourceFactory { + private final Client client; + private final Settings settings; + public static final String EMR_CLUSTER = "emr.cluster"; + public static final String OPENSEARCH_DOMAIN_ENDPOINT = "opensearch.domain"; + public static final String AUTH_TYPE = "emr.auth.type"; + public static final String REGION = "emr.auth.region"; + public static final String ROLE_ARN = "emr.auth.role_arn"; + public static final String ACCESS_KEY = "emr.auth.access_key"; + public static final String SECRET_KEY = "emr.auth.secret_key"; + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.SPARK; + } + + @Override + public DataSource createDataSource(DataSourceMetadata metadata) { + return new DataSource( + metadata.getName(), + DataSourceType.SPARK, + getStorageEngine(metadata.getProperties())); + } + + StorageEngine getStorageEngine(Map requiredConfig) { + SparkClient sparkClient; + //TODO: Initialize spark client send to storage engine + return new SparkStorageEngine(null); + } +} \ No newline at end of file From f595ff4ab4ecd550239112669f1c97f0519e91fc Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Wed, 21 Jun 2023 11:18:20 -0700 Subject: [PATCH 03/15] Remove vars Signed-off-by: Rupal Mahajan --- .../opensearch/sql/spark/storage/SparkStorageFactory.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java index ab9db5e749e..947945b9fc0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java @@ -23,13 +23,6 @@ public class SparkStorageFactory implements DataSourceFactory { private final Client client; private final Settings settings; - public static final String EMR_CLUSTER = "emr.cluster"; - public static final String OPENSEARCH_DOMAIN_ENDPOINT = "opensearch.domain"; - public static final String AUTH_TYPE = "emr.auth.type"; - public static final String REGION = "emr.auth.region"; - public static final String ROLE_ARN = "emr.auth.role_arn"; - public static final String ACCESS_KEY = "emr.auth.access_key"; - public static final String SECRET_KEY = "emr.auth.secret_key"; @Override public DataSourceType getDataSourceType() { From d0d50429cd5717c39edbb08f08aebb98ff885629 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 26 Jun 2023 09:19:16 -0700 Subject: [PATCH 04/15] Spark connector draft Signed-off-by: Rupal Mahajan --- plugin/build.gradle | 1 + .../org/opensearch/sql/plugin/SQLPlugin.java | 19 ++-- spark/build.gradle | 4 + spark/lombok.config | 3 + .../SqlFunctionImplementation.java | 98 +++++++++++++++++++ .../resolver/SqlTableFunctionResolver.java | 93 ++++++++++++++++++ .../scan/SqlFunctionTableScanBuilder.java | 35 +++++++ .../sql/spark/request/SparkQueryRequest.java | 29 ++++++ .../sql/spark/storage/SparkMetricTable.java | 69 +++++++++++++ .../sql/spark/storage/SparkStorageEngine.java | 6 +- .../spark/storage/SparkStorageFactory.java | 6 +- .../SqlFunctionImplementationTest.java | 78 +++++++++++++++ 12 files changed, 427 insertions(+), 14 deletions(-) create mode 100644 spark/lombok.config create mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java diff --git a/plugin/build.gradle b/plugin/build.gradle index 42d57231940..11f97ea8572 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -126,6 +126,7 @@ dependencies { api project(':opensearch') api project(':prometheus') api project(':datasources') + api project(':spark') testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.12.13' testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 36986c9afce..463a326b183 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -5,16 +5,8 @@ package org.opensearch.sql.plugin; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; - import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionRequest; @@ -81,12 +73,22 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; +import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; + public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin { private static final Logger LOG = LogManager.getLogger(); @@ -221,6 +223,7 @@ private DataSourceServiceImpl createDataSourceService() { .add(new OpenSearchDataSourceFactory( new OpenSearchNodeClient(this.client), pluginSettings)) .add(new PrometheusStorageFactory(pluginSettings)) + .add(new SparkStorageFactory(this.client, pluginSettings)) .build(), dataSourceMetadataStorage, dataSourceUserAuthorizationHelper); diff --git a/spark/build.gradle b/spark/build.gradle index 4b661360b03..86a22b7462b 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -19,6 +19,10 @@ dependencies { implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" implementation group: 'org.json', name: 'json', version: '20230227' + + testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') + testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' } test { diff --git a/spark/lombok.config b/spark/lombok.config new file mode 100644 index 00000000000..aac13295bd7 --- /dev/null +++ b/spark/lombok.config @@ -0,0 +1,3 @@ +# This file is generated by the 'io.freefair.lombok' Gradle plugin +config.stopBubbling = true +lombok.addLombokGeneratedAnnotation = true \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java new file mode 100644 index 00000000000..ade91bbb4e8 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java @@ -0,0 +1,98 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.functions.implementation; + +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.exception.ExpressionEvaluationException; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.FunctionExpression; +import org.opensearch.sql.expression.NamedArgumentExpression; +import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.expression.function.TableFunctionImplementation; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.request.SparkQueryRequest; +import org.opensearch.sql.spark.storage.SparkMetricTable; +import org.opensearch.sql.storage.Table; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver.QUERY; + +public class SqlFunctionImplementation extends FunctionExpression implements + TableFunctionImplementation { + + private final FunctionName functionName; + private final List arguments; + private final SparkClient sparkClient; + + /** + * Required argument constructor. + * + * @param functionName name of the function + * @param arguments a list of expressions + */ + public SqlFunctionImplementation(FunctionName functionName, List arguments, + SparkClient sparkClient) { + super(functionName, arguments); + this.functionName = functionName; + this.arguments = arguments; + this.sparkClient = sparkClient; + } + + @Override + public ExprValue valueOf(Environment valueEnv) { + throw new UnsupportedOperationException(String.format( + "Spark defined function [%s] is only " + + "supported in SOURCE clause with spark connector catalog", + functionName)); + } + + @Override + public ExprType type() { + return ExprCoreType.STRUCT; + } + + @Override + public String toString() { + List args = arguments.stream() + .map(arg -> String.format("%s=%s", ((NamedArgumentExpression) arg) + .getArgName(), ((NamedArgumentExpression) arg).getValue().toString())) + .collect(Collectors.toList()); + return String.format("%s(%s)", functionName, String.join(", ", args)); + } + + @Override + public Table applyArguments() { + return new SparkMetricTable(sparkClient, buildQueryFromSqlFunction(arguments)); + } + + private SparkQueryRequest buildQueryFromSqlFunction(List arguments) { + + SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); + arguments.forEach(arg -> { + String argName = ((NamedArgumentExpression) arg).getArgName(); + Expression argValue = ((NamedArgumentExpression) arg).getValue(); + ExprValue literalValue = argValue.valueOf(); + switch (argName) { + case QUERY: + sparkQueryRequest + .setSql((String) literalValue.value()); + break; + default: + throw new ExpressionEvaluationException( + String.format("Invalid Function Argument:%s", argName)); + } + }); + return sparkQueryRequest; + } + +} \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java new file mode 100644 index 00000000000..dc4f8e150a6 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java @@ -0,0 +1,93 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.functions.resolver; + +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.NamedArgumentExpression; +import org.opensearch.sql.expression.function.FunctionBuilder; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.expression.function.FunctionResolver; +import org.opensearch.sql.expression.function.FunctionSignature; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.functions.implementation.SqlFunctionImplementation; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +@RequiredArgsConstructor +public class SqlTableFunctionResolver implements FunctionResolver { + private final SparkClient sparkClient; + + public static final String SQL = "sql"; + public static final String QUERY = "query"; + + @Override + public Pair resolve(FunctionSignature unresolvedSignature) { + FunctionName functionName = FunctionName.of(SQL); + FunctionSignature functionSignature = + new FunctionSignature(functionName, List.of(STRING)); + final List argumentNames = List.of(QUERY); + + FunctionBuilder functionBuilder = (functionProperties, arguments) -> { + Boolean argumentsPassedByName = arguments.stream() + .noneMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); + Boolean argumentsPassedByPosition = arguments.stream() + .allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); + if (!(argumentsPassedByName || argumentsPassedByPosition)) { + throw new SemanticCheckException("Arguments should be either passed by name or position"); + } + + if (arguments.size() != argumentNames.size()) { + throw new SemanticCheckException( + generateErrorMessageForMissingArguments(argumentsPassedByPosition, arguments, + argumentNames)); + } + + if (argumentsPassedByPosition) { + List namedArguments = new ArrayList<>(); + for (int i = 0; i < arguments.size(); i++) { + namedArguments.add(new NamedArgumentExpression(argumentNames.get(i), + ((NamedArgumentExpression) arguments.get(i)).getValue())); + } + return new SqlFunctionImplementation(functionName, namedArguments, sparkClient); + } + return new SqlFunctionImplementation(functionName, arguments, sparkClient); + }; + return Pair.of(functionSignature, functionBuilder); + } + + private String generateErrorMessageForMissingArguments(Boolean argumentsPassedByPosition, + List arguments, + List argumentNames) { + if (argumentsPassedByPosition) { + return String.format("Missing arguments:[%s]", + String.join(",", argumentNames.subList(arguments.size(), argumentNames.size()))); + } else { + Set requiredArguments = new HashSet<>(argumentNames); + Set providedArguments = + arguments.stream().map(expression -> ((NamedArgumentExpression) expression).getArgName()) + .collect(Collectors.toSet()); + requiredArguments.removeAll(providedArguments); + return String.format("Missing arguments:[%s]", String.join(",", requiredArguments)); + } + } + + @Override + public FunctionName getFunctionName() { + return FunctionName.of(SQL); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java new file mode 100644 index 00000000000..3804072f3dd --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.functions.scan; + +import lombok.AllArgsConstructor; +import org.opensearch.sql.planner.logical.LogicalProject; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.request.SparkQueryRequest; +import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.read.TableScanBuilder; + +/** + * TableScanBuilder for sql table function of spark connector. + */ +@AllArgsConstructor +public class SqlFunctionTableScanBuilder extends TableScanBuilder { + + private final SparkClient sparkClient; + + private final SparkQueryRequest sparkQueryRequest; + + @Override + public TableScanOperator build() { + //TODO: return SqlFunctionTableScanOperator + return null; + } + + @Override + public boolean pushDownProject(LogicalProject project) { + return true; + } +} \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java b/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java new file mode 100644 index 00000000000..a2e7b2fa115 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + * Spark query request. + */ +@EqualsAndHashCode +@Data +@ToString +@AllArgsConstructor +@NoArgsConstructor +public class SparkQueryRequest { + + /** + * SQL. + */ + private String sql; + +} \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java new file mode 100644 index 00000000000..96a534da7d5 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.storage; + +import lombok.Getter; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.functions.scan.SqlFunctionTableScanBuilder; +import org.opensearch.sql.spark.request.SparkQueryRequest; +import org.opensearch.sql.storage.Table; +import org.opensearch.sql.storage.read.TableScanBuilder; + +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.Map; + +/** + * Spark table implementation. + * This can be constructed from SparkQueryRequest. + */ +public class SparkMetricTable implements Table { + + private final SparkClient sparkClient; + + @Getter + private final SparkQueryRequest sparkQueryRequest; + + /** + * Constructor for entire Sql Request. + */ + public SparkMetricTable(SparkClient sparkService, + @Nonnull SparkQueryRequest sparkQueryRequest) { + this.sparkClient = sparkService; + this.sparkQueryRequest = sparkQueryRequest; + } + + @Override + public boolean exists() { + throw new UnsupportedOperationException( + "Exists operation is not supported in spark datasource"); + } + + @Override + public void create(Map schema) { + throw new UnsupportedOperationException( + "Create operation is not supported in spark datasource"); + } + + @Override + public Map getFieldTypes() { + return new HashMap<>(); + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + //TODO: Add plan + return null; + } + + @Override + public TableScanBuilder createScanBuilder() { + return new SqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); + } +} \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java index 1a39cfbd138..bdb3803c907 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java @@ -11,13 +11,13 @@ import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.expression.function.FunctionResolver; import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver; import org.opensearch.sql.storage.StorageEngine; import org.opensearch.sql.storage.Table; import java.util.Collection; import java.util.Collections; - /** * Spark storage engine implementation. */ @@ -28,8 +28,8 @@ public class SparkStorageEngine implements StorageEngine { @Override public Collection getFunctions() { - //TODO: add SqlTableFunctionResolver to list - return Collections.singletonList(null); + return Collections.singletonList( + new SqlTableFunctionResolver(sparkClient)); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java index 947945b9fc0..84c693e841b 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java @@ -38,8 +38,8 @@ public DataSource createDataSource(DataSourceMetadata metadata) { } StorageEngine getStorageEngine(Map requiredConfig) { - SparkClient sparkClient; - //TODO: Initialize spark client send to storage engine - return new SparkStorageEngine(null); + SparkClient sparkClient = null; + //TODO: Initialize spark client + return new SparkStorageEngine(sparkClient); } } \ No newline at end of file diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java new file mode 100644 index 00000000000..ca40b42b23d --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.functions; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.exception.ExpressionEvaluationException; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.functions.implementation.SqlFunctionImplementation; +import org.opensearch.sql.spark.request.SparkQueryRequest; +import org.opensearch.sql.spark.storage.SparkMetricTable; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@ExtendWith(MockitoExtension.class) +public class SqlFunctionImplementationTest { + @Mock + private SparkClient client; + + @Test + void testValueOfAndTypeToString() { + FunctionName functionName = new FunctionName("sql"); + List namedArgumentExpressionList + = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); + SqlFunctionImplementation sqlFunctionImplementation + = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); + UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, + () -> sqlFunctionImplementation.valueOf()); + assertEquals("Spark defined function [sql] is only " + + "supported in SOURCE clause with spark connector catalog", exception.getMessage()); + assertEquals("sql(query=\"select 1\")", + sqlFunctionImplementation.toString()); + assertEquals(ExprCoreType.STRUCT, sqlFunctionImplementation.type()); + } + + + @Test + void testApplyArguments() { + FunctionName functionName = new FunctionName("sql"); + List namedArgumentExpressionList + = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); + SqlFunctionImplementation sqlFunctionImplementation + = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); + SparkMetricTable sparkMetricTable + = (SparkMetricTable) sqlFunctionImplementation.applyArguments(); + assertNotNull(sparkMetricTable.getSparkQueryRequest()); + SparkQueryRequest sparkQueryRequest + = sparkMetricTable.getSparkQueryRequest(); + assertEquals("select 1", sparkQueryRequest.getSql()); + } + + @Test + void testApplyArgumentsException() { + FunctionName functionName = new FunctionName("sql"); + List namedArgumentExpressionList + = List.of(DSL.namedArgument("query", DSL.literal("select 1")), + DSL.namedArgument("tmp", DSL.literal(12345))); + SqlFunctionImplementation sqlFunctionImplementation + = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); + ExpressionEvaluationException exception = assertThrows(ExpressionEvaluationException.class, + () -> sqlFunctionImplementation.applyArguments()); + assertEquals("Invalid Function Argument:tmp", exception.getMessage()); + } + +} \ No newline at end of file From 5ac01b251c6e1e8338b59a222d1651c4eb87630e Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 26 Jun 2023 09:33:03 -0700 Subject: [PATCH 05/15] nit Signed-off-by: Rupal Mahajan --- spark/build.gradle | 2 +- .../opensearch/sql/spark/storage/SparkStorageFactory.java | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/spark/build.gradle b/spark/build.gradle index 86a22b7462b..58103dc67fe 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -51,7 +51,7 @@ jacocoTestCoverageVerification { rule { element = 'CLASS' excludes = [ - 'org.opensearch.sql.prometheus.data.constants.*' + 'org.opensearch.sql.spark.data.constants.*' ] limit { counter = 'LINE' diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java index 84c693e841b..784b284d683 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java @@ -1,8 +1,6 @@ /* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.sql.spark.storage; From 4fcf4ba829f44af2b90ee2ec5b5142cae5527acc Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 26 Jun 2023 11:13:11 -0700 Subject: [PATCH 06/15] Fix checkstyle errors Signed-off-by: Rupal Mahajan --- .../org/opensearch/sql/plugin/SQLPlugin.java | 16 +- .../sql/spark/client/SparkClient.java | 6 +- .../SqlFunctionImplementation.java | 126 ++++++++-------- .../resolver/SqlTableFunctionResolver.java | 119 ++++++++------- .../scan/SqlFunctionTableScanBuilder.java | 22 +-- .../sql/spark/request/SparkQueryRequest.java | 8 +- .../sql/spark/storage/SparkMetricTable.java | 75 +++++----- .../sql/spark/storage/SparkStorageEngine.java | 26 ++-- .../spark/storage/SparkStorageFactory.java | 9 +- .../SqlTableFunctionResolverTest.java | 141 ++++++++++++++++++ 10 files changed, 340 insertions(+), 208 deletions(-) create mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 463a326b183..d99b7d63173 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -5,8 +5,16 @@ package org.opensearch.sql.plugin; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; + import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionRequest; @@ -80,14 +88,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.function.Supplier; - -import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin { diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java index f6ffc446483..ffe979f5329 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java @@ -5,11 +5,9 @@ package org.opensearch.sql.spark.client; -import org.json.JSONObject; - import java.io.IOException; +import org.json.JSONObject; public interface SparkClient { - - JSONObject sql(String query) throws IOException; + JSONObject sql(String query) throws IOException; } \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java index ade91bbb4e8..4bfebeeb471 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java @@ -7,6 +7,10 @@ package org.opensearch.sql.spark.functions.implementation; +import static org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver.QUERY; + +import java.util.List; +import java.util.stream.Collectors; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; @@ -22,77 +26,71 @@ import org.opensearch.sql.spark.storage.SparkMetricTable; import org.opensearch.sql.storage.Table; -import java.util.List; -import java.util.stream.Collectors; - -import static org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver.QUERY; - -public class SqlFunctionImplementation extends FunctionExpression implements - TableFunctionImplementation { +public class SqlFunctionImplementation extends FunctionExpression + implements TableFunctionImplementation { - private final FunctionName functionName; - private final List arguments; - private final SparkClient sparkClient; + private final FunctionName functionName; + private final List arguments; + private final SparkClient sparkClient; - /** - * Required argument constructor. - * - * @param functionName name of the function - * @param arguments a list of expressions - */ - public SqlFunctionImplementation(FunctionName functionName, List arguments, - SparkClient sparkClient) { - super(functionName, arguments); - this.functionName = functionName; - this.arguments = arguments; - this.sparkClient = sparkClient; - } + /** + * Required argument constructor. + * + * @param functionName name of the function + * @param arguments a list of expressions + */ + public SqlFunctionImplementation( + FunctionName functionName, List arguments, SparkClient sparkClient) { + super(functionName, arguments); + this.functionName = functionName; + this.arguments = arguments; + this.sparkClient = sparkClient; + } - @Override - public ExprValue valueOf(Environment valueEnv) { - throw new UnsupportedOperationException(String.format( - "Spark defined function [%s] is only " - + "supported in SOURCE clause with spark connector catalog", - functionName)); - } + @Override + public ExprValue valueOf(Environment valueEnv) { + throw new UnsupportedOperationException(String.format( + "Spark defined function [%s] is only " + + "supported in SOURCE clause with spark connector catalog", functionName)); + } - @Override - public ExprType type() { - return ExprCoreType.STRUCT; - } + @Override + public ExprType type() { + return ExprCoreType.STRUCT; + } - @Override - public String toString() { - List args = arguments.stream() - .map(arg -> String.format("%s=%s", ((NamedArgumentExpression) arg) - .getArgName(), ((NamedArgumentExpression) arg).getValue().toString())) - .collect(Collectors.toList()); - return String.format("%s(%s)", functionName, String.join(", ", args)); - } + @Override + public String toString() { + List args = arguments.stream() + .map(arg -> String.format("%s=%s", + ((NamedArgumentExpression) arg).getArgName(), + ((NamedArgumentExpression) arg).getValue().toString())) + .collect(Collectors.toList()); + return String.format("%s(%s)", functionName, String.join(", ", args)); + } - @Override - public Table applyArguments() { - return new SparkMetricTable(sparkClient, buildQueryFromSqlFunction(arguments)); - } + @Override + public Table applyArguments() { + return new SparkMetricTable(sparkClient, buildQueryFromSqlFunction(arguments)); + } - private SparkQueryRequest buildQueryFromSqlFunction(List arguments) { + private SparkQueryRequest buildQueryFromSqlFunction(List arguments) { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - arguments.forEach(arg -> { - String argName = ((NamedArgumentExpression) arg).getArgName(); - Expression argValue = ((NamedArgumentExpression) arg).getValue(); - ExprValue literalValue = argValue.valueOf(); - switch (argName) { - case QUERY: - sparkQueryRequest - .setSql((String) literalValue.value()); - break; - default: - throw new ExpressionEvaluationException( - String.format("Invalid Function Argument:%s", argName)); - } - }); - return sparkQueryRequest; - } + SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); + arguments.forEach(arg -> { + String argName = ((NamedArgumentExpression) arg).getArgName(); + Expression argValue = ((NamedArgumentExpression) arg).getValue(); + ExprValue literalValue = argValue.valueOf(); + switch (argName) { + case QUERY: + sparkQueryRequest.setSql((String) literalValue.value()); + break; + default: + throw new ExpressionEvaluationException( + String.format("Invalid Function Argument:%s", argName)); + } + }); + return sparkQueryRequest; + } } \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java index dc4f8e150a6..2bb6e4f005d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java @@ -7,6 +7,13 @@ package org.opensearch.sql.spark.functions.resolver; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -20,74 +27,66 @@ import org.opensearch.sql.spark.client.SparkClient; import org.opensearch.sql.spark.functions.implementation.SqlFunctionImplementation; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.opensearch.sql.data.type.ExprCoreType.STRING; - @RequiredArgsConstructor public class SqlTableFunctionResolver implements FunctionResolver { - private final SparkClient sparkClient; + private final SparkClient sparkClient; - public static final String SQL = "sql"; - public static final String QUERY = "query"; + public static final String SQL = "sql"; + public static final String QUERY = "query"; - @Override - public Pair resolve(FunctionSignature unresolvedSignature) { - FunctionName functionName = FunctionName.of(SQL); - FunctionSignature functionSignature = - new FunctionSignature(functionName, List.of(STRING)); - final List argumentNames = List.of(QUERY); + @Override + public Pair resolve(FunctionSignature unresolvedSignature) { + FunctionName functionName = FunctionName.of(SQL); + FunctionSignature functionSignature = + new FunctionSignature(functionName, List.of(STRING)); + final List argumentNames = List.of(QUERY); - FunctionBuilder functionBuilder = (functionProperties, arguments) -> { - Boolean argumentsPassedByName = arguments.stream() - .noneMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); - Boolean argumentsPassedByPosition = arguments.stream() - .allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); - if (!(argumentsPassedByName || argumentsPassedByPosition)) { - throw new SemanticCheckException("Arguments should be either passed by name or position"); - } + FunctionBuilder functionBuilder = (functionProperties, arguments) -> { + Boolean argumentsPassedByName = arguments.stream() + .noneMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); + Boolean argumentsPassedByPosition = arguments.stream() + .allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); + if (!(argumentsPassedByName || argumentsPassedByPosition)) { + throw new SemanticCheckException("Arguments should be either passed by name or position"); + } - if (arguments.size() != argumentNames.size()) { - throw new SemanticCheckException( - generateErrorMessageForMissingArguments(argumentsPassedByPosition, arguments, - argumentNames)); - } + if (arguments.size() != argumentNames.size()) { + throw new SemanticCheckException( + generateErrorMessageForMissingArguments(argumentsPassedByPosition, arguments, + argumentNames)); + } - if (argumentsPassedByPosition) { - List namedArguments = new ArrayList<>(); - for (int i = 0; i < arguments.size(); i++) { - namedArguments.add(new NamedArgumentExpression(argumentNames.get(i), - ((NamedArgumentExpression) arguments.get(i)).getValue())); - } - return new SqlFunctionImplementation(functionName, namedArguments, sparkClient); - } - return new SqlFunctionImplementation(functionName, arguments, sparkClient); - }; - return Pair.of(functionSignature, functionBuilder); - } - - private String generateErrorMessageForMissingArguments(Boolean argumentsPassedByPosition, - List arguments, - List argumentNames) { - if (argumentsPassedByPosition) { - return String.format("Missing arguments:[%s]", - String.join(",", argumentNames.subList(arguments.size(), argumentNames.size()))); - } else { - Set requiredArguments = new HashSet<>(argumentNames); - Set providedArguments = - arguments.stream().map(expression -> ((NamedArgumentExpression) expression).getArgName()) - .collect(Collectors.toSet()); - requiredArguments.removeAll(providedArguments); - return String.format("Missing arguments:[%s]", String.join(",", requiredArguments)); + if (argumentsPassedByPosition) { + List namedArguments = new ArrayList<>(); + for (int i = 0; i < arguments.size(); i++) { + namedArguments.add(new NamedArgumentExpression(argumentNames.get(i), + ((NamedArgumentExpression) arguments.get(i)).getValue())); } - } + return new SqlFunctionImplementation(functionName, namedArguments, sparkClient); + } + return new SqlFunctionImplementation(functionName, arguments, sparkClient); + }; + return Pair.of(functionSignature, functionBuilder); + } - @Override - public FunctionName getFunctionName() { - return FunctionName.of(SQL); + private String generateErrorMessageForMissingArguments(Boolean argumentsPassedByPosition, + List arguments, + List argumentNames) { + if (argumentsPassedByPosition) { + return String.format("Missing arguments:[%s]", + String.join(",", argumentNames.subList(arguments.size(), argumentNames.size()))); + } else { + Set requiredArguments = new HashSet<>(argumentNames); + Set providedArguments = + arguments.stream().map(expression -> ((NamedArgumentExpression) expression).getArgName()) + .collect(Collectors.toSet()); + requiredArguments.removeAll(providedArguments); + return String.format("Missing arguments:[%s]", String.join(",", requiredArguments)); } + } + + @Override + public FunctionName getFunctionName() { + return FunctionName.of(SQL); + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java index 3804072f3dd..0a60affebc2 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java @@ -18,18 +18,18 @@ @AllArgsConstructor public class SqlFunctionTableScanBuilder extends TableScanBuilder { - private final SparkClient sparkClient; + private final SparkClient sparkClient; - private final SparkQueryRequest sparkQueryRequest; + private final SparkQueryRequest sparkQueryRequest; - @Override - public TableScanOperator build() { - //TODO: return SqlFunctionTableScanOperator - return null; - } + @Override + public TableScanOperator build() { + //TODO: return SqlFunctionTableScanOperator + return null; + } - @Override - public boolean pushDownProject(LogicalProject project) { - return true; - } + @Override + public boolean pushDownProject(LogicalProject project) { + return true; + } } \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java b/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java index a2e7b2fa115..93e4ad94191 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java @@ -21,9 +21,9 @@ @NoArgsConstructor public class SparkQueryRequest { - /** - * SQL. - */ - private String sql; + /** + * SQL. + */ + private String sql; } \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java index 96a534da7d5..b82ad3fa72e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java @@ -5,6 +5,9 @@ package org.opensearch.sql.spark.storage; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; import lombok.Getter; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.planner.logical.LogicalPlan; @@ -15,55 +18,51 @@ import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.read.TableScanBuilder; -import javax.annotation.Nonnull; -import java.util.HashMap; -import java.util.Map; - /** * Spark table implementation. * This can be constructed from SparkQueryRequest. */ public class SparkMetricTable implements Table { - private final SparkClient sparkClient; + private final SparkClient sparkClient; - @Getter - private final SparkQueryRequest sparkQueryRequest; + @Getter + private final SparkQueryRequest sparkQueryRequest; - /** - * Constructor for entire Sql Request. - */ - public SparkMetricTable(SparkClient sparkService, - @Nonnull SparkQueryRequest sparkQueryRequest) { - this.sparkClient = sparkService; - this.sparkQueryRequest = sparkQueryRequest; - } + /** + * Constructor for entire Sql Request. + */ + public SparkMetricTable(SparkClient sparkService, + @Nonnull SparkQueryRequest sparkQueryRequest) { + this.sparkClient = sparkService; + this.sparkQueryRequest = sparkQueryRequest; + } - @Override - public boolean exists() { - throw new UnsupportedOperationException( - "Exists operation is not supported in spark datasource"); - } + @Override + public boolean exists() { + throw new UnsupportedOperationException( + "Exists operation is not supported in spark datasource"); + } - @Override - public void create(Map schema) { - throw new UnsupportedOperationException( - "Create operation is not supported in spark datasource"); - } + @Override + public void create(Map schema) { + throw new UnsupportedOperationException( + "Create operation is not supported in spark datasource"); + } - @Override - public Map getFieldTypes() { - return new HashMap<>(); - } + @Override + public Map getFieldTypes() { + return new HashMap<>(); + } - @Override - public PhysicalPlan implement(LogicalPlan plan) { - //TODO: Add plan - return null; - } + @Override + public PhysicalPlan implement(LogicalPlan plan) { + //TODO: Add plan + return null; + } - @Override - public TableScanBuilder createScanBuilder() { - return new SqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); - } + @Override + public TableScanBuilder createScanBuilder() { + return new SqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); + } } \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java index bdb3803c907..fba37847eaf 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java @@ -7,6 +7,8 @@ package org.opensearch.sql.spark.storage; +import java.util.Collection; +import java.util.Collections; import lombok.RequiredArgsConstructor; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.expression.function.FunctionResolver; @@ -15,25 +17,21 @@ import org.opensearch.sql.storage.StorageEngine; import org.opensearch.sql.storage.Table; -import java.util.Collection; -import java.util.Collections; - /** * Spark storage engine implementation. */ @RequiredArgsConstructor public class SparkStorageEngine implements StorageEngine { + private final SparkClient sparkClient; - private final SparkClient sparkClient; - - @Override - public Collection getFunctions() { - return Collections.singletonList( - new SqlTableFunctionResolver(sparkClient)); - } + @Override + public Collection getFunctions() { + return Collections.singletonList( + new SqlTableFunctionResolver(sparkClient)); + } - @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { - return null; - } + @Override + public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { + return null; + } } \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java index 784b284d683..35d53cfb3f9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.storage; +import java.util.Map; import lombok.RequiredArgsConstructor; import org.opensearch.client.Client; import org.opensearch.sql.common.setting.Settings; @@ -15,8 +16,6 @@ import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.sql.storage.StorageEngine; -import java.util.Map; - @RequiredArgsConstructor public class SparkStorageFactory implements DataSourceFactory { private final Client client; @@ -30,9 +29,9 @@ public DataSourceType getDataSourceType() { @Override public DataSource createDataSource(DataSourceMetadata metadata) { return new DataSource( - metadata.getName(), - DataSourceType.SPARK, - getStorageEngine(metadata.getProperties())); + metadata.getName(), + DataSourceType.SPARK, + getStorageEngine(metadata.getProperties())); } StorageEngine getStorageEngine(Map requiredConfig) { diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java new file mode 100644 index 00000000000..18b856f13dd --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java @@ -0,0 +1,141 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.functions; + +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.function.FunctionBuilder; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.expression.function.FunctionProperties; +import org.opensearch.sql.expression.function.FunctionSignature; +import org.opensearch.sql.expression.function.TableFunctionImplementation; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.functions.implementation.SqlFunctionImplementation; +import org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver; +import org.opensearch.sql.spark.request.SparkQueryRequest; +import org.opensearch.sql.spark.storage.SparkMetricTable; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +@ExtendWith(MockitoExtension.class) +public class SqlTableFunctionResolverTest { + @Mock + private SparkClient client; + + @Mock + private FunctionProperties functionProperties; + + @Test + void testResolve() { + SqlTableFunctionResolver sqlTableFunctionResolver + = new SqlTableFunctionResolver(client); + FunctionName functionName = FunctionName.of("sql"); + List expressions + = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); + FunctionSignature functionSignature = new FunctionSignature(functionName, expressions + .stream().map(Expression::type).collect(Collectors.toList())); + Pair resolution + = sqlTableFunctionResolver.resolve(functionSignature); + assertEquals(functionName, resolution.getKey().getFunctionName()); + assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); + assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); + FunctionBuilder functionBuilder = resolution.getValue(); + TableFunctionImplementation functionImplementation + = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); + assertTrue(functionImplementation instanceof SqlFunctionImplementation); + SparkMetricTable sparkMetricTable + = (SparkMetricTable) functionImplementation.applyArguments(); + assertNotNull(sparkMetricTable.getSparkQueryRequest()); + SparkQueryRequest sparkQueryRequest = + sparkMetricTable.getSparkQueryRequest(); + assertEquals("select 1", sparkQueryRequest.getSql()); + } + + @Test + void testArgumentsPassedByPosition() { + SqlTableFunctionResolver sqlTableFunctionResolver + = new SqlTableFunctionResolver(client); + FunctionName functionName = FunctionName.of("sql"); + List expressions + = List.of(DSL.namedArgument(null, DSL.literal("select 1"))); + FunctionSignature functionSignature = new FunctionSignature(functionName, expressions + .stream().map(Expression::type).collect(Collectors.toList())); + + Pair resolution + = sqlTableFunctionResolver.resolve(functionSignature); + + assertEquals(functionName, resolution.getKey().getFunctionName()); + assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); + assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); + FunctionBuilder functionBuilder = resolution.getValue(); + TableFunctionImplementation functionImplementation + = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); + assertTrue(functionImplementation instanceof SqlFunctionImplementation); + SparkMetricTable sparkMetricTable + = (SparkMetricTable) functionImplementation.applyArguments(); + assertNotNull(sparkMetricTable.getSparkQueryRequest()); + SparkQueryRequest sparkQueryRequest = + sparkMetricTable.getSparkQueryRequest(); + assertEquals("select 1", sparkQueryRequest.getSql()); + } + + @Test + void testMixedArgumentTypes() { + SqlTableFunctionResolver sqlTableFunctionResolver + = new SqlTableFunctionResolver(client); + FunctionName functionName = FunctionName.of("sql"); + List expressions + = List.of(DSL.namedArgument("query", DSL.literal("select 1")), + DSL.namedArgument(null, DSL.literal(12345))); + FunctionSignature functionSignature = new FunctionSignature(functionName, expressions + .stream().map(Expression::type).collect(Collectors.toList())); + Pair resolution + = sqlTableFunctionResolver.resolve(functionSignature); + + assertEquals(functionName, resolution.getKey().getFunctionName()); + assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); + assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); + SemanticCheckException exception = assertThrows(SemanticCheckException.class, + () -> resolution.getValue().apply(functionProperties, expressions)); + + assertEquals("Arguments should be either passed by name or position", exception.getMessage()); + } + + @Test + void testWrongArgumentsSizeWhenPassedByName() { + SqlTableFunctionResolver sqlTableFunctionResolver + = new SqlTableFunctionResolver(client); + FunctionName functionName = FunctionName.of("sql"); + List expressions + = List.of(); + FunctionSignature functionSignature = new FunctionSignature(functionName, expressions + .stream().map(Expression::type).collect(Collectors.toList())); + Pair resolution + = sqlTableFunctionResolver.resolve(functionSignature); + + assertEquals(functionName, resolution.getKey().getFunctionName()); + assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); + assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); + SemanticCheckException exception = assertThrows(SemanticCheckException.class, + () -> resolution.getValue().apply(functionProperties, expressions)); + + assertEquals("Missing arguments:[query]", exception.getMessage()); + } + +} From 52a044d987f464001ea82be709fb8fd3ff87bf09 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 26 Jun 2023 11:14:31 -0700 Subject: [PATCH 07/15] nit Signed-off-by: Rupal Mahajan --- plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java | 1 - 1 file changed, 1 deletion(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index d99b7d63173..7e867be9678 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -88,7 +88,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; - public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin { private static final Logger LOG = LogManager.getLogger(); From 69ca11692e631a544bd47a3d4441dc3ebd7d22bd Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 26 Jun 2023 11:17:50 -0700 Subject: [PATCH 08/15] Fix license header Signed-off-by: Rupal Mahajan --- .../functions/implementation/SqlFunctionImplementation.java | 6 ++---- .../spark/functions/resolver/SqlTableFunctionResolver.java | 6 ++---- .../opensearch/sql/spark/storage/SparkStorageEngine.java | 6 ++---- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java index 4bfebeeb471..7c3f1309d14 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java @@ -1,8 +1,6 @@ /* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.sql.spark.functions.implementation; diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java index 2bb6e4f005d..25f850ddb33 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java @@ -1,8 +1,6 @@ /* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.sql.spark.functions.resolver; diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java index fba37847eaf..565a4a85854 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java @@ -1,8 +1,6 @@ /* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.sql.spark.storage; From 901e762d8c36b2d8c104a4b49abee172708832ba Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 26 Jun 2023 12:41:51 -0700 Subject: [PATCH 09/15] Add spark storage test Signed-off-by: Rupal Mahajan --- .../SqlFunctionImplementationTest.java | 97 ++++---- .../SqlTableFunctionResolverTest.java | 221 +++++++++--------- .../spark/storage/SparkStorageEngineTest.java | 37 +++ .../storage/SparkStorageFactoryTest.java | 44 ++++ 4 files changed, 239 insertions(+), 160 deletions(-) create mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java index ca40b42b23d..9b854bf02c4 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java @@ -5,6 +5,11 @@ package org.opensearch.sql.spark.functions; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -19,60 +24,54 @@ import org.opensearch.sql.spark.request.SparkQueryRequest; import org.opensearch.sql.spark.storage.SparkMetricTable; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; - @ExtendWith(MockitoExtension.class) public class SqlFunctionImplementationTest { - @Mock - private SparkClient client; + @Mock + private SparkClient client; - @Test - void testValueOfAndTypeToString() { - FunctionName functionName = new FunctionName("sql"); - List namedArgumentExpressionList - = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); - SqlFunctionImplementation sqlFunctionImplementation - = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, - () -> sqlFunctionImplementation.valueOf()); - assertEquals("Spark defined function [sql] is only " - + "supported in SOURCE clause with spark connector catalog", exception.getMessage()); - assertEquals("sql(query=\"select 1\")", - sqlFunctionImplementation.toString()); - assertEquals(ExprCoreType.STRUCT, sqlFunctionImplementation.type()); - } + @Test + void testValueOfAndTypeToString() { + FunctionName functionName = new FunctionName("sql"); + List namedArgumentExpressionList + = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); + SqlFunctionImplementation sqlFunctionImplementation + = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); + UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, + () -> sqlFunctionImplementation.valueOf()); + assertEquals("Spark defined function [sql] is only " + + "supported in SOURCE clause with spark connector catalog", exception.getMessage()); + assertEquals("sql(query=\"select 1\")", + sqlFunctionImplementation.toString()); + assertEquals(ExprCoreType.STRUCT, sqlFunctionImplementation.type()); + } - @Test - void testApplyArguments() { - FunctionName functionName = new FunctionName("sql"); - List namedArgumentExpressionList - = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); - SqlFunctionImplementation sqlFunctionImplementation - = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - SparkMetricTable sparkMetricTable - = (SparkMetricTable) sqlFunctionImplementation.applyArguments(); - assertNotNull(sparkMetricTable.getSparkQueryRequest()); - SparkQueryRequest sparkQueryRequest - = sparkMetricTable.getSparkQueryRequest(); - assertEquals("select 1", sparkQueryRequest.getSql()); - } + @Test + void testApplyArguments() { + FunctionName functionName = new FunctionName("sql"); + List namedArgumentExpressionList + = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); + SqlFunctionImplementation sqlFunctionImplementation + = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); + SparkMetricTable sparkMetricTable + = (SparkMetricTable) sqlFunctionImplementation.applyArguments(); + assertNotNull(sparkMetricTable.getSparkQueryRequest()); + SparkQueryRequest sparkQueryRequest + = sparkMetricTable.getSparkQueryRequest(); + assertEquals("select 1", sparkQueryRequest.getSql()); + } - @Test - void testApplyArgumentsException() { - FunctionName functionName = new FunctionName("sql"); - List namedArgumentExpressionList - = List.of(DSL.namedArgument("query", DSL.literal("select 1")), - DSL.namedArgument("tmp", DSL.literal(12345))); - SqlFunctionImplementation sqlFunctionImplementation - = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - ExpressionEvaluationException exception = assertThrows(ExpressionEvaluationException.class, - () -> sqlFunctionImplementation.applyArguments()); - assertEquals("Invalid Function Argument:tmp", exception.getMessage()); - } + @Test + void testApplyArgumentsException() { + FunctionName functionName = new FunctionName("sql"); + List namedArgumentExpressionList + = List.of(DSL.namedArgument("query", DSL.literal("select 1")), + DSL.namedArgument("tmp", DSL.literal(12345))); + SqlFunctionImplementation sqlFunctionImplementation + = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); + ExpressionEvaluationException exception = assertThrows(ExpressionEvaluationException.class, + () -> sqlFunctionImplementation.applyArguments()); + assertEquals("Invalid Function Argument:tmp", exception.getMessage()); + } } \ No newline at end of file diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java index 18b856f13dd..0a251ef05cf 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java @@ -5,6 +5,14 @@ package org.opensearch.sql.spark.functions; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -24,118 +32,109 @@ import org.opensearch.sql.spark.request.SparkQueryRequest; import org.opensearch.sql.spark.storage.SparkMetricTable; -import java.util.List; -import java.util.stream.Collectors; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.opensearch.sql.data.type.ExprCoreType.STRING; - @ExtendWith(MockitoExtension.class) public class SqlTableFunctionResolverTest { - @Mock - private SparkClient client; - - @Mock - private FunctionProperties functionProperties; - - @Test - void testResolve() { - SqlTableFunctionResolver sqlTableFunctionResolver - = new SqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions - = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); - FunctionSignature functionSignature = new FunctionSignature(functionName, expressions - .stream().map(Expression::type).collect(Collectors.toList())); - Pair resolution - = sqlTableFunctionResolver.resolve(functionSignature); - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - FunctionBuilder functionBuilder = resolution.getValue(); - TableFunctionImplementation functionImplementation - = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); - assertTrue(functionImplementation instanceof SqlFunctionImplementation); - SparkMetricTable sparkMetricTable - = (SparkMetricTable) functionImplementation.applyArguments(); - assertNotNull(sparkMetricTable.getSparkQueryRequest()); - SparkQueryRequest sparkQueryRequest = - sparkMetricTable.getSparkQueryRequest(); - assertEquals("select 1", sparkQueryRequest.getSql()); - } - - @Test - void testArgumentsPassedByPosition() { - SqlTableFunctionResolver sqlTableFunctionResolver - = new SqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions - = List.of(DSL.namedArgument(null, DSL.literal("select 1"))); - FunctionSignature functionSignature = new FunctionSignature(functionName, expressions - .stream().map(Expression::type).collect(Collectors.toList())); - - Pair resolution - = sqlTableFunctionResolver.resolve(functionSignature); - - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - FunctionBuilder functionBuilder = resolution.getValue(); - TableFunctionImplementation functionImplementation - = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); - assertTrue(functionImplementation instanceof SqlFunctionImplementation); - SparkMetricTable sparkMetricTable - = (SparkMetricTable) functionImplementation.applyArguments(); - assertNotNull(sparkMetricTable.getSparkQueryRequest()); - SparkQueryRequest sparkQueryRequest = - sparkMetricTable.getSparkQueryRequest(); - assertEquals("select 1", sparkQueryRequest.getSql()); - } - - @Test - void testMixedArgumentTypes() { - SqlTableFunctionResolver sqlTableFunctionResolver - = new SqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions - = List.of(DSL.namedArgument("query", DSL.literal("select 1")), - DSL.namedArgument(null, DSL.literal(12345))); - FunctionSignature functionSignature = new FunctionSignature(functionName, expressions - .stream().map(Expression::type).collect(Collectors.toList())); - Pair resolution - = sqlTableFunctionResolver.resolve(functionSignature); - - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - SemanticCheckException exception = assertThrows(SemanticCheckException.class, - () -> resolution.getValue().apply(functionProperties, expressions)); - - assertEquals("Arguments should be either passed by name or position", exception.getMessage()); - } - - @Test - void testWrongArgumentsSizeWhenPassedByName() { - SqlTableFunctionResolver sqlTableFunctionResolver - = new SqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions - = List.of(); - FunctionSignature functionSignature = new FunctionSignature(functionName, expressions - .stream().map(Expression::type).collect(Collectors.toList())); - Pair resolution - = sqlTableFunctionResolver.resolve(functionSignature); - - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - SemanticCheckException exception = assertThrows(SemanticCheckException.class, - () -> resolution.getValue().apply(functionProperties, expressions)); - - assertEquals("Missing arguments:[query]", exception.getMessage()); - } + @Mock + private SparkClient client; + + @Mock + private FunctionProperties functionProperties; + + @Test + void testResolve() { + SqlTableFunctionResolver sqlTableFunctionResolver + = new SqlTableFunctionResolver(client); + FunctionName functionName = FunctionName.of("sql"); + List expressions + = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); + FunctionSignature functionSignature = new FunctionSignature(functionName, expressions + .stream().map(Expression::type).collect(Collectors.toList())); + Pair resolution + = sqlTableFunctionResolver.resolve(functionSignature); + assertEquals(functionName, resolution.getKey().getFunctionName()); + assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); + assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); + FunctionBuilder functionBuilder = resolution.getValue(); + TableFunctionImplementation functionImplementation + = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); + assertTrue(functionImplementation instanceof SqlFunctionImplementation); + SparkMetricTable sparkMetricTable + = (SparkMetricTable) functionImplementation.applyArguments(); + assertNotNull(sparkMetricTable.getSparkQueryRequest()); + SparkQueryRequest sparkQueryRequest = + sparkMetricTable.getSparkQueryRequest(); + assertEquals("select 1", sparkQueryRequest.getSql()); + } + + @Test + void testArgumentsPassedByPosition() { + SqlTableFunctionResolver sqlTableFunctionResolver + = new SqlTableFunctionResolver(client); + FunctionName functionName = FunctionName.of("sql"); + List expressions + = List.of(DSL.namedArgument(null, DSL.literal("select 1"))); + FunctionSignature functionSignature = new FunctionSignature(functionName, expressions + .stream().map(Expression::type).collect(Collectors.toList())); + + Pair resolution + = sqlTableFunctionResolver.resolve(functionSignature); + + assertEquals(functionName, resolution.getKey().getFunctionName()); + assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); + assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); + FunctionBuilder functionBuilder = resolution.getValue(); + TableFunctionImplementation functionImplementation + = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); + assertTrue(functionImplementation instanceof SqlFunctionImplementation); + SparkMetricTable sparkMetricTable + = (SparkMetricTable) functionImplementation.applyArguments(); + assertNotNull(sparkMetricTable.getSparkQueryRequest()); + SparkQueryRequest sparkQueryRequest = + sparkMetricTable.getSparkQueryRequest(); + assertEquals("select 1", sparkQueryRequest.getSql()); + } + + @Test + void testMixedArgumentTypes() { + SqlTableFunctionResolver sqlTableFunctionResolver + = new SqlTableFunctionResolver(client); + FunctionName functionName = FunctionName.of("sql"); + List expressions + = List.of(DSL.namedArgument("query", DSL.literal("select 1")), + DSL.namedArgument(null, DSL.literal(12345))); + FunctionSignature functionSignature = new FunctionSignature(functionName, expressions + .stream().map(Expression::type).collect(Collectors.toList())); + Pair resolution + = sqlTableFunctionResolver.resolve(functionSignature); + + assertEquals(functionName, resolution.getKey().getFunctionName()); + assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); + assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); + SemanticCheckException exception = assertThrows(SemanticCheckException.class, + () -> resolution.getValue().apply(functionProperties, expressions)); + + assertEquals("Arguments should be either passed by name or position", exception.getMessage()); + } + + @Test + void testWrongArgumentsSizeWhenPassedByName() { + SqlTableFunctionResolver sqlTableFunctionResolver + = new SqlTableFunctionResolver(client); + FunctionName functionName = FunctionName.of("sql"); + List expressions + = List.of(); + FunctionSignature functionSignature = new FunctionSignature(functionName, expressions + .stream().map(Expression::type).collect(Collectors.toList())); + Pair resolution + = sqlTableFunctionResolver.resolve(functionSignature); + + assertEquals(functionName, resolution.getKey().getFunctionName()); + assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); + assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); + SemanticCheckException exception = assertThrows(SemanticCheckException.class, + () -> resolution.getValue().apply(functionProperties, expressions)); + + assertEquals("Missing arguments:[query]", exception.getMessage()); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java new file mode 100644 index 00000000000..475a7cb1b29 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collection; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.expression.function.FunctionResolver; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver; + +@ExtendWith(MockitoExtension.class) +public class SparkStorageEngineTest { + @Mock + private SparkClient client; + + @Test + public void getFunctions() { + SparkStorageEngine engine = new SparkStorageEngine(client); + Collection functionResolverCollection + = engine.getFunctions(); + assertNotNull(functionResolverCollection); + assertEquals(1, functionResolverCollection.size()); + assertTrue( + functionResolverCollection.iterator().next() instanceof SqlTableFunctionResolver); + } + +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java new file mode 100644 index 00000000000..c8c2bd5d9ff --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.storage; + +import java.util.HashMap; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.Client; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.storage.StorageEngine; + +@ExtendWith(MockitoExtension.class) +public class SparkStorageFactoryTest { + @Mock + private Settings settings; + + @Mock + private Client client; + + @Test + void testGetConnectorType() { + SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); + Assertions.assertEquals( + DataSourceType.SPARK, sparkStorageFactory.getDataSourceType()); + } + + @Test + @SneakyThrows + void testGetStorageEngine() { + SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); + StorageEngine storageEngine + = sparkStorageFactory.getStorageEngine(new HashMap<>()); + Assertions.assertTrue(storageEngine instanceof SparkStorageEngine); + } + +} From 1b81453809bfe9c1010d6bb55db5b528491481ae Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 26 Jun 2023 13:11:41 -0700 Subject: [PATCH 10/15] Update comments Signed-off-by: Rupal Mahajan --- .../org/opensearch/sql/spark/client/SparkClient.java | 8 ++++++++ .../implementation/SqlFunctionImplementation.java | 10 ++++++++-- .../functions/resolver/SqlTableFunctionResolver.java | 3 +++ .../functions/scan/SqlFunctionTableScanBuilder.java | 2 +- .../sql/spark/storage/SparkStorageFactory.java | 7 +++++++ 5 files changed, 27 insertions(+), 3 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java index ffe979f5329..5444dff0b14 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java @@ -8,6 +8,14 @@ import java.io.IOException; import org.json.JSONObject; +/** + * Interface class for Spark Client. + */ public interface SparkClient { + /** + * @param query spark sql query + * @return spark query response + * @throws IOException + */ JSONObject sql(String query) throws IOException; } \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java index 7c3f1309d14..b5b56452219 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java @@ -24,6 +24,9 @@ import org.opensearch.sql.spark.storage.SparkMetricTable; import org.opensearch.sql.storage.Table; +/** + * Spark SQL function implementation. + */ public class SqlFunctionImplementation extends FunctionExpression implements TableFunctionImplementation { @@ -32,10 +35,9 @@ public class SqlFunctionImplementation extends FunctionExpression private final SparkClient sparkClient; /** - * Required argument constructor. - * * @param functionName name of the function * @param arguments a list of expressions + * @param sparkClient spark client */ public SqlFunctionImplementation( FunctionName functionName, List arguments, SparkClient sparkClient) { @@ -72,6 +74,10 @@ public Table applyArguments() { return new SparkMetricTable(sparkClient, buildQueryFromSqlFunction(arguments)); } + /** + * @param arguments spark sql function arguments + * @return spark query request + */ private SparkQueryRequest buildQueryFromSqlFunction(List arguments) { SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java index 25f850ddb33..1959cb1fd89 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java @@ -25,6 +25,9 @@ import org.opensearch.sql.spark.client.SparkClient; import org.opensearch.sql.spark.functions.implementation.SqlFunctionImplementation; +/** + * Function resolver for sql function of spark connector. + */ @RequiredArgsConstructor public class SqlTableFunctionResolver implements FunctionResolver { private final SparkClient sparkClient; diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java index 0a60affebc2..45f768340fa 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java @@ -13,7 +13,7 @@ import org.opensearch.sql.storage.read.TableScanBuilder; /** - * TableScanBuilder for sql table function of spark connector. + * TableScanBuilder for sql function of spark connector. */ @AllArgsConstructor public class SqlFunctionTableScanBuilder extends TableScanBuilder { diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java index 35d53cfb3f9..0d79e684693 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java @@ -16,6 +16,9 @@ import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.sql.storage.StorageEngine; +/** + * Storage factory implementation for spark connector. + */ @RequiredArgsConstructor public class SparkStorageFactory implements DataSourceFactory { private final Client client; @@ -34,6 +37,10 @@ public DataSource createDataSource(DataSourceMetadata metadata) { getStorageEngine(metadata.getProperties())); } + /** + * @param requiredConfig spark config options + * @return spark client + */ StorageEngine getStorageEngine(Map requiredConfig) { SparkClient sparkClient = null; //TODO: Initialize spark client From 106218e319f7415c13580f432481cc2303d68f08 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 26 Jun 2023 13:52:45 -0700 Subject: [PATCH 11/15] Fix checkstyle in comments Signed-off-by: Rupal Mahajan --- .../opensearch/sql/spark/client/SparkClient.java | 3 ++- .../implementation/SqlFunctionImplementation.java | 4 ++++ .../sql/spark/storage/SparkStorageFactory.java | 4 +++- .../sql/spark/storage/SparkStorageFactoryTest.java | 13 +++++++++++++ 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java index 5444dff0b14..3ceeb0e94c7 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java @@ -13,9 +13,10 @@ */ public interface SparkClient { /** + * This method executes spark sql query. + * * @param query spark sql query * @return spark query response - * @throws IOException */ JSONObject sql(String query) throws IOException; } \ No newline at end of file diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java index b5b56452219..2aebaef4ae7 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java @@ -35,6 +35,8 @@ public class SqlFunctionImplementation extends FunctionExpression private final SparkClient sparkClient; /** + * Constructor for spark sql function. + * * @param functionName name of the function * @param arguments a list of expressions * @param sparkClient spark client @@ -75,6 +77,8 @@ public Table applyArguments() { } /** + * This method build spark query request. + * * @param arguments spark sql function arguments * @return spark query request */ diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java index 0d79e684693..25366cdc9a4 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java @@ -38,8 +38,10 @@ public DataSource createDataSource(DataSourceMetadata metadata) { } /** + * This function gets spark storage engine. + * * @param requiredConfig spark config options - * @return spark client + * @return spark storage engine object */ StorageEngine getStorageEngine(Map requiredConfig) { SparkClient sparkClient = null; diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java index c8c2bd5d9ff..4142cfe3556 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java @@ -14,6 +14,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.Client; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.storage.StorageEngine; @@ -41,4 +43,15 @@ void testGetStorageEngine() { Assertions.assertTrue(storageEngine instanceof SparkStorageEngine); } + @Test + void createDataSourceSuccessWithLocalhost() { + DataSourceMetadata metadata = new DataSourceMetadata(); + metadata.setName("spark"); + metadata.setConnector(DataSourceType.SPARK); + metadata.setProperties(new HashMap<>()); + + DataSource dataSource = new SparkStorageFactory(client, settings).createDataSource(metadata); + Assertions.assertTrue(dataSource.getStorageEngine() instanceof SparkStorageEngine); + } + } From b953aa6030bd18709c259cd17dc89acbd05379de Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 26 Jun 2023 15:24:40 -0700 Subject: [PATCH 12/15] Update tests Signed-off-by: Rupal Mahajan --- .../resolver/SqlTableFunctionResolver.java | 20 +---- .../SqlFunctionTableScanBuilderTest.java | 45 ++++++++++ .../spark/storage/SparkMetricTableTest.java | 87 +++++++++++++++++++ .../spark/storage/SparkStorageEngineTest.java | 9 ++ 4 files changed, 143 insertions(+), 18 deletions(-) create mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionTableScanBuilderTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkMetricTableTest.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java index 1959cb1fd89..09a6d3d12c4 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java @@ -53,8 +53,8 @@ public Pair resolve(FunctionSignature unreso if (arguments.size() != argumentNames.size()) { throw new SemanticCheckException( - generateErrorMessageForMissingArguments(argumentsPassedByPosition, arguments, - argumentNames)); + String.format("Missing arguments:[%s]", + String.join(",", argumentNames.subList(arguments.size(), argumentNames.size())))); } if (argumentsPassedByPosition) { @@ -70,22 +70,6 @@ public Pair resolve(FunctionSignature unreso return Pair.of(functionSignature, functionBuilder); } - private String generateErrorMessageForMissingArguments(Boolean argumentsPassedByPosition, - List arguments, - List argumentNames) { - if (argumentsPassedByPosition) { - return String.format("Missing arguments:[%s]", - String.join(",", argumentNames.subList(arguments.size(), argumentNames.size()))); - } else { - Set requiredArguments = new HashSet<>(argumentNames); - Set providedArguments = - arguments.stream().map(expression -> ((NamedArgumentExpression) expression).getArgName()) - .collect(Collectors.toSet()); - requiredArguments.removeAll(providedArguments); - return String.format("Missing arguments:[%s]", String.join(",", requiredArguments)); - } - } - @Override public FunctionName getFunctionName() { return FunctionName.of(SQL); diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionTableScanBuilderTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionTableScanBuilderTest.java new file mode 100644 index 00000000000..d52cdb70309 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionTableScanBuilderTest.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.functions; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.opensearch.sql.planner.logical.LogicalProject; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.functions.scan.SqlFunctionTableScanBuilder; +import org.opensearch.sql.spark.request.SparkQueryRequest; +import org.opensearch.sql.storage.TableScanOperator; + +public class SqlFunctionTableScanBuilderTest { + @Mock + private SparkClient sparkClient; + + @Mock + private LogicalProject logicalProject; + + @Test + void testBuild() { + SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); + sparkQueryRequest.setSql("select 1"); + + SqlFunctionTableScanBuilder sqlFunctionTableScanBuilder + = new SqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); + TableScanOperator sqlFunctionTableScanOperator + = sqlFunctionTableScanBuilder.build(); + Assertions.assertNull(sqlFunctionTableScanOperator); + } + + @Test + void testPushProject() { + SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); + sparkQueryRequest.setSql("select 1"); + + SqlFunctionTableScanBuilder sqlFunctionTableScanBuilder + = new SqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); + Assertions.assertTrue(sqlFunctionTableScanBuilder.pushDownProject(logicalProject)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkMetricTableTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkMetricTableTest.java new file mode 100644 index 00000000000..dce4a7f0ffb --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkMetricTableTest.java @@ -0,0 +1,87 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.relation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.expression.NamedExpression; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.spark.client.SparkClient; +import org.opensearch.sql.spark.functions.scan.SqlFunctionTableScanBuilder; +import org.opensearch.sql.spark.request.SparkQueryRequest; +import org.opensearch.sql.storage.read.TableScanBuilder; + +@ExtendWith(MockitoExtension.class) +public class SparkMetricTableTest { + @Mock + private SparkClient client; + + @Test + void testUnsupportedOperation() { + SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); + SparkMetricTable sparkMetricTable = + new SparkMetricTable(client, sparkQueryRequest); + + assertThrows(UnsupportedOperationException.class, sparkMetricTable::exists); + assertThrows(UnsupportedOperationException.class, + () -> sparkMetricTable.create(Collections.emptyMap())); + } + + @Test + void testCreateScanBuilderWithSqlTableFunction() { + SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); + sparkQueryRequest.setSql("select 1"); + SparkMetricTable sparkMetricTable = + new SparkMetricTable(client, sparkQueryRequest); + TableScanBuilder tableScanBuilder = sparkMetricTable.createScanBuilder(); + Assertions.assertNotNull(tableScanBuilder); + Assertions.assertTrue(tableScanBuilder instanceof SqlFunctionTableScanBuilder); + } + + @Test + @SneakyThrows + void testGetFieldTypesFromSparkQueryRequest() { + SparkMetricTable sparkMetricTable + = new SparkMetricTable(client, new SparkQueryRequest()); + Map expectedFieldTypes = new HashMap<>(); + Map fieldTypes = sparkMetricTable.getFieldTypes(); + + assertEquals(expectedFieldTypes, fieldTypes); + verifyNoMoreInteractions(client); + assertNotNull(sparkMetricTable.getSparkQueryRequest()); + } + + @Test + void testImplementWithSqlFunction() { + SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); + sparkQueryRequest.setSql("select 1"); + SparkMetricTable sparkMetricTable = + new SparkMetricTable(client, sparkQueryRequest); + List finalProjectList = new ArrayList<>(); + PhysicalPlan plan = sparkMetricTable.implement( + project(relation("sql", sparkMetricTable), + finalProjectList, null)); + assertNull(plan); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java index 475a7cb1b29..9f0e0d00c72 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Collection; @@ -14,9 +15,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.expression.function.FunctionResolver; import org.opensearch.sql.spark.client.SparkClient; import org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver; +import org.opensearch.sql.storage.Table; @ExtendWith(MockitoExtension.class) public class SparkStorageEngineTest { @@ -34,4 +37,10 @@ public void getFunctions() { functionResolverCollection.iterator().next() instanceof SqlTableFunctionResolver); } + @Test + public void getTable() { + SparkStorageEngine engine = new SparkStorageEngine(client); + Table table = engine.getTable(new DataSourceSchemaName("spark", "default"), ""); + assertNull(table); + } } From 0df37a891be38a3fb407c73489d86f47ab721e1f Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Wed, 28 Jun 2023 10:39:11 -0700 Subject: [PATCH 13/15] Address PR comments Signed-off-by: Rupal Mahajan --- spark/lombok.config | 2 +- .../opensearch/sql/spark/client/SparkClient.java | 2 +- .../SqlFunctionImplementation.java | 16 +++++++--------- .../scan/SqlFunctionTableScanBuilder.java | 2 +- .../sql/spark/request/SparkQueryRequest.java | 2 +- .../sql/spark/storage/SparkMetricTable.java | 2 +- .../sql/spark/storage/SparkStorageEngine.java | 2 +- .../sql/spark/storage/SparkStorageFactory.java | 2 +- .../functions/SqlFunctionImplementationTest.java | 2 +- 9 files changed, 15 insertions(+), 17 deletions(-) diff --git a/spark/lombok.config b/spark/lombok.config index aac13295bd7..189c0bef98b 100644 --- a/spark/lombok.config +++ b/spark/lombok.config @@ -1,3 +1,3 @@ # This file is generated by the 'io.freefair.lombok' Gradle plugin config.stopBubbling = true -lombok.addLombokGeneratedAnnotation = true \ No newline at end of file +lombok.addLombokGeneratedAnnotation = true diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java index 3ceeb0e94c7..99d8600dd06 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java @@ -19,4 +19,4 @@ public interface SparkClient { * @return spark query response */ JSONObject sql(String query) throws IOException; -} \ No newline at end of file +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java index 2aebaef4ae7..977561fb054 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java @@ -77,7 +77,7 @@ public Table applyArguments() { } /** - * This method build spark query request. + * This method builds a spark query request. * * @param arguments spark sql function arguments * @return spark query request @@ -89,16 +89,14 @@ private SparkQueryRequest buildQueryFromSqlFunction(List arguments) String argName = ((NamedArgumentExpression) arg).getArgName(); Expression argValue = ((NamedArgumentExpression) arg).getValue(); ExprValue literalValue = argValue.valueOf(); - switch (argName) { - case QUERY: - sparkQueryRequest.setSql((String) literalValue.value()); - break; - default: - throw new ExpressionEvaluationException( - String.format("Invalid Function Argument:%s", argName)); + if (argName.equals(QUERY)) { + sparkQueryRequest.setSql((String) literalValue.value()); + } else { + throw new ExpressionEvaluationException( + String.format("Invalid Function Argument:%s", argName)); } }); return sparkQueryRequest; } -} \ No newline at end of file +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java index 45f768340fa..1eb249cde6e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java @@ -32,4 +32,4 @@ public TableScanOperator build() { public boolean pushDownProject(LogicalProject project) { return true; } -} \ No newline at end of file +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java b/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java index 93e4ad94191..8465b6a3426 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java @@ -26,4 +26,4 @@ public class SparkQueryRequest { */ private String sql; -} \ No newline at end of file +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java index b82ad3fa72e..21534a09c29 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java @@ -65,4 +65,4 @@ public PhysicalPlan implement(LogicalPlan plan) { public TableScanBuilder createScanBuilder() { return new SqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); } -} \ No newline at end of file +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java index 565a4a85854..3c46f4a8cb1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java @@ -32,4 +32,4 @@ public Collection getFunctions() { public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { return null; } -} \ No newline at end of file +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java index 25366cdc9a4..e4da29f6b49 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java @@ -48,4 +48,4 @@ StorageEngine getStorageEngine(Map requiredConfig) { //TODO: Initialize spark client return new SparkStorageEngine(sparkClient); } -} \ No newline at end of file +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java index 9b854bf02c4..118f24e3ac1 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java @@ -74,4 +74,4 @@ void testApplyArgumentsException() { assertEquals("Invalid Function Argument:tmp", exception.getMessage()); } -} \ No newline at end of file +} From 776bc3660c56893f9042c70fac73d477129cd57f Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Tue, 4 Jul 2023 15:34:05 -0700 Subject: [PATCH 14/15] Refactor class name Signed-off-by: Rupal Mahajan --- .../sql/datasource/model/DataSourceType.java | 1 - spark/.gitignore | 42 ------------------ spark/lombok.config | 3 -- ...va => SparkSqlFunctionImplementation.java} | 10 ++--- ...ava => SparkSqlTableFunctionResolver.java} | 11 ++--- ... => SparkSqlFunctionTableScanBuilder.java} | 2 +- .../sql/spark/request/SparkQueryRequest.java | 8 ---- .../sql/spark/storage/SparkStorageEngine.java | 4 +- ...{SparkMetricTable.java => SparkTable.java} | 10 ++--- ...> SparkSqlFunctionImplementationTest.java} | 34 +++++++------- ...SparkSqlFunctionTableScanBuilderTest.java} | 16 +++---- ...=> SparkSqlTableFunctionResolverTest.java} | 44 +++++++++---------- .../spark/storage/SparkStorageEngineTest.java | 4 +- ...tricTableTest.java => SparkTableTest.java} | 36 +++++++-------- 14 files changed, 84 insertions(+), 141 deletions(-) delete mode 100644 spark/.gitignore delete mode 100644 spark/lombok.config rename spark/src/main/java/org/opensearch/sql/spark/functions/implementation/{SqlFunctionImplementation.java => SparkSqlFunctionImplementation.java} (90%) rename spark/src/main/java/org/opensearch/sql/spark/functions/resolver/{SqlTableFunctionResolver.java => SparkSqlTableFunctionResolver.java} (86%) rename spark/src/main/java/org/opensearch/sql/spark/functions/scan/{SqlFunctionTableScanBuilder.java => SparkSqlFunctionTableScanBuilder.java} (92%) rename spark/src/main/java/org/opensearch/sql/spark/storage/{SparkMetricTable.java => SparkTable.java} (82%) rename spark/src/test/java/org/opensearch/sql/spark/functions/{SqlFunctionImplementationTest.java => SparkSqlFunctionImplementationTest.java} (66%) rename spark/src/test/java/org/opensearch/sql/spark/functions/{SqlFunctionTableScanBuilderTest.java => SparkSqlFunctionTableScanBuilderTest.java} (62%) rename spark/src/test/java/org/opensearch/sql/spark/functions/{SqlTableFunctionResolverTest.java => SparkSqlTableFunctionResolverTest.java} (81%) rename spark/src/test/java/org/opensearch/sql/spark/storage/{SparkMetricTableTest.java => SparkTableTest.java} (68%) diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java index 78c431a81f5..5010e41942d 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -8,7 +8,6 @@ public enum DataSourceType { PROMETHEUS("prometheus"), OPENSEARCH("opensearch"), - JDBC("jdbc"), SPARK("spark"); private String text; diff --git a/spark/.gitignore b/spark/.gitignore deleted file mode 100644 index b63da4551b2..00000000000 --- a/spark/.gitignore +++ /dev/null @@ -1,42 +0,0 @@ -.gradle -build/ -!gradle/wrapper/gradle-wrapper.jar -!**/src/main/**/build/ -!**/src/test/**/build/ - -### IntelliJ IDEA ### -.idea/modules.xml -.idea/jarRepositories.xml -.idea/compiler.xml -.idea/libraries/ -*.iws -*.iml -*.ipr -out/ -!**/src/main/**/out/ -!**/src/test/**/out/ - -### Eclipse ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache -bin/ -!**/src/main/**/bin/ -!**/src/test/**/bin/ - -### NetBeans ### -/nbproject/private/ -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ - -### VS Code ### -.vscode/ - -### Mac OS ### -.DS_Store \ No newline at end of file diff --git a/spark/lombok.config b/spark/lombok.config deleted file mode 100644 index 189c0bef98b..00000000000 --- a/spark/lombok.config +++ /dev/null @@ -1,3 +0,0 @@ -# This file is generated by the 'io.freefair.lombok' Gradle plugin -config.stopBubbling = true -lombok.addLombokGeneratedAnnotation = true diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java similarity index 90% rename from spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java rename to spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java index 977561fb054..1936c266def 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SqlFunctionImplementation.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java @@ -5,7 +5,7 @@ package org.opensearch.sql.spark.functions.implementation; -import static org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver.QUERY; +import static org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver.QUERY; import java.util.List; import java.util.stream.Collectors; @@ -21,13 +21,13 @@ import org.opensearch.sql.expression.function.TableFunctionImplementation; import org.opensearch.sql.spark.client.SparkClient; import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.spark.storage.SparkMetricTable; +import org.opensearch.sql.spark.storage.SparkTable; import org.opensearch.sql.storage.Table; /** * Spark SQL function implementation. */ -public class SqlFunctionImplementation extends FunctionExpression +public class SparkSqlFunctionImplementation extends FunctionExpression implements TableFunctionImplementation { private final FunctionName functionName; @@ -41,7 +41,7 @@ public class SqlFunctionImplementation extends FunctionExpression * @param arguments a list of expressions * @param sparkClient spark client */ - public SqlFunctionImplementation( + public SparkSqlFunctionImplementation( FunctionName functionName, List arguments, SparkClient sparkClient) { super(functionName, arguments); this.functionName = functionName; @@ -73,7 +73,7 @@ public String toString() { @Override public Table applyArguments() { - return new SparkMetricTable(sparkClient, buildQueryFromSqlFunction(arguments)); + return new SparkTable(sparkClient, buildQueryFromSqlFunction(arguments)); } /** diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java similarity index 86% rename from spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java rename to spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java index 09a6d3d12c4..624600e1a8e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SqlTableFunctionResolver.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java @@ -8,10 +8,7 @@ import static org.opensearch.sql.data.type.ExprCoreType.STRING; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -23,13 +20,13 @@ import org.opensearch.sql.expression.function.FunctionResolver; import org.opensearch.sql.expression.function.FunctionSignature; import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.implementation.SqlFunctionImplementation; +import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation; /** * Function resolver for sql function of spark connector. */ @RequiredArgsConstructor -public class SqlTableFunctionResolver implements FunctionResolver { +public class SparkSqlTableFunctionResolver implements FunctionResolver { private final SparkClient sparkClient; public static final String SQL = "sql"; @@ -63,9 +60,9 @@ public Pair resolve(FunctionSignature unreso namedArguments.add(new NamedArgumentExpression(argumentNames.get(i), ((NamedArgumentExpression) arguments.get(i)).getValue())); } - return new SqlFunctionImplementation(functionName, namedArguments, sparkClient); + return new SparkSqlFunctionImplementation(functionName, namedArguments, sparkClient); } - return new SqlFunctionImplementation(functionName, arguments, sparkClient); + return new SparkSqlFunctionImplementation(functionName, arguments, sparkClient); }; return Pair.of(functionSignature, functionBuilder); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java similarity index 92% rename from spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java rename to spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java index 1eb249cde6e..561f6f29330 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SqlFunctionTableScanBuilder.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java @@ -16,7 +16,7 @@ * TableScanBuilder for sql function of spark connector. */ @AllArgsConstructor -public class SqlFunctionTableScanBuilder extends TableScanBuilder { +public class SparkSqlFunctionTableScanBuilder extends TableScanBuilder { private final SparkClient sparkClient; diff --git a/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java b/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java index 8465b6a3426..bc0944a7841 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java @@ -5,20 +5,12 @@ package org.opensearch.sql.spark.request; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import lombok.ToString; /** * Spark query request. */ -@EqualsAndHashCode @Data -@ToString -@AllArgsConstructor -@NoArgsConstructor public class SparkQueryRequest { /** diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java index 3c46f4a8cb1..86697833a15 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java @@ -11,7 +11,7 @@ import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.expression.function.FunctionResolver; import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver; +import org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver; import org.opensearch.sql.storage.StorageEngine; import org.opensearch.sql.storage.Table; @@ -25,7 +25,7 @@ public class SparkStorageEngine implements StorageEngine { @Override public Collection getFunctions() { return Collections.singletonList( - new SqlTableFunctionResolver(sparkClient)); + new SparkSqlTableFunctionResolver(sparkClient)); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java similarity index 82% rename from spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java rename to spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java index 21534a09c29..344db8ab7a9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkMetricTable.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java @@ -13,7 +13,7 @@ import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.scan.SqlFunctionTableScanBuilder; +import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanBuilder; import org.opensearch.sql.spark.request.SparkQueryRequest; import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.read.TableScanBuilder; @@ -22,7 +22,7 @@ * Spark table implementation. * This can be constructed from SparkQueryRequest. */ -public class SparkMetricTable implements Table { +public class SparkTable implements Table { private final SparkClient sparkClient; @@ -32,8 +32,8 @@ public class SparkMetricTable implements Table { /** * Constructor for entire Sql Request. */ - public SparkMetricTable(SparkClient sparkService, - @Nonnull SparkQueryRequest sparkQueryRequest) { + public SparkTable(SparkClient sparkService, + @Nonnull SparkQueryRequest sparkQueryRequest) { this.sparkClient = sparkService; this.sparkQueryRequest = sparkQueryRequest; } @@ -63,6 +63,6 @@ public PhysicalPlan implement(LogicalPlan plan) { @Override public TableScanBuilder createScanBuilder() { - return new SqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); + return new SparkSqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java similarity index 66% rename from spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java rename to spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java index 118f24e3ac1..33c65ff2780 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionImplementationTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java @@ -20,12 +20,12 @@ import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.function.FunctionName; import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.implementation.SqlFunctionImplementation; +import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation; import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.spark.storage.SparkMetricTable; +import org.opensearch.sql.spark.storage.SparkTable; @ExtendWith(MockitoExtension.class) -public class SqlFunctionImplementationTest { +public class SparkSqlFunctionImplementationTest { @Mock private SparkClient client; @@ -34,15 +34,15 @@ void testValueOfAndTypeToString() { FunctionName functionName = new FunctionName("sql"); List namedArgumentExpressionList = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); - SqlFunctionImplementation sqlFunctionImplementation - = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); + SparkSqlFunctionImplementation sparkSqlFunctionImplementation + = new SparkSqlFunctionImplementation(functionName, namedArgumentExpressionList, client); UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, - () -> sqlFunctionImplementation.valueOf()); + () -> sparkSqlFunctionImplementation.valueOf()); assertEquals("Spark defined function [sql] is only " + "supported in SOURCE clause with spark connector catalog", exception.getMessage()); assertEquals("sql(query=\"select 1\")", - sqlFunctionImplementation.toString()); - assertEquals(ExprCoreType.STRUCT, sqlFunctionImplementation.type()); + sparkSqlFunctionImplementation.toString()); + assertEquals(ExprCoreType.STRUCT, sparkSqlFunctionImplementation.type()); } @@ -51,13 +51,13 @@ void testApplyArguments() { FunctionName functionName = new FunctionName("sql"); List namedArgumentExpressionList = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); - SqlFunctionImplementation sqlFunctionImplementation - = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - SparkMetricTable sparkMetricTable - = (SparkMetricTable) sqlFunctionImplementation.applyArguments(); - assertNotNull(sparkMetricTable.getSparkQueryRequest()); + SparkSqlFunctionImplementation sparkSqlFunctionImplementation + = new SparkSqlFunctionImplementation(functionName, namedArgumentExpressionList, client); + SparkTable sparkTable + = (SparkTable) sparkSqlFunctionImplementation.applyArguments(); + assertNotNull(sparkTable.getSparkQueryRequest()); SparkQueryRequest sparkQueryRequest - = sparkMetricTable.getSparkQueryRequest(); + = sparkTable.getSparkQueryRequest(); assertEquals("select 1", sparkQueryRequest.getSql()); } @@ -67,10 +67,10 @@ void testApplyArgumentsException() { List namedArgumentExpressionList = List.of(DSL.namedArgument("query", DSL.literal("select 1")), DSL.namedArgument("tmp", DSL.literal(12345))); - SqlFunctionImplementation sqlFunctionImplementation - = new SqlFunctionImplementation(functionName, namedArgumentExpressionList, client); + SparkSqlFunctionImplementation sparkSqlFunctionImplementation + = new SparkSqlFunctionImplementation(functionName, namedArgumentExpressionList, client); ExpressionEvaluationException exception = assertThrows(ExpressionEvaluationException.class, - () -> sqlFunctionImplementation.applyArguments()); + () -> sparkSqlFunctionImplementation.applyArguments()); assertEquals("Invalid Function Argument:tmp", exception.getMessage()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionTableScanBuilderTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java similarity index 62% rename from spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionTableScanBuilderTest.java rename to spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java index d52cdb70309..f5fb0983cc2 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlFunctionTableScanBuilderTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java @@ -10,11 +10,11 @@ import org.mockito.Mock; import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.scan.SqlFunctionTableScanBuilder; +import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanBuilder; import org.opensearch.sql.spark.request.SparkQueryRequest; import org.opensearch.sql.storage.TableScanOperator; -public class SqlFunctionTableScanBuilderTest { +public class SparkSqlFunctionTableScanBuilderTest { @Mock private SparkClient sparkClient; @@ -26,10 +26,10 @@ void testBuild() { SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); sparkQueryRequest.setSql("select 1"); - SqlFunctionTableScanBuilder sqlFunctionTableScanBuilder - = new SqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); + SparkSqlFunctionTableScanBuilder sparkSqlFunctionTableScanBuilder + = new SparkSqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); TableScanOperator sqlFunctionTableScanOperator - = sqlFunctionTableScanBuilder.build(); + = sparkSqlFunctionTableScanBuilder.build(); Assertions.assertNull(sqlFunctionTableScanOperator); } @@ -38,8 +38,8 @@ void testPushProject() { SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); sparkQueryRequest.setSql("select 1"); - SqlFunctionTableScanBuilder sqlFunctionTableScanBuilder - = new SqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); - Assertions.assertTrue(sqlFunctionTableScanBuilder.pushDownProject(logicalProject)); + SparkSqlFunctionTableScanBuilder sparkSqlFunctionTableScanBuilder + = new SparkSqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); + Assertions.assertTrue(sparkSqlFunctionTableScanBuilder.pushDownProject(logicalProject)); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java similarity index 81% rename from spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java rename to spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java index 0a251ef05cf..491e9bbd735 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SqlTableFunctionResolverTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java @@ -27,13 +27,13 @@ import org.opensearch.sql.expression.function.FunctionSignature; import org.opensearch.sql.expression.function.TableFunctionImplementation; import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.implementation.SqlFunctionImplementation; -import org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver; +import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation; +import org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver; import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.spark.storage.SparkMetricTable; +import org.opensearch.sql.spark.storage.SparkTable; @ExtendWith(MockitoExtension.class) -public class SqlTableFunctionResolverTest { +public class SparkSqlTableFunctionResolverTest { @Mock private SparkClient client; @@ -42,8 +42,8 @@ public class SqlTableFunctionResolverTest { @Test void testResolve() { - SqlTableFunctionResolver sqlTableFunctionResolver - = new SqlTableFunctionResolver(client); + SparkSqlTableFunctionResolver sqlTableFunctionResolver + = new SparkSqlTableFunctionResolver(client); FunctionName functionName = FunctionName.of("sql"); List expressions = List.of(DSL.namedArgument("query", DSL.literal("select 1"))); @@ -57,19 +57,19 @@ void testResolve() { FunctionBuilder functionBuilder = resolution.getValue(); TableFunctionImplementation functionImplementation = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); - assertTrue(functionImplementation instanceof SqlFunctionImplementation); - SparkMetricTable sparkMetricTable - = (SparkMetricTable) functionImplementation.applyArguments(); - assertNotNull(sparkMetricTable.getSparkQueryRequest()); + assertTrue(functionImplementation instanceof SparkSqlFunctionImplementation); + SparkTable sparkTable + = (SparkTable) functionImplementation.applyArguments(); + assertNotNull(sparkTable.getSparkQueryRequest()); SparkQueryRequest sparkQueryRequest = - sparkMetricTable.getSparkQueryRequest(); + sparkTable.getSparkQueryRequest(); assertEquals("select 1", sparkQueryRequest.getSql()); } @Test void testArgumentsPassedByPosition() { - SqlTableFunctionResolver sqlTableFunctionResolver - = new SqlTableFunctionResolver(client); + SparkSqlTableFunctionResolver sqlTableFunctionResolver + = new SparkSqlTableFunctionResolver(client); FunctionName functionName = FunctionName.of("sql"); List expressions = List.of(DSL.namedArgument(null, DSL.literal("select 1"))); @@ -85,19 +85,19 @@ void testArgumentsPassedByPosition() { FunctionBuilder functionBuilder = resolution.getValue(); TableFunctionImplementation functionImplementation = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); - assertTrue(functionImplementation instanceof SqlFunctionImplementation); - SparkMetricTable sparkMetricTable - = (SparkMetricTable) functionImplementation.applyArguments(); - assertNotNull(sparkMetricTable.getSparkQueryRequest()); + assertTrue(functionImplementation instanceof SparkSqlFunctionImplementation); + SparkTable sparkTable + = (SparkTable) functionImplementation.applyArguments(); + assertNotNull(sparkTable.getSparkQueryRequest()); SparkQueryRequest sparkQueryRequest = - sparkMetricTable.getSparkQueryRequest(); + sparkTable.getSparkQueryRequest(); assertEquals("select 1", sparkQueryRequest.getSql()); } @Test void testMixedArgumentTypes() { - SqlTableFunctionResolver sqlTableFunctionResolver - = new SqlTableFunctionResolver(client); + SparkSqlTableFunctionResolver sqlTableFunctionResolver + = new SparkSqlTableFunctionResolver(client); FunctionName functionName = FunctionName.of("sql"); List expressions = List.of(DSL.namedArgument("query", DSL.literal("select 1")), @@ -118,8 +118,8 @@ void testMixedArgumentTypes() { @Test void testWrongArgumentsSizeWhenPassedByName() { - SqlTableFunctionResolver sqlTableFunctionResolver - = new SqlTableFunctionResolver(client); + SparkSqlTableFunctionResolver sqlTableFunctionResolver + = new SparkSqlTableFunctionResolver(client); FunctionName functionName = FunctionName.of("sql"); List expressions = List.of(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java index 9f0e0d00c72..2c4f8a3567f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java @@ -18,7 +18,7 @@ import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.expression.function.FunctionResolver; import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver; +import org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver; import org.opensearch.sql.storage.Table; @ExtendWith(MockitoExtension.class) @@ -34,7 +34,7 @@ public void getFunctions() { assertNotNull(functionResolverCollection); assertEquals(1, functionResolverCollection.size()); assertTrue( - functionResolverCollection.iterator().next() instanceof SqlTableFunctionResolver); + functionResolverCollection.iterator().next() instanceof SparkSqlTableFunctionResolver); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkMetricTableTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java similarity index 68% rename from spark/src/test/java/org/opensearch/sql/spark/storage/SparkMetricTableTest.java rename to spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java index dce4a7f0ffb..d3487d65c13 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkMetricTableTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java @@ -28,59 +28,59 @@ import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.scan.SqlFunctionTableScanBuilder; +import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanBuilder; import org.opensearch.sql.spark.request.SparkQueryRequest; import org.opensearch.sql.storage.read.TableScanBuilder; @ExtendWith(MockitoExtension.class) -public class SparkMetricTableTest { +public class SparkTableTest { @Mock private SparkClient client; @Test void testUnsupportedOperation() { SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - SparkMetricTable sparkMetricTable = - new SparkMetricTable(client, sparkQueryRequest); + SparkTable sparkTable = + new SparkTable(client, sparkQueryRequest); - assertThrows(UnsupportedOperationException.class, sparkMetricTable::exists); + assertThrows(UnsupportedOperationException.class, sparkTable::exists); assertThrows(UnsupportedOperationException.class, - () -> sparkMetricTable.create(Collections.emptyMap())); + () -> sparkTable.create(Collections.emptyMap())); } @Test void testCreateScanBuilderWithSqlTableFunction() { SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); sparkQueryRequest.setSql("select 1"); - SparkMetricTable sparkMetricTable = - new SparkMetricTable(client, sparkQueryRequest); - TableScanBuilder tableScanBuilder = sparkMetricTable.createScanBuilder(); + SparkTable sparkTable = + new SparkTable(client, sparkQueryRequest); + TableScanBuilder tableScanBuilder = sparkTable.createScanBuilder(); Assertions.assertNotNull(tableScanBuilder); - Assertions.assertTrue(tableScanBuilder instanceof SqlFunctionTableScanBuilder); + Assertions.assertTrue(tableScanBuilder instanceof SparkSqlFunctionTableScanBuilder); } @Test @SneakyThrows void testGetFieldTypesFromSparkQueryRequest() { - SparkMetricTable sparkMetricTable - = new SparkMetricTable(client, new SparkQueryRequest()); + SparkTable sparkTable + = new SparkTable(client, new SparkQueryRequest()); Map expectedFieldTypes = new HashMap<>(); - Map fieldTypes = sparkMetricTable.getFieldTypes(); + Map fieldTypes = sparkTable.getFieldTypes(); assertEquals(expectedFieldTypes, fieldTypes); verifyNoMoreInteractions(client); - assertNotNull(sparkMetricTable.getSparkQueryRequest()); + assertNotNull(sparkTable.getSparkQueryRequest()); } @Test void testImplementWithSqlFunction() { SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); sparkQueryRequest.setSql("select 1"); - SparkMetricTable sparkMetricTable = - new SparkMetricTable(client, sparkQueryRequest); + SparkTable sparkTable = + new SparkTable(client, sparkQueryRequest); List finalProjectList = new ArrayList<>(); - PhysicalPlan plan = sparkMetricTable.implement( - project(relation("sql", sparkMetricTable), + PhysicalPlan plan = sparkTable.implement( + project(relation("sql", sparkTable), finalProjectList, null)); assertNull(plan); } From be15dae4bb5130ac2c2ed7b19c7270eadd130a87 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Wed, 5 Jul 2023 08:29:47 -0700 Subject: [PATCH 15/15] Address PR comment Signed-off-by: Rupal Mahajan --- .../opensearch/sql/spark/storage/SparkStorageEngine.java | 2 +- .../sql/spark/storage/SparkStorageEngineTest.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java index 86697833a15..a5e35ecc4cb 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java @@ -30,6 +30,6 @@ public Collection getFunctions() { @Override public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { - return null; + throw new RuntimeException("Unable to get table from storage engine."); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java index 2c4f8a3567f..7adcc725fa5 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java @@ -8,6 +8,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Collection; @@ -40,7 +41,8 @@ public void getFunctions() { @Test public void getTable() { SparkStorageEngine engine = new SparkStorageEngine(client); - Table table = engine.getTable(new DataSourceSchemaName("spark", "default"), ""); - assertNull(table); + RuntimeException exception = assertThrows(RuntimeException.class, + () -> engine.getTable(new DataSourceSchemaName("spark", "default"), "")); + assertEquals("Unable to get table from storage engine.", exception.getMessage()); } }