diff --git a/docs/user/ppl/admin/prometheus_connector.rst b/docs/user/ppl/admin/prometheus_connector.rst index 237181778ec..15f1f2445cf 100644 --- a/docs/user/ppl/admin/prometheus_connector.rst +++ b/docs/user/ppl/admin/prometheus_connector.rst @@ -186,3 +186,27 @@ Example queries | 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 | +------------+------------------------+--------------------------------+---------------+ +PromQL Support for prometheus Connector +========================================== + +`query_range` Table Function +---------------------------- +Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL. +The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/ +Arguments should be either passed by name or positionArguments should be either passed by name or position. +`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)` +or +`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)` +Example:: + + > source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14) + +------------+------------------------+--------------------------------+---------------+-------------+-------------+ + | @value | @timestamp | handler | code | instance | job | + |------------+------------------------+--------------------------------+---------------+-------------+-------------| + | 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus | + | 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus | + | 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus | + +------------+------------------------+--------------------------------+---------------+-------------+-------------+ diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 3a282102718..d98ee3cd1c6 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -183,8 +183,11 @@ mlArg fromClause : SOURCE EQUAL tableSourceClause | INDEX EQUAL tableSourceClause + | SOURCE EQUAL tableFunction + | INDEX EQUAL tableFunction ; + tableSourceClause : tableSource (COMMA tableSource)* ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 73c72386248..292ca795ae8 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -20,6 +20,7 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SearchFromFilterContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SortCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.StatsCommandContext; +import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TableFunctionContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TableSourceClauseContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TopCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.WhereCommandContext; @@ -33,7 +34,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import lombok.Generated; import lombok.RequiredArgsConstructor; import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.Token; @@ -46,6 +46,7 @@ import org.opensearch.sql.ast.expression.Map; import org.opensearch.sql.ast.expression.ParseMethod; import org.opensearch.sql.ast.expression.QualifiedName; +import org.opensearch.sql.ast.expression.UnresolvedArgument; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.ast.tree.AD; import org.opensearch.sql.ast.tree.Aggregation; @@ -62,6 +63,7 @@ import org.opensearch.sql.ast.tree.Relation; import org.opensearch.sql.ast.tree.Rename; import org.opensearch.sql.ast.tree.Sort; +import org.opensearch.sql.ast.tree.TableFunction; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.utils.StringUtils; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser; @@ -346,7 +348,11 @@ public UnresolvedPlan visitTopCommand(TopCommandContext ctx) { */ @Override public UnresolvedPlan visitFromClause(FromClauseContext ctx) { - return visitTableSourceClause(ctx.tableSourceClause()); + if (ctx.tableFunction() != null) { + return visitTableFunction(ctx.tableFunction()); + } else { + return visitTableSourceClause(ctx.tableSourceClause()); + } } @Override @@ -357,10 +363,16 @@ public UnresolvedPlan visitTableSourceClause(TableSourceClauseContext ctx) { } @Override - @Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019 - public UnresolvedPlan visitTableFunction(OpenSearchPPLParser.TableFunctionContext ctx) { - // - return null; + public UnresolvedPlan visitTableFunction(TableFunctionContext ctx) { + ImmutableList.Builder builder = ImmutableList.builder(); + ctx.functionArgs().functionArg().forEach(arg + -> { + String argName = (arg.ident() != null) ? arg.ident().getText() : null; + builder.add( + new UnresolvedArgument(argName, + this.internalVisitExpression(arg.valueExpression()))); + }); + return new TableFunction(this.internalVisitExpression(ctx.qualifiedName()), builder.build()); } /** diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 4ce9ce2789f..eddee3064ee 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -6,7 +6,6 @@ package org.opensearch.sql.ppl.parser; -import static org.opensearch.sql.ast.dsl.AstDSL.qualifiedName; import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NOT_NULL; import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NULL; import static org.opensearch.sql.expression.function.BuiltinFunctionName.POSITION; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 504469a4b23..2f520b55c6c 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -101,10 +101,13 @@ public String visitRelation(Relation node, String context) { } @Override - @Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019 public String visitTableFunction(TableFunction node, String context) { - // - return null; + String arguments = + node.getArguments().stream() + .map(unresolvedExpression + -> this.expressionAnalyzer.analyze(unresolvedExpression, context)) + .collect(Collectors.joining(",")); + return StringUtils.format("source=%s(%s)", node.getFunctionName().toString(), arguments); } @Override diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 8249619b376..65341b66c1d 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -112,7 +112,6 @@ public void testSearchCommandWithDotInIndexName() { ); } - @Ignore @Test public void testSearchWithPrometheusQueryRangeWithPositionedArguments() { assertEqual("search source = prometheus.query_range(\"test{code='200'}\",1234, 12345, 3)", @@ -124,7 +123,6 @@ public void testSearchWithPrometheusQueryRangeWithPositionedArguments() { )); } - @Ignore @Test public void testSearchWithPrometheusQueryRangeWithNamedArguments() { assertEqual("search source = prometheus.query_range(query = \"test{code='200'}\", " diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 1e4af28ecf1..1998647dba3 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -12,7 +12,6 @@ import static org.opensearch.sql.ast.dsl.AstDSL.relation; import java.util.Collections; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; @@ -36,7 +35,6 @@ public void testSearchCommand() { } @Test - @Ignore public void testTableFunctionCommand() { assertEquals("source=prometheus.query_range(***,***,***,***)", anonymize("source=prometheus.query_range('afsd',123,123,3)") diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java new file mode 100644 index 00000000000..7f261360f77 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java @@ -0,0 +1,117 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.functions.response; + +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import org.jetbrains.annotations.NotNull; +import org.json.JSONArray; +import org.json.JSONObject; +import org.opensearch.sql.data.model.ExprDoubleValue; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTimestampValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants; + +/** + * Default implementation of QueryRangeFunctionResponseHandle. + */ +public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFunctionResponseHandle { + + private final JSONObject responseObject; + private Iterator responseIterator; + private ExecutionEngine.Schema schema; + + /** + * Constructor. + * + * @param responseObject Prometheus responseObject. + */ + public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) { + this.responseObject = responseObject; + constructIteratorAndSchema(); + } + + private void constructIteratorAndSchema() { + List result = new ArrayList<>(); + List columnList = new ArrayList<>(); + if ("matrix".equals(responseObject.getString("resultType"))) { + JSONArray itemArray = responseObject.getJSONArray("result"); + for (int i = 0; i < itemArray.length(); i++) { + JSONObject item = itemArray.getJSONObject(i); + JSONObject metric = item.getJSONObject("metric"); + JSONArray values = item.getJSONArray("values"); + if (i == 0) { + columnList = getColumnList(metric); + } + for (int j = 0; j < values.length(); j++) { + LinkedHashMap linkedHashMap = + extractRow(metric, values.getJSONArray(j), columnList); + result.add(new ExprTupleValue(linkedHashMap)); + } + } + } else { + throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus " + + "Response Parsing. 'matrix' resultType is expected", + responseObject.getString("resultType"))); + } + this.schema = new ExecutionEngine.Schema(columnList); + this.responseIterator = result.iterator(); + } + + @NotNull + private static LinkedHashMap extractRow(JSONObject metric, + JSONArray values, List columnList) { + LinkedHashMap linkedHashMap = new LinkedHashMap<>(); + for (ExecutionEngine.Schema.Column column : columnList) { + if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) { + linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP, + new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000)))); + } else if (column.getName().equals(VALUE)) { + linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1))); + } else { + linkedHashMap.put(column.getName(), + new ExprStringValue(metric.getString(column.getName()))); + } + } + return linkedHashMap; + } + + + private List getColumnList(JSONObject metric) { + List columnList = new ArrayList<>(); + columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP, + PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP)); + columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE)); + for (String key : metric.keySet()) { + columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING)); + } + return columnList; + } + + @Override + public boolean hasNext() { + return responseIterator.hasNext(); + } + + @Override + public ExprValue next() { + return responseIterator.next(); + } + + @Override + public ExecutionEngine.Schema schema() { + return schema; + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryRangeFunctionResponseHandle.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryRangeFunctionResponseHandle.java new file mode 100644 index 00000000000..80a75cfae7d --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryRangeFunctionResponseHandle.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.functions.response; + +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; + +/** + * Handle Prometheus response. + */ +public interface QueryRangeFunctionResponseHandle { + + /** + * Return true if Prometheus response has more result. + */ + boolean hasNext(); + + /** + * Return Prometheus response as {@link ExprValue}. Attention, the method must been called when + * hasNext return true. + */ + ExprValue next(); + + /** + * Return ExecutionEngine.Schema of the Prometheus response. + */ + ExecutionEngine.Schema schema(); +} \ No newline at end of file diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilder.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilder.java new file mode 100644 index 00000000000..00e2191d09f --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilder.java @@ -0,0 +1,38 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.functions.scan; + +import lombok.AllArgsConstructor; +import org.opensearch.sql.planner.logical.LogicalProject; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; +import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.read.TableScanBuilder; + +/** + * TableScanBuilder for query_range table function of prometheus connector. + * we can merge this when we refactor for existing + * ppl queries based on prometheus connector. + */ +@AllArgsConstructor +public class QueryRangeFunctionTableScanBuilder extends TableScanBuilder { + + private final PrometheusClient prometheusClient; + + private final PrometheusQueryRequest prometheusQueryRequest; + + @Override + public TableScanOperator build() { + return new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest); + } + + @Override + public boolean pushDownProject(LogicalProject project) { + return true; + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperator.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperator.java new file mode 100644 index 00000000000..019f9cffcb5 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperator.java @@ -0,0 +1,85 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.functions.scan; + +import java.io.IOException; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Locale; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.response.DefaultQueryRangeFunctionResponseHandle; +import org.opensearch.sql.prometheus.functions.response.QueryRangeFunctionResponseHandle; +import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; +import org.opensearch.sql.storage.TableScanOperator; + +/** + * This a table scan operator to handle Query Range table function. + */ +@RequiredArgsConstructor +public class QueryRangeFunctionTableScanOperator extends TableScanOperator { + + private final PrometheusClient prometheusClient; + + private final PrometheusQueryRequest request; + private QueryRangeFunctionResponseHandle prometheusResponseHandle; + + private static final Logger LOG = LogManager.getLogger(); + + @Override + public void open() { + super.open(); + this.prometheusResponseHandle + = AccessController.doPrivileged((PrivilegedAction) () -> { + try { + JSONObject responseObject = prometheusClient.queryRange( + request.getPromQl(), + request.getStartTime(), request.getEndTime(), request.getStep()); + return new DefaultQueryRangeFunctionResponseHandle(responseObject); + } catch (IOException e) { + LOG.error(e.getMessage()); + throw new RuntimeException( + String.format("Error fetching data from prometheus server: %s", e.getMessage())); + } + }); + } + + @Override + public void close() { + super.close(); + } + + @Override + public boolean hasNext() { + return this.prometheusResponseHandle.hasNext(); + } + + @Override + public ExprValue next() { + return this.prometheusResponseHandle.next(); + } + + @Override + public String explain() { + return String.format(Locale.ROOT, "query_range(%s, %s, %s, %s)", + request.getPromQl(), + request.getStartTime(), + request.getEndTime(), + request.getStep()); + } + + @Override + public ExecutionEngine.Schema schema() { + return this.prometheusResponseHandle.schema(); + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java index ef7f19ba2f0..331605b1d59 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java @@ -15,7 +15,6 @@ import java.util.LinkedHashMap; import java.util.List; import lombok.NonNull; -import org.apache.commons.lang3.StringUtils; import org.json.JSONArray; import org.json.JSONObject; import org.opensearch.sql.data.model.ExprDoubleValue; diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java index 83384ff7602..a03d69bc41f 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java @@ -17,11 +17,13 @@ import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.scan.QueryRangeFunctionTableScanBuilder; import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalPlanOptimizerFactory; import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; import org.opensearch.sql.prometheus.request.system.PrometheusDescribeMetricRequest; import org.opensearch.sql.prometheus.storage.implementor.PrometheusDefaultImplementor; import org.opensearch.sql.storage.Table; +import org.opensearch.sql.storage.read.TableScanBuilder; /** * Prometheus table (metric) implementation. @@ -107,4 +109,14 @@ public LogicalPlan optimize(LogicalPlan plan) { return PrometheusLogicalPlanOptimizerFactory.create().optimize(plan); } + //Only handling query_range function for now. + //we need to move PPL implementations to ScanBuilder in future. + @Override + public TableScanBuilder createScanBuilder() { + if (metricName == null) { + return new QueryRangeFunctionTableScanBuilder(prometheusClient, prometheusQueryRequest); + } else { + return null; + } + } } \ No newline at end of file diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilderTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilderTest.java new file mode 100644 index 00000000000..59974390291 --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilderTest.java @@ -0,0 +1,61 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.functions.scan; + + +import static org.opensearch.sql.prometheus.constants.TestConstants.ENDTIME; +import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY; +import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME; +import static org.opensearch.sql.prometheus.constants.TestConstants.STEP; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.opensearch.sql.planner.logical.LogicalProject; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; +import org.opensearch.sql.storage.TableScanOperator; + +public class QueryRangeFunctionTableScanBuilderTest { + + @Mock + private PrometheusClient prometheusClient; + + @Mock + private LogicalProject logicalProject; + + @Test + void testBuild() { + PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl(QUERY); + prometheusQueryRequest.setStartTime(STARTTIME); + prometheusQueryRequest.setEndTime(ENDTIME); + prometheusQueryRequest.setStep(STEP); + + QueryRangeFunctionTableScanBuilder queryRangeFunctionTableScanBuilder + = new QueryRangeFunctionTableScanBuilder(prometheusClient, prometheusQueryRequest); + TableScanOperator queryRangeFunctionTableScanOperator + = queryRangeFunctionTableScanBuilder.build(); + Assertions.assertNotNull(queryRangeFunctionTableScanOperator); + Assertions.assertTrue(queryRangeFunctionTableScanOperator + instanceof QueryRangeFunctionTableScanOperator); + } + + @Test + void testPushProject() { + PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl(QUERY); + prometheusQueryRequest.setStartTime(STARTTIME); + prometheusQueryRequest.setEndTime(ENDTIME); + prometheusQueryRequest.setStep(STEP); + + QueryRangeFunctionTableScanBuilder queryRangeFunctionTableScanBuilder + = new QueryRangeFunctionTableScanBuilder(prometheusClient, prometheusQueryRequest); + Assertions.assertTrue(queryRangeFunctionTableScanBuilder.pushDownProject(logicalProject)); + } +} diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java new file mode 100644 index 00000000000..3aa992fc65b --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java @@ -0,0 +1,182 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.functions.scan; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.prometheus.constants.TestConstants.ENDTIME; +import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY; +import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME; +import static org.opensearch.sql.prometheus.constants.TestConstants.STEP; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; +import static org.opensearch.sql.prometheus.utils.TestUtils.getJson; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import lombok.SneakyThrows; +import org.json.JSONObject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.model.ExprDoubleValue; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTimestampValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; + +@ExtendWith(MockitoExtension.class) +public class QueryRangeFunctionTableScanOperatorTest { + @Mock + private PrometheusClient prometheusClient; + + @Test + @SneakyThrows + void testQueryResponseIterator() { + + PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl(QUERY); + prometheusQueryRequest.setStartTime(STARTTIME); + prometheusQueryRequest.setEndTime(ENDTIME); + prometheusQueryRequest.setStep(STEP); + + QueryRangeFunctionTableScanOperator queryRangeFunctionTableScanOperator + = new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest); + + when(prometheusClient.queryRange(any(), any(), any(), any())) + .thenReturn(new JSONObject(getJson("query_range_result.json"))); + queryRangeFunctionTableScanOperator.open(); + Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext()); + ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); + put(VALUE, new ExprDoubleValue(1)); + put("instance", new ExprStringValue("localhost:9090")); + put("__name__", new ExprStringValue("up")); + put("job", new ExprStringValue("prometheus")); + } + }); + assertEquals(firstRow, queryRangeFunctionTableScanOperator.next()); + Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext()); + ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put("@timestamp", new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); + put("@value", new ExprDoubleValue(0)); + put("instance", new ExprStringValue("localhost:9091")); + put("__name__", new ExprStringValue("up")); + put("job", new ExprStringValue("node")); + } + }); + assertEquals(secondRow, queryRangeFunctionTableScanOperator.next()); + Assertions.assertFalse(queryRangeFunctionTableScanOperator.hasNext()); + } + + @Test + @SneakyThrows + void testEmptyQueryWithNoMatrixKeyInResultJson() { + PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl(QUERY); + prometheusQueryRequest.setStartTime(STARTTIME); + prometheusQueryRequest.setEndTime(ENDTIME); + prometheusQueryRequest.setStep(STEP); + + QueryRangeFunctionTableScanOperator queryRangeFunctionTableScanOperator + = new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest); + + when(prometheusClient.queryRange(any(), any(), any(), any())) + .thenReturn(new JSONObject(getJson("no_matrix_query_range_result.json"))); + RuntimeException runtimeException + = assertThrows(RuntimeException.class, queryRangeFunctionTableScanOperator::open); + assertEquals( + "Unexpected Result Type: vector during Prometheus Response Parsing. " + + "'matrix' resultType is expected", runtimeException.getMessage()); + } + + @Test + @SneakyThrows + void testQuerySchema() { + PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl(QUERY); + prometheusQueryRequest.setStartTime(STARTTIME); + prometheusQueryRequest.setEndTime(ENDTIME); + prometheusQueryRequest.setStep(STEP); + + QueryRangeFunctionTableScanOperator queryRangeFunctionTableScanOperator + = new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest); + + when(prometheusClient.queryRange(any(), any(), any(), any())) + .thenReturn(new JSONObject(getJson("query_range_result.json"))); + queryRangeFunctionTableScanOperator.open(); + ArrayList columns = new ArrayList<>(); + columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.TIMESTAMP)); + columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE)); + columns.add(new ExecutionEngine.Schema.Column("instance", "instance", ExprCoreType.STRING)); + columns.add(new ExecutionEngine.Schema.Column("__name__", "__name__", ExprCoreType.STRING)); + columns.add(new ExecutionEngine.Schema.Column("job", "job", ExprCoreType.STRING)); + ExecutionEngine.Schema expectedSchema = new ExecutionEngine.Schema(columns); + assertEquals(expectedSchema, queryRangeFunctionTableScanOperator.schema()); + } + + @Test + @SneakyThrows + void testEmptyQueryWithException() { + PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl(QUERY); + prometheusQueryRequest.setStartTime(STARTTIME); + prometheusQueryRequest.setEndTime(ENDTIME); + prometheusQueryRequest.setStep(STEP); + + QueryRangeFunctionTableScanOperator queryRangeFunctionTableScanOperator + = new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest); + + when(prometheusClient.queryRange(any(), any(), any(), any())) + .thenThrow(new IOException("Error Message")); + RuntimeException runtimeException + = assertThrows(RuntimeException.class, queryRangeFunctionTableScanOperator::open); + assertEquals("Error fetching data from prometheus server: Error Message", + runtimeException.getMessage()); + } + + + @Test + @SneakyThrows + void testExplain() { + PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl(QUERY); + prometheusQueryRequest.setStartTime(STARTTIME); + prometheusQueryRequest.setEndTime(ENDTIME); + prometheusQueryRequest.setStep(STEP); + + QueryRangeFunctionTableScanOperator queryRangeFunctionTableScanOperator + = new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest); + + Assertions.assertEquals("query_range(test_query, 1664767694133, 1664771294133, 14)", + queryRangeFunctionTableScanOperator.explain()); + } + + @Test + @SneakyThrows + void testClose() { + PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl(QUERY); + prometheusQueryRequest.setStartTime(STARTTIME); + prometheusQueryRequest.setEndTime(ENDTIME); + prometheusQueryRequest.setStep(STEP); + + QueryRangeFunctionTableScanOperator queryRangeFunctionTableScanOperator + = new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest); + queryRangeFunctionTableScanOperator.close(); + } +} diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java index 01e3e8d899f..de95b2bd640 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java @@ -55,7 +55,9 @@ import org.opensearch.sql.planner.physical.ProjectOperator; import org.opensearch.sql.prometheus.client.PrometheusClient; import org.opensearch.sql.prometheus.constants.TestConstants; +import org.opensearch.sql.prometheus.functions.scan.QueryRangeFunctionTableScanBuilder; import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; +import org.opensearch.sql.storage.read.TableScanBuilder; @ExtendWith(MockitoExtension.class) class PrometheusMetricTableTest { @@ -900,5 +902,24 @@ void testImplementPrometheusQueryWithUnsupportedFilterQuery() { } + @Test + void testCreateScanBuilderWithQueryRangeTableFunction() { + PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl("test"); + prometheusQueryRequest.setStep("15m"); + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, prometheusQueryRequest); + TableScanBuilder tableScanBuilder = prometheusMetricTable.createScanBuilder(); + Assertions.assertNotNull(tableScanBuilder); + Assertions.assertTrue(tableScanBuilder instanceof QueryRangeFunctionTableScanBuilder); + } + + @Test + void testCreateScanBuilderWithPPLQuery() { + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, TestConstants.METRIC_NAME); + TableScanBuilder tableScanBuilder = prometheusMetricTable.createScanBuilder(); + Assertions.assertNull(tableScanBuilder); + } }