Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,27 @@ Example queries
| 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 |
+------------+------------------------+--------------------------------+---------------+

PromQL Support for prometheus Connector
==========================================

`query_range` Table Function
----------------------------
Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
Arguments should be either passed by name or positionArguments should be either passed by name or position.
`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
or
`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
Example::

> source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
| @value | @timestamp | handler | code | instance | job |
|------------+------------------------+--------------------------------+---------------+-------------+-------------|
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
3 changes: 3 additions & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,11 @@ mlArg
fromClause
: SOURCE EQUAL tableSourceClause
| INDEX EQUAL tableSourceClause
| SOURCE EQUAL tableFunction
| INDEX EQUAL tableFunction
;


tableSourceClause
: tableSource (COMMA tableSource)*
;
Expand Down
24 changes: 18 additions & 6 deletions ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SearchFromFilterContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SortCommandContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.StatsCommandContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TableFunctionContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TableSourceClauseContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TopCommandContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.WhereCommandContext;
Expand All @@ -33,7 +34,6 @@
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.RequiredArgsConstructor;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.Token;
Expand All @@ -46,6 +46,7 @@
import org.opensearch.sql.ast.expression.Map;
import org.opensearch.sql.ast.expression.ParseMethod;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.UnresolvedArgument;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
Expand All @@ -62,6 +63,7 @@
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser;
Expand Down Expand Up @@ -346,7 +348,11 @@ public UnresolvedPlan visitTopCommand(TopCommandContext ctx) {
*/
@Override
public UnresolvedPlan visitFromClause(FromClauseContext ctx) {
return visitTableSourceClause(ctx.tableSourceClause());
if (ctx.tableFunction() != null) {
return visitTableFunction(ctx.tableFunction());
} else {
return visitTableSourceClause(ctx.tableSourceClause());
}
}

@Override
Expand All @@ -357,10 +363,16 @@ public UnresolvedPlan visitTableSourceClause(TableSourceClauseContext ctx) {
}

@Override
@Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019
public UnresolvedPlan visitTableFunction(OpenSearchPPLParser.TableFunctionContext ctx) {
//<TODO>
return null;
public UnresolvedPlan visitTableFunction(TableFunctionContext ctx) {
ImmutableList.Builder<UnresolvedExpression> builder = ImmutableList.builder();
ctx.functionArgs().functionArg().forEach(arg
-> {
String argName = (arg.ident() != null) ? arg.ident().getText() : null;
builder.add(
new UnresolvedArgument(argName,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When argName could be null? Are you protected from NPE when it is used?

this.internalVisitExpression(arg.valueExpression())));
});
return new TableFunction(this.internalVisitExpression(ctx.qualifiedName()), builder.build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.opensearch.sql.ppl.parser;

import static org.opensearch.sql.ast.dsl.AstDSL.qualifiedName;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NOT_NULL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NULL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.POSITION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,13 @@ public String visitRelation(Relation node, String context) {
}

@Override
@Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019
public String visitTableFunction(TableFunction node, String context) {
//<TODO>
return null;
String arguments =
node.getArguments().stream()
.map(unresolvedExpression
-> this.expressionAnalyzer.analyze(unresolvedExpression, context))
.collect(Collectors.joining(","));
return StringUtils.format("source=%s(%s)", node.getFunctionName().toString(), arguments);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ public void testSearchCommandWithDotInIndexName() {
);
}

@Ignore
@Test
public void testSearchWithPrometheusQueryRangeWithPositionedArguments() {
assertEqual("search source = prometheus.query_range(\"test{code='200'}\",1234, 12345, 3)",
Expand All @@ -124,7 +123,6 @@ public void testSearchWithPrometheusQueryRangeWithPositionedArguments() {
));
}

@Ignore
@Test
public void testSearchWithPrometheusQueryRangeWithNamedArguments() {
assertEqual("search source = prometheus.query_range(query = \"test{code='200'}\", "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static org.opensearch.sql.ast.dsl.AstDSL.relation;

import java.util.Collections;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
Expand All @@ -36,7 +35,6 @@ public void testSearchCommand() {
}

@Test
@Ignore
public void testTableFunctionCommand() {
assertEquals("source=prometheus.query_range(***,***,***,***)",
anonymize("source=prometheus.query_range('afsd',123,123,3)")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.prometheus.functions.response;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants;

/**
* Default implementation of QueryRangeFunctionResponseHandle.
*/
public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFunctionResponseHandle {

private final JSONObject responseObject;
private Iterator<ExprValue> responseIterator;
private ExecutionEngine.Schema schema;

/**
* Constructor.
*
* @param responseObject Prometheus responseObject.
*/
public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) {
this.responseObject = responseObject;
constructIteratorAndSchema();
}

private void constructIteratorAndSchema() {
List<ExprValue> result = new ArrayList<>();
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
if ("matrix".equals(responseObject.getString("resultType"))) {
JSONArray itemArray = responseObject.getJSONArray("result");
for (int i = 0; i < itemArray.length(); i++) {
JSONObject item = itemArray.getJSONObject(i);
JSONObject metric = item.getJSONObject("metric");
JSONArray values = item.getJSONArray("values");
if (i == 0) {
columnList = getColumnList(metric);
}
for (int j = 0; j < values.length(); j++) {
LinkedHashMap<String, ExprValue> linkedHashMap =
extractRow(metric, values.getJSONArray(j), columnList);
result.add(new ExprTupleValue(linkedHashMap));
}
}
} else {
throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus "
+ "Response Parsing. 'matrix' resultType is expected",
responseObject.getString("resultType")));
}
this.schema = new ExecutionEngine.Schema(columnList);
this.responseIterator = result.iterator();
}

@NotNull
private static LinkedHashMap<String, ExprValue> extractRow(JSONObject metric,
JSONArray values, List<ExecutionEngine.Schema.Column> columnList) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
for (ExecutionEngine.Schema.Column column : columnList) {
if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) {
linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP,
new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000))));
} else if (column.getName().equals(VALUE)) {
linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1)));
} else {
linkedHashMap.put(column.getName(),
new ExprStringValue(metric.getString(column.getName())));
}
}
return linkedHashMap;
}


private List<ExecutionEngine.Schema.Column> getColumnList(JSONObject metric) {
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP,
PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP));
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
for (String key : metric.keySet()) {
columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING));
}
return columnList;
}

@Override
public boolean hasNext() {
return responseIterator.hasNext();
}

@Override
public ExprValue next() {
return responseIterator.next();
}

@Override
public ExecutionEngine.Schema schema() {
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.prometheus.functions.response;

import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;

/**
* Handle Prometheus response.
*/
public interface QueryRangeFunctionResponseHandle {

/**
* Return true if Prometheus response has more result.
*/
boolean hasNext();

/**
* Return Prometheus response as {@link ExprValue}. Attention, the method must been called when
* hasNext return true.
*/
ExprValue next();

/**
* Return ExecutionEngine.Schema of the Prometheus response.
*/
ExecutionEngine.Schema schema();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.prometheus.functions.scan;

import lombok.AllArgsConstructor;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.request.PrometheusQueryRequest;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* TableScanBuilder for query_range table function of prometheus connector.
* we can merge this when we refactor for existing
* ppl queries based on prometheus connector.
*/
@AllArgsConstructor
public class QueryRangeFunctionTableScanBuilder extends TableScanBuilder {

private final PrometheusClient prometheusClient;

private final PrometheusQueryRequest prometheusQueryRequest;

@Override
public TableScanOperator build() {
return new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest);
}

@Override
public boolean pushDownProject(LogicalProject project) {
return true;
}
}
Loading