Skip to content

Commit fa3f6d9

Browse files
committed
Enable table function
Signed-off-by: Vamsi Manohar <[email protected]>
1 parent 7460273 commit fa3f6d9

File tree

13 files changed

+578
-4
lines changed

13 files changed

+578
-4
lines changed

docs/user/ppl/admin/prometheus_connector.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,27 @@ Example queries
186186
| 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 |
187187
+------------+------------------------+--------------------------------+---------------+
188188

189+
PromQL Support for prometheus Connector
190+
==========================================
191+
192+
`query_range` Table Function
193+
----------------------------
194+
Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
195+
The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
196+
Arguments should be either passed by name or positionArguments should be either passed by name or position.
197+
`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
198+
or
199+
`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
200+
Example::
201+
202+
> source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)
203+
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
204+
| @value | @timestamp | handler | code | instance | job |
205+
|------------+------------------------+--------------------------------+---------------+-------------+-------------|
206+
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
207+
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
208+
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
209+
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus |
210+
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
211+
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
212+
+------------+------------------------+--------------------------------+---------------+-------------+-------------+

ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SearchFromFilterContext;
2121
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SortCommandContext;
2222
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.StatsCommandContext;
23+
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TableFunctionContext;
2324
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TableSourceClauseContext;
2425
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TopCommandContext;
2526
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.WhereCommandContext;
@@ -33,7 +34,6 @@
3334
import java.util.List;
3435
import java.util.Optional;
3536
import java.util.stream.Collectors;
36-
import lombok.Generated;
3737
import lombok.RequiredArgsConstructor;
3838
import org.antlr.v4.runtime.ParserRuleContext;
3939
import org.antlr.v4.runtime.Token;
@@ -363,7 +363,7 @@ public UnresolvedPlan visitTableSourceClause(TableSourceClauseContext ctx) {
363363
}
364364

365365
@Override
366-
public UnresolvedPlan visitTableFunction(OpenSearchPPLParser.TableFunctionContext ctx) {
366+
public UnresolvedPlan visitTableFunction(TableFunctionContext ctx) {
367367
ImmutableList.Builder<UnresolvedExpression> builder = ImmutableList.builder();
368368
ctx.functionArgs().functionArg().forEach(arg
369369
-> {

ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
package org.opensearch.sql.ppl.parser;
88

9-
import static org.opensearch.sql.ast.dsl.AstDSL.qualifiedName;
109
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NOT_NULL;
1110
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NULL;
1211
import static org.opensearch.sql.expression.function.BuiltinFunctionName.POSITION;
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.prometheus.functions.response;
7+
8+
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
9+
10+
import java.time.Instant;
11+
import java.util.ArrayList;
12+
import java.util.Iterator;
13+
import java.util.LinkedHashMap;
14+
import java.util.List;
15+
import org.jetbrains.annotations.NotNull;
16+
import org.json.JSONArray;
17+
import org.json.JSONObject;
18+
import org.opensearch.sql.data.model.ExprDoubleValue;
19+
import org.opensearch.sql.data.model.ExprStringValue;
20+
import org.opensearch.sql.data.model.ExprTimestampValue;
21+
import org.opensearch.sql.data.model.ExprTupleValue;
22+
import org.opensearch.sql.data.model.ExprValue;
23+
import org.opensearch.sql.data.type.ExprCoreType;
24+
import org.opensearch.sql.executor.ExecutionEngine;
25+
import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants;
26+
27+
/**
28+
* Default implementation of QueryRangeFunctionResponseHandle.
29+
*/
30+
public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFunctionResponseHandle {
31+
32+
private final JSONObject responseObject;
33+
private Iterator<ExprValue> responseIterator;
34+
private ExecutionEngine.Schema schema;
35+
36+
/**
37+
* Constructor.
38+
*
39+
* @param responseObject Prometheus responseObject.
40+
*/
41+
public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) {
42+
this.responseObject = responseObject;
43+
constructIteratorAndSchema();
44+
}
45+
46+
private void constructIteratorAndSchema() {
47+
List<ExprValue> result = new ArrayList<>();
48+
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
49+
if ("matrix".equals(responseObject.getString("resultType"))) {
50+
JSONArray itemArray = responseObject.getJSONArray("result");
51+
for (int i = 0; i < itemArray.length(); i++) {
52+
JSONObject item = itemArray.getJSONObject(i);
53+
JSONObject metric = item.getJSONObject("metric");
54+
JSONArray values = item.getJSONArray("values");
55+
if (i == 0) {
56+
columnList = getColumnList(metric);
57+
}
58+
for (int j = 0; j < values.length(); j++) {
59+
LinkedHashMap<String, ExprValue> linkedHashMap =
60+
extractRow(metric, values.getJSONArray(j), columnList);
61+
result.add(new ExprTupleValue(linkedHashMap));
62+
}
63+
}
64+
} else {
65+
throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus "
66+
+ "Response Parsing. 'matrix' resultType is expected",
67+
responseObject.getString("resultType")));
68+
}
69+
this.schema = new ExecutionEngine.Schema(columnList);
70+
this.responseIterator = result.iterator();
71+
}
72+
73+
@NotNull
74+
private static LinkedHashMap<String, ExprValue> extractRow(JSONObject metric,
75+
JSONArray values, List<ExecutionEngine.Schema.Column> columnList) {
76+
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
77+
for (ExecutionEngine.Schema.Column column : columnList) {
78+
if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) {
79+
linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP,
80+
new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000))));
81+
} else if (column.getName().equals(VALUE)) {
82+
linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1)));
83+
} else {
84+
linkedHashMap.put(column.getName(),
85+
new ExprStringValue(metric.getString(column.getName())));
86+
}
87+
}
88+
return linkedHashMap;
89+
}
90+
91+
92+
private List<ExecutionEngine.Schema.Column> getColumnList(JSONObject metric) {
93+
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
94+
columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP,
95+
PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP));
96+
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
97+
for (String key : metric.keySet()) {
98+
columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING));
99+
}
100+
return columnList;
101+
}
102+
103+
@Override
104+
public boolean hasNext() {
105+
return responseIterator.hasNext();
106+
}
107+
108+
@Override
109+
public ExprValue next() {
110+
return responseIterator.next();
111+
}
112+
113+
@Override
114+
public ExecutionEngine.Schema schema() {
115+
return schema;
116+
}
117+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.prometheus.functions.response;
7+
8+
import org.opensearch.sql.data.model.ExprValue;
9+
import org.opensearch.sql.executor.ExecutionEngine;
10+
11+
/**
12+
* Handle Prometheus response.
13+
*/
14+
public interface QueryRangeFunctionResponseHandle {
15+
16+
/**
17+
* Return true if Prometheus response has more result.
18+
*/
19+
boolean hasNext();
20+
21+
/**
22+
* Return Prometheus response as {@link ExprValue}. Attention, the method must been called when
23+
* hasNext return true.
24+
*/
25+
ExprValue next();
26+
27+
/**
28+
* Return ExecutionEngine.Schema of the Prometheus response.
29+
*/
30+
ExecutionEngine.Schema schema();
31+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
*
3+
* * Copyright OpenSearch Contributors
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package org.opensearch.sql.prometheus.functions.scan;
9+
10+
import lombok.AllArgsConstructor;
11+
import org.opensearch.sql.planner.logical.LogicalProject;
12+
import org.opensearch.sql.prometheus.client.PrometheusClient;
13+
import org.opensearch.sql.prometheus.request.PrometheusQueryRequest;
14+
import org.opensearch.sql.storage.TableScanOperator;
15+
import org.opensearch.sql.storage.read.TableScanBuilder;
16+
17+
/**
18+
* TableScanBuilder for query_range table function of prometheus connector.
19+
* we can merge this when we refactor for existing
20+
* ppl queries based on prometheus connector.
21+
*/
22+
@AllArgsConstructor
23+
public class QueryRangeFunctionTableScanBuilder extends TableScanBuilder {
24+
25+
private final PrometheusClient prometheusClient;
26+
27+
private final PrometheusQueryRequest prometheusQueryRequest;
28+
29+
@Override
30+
public TableScanOperator build() {
31+
return new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest);
32+
}
33+
34+
@Override
35+
public boolean pushDownProject(LogicalProject project) {
36+
return true;
37+
}
38+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
*
3+
* * Copyright OpenSearch Contributors
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package org.opensearch.sql.prometheus.functions.scan;
9+
10+
import java.io.IOException;
11+
import java.security.AccessController;
12+
import java.security.PrivilegedAction;
13+
import java.util.Locale;
14+
import lombok.RequiredArgsConstructor;
15+
import org.apache.logging.log4j.LogManager;
16+
import org.apache.logging.log4j.Logger;
17+
import org.json.JSONObject;
18+
import org.opensearch.sql.data.model.ExprValue;
19+
import org.opensearch.sql.executor.ExecutionEngine;
20+
import org.opensearch.sql.prometheus.client.PrometheusClient;
21+
import org.opensearch.sql.prometheus.functions.response.DefaultQueryRangeFunctionResponseHandle;
22+
import org.opensearch.sql.prometheus.functions.response.QueryRangeFunctionResponseHandle;
23+
import org.opensearch.sql.prometheus.request.PrometheusQueryRequest;
24+
import org.opensearch.sql.storage.TableScanOperator;
25+
26+
/**
27+
* This a table scan operator to handle Query Range table function.
28+
*/
29+
@RequiredArgsConstructor
30+
public class QueryRangeFunctionTableScanOperator extends TableScanOperator {
31+
32+
private final PrometheusClient prometheusClient;
33+
34+
private final PrometheusQueryRequest request;
35+
private QueryRangeFunctionResponseHandle prometheusResponseHandle;
36+
37+
private static final Logger LOG = LogManager.getLogger();
38+
39+
@Override
40+
public void open() {
41+
super.open();
42+
this.prometheusResponseHandle
43+
= AccessController.doPrivileged((PrivilegedAction<QueryRangeFunctionResponseHandle>) () -> {
44+
try {
45+
JSONObject responseObject = prometheusClient.queryRange(
46+
request.getPromQl(),
47+
request.getStartTime(), request.getEndTime(), request.getStep());
48+
return new DefaultQueryRangeFunctionResponseHandle(responseObject);
49+
} catch (IOException e) {
50+
LOG.error(e.getMessage());
51+
throw new RuntimeException(
52+
String.format("Error fetching data from prometheus server: %s", e.getMessage()));
53+
}
54+
});
55+
}
56+
57+
@Override
58+
public void close() {
59+
super.close();
60+
}
61+
62+
@Override
63+
public boolean hasNext() {
64+
return this.prometheusResponseHandle.hasNext();
65+
}
66+
67+
@Override
68+
public ExprValue next() {
69+
return this.prometheusResponseHandle.next();
70+
}
71+
72+
@Override
73+
public String explain() {
74+
return String.format(Locale.ROOT, "query_range(%s, %s, %s, %s)",
75+
request.getPromQl(),
76+
request.getStartTime(),
77+
request.getEndTime(),
78+
request.getStep());
79+
}
80+
81+
@Override
82+
public ExecutionEngine.Schema schema() {
83+
return this.prometheusResponseHandle.schema();
84+
}
85+
}

prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.util.LinkedHashMap;
1616
import java.util.List;
1717
import lombok.NonNull;
18-
import org.apache.commons.lang3.StringUtils;
1918
import org.json.JSONArray;
2019
import org.json.JSONObject;
2120
import org.opensearch.sql.data.model.ExprDoubleValue;

prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
import org.opensearch.sql.planner.logical.LogicalPlan;
1818
import org.opensearch.sql.planner.physical.PhysicalPlan;
1919
import org.opensearch.sql.prometheus.client.PrometheusClient;
20+
import org.opensearch.sql.prometheus.functions.scan.QueryRangeFunctionTableScanBuilder;
2021
import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalPlanOptimizerFactory;
2122
import org.opensearch.sql.prometheus.request.PrometheusQueryRequest;
2223
import org.opensearch.sql.prometheus.request.system.PrometheusDescribeMetricRequest;
2324
import org.opensearch.sql.prometheus.storage.implementor.PrometheusDefaultImplementor;
2425
import org.opensearch.sql.storage.Table;
26+
import org.opensearch.sql.storage.read.TableScanBuilder;
2527

2628
/**
2729
* Prometheus table (metric) implementation.
@@ -107,4 +109,14 @@ public LogicalPlan optimize(LogicalPlan plan) {
107109
return PrometheusLogicalPlanOptimizerFactory.create().optimize(plan);
108110
}
109111

112+
//Only handling query_range function for now.
113+
//we need to move PPL implementations to ScanBuilder in future.
114+
@Override
115+
public TableScanBuilder createScanBuilder() {
116+
if (metricName == null) {
117+
return new QueryRangeFunctionTableScanBuilder(prometheusClient, prometheusQueryRequest);
118+
} else {
119+
return null;
120+
}
121+
}
110122
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.opensearch.sql.prometheus.functions.response;
2+
3+
public class DefaultQueryRangeFunctionResponseHandleTest {
4+
5+
}

0 commit comments

Comments
 (0)