Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions .github/workflows/publish-async-query-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ on:
push:
branches:
- main
- 1.*
- 2.*
- '[0-9]+.[0-9]+'
- '[0-9]+.x'
paths:
- 'async-query-core/**'
- '.github/workflows/publish-async-query-core.yml'
Expand All @@ -18,7 +18,6 @@ concurrency:
cancel-in-progress: false

env:
SNAPSHOT_REPO_URL: https://central.sonatype.com/repository/maven-snapshots/
COMMIT_MAP_FILENAME: commit-history-async-query-core.json

jobs:
Expand All @@ -36,7 +35,7 @@ jobs:
- uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 21
java-version: 17

- uses: actions/checkout@v3

Expand All @@ -47,8 +46,19 @@ jobs:
export-env: true
env:
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
SONATYPE_USERNAME: op://opensearch-infra-secrets/maven-central-portal-credentials/username
SONATYPE_PASSWORD: op://opensearch-infra-secrets/maven-central-portal-credentials/password
MAVEN_SNAPSHOTS_S3_REPO: op://opensearch-infra-secrets/maven-snapshots-s3/repo
MAVEN_SNAPSHOTS_S3_ROLE: op://opensearch-infra-secrets/maven-snapshots-s3/role

- name: Export SNAPSHOT_REPO_URL
run: |
snapshot_repo_url=${{ env.MAVEN_SNAPSHOTS_S3_REPO }}
echo "SNAPSHOT_REPO_URL=$snapshot_repo_url" >> $GITHUB_ENV

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v5
with:
role-to-assume: ${{ env.MAVEN_SNAPSHOTS_S3_ROLE }}
aws-region: us-east-1

- name: Set commit ID
id: set_commit
Expand Down
2 changes: 1 addition & 1 deletion async-query-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ check.dependsOn jacocoTestCoverageVerification

