From be9909ec5ecda1025da8a11f1f303aec57b20718 Mon Sep 17 00:00:00 2001 From: Mayank Aggarwal <32715597+aggarwalmayank@users.noreply.github.com> Date: Wed, 20 Aug 2025 22:48:35 +0530 Subject: [PATCH 1/2] adding capability in SQLQueryUtils to identify if SQL query is for creating a table or not. (#4029) * adding capability to identify if SQL query is create table or not Signed-off-by: Mayank Aggarwal * fixing codestyle violations Signed-off-by: Mayank Aggarwal * updating variable name Signed-off-by: Mayank Aggarwal --------- Signed-off-by: Mayank Aggarwal Co-authored-by: Mayank Aggarwal --- .../sql/spark/utils/SQLQueryUtils.java | 43 +++++++++++-- .../sql/spark/utils/SQLQueryUtilsTest.java | 64 +++++++++++++++++++ 2 files changed, 100 insertions(+), 7 deletions(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index f1ec148c622..de14eee54e6 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -43,14 +43,24 @@ public class SQLQueryUtils { private static final Logger logger = LogManager.getLogger(SQLQueryUtils.class); public static List extractFullyQualifiedTableNames(String sqlQuery) { - SqlBaseParser sqlBaseParser = - new SqlBaseParser( - new CommonTokenStream(new SqlBaseLexer(new CaseInsensitiveCharStream(sqlQuery)))); - sqlBaseParser.addErrorListener(new SyntaxAnalysisErrorListener()); + return extractFullyQualifiedTableNamesWithMetadata(sqlQuery).getFullyQualifiedTableNames(); + } + + public static TableExtractionResult extractFullyQualifiedTableNamesWithMetadata(String sqlQuery) { + SqlBaseParser sqlBaseParser = getBaseParser(sqlQuery); StatementContext statement = sqlBaseParser.statement(); - SparkSqlTableNameVisitor sparkSqlTableNameVisitor = new SparkSqlTableNameVisitor(); - statement.accept(sparkSqlTableNameVisitor); - return sparkSqlTableNameVisitor.getFullyQualifiedTableNames(); + SparkSqlTableNameVisitor visitor = new SparkSqlTableNameVisitor(); + statement.accept(visitor); + + // Remove duplicate table names + List uniqueFullyQualifiedTableNames = new LinkedList<>(); + for (FullyQualifiedTableName fullyQualifiedTableName : visitor.getFullyQualifiedTableNames()) { + if (!uniqueFullyQualifiedTableNames.contains(fullyQualifiedTableName)) { + uniqueFullyQualifiedTableNames.add(fullyQualifiedTableName); + } + } + + return new TableExtractionResult(uniqueFullyQualifiedTableNames, visitor.isCreateTable()); } public static IndexQueryDetails extractIndexDetails(String sqlQuery) { @@ -92,6 +102,8 @@ public static class SparkSqlTableNameVisitor extends SqlBaseParserBaseVisitor fullyQualifiedTableNames = new LinkedList<>(); + @Getter private boolean isCreateTable = false; + public SparkSqlTableNameVisitor() {} @Override @@ -130,6 +142,12 @@ public Void visitCreateTableHeader(SqlBaseParser.CreateTableHeaderContext ctx) { } return super.visitCreateTableHeader(ctx); } + + @Override + public Void visitCreateTable(SqlBaseParser.CreateTableContext ctx) { + isCreateTable = true; + return super.visitCreateTable(ctx); + } } public static class FlintSQLIndexDetailsVisitor extends FlintSparkSqlExtensionsBaseVisitor { @@ -380,4 +398,15 @@ public String removeUnwantedQuotes(String input) { return input.replaceAll("^\"|\"$", ""); } } + + public static class TableExtractionResult { + @Getter private final List fullyQualifiedTableNames; + @Getter private final boolean isCreateTableQuery; + + public TableExtractionResult( + List fullyQualifiedTableNames, boolean isCreateTableQuery) { + this.fullyQualifiedTableNames = fullyQualifiedTableNames; + this.isCreateTableQuery = isCreateTableQuery; + } + } } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index f860c6a3bc9..1a251b32fdc 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -25,6 +25,7 @@ import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.utils.SQLQueryUtils.TableExtractionResult; @ExtendWith(MockitoExtension.class) public class SQLQueryUtilsTest { @@ -444,6 +445,69 @@ void testRecoverIndex() { assertEquals(IndexQueryActionType.RECOVER, indexDetails.getIndexQueryActionType()); } + @Test + void testExtractFullyQualifiedTableNamesWithMetadata() { + // Test CREATE TABLE queries + String createTableQuery = + "CREATE EXTERNAL TABLE\n" + + "myS3.default.alb_logs\n" + + "[ PARTITIONED BY (col_name [, … ] ) ]\n" + + "[ ROW FORMAT DELIMITED row_format ]\n" + + "STORED AS file_format\n" + + "LOCATION { 's3://bucket/folder/' }"; + + TableExtractionResult result = + SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(createTableQuery); + assertTrue(result.isCreateTableQuery()); + assertEquals(1, result.getFullyQualifiedTableNames().size()); + assertFullyQualifiedTableName( + "myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0)); + + String createTableQuery2 = + "CREATE TABLE myS3.default.new_table (id INT, name STRING) USING PARQUET"; + result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(createTableQuery2); + assertTrue(result.isCreateTableQuery()); + assertEquals(1, result.getFullyQualifiedTableNames().size()); + assertFullyQualifiedTableName( + "myS3", "default", "new_table", result.getFullyQualifiedTableNames().get(0)); + + // Test SELECT queries + String selectQuery = "SELECT * FROM myS3.default.alb_logs"; + result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(selectQuery); + assertFalse(result.isCreateTableQuery()); + assertEquals(1, result.getFullyQualifiedTableNames().size()); + assertFullyQualifiedTableName( + "myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0)); + + // Test DROP TABLE queries + String dropTableQuery = "DROP TABLE myS3.default.alb_logs"; + result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(dropTableQuery); + assertFalse(result.isCreateTableQuery()); + assertEquals(1, result.getFullyQualifiedTableNames().size()); + assertFullyQualifiedTableName( + "myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0)); + + // Test DESCRIBE TABLE queries + String describeTableQuery = "DESCRIBE TABLE myS3.default.alb_logs"; + result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(describeTableQuery); + assertFalse(result.isCreateTableQuery()); + assertEquals(1, result.getFullyQualifiedTableNames().size()); + assertFullyQualifiedTableName( + "myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0)); + + // Test JOIN queries + String joinQuery = + "SELECT * FROM myS3.default.alb_logs JOIN myS3.default.http_logs ON alb_logs.id =" + + " http_logs.id"; + result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(joinQuery); + assertFalse(result.isCreateTableQuery()); + assertEquals(2, result.getFullyQualifiedTableNames().size()); + assertFullyQualifiedTableName( + "myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0)); + assertFullyQualifiedTableName( + "myS3", "default", "http_logs", result.getFullyQualifiedTableNames().get(1)); + } + @Getter protected static class IndexQuery { private String query; From 1a78574c4281ba0f86950bc9c6a07a674a0059c1 Mon Sep 17 00:00:00 2001 From: Paras jaggi Date: Wed, 24 Dec 2025 10:06:16 +0000 Subject: [PATCH 2/2] Sync up this path publish-async-query-core.yml from main to 2.19-dev and also changing the JAVA to 17 Signed-off-by: Paras jaggi --- .../workflows/publish-async-query-core.yml | 22 ++++++++++++++----- async-query-core/build.gradle | 2 +- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/.github/workflows/publish-async-query-core.yml b/.github/workflows/publish-async-query-core.yml index 8433cc873a3..02a3fadc27b 100644 --- a/.github/workflows/publish-async-query-core.yml +++ b/.github/workflows/publish-async-query-core.yml @@ -5,8 +5,8 @@ on: push: branches: - main - - 1.* - - 2.* + - '[0-9]+.[0-9]+' + - '[0-9]+.x' paths: - 'async-query-core/**' - '.github/workflows/publish-async-query-core.yml' @@ -18,7 +18,6 @@ concurrency: cancel-in-progress: false env: - SNAPSHOT_REPO_URL: https://central.sonatype.com/repository/maven-snapshots/ COMMIT_MAP_FILENAME: commit-history-async-query-core.json jobs: @@ -36,7 +35,7 @@ jobs: - uses: actions/setup-java@v3 with: distribution: temurin - java-version: 21 + java-version: 17 - uses: actions/checkout@v3 @@ -47,8 +46,19 @@ jobs: export-env: true env: OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }} - SONATYPE_USERNAME: op://opensearch-infra-secrets/maven-central-portal-credentials/username - SONATYPE_PASSWORD: op://opensearch-infra-secrets/maven-central-portal-credentials/password + MAVEN_SNAPSHOTS_S3_REPO: op://opensearch-infra-secrets/maven-snapshots-s3/repo + MAVEN_SNAPSHOTS_S3_ROLE: op://opensearch-infra-secrets/maven-snapshots-s3/role + + - name: Export SNAPSHOT_REPO_URL + run: | + snapshot_repo_url=${{ env.MAVEN_SNAPSHOTS_S3_REPO }} + echo "SNAPSHOT_REPO_URL=$snapshot_repo_url" >> $GITHUB_ENV + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v5 + with: + role-to-assume: ${{ env.MAVEN_SNAPSHOTS_S3_ROLE }} + aws-region: us-east-1 - name: Set commit ID id: set_commit diff --git a/async-query-core/build.gradle b/async-query-core/build.gradle index 147e59bd6a4..892b49a58ad 100644 --- a/async-query-core/build.gradle +++ b/async-query-core/build.gradle @@ -144,7 +144,7 @@ check.dependsOn jacocoTestCoverageVerification shadowJar { archiveBaseName.set('async-query-core') - archiveVersion.set('1.0.0') // Set the desired version + archiveVersion.set('0.0.0.1') // Set the desired version archiveClassifier.set('all') from sourceSets.main.output