Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0ee85a3
Add direct-query-core module for prometheus integration (#3440)
joshuali925 Sep 4, 2025
81b8896
Add direct-query module for prometheus integration (#3441)
joshuali925 Sep 4, 2025
a0ae814
Add experimental annotations to direct-query and direct-query-core (#…
lezzago Sep 9, 2025
16cff50
Doc enhancement for eventstats and bin command (#4117)
ahkcs Sep 4, 2025
ed8557f
Implement `Append` command with Calcite (#4123)
songkant-aws Sep 5, 2025
0439445
`Bin` command big5 queries (#4163)
ahkcs Sep 5, 2025
02f7ecb
Don't recreate indices on every test (#4222)
Swiddis Sep 5, 2025
5a5de29
Enable pushdown optimization for filtered aggregation (#4213)
dai-chen Sep 5, 2025
269c573
Split up our test actions into unit, integ, and doctest. (#4193)
Swiddis Sep 5, 2025
11502be
[Feature] Core Implementation of `rex` Command In PPL (#4109)
RyanL1997 Sep 5, 2025
0e4a054
Add wildcard support for rename command (#4019)
ritvibhatt Sep 8, 2025
bb6f612
Add support for `median(<value>)` (#4234)
aalva500-prog Sep 8, 2025
33f221f
Dynamic source selector (#4116)
vamsimanohar Sep 9, 2025
4b34b3b
Add gitignore (#4258)
ykmr1224 Sep 9, 2025
1962aed
Support join field list and join options (#3803)
LantaoJin Sep 10, 2025
3733c19
Support first/last aggregate functions for PPL (#4223)
ahkcs Sep 10, 2025
e89a68b
Fix gitignore to ignore symbolic link (#4263)
ykmr1224 Sep 10, 2025
7a2b6bb
Push down limit operator into aggregation bucket size (#4228)
qianheng-aws Sep 10, 2025
3d4d7de
Fix the IT issue caused by merging conflict (#4270)
qianheng-aws Sep 10, 2025
1be925a
Print links to test logs after integTest (#4273)
ykmr1224 Sep 11, 2025
cccc951
[Feature] Implementation of mode `sed` and `offset_field` in rex PPL …
RyanL1997 Sep 11, 2025
bffd3f5
Add earliest/latest aggregate function for eventstats PPL command (#4…
ykmr1224 Sep 11, 2025
e1983bd
Speed up aggregation pushdown for single group-by expression (#3550)
LantaoJin Sep 11, 2025
18b5864
Introduce YAML formatter for better testing/debugging (#4274)
ykmr1224 Sep 11, 2025
e603920
doctest: Use 1.0 branch instead of main (#4219)
Swiddis Sep 12, 2025
9acbd0f
Fix doctest (#4292)
Swiddis Sep 12, 2025
09fd753
Search Command Revamp (#4152)
vamsimanohar Sep 15, 2025
cc71808
`mvjoin` support in PPL Caclite (#4217)
ps48 Sep 15, 2025
03e8ab1
strftime function implementation (#4106)
vamsimanohar Sep 15, 2025
933c6b5
Add support for writing resources and include more tests (#4300)
lezzago Sep 23, 2025
77ea4e9
Merge changes from main (#4374)
lezzago Sep 24, 2025
08b2838
Merge remote-tracking branch 'origin/feature/direct-query-prometheus'…
lezzago Sep 24, 2025
cca6966
Address comments
lezzago Sep 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
/** Language type accepted in async query apis. */
public enum LangType {
SQL("sql"),
PPL("ppl");
PPL("ppl"),
PROMQL("promql");
private final String text;

LangType(String text) {
Expand Down
105 changes: 105 additions & 0 deletions direct-query-core/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'java-test-fixtures'
}

repositories {
mavenCentral()
}

dependencies {
api project(':core')
implementation project(':datasources')
implementation project(':async-query-core')

// Common dependencies
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20231013'
implementation group: 'commons-io', name: 'commons-io', version: "${commons_io_version}"

// Test dependencies
testImplementation(platform("org.junit:junit-bom:5.9.3"))
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.3'
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}"
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: "${mockito_version}"
testImplementation group: 'com.squareup.okhttp3', name: 'mockwebserver', version: '4.12.0'

testCompileOnly('junit:junit:4.13.1') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.vintage:junit-vintage-engine") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testImplementation("org.opensearch.test:framework:${opensearch_version}")
}

test {
useJUnitPlatform()
testLogging {
events "failed"
exceptionFormat "full"
}
}
task junit4(type: Test) {
useJUnitPlatform {
includeEngines("junit-vintage")
}
systemProperty 'tests.security.manager', 'false'
testLogging {
events "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
dependsOn test, junit4
executionData test, junit4
reports {
html.required = true
xml.required = true
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}

jacocoTestCoverageVerification {
dependsOn test, junit4
executionData test, junit4
violationRules {
rule {
element = 'CLASS'
excludes = [
'org.opensearch.sql.prometheus.model.*',
'org.opensearch.sql.directquery.rest.model.*'
]
limit {
counter = 'LINE'
minimum = 1.0
}
limit {
counter = 'BRANCH'
minimum = 1.0
}
}
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
check.dependsOn jacocoTestCoverageVerification
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.client;

/**
* Base interface for all data source clients. This interface serves as a marker interface for all
* client implementations.
*
* @opensearch.experimental
*/
public interface DataSourceClient {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.client;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.inject.Inject;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.client.exceptions.DataSourceClientException;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.prometheus.utils.PrometheusClientUtils;

/**
* Factory for creating data source clients based on the data source type.
*
* @opensearch.experimental
*/
public class DataSourceClientFactory {

private static final Logger LOG = LogManager.getLogger();

private final Settings settings;
private final DataSourceService dataSourceService;

@Inject
public DataSourceClientFactory(DataSourceService dataSourceService, Settings settings) {
this.settings = settings;
this.dataSourceService = dataSourceService;
}

/**
* Creates a client for the specified data source with appropriate type.
*
* @param <T> The type of client to create, must implement DataSourceClient
* @param dataSourceName The name of the data source
* @return The appropriate client for the data source type
* @throws DataSourceClientException If client creation fails
*/
@SuppressWarnings("unchecked")
public <T extends DataSourceClient> T createClient(String dataSourceName)
throws DataSourceClientException {
try {
if (!dataSourceService.dataSourceExists(dataSourceName)) {
throw new DataSourceClientException("Data source does not exist: " + dataSourceName);
}

DataSourceMetadata metadata =
dataSourceService.verifyDataSourceAccessAndGetRawMetadata(dataSourceName, null);
DataSourceType dataSourceType = metadata.getConnector();

return (T) createClientForType(dataSourceType.name(), metadata);
} catch (Exception e) {
if (e instanceof DataSourceClientException) {
throw e;
}
LOG.error("Failed to create client for data source: " + dataSourceName, e);
throw new DataSourceClientException(
"Failed to create client for data source: " + dataSourceName, e);
}
}

/**
* Gets the data source type for a given data source name.
*
* @param dataSourceName The name of the data source
* @return The type of the data source
* @throws DataSourceClientException If the data source doesn't exist
*/
public DataSourceType getDataSourceType(String dataSourceName) throws DataSourceClientException {
if (!dataSourceService.dataSourceExists(dataSourceName)) {
throw new DataSourceClientException("Data source does not exist: " + dataSourceName);
}

return dataSourceService.getDataSourceMetadata(dataSourceName).getConnector();
}

private DataSourceClient createClientForType(String dataSourceType, DataSourceMetadata metadata)
throws DataSourceClientException {
switch (dataSourceType) {
case "PROMETHEUS":
return PrometheusClientUtils.createPrometheusClient(metadata, settings);
default:
throw new DataSourceClientException("Unsupported data source type: " + dataSourceType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.client.exceptions;

/**
* Exception thrown when there are issues with data source client operations.
*
* @opensearch.experimental
*/
public class DataSourceClientException extends RuntimeException {

public DataSourceClientException(String message) {
super(message);
}

public DataSourceClientException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.query;

import java.io.IOException;
import org.opensearch.sql.datasource.client.DataSourceClient;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.directquery.rest.model.ExecuteDirectQueryRequest;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesRequest;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesResponse;
import org.opensearch.sql.directquery.rest.model.WriteDirectQueryResourcesRequest;
import org.opensearch.sql.directquery.rest.model.WriteDirectQueryResourcesResponse;

/**
* Interface for handling queries for specific data source types.
*
* @param <T> The client type this handler works with, extending DataSourceClient
*
* @opensearch.experimental
*/
public interface QueryHandler<T extends DataSourceClient> {

/**
* Returns the data source type this handler supports.
*
* @return The supported data source type
*/
DataSourceType getSupportedDataSourceType();

/**
* Executes a query for the supported data source type.
*
* @param client The client instance to use
* @param request The query request
* @return JSON string result of the query
* @throws IOException If query execution fails
*/
String executeQuery(T client, ExecuteDirectQueryRequest request) throws IOException;

/**
* Gets resources from the data source.
*
* @param client The client instance to use
* @param request The resources request
* @return Response containing the requested resources
* @throws IOException If resource retrieval fails
*/
GetDirectQueryResourcesResponse<?> getResources(T client, GetDirectQueryResourcesRequest request)
throws IOException;

/**
* Writes resources to the data source.
*
* @param client The client instance to use
* @param request The resources request
* @return Response containing the requested resources
* @throws IOException If resource retrieval fails
*/
WriteDirectQueryResourcesResponse<?> writeResources(T client, WriteDirectQueryResourcesRequest request)
throws IOException;

/**
* Checks if this handler can handle the given client type.
*
* @param client The client to check
* @return true if this handler can handle the client
*/
boolean canHandle(DataSourceClient client);

/**
* Gets the client class this handler supports.
*
* @return The class of client this handler supports
*/
Class<T> getClientClass();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.query;

import java.util.List;
import java.util.Optional;
import org.opensearch.common.inject.Inject;
import org.opensearch.sql.datasource.client.DataSourceClient;

/**
* Registry for all query handlers.
*
* @opensearch.experimental
*/
public class QueryHandlerRegistry {

private final List<QueryHandler<?>> handlers;

@Inject
public QueryHandlerRegistry(List<QueryHandler<?>> handlers) {
this.handlers = handlers;
}

/**
* Finds a handler that can process the given client.
*
* @param client The client to find a handler for
* @param <T> The type of client, extending DataSourceClient
* @return An optional containing the handler if found
*/
@SuppressWarnings("unchecked")
public <T extends DataSourceClient> Optional<QueryHandler<T>> getQueryHandler(T client) {
return handlers.stream()
.filter(
handler -> {
try {
// Get the handler's client class and check if it's compatible with our client
Class<?> handlerClientClass = handler.getClientClass();
return handlerClientClass.isInstance(client)
&& ((QueryHandler<T>) handler).canHandle(client);
} catch (ClassCastException e) {
return false;
}
})
.map(handler -> (QueryHandler<T>) handler)
.findFirst();
}
}
Loading
Loading