shadowJar {
archiveBaseName.set('async-query-core')
archiveVersion.set('1.0.0') // Set the desired version
archiveVersion.set('0.0.0.1') // Set the desired version
archiveClassifier.set('all')

from sourceSets.main.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,24 @@ public class SQLQueryUtils {
private static final Logger logger = LogManager.getLogger(SQLQueryUtils.class);

public static List<FullyQualifiedTableName> extractFullyQualifiedTableNames(String sqlQuery) {
SqlBaseParser sqlBaseParser =
new SqlBaseParser(
new CommonTokenStream(new SqlBaseLexer(new CaseInsensitiveCharStream(sqlQuery))));
sqlBaseParser.addErrorListener(new SyntaxAnalysisErrorListener());
return extractFullyQualifiedTableNamesWithMetadata(sqlQuery).getFullyQualifiedTableNames();
}

public static TableExtractionResult extractFullyQualifiedTableNamesWithMetadata(String sqlQuery) {
SqlBaseParser sqlBaseParser = getBaseParser(sqlQuery);
StatementContext statement = sqlBaseParser.statement();
SparkSqlTableNameVisitor sparkSqlTableNameVisitor = new SparkSqlTableNameVisitor();
statement.accept(sparkSqlTableNameVisitor);
return sparkSqlTableNameVisitor.getFullyQualifiedTableNames();
SparkSqlTableNameVisitor visitor = new SparkSqlTableNameVisitor();
statement.accept(visitor);

// Remove duplicate table names
List<FullyQualifiedTableName> uniqueFullyQualifiedTableNames = new LinkedList<>();
for (FullyQualifiedTableName fullyQualifiedTableName : visitor.getFullyQualifiedTableNames()) {
if (!uniqueFullyQualifiedTableNames.contains(fullyQualifiedTableName)) {
uniqueFullyQualifiedTableNames.add(fullyQualifiedTableName);
}
}

return new TableExtractionResult(uniqueFullyQualifiedTableNames, visitor.isCreateTable());
}

public static IndexQueryDetails extractIndexDetails(String sqlQuery) {
Expand Down Expand Up @@ -92,6 +102,8 @@ public static class SparkSqlTableNameVisitor extends SqlBaseParserBaseVisitor<Vo

@Getter private List<FullyQualifiedTableName> fullyQualifiedTableNames = new LinkedList<>();

@Getter private boolean isCreateTable = false;

public SparkSqlTableNameVisitor() {}

@Override
Expand Down Expand Up @@ -130,6 +142,12 @@ public Void visitCreateTableHeader(SqlBaseParser.CreateTableHeaderContext ctx) {
}
return super.visitCreateTableHeader(ctx);
}

@Override
public Void visitCreateTable(SqlBaseParser.CreateTableContext ctx) {
isCreateTable = true;
return super.visitCreateTable(ctx);
}
}

public static class FlintSQLIndexDetailsVisitor extends FlintSparkSqlExtensionsBaseVisitor<Void> {
Expand Down Expand Up @@ -380,4 +398,15 @@ public String removeUnwantedQuotes(String input) {
return input.replaceAll("^\"|\"$", "");
}
}

public static class TableExtractionResult {
@Getter private final List<FullyQualifiedTableName> fullyQualifiedTableNames;
@Getter private final boolean isCreateTableQuery;

public TableExtractionResult(
List<FullyQualifiedTableName> fullyQualifiedTableNames, boolean isCreateTableQuery) {
this.fullyQualifiedTableNames = fullyQualifiedTableNames;
this.isCreateTableQuery = isCreateTableQuery;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.flint.FlintIndexType;
import org.opensearch.sql.spark.utils.SQLQueryUtils.TableExtractionResult;

@ExtendWith(MockitoExtension.class)
public class SQLQueryUtilsTest {
Expand Down Expand Up @@ -444,6 +445,69 @@ void testRecoverIndex() {
assertEquals(IndexQueryActionType.RECOVER, indexDetails.getIndexQueryActionType());
}

@Test
void testExtractFullyQualifiedTableNamesWithMetadata() {
// Test CREATE TABLE queries
String createTableQuery =
"CREATE EXTERNAL TABLE\n"
+ "myS3.default.alb_logs\n"
+ "[ PARTITIONED BY (col_name [, … ] ) ]\n"
+ "[ ROW FORMAT DELIMITED row_format ]\n"
+ "STORED AS file_format\n"
+ "LOCATION { 's3://bucket/folder/' }";

TableExtractionResult result =
SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(createTableQuery);
assertTrue(result.isCreateTableQuery());
assertEquals(1, result.getFullyQualifiedTableNames().size());
assertFullyQualifiedTableName(
"myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0));

String createTableQuery2 =
"CREATE TABLE myS3.default.new_table (id INT, name STRING) USING PARQUET";
result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(createTableQuery2);
assertTrue(result.isCreateTableQuery());
assertEquals(1, result.getFullyQualifiedTableNames().size());
assertFullyQualifiedTableName(
"myS3", "default", "new_table", result.getFullyQualifiedTableNames().get(0));

// Test SELECT queries
String selectQuery = "SELECT * FROM myS3.default.alb_logs";
result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(selectQuery);
assertFalse(result.isCreateTableQuery());
assertEquals(1, result.getFullyQualifiedTableNames().size());
assertFullyQualifiedTableName(
"myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0));

// Test DROP TABLE queries
String dropTableQuery = "DROP TABLE myS3.default.alb_logs";
result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(dropTableQuery);
assertFalse(result.isCreateTableQuery());
assertEquals(1, result.getFullyQualifiedTableNames().size());
assertFullyQualifiedTableName(
"myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0));

// Test DESCRIBE TABLE queries
String describeTableQuery = "DESCRIBE TABLE myS3.default.alb_logs";
result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(describeTableQuery);
assertFalse(result.isCreateTableQuery());
assertEquals(1, result.getFullyQualifiedTableNames().size());
assertFullyQualifiedTableName(
"myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0));

// Test JOIN queries
String joinQuery =
"SELECT * FROM myS3.default.alb_logs JOIN myS3.default.http_logs ON alb_logs.id ="
+ " http_logs.id";
result = SQLQueryUtils.extractFullyQualifiedTableNamesWithMetadata(joinQuery);
assertFalse(result.isCreateTableQuery());
assertEquals(2, result.getFullyQualifiedTableNames().size());
assertFullyQualifiedTableName(
"myS3", "default", "alb_logs", result.getFullyQualifiedTableNames().get(0));
assertFullyQualifiedTableName(
"myS3", "default", "http_logs", result.getFullyQualifiedTableNames().get(1));
}

@Getter
protected static class IndexQuery {
private String query;
Expand Down