Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.junit.ClassRule;

import java.io.IOException;
import java.util.List;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;

/**
Expand Down Expand Up @@ -98,7 +100,16 @@ private static void assertMetricResults(ObjectPath responsePath) throws IOExcept
assertThat(responsePath.evaluate("data.result"), hasSize(1));
assertThat(responsePath.evaluate("data.result.0.metric.job"), equalTo("test_job"));
assertThat(responsePath.evaluate("data.result.0.metric.instance"), equalTo("localhost:9090"));
assertThat(responsePath.evaluate("data.result.0.values"), hasSize(5));
List<List<Object>> values = responsePath.evaluate("data.result.0.values");
assertThat(values, hasSize(5));

// Assert timestamps are in strictly ascending order
double prevTimestamp = -1;
for (List<Object> point : values) {
double timestamp = ((Number) point.getFirst()).doubleValue();
assertThat(timestamp, greaterThan(prevTimestamp));
prevTimestamp = timestamp;
}
}

private ObjectPath executeQueryRange() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,16 @@
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.parser.QueryParam;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.action.PreparedEsqlQueryRequest;
import org.elasticsearch.xpack.esql.plan.EsqlStatement;

import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.VALUE;

/**
* REST handler for the Prometheus {@code GET /api/v1/query_range} endpoint.
* Translates Prometheus query_range parameters into an ES|QL {@code PROMQL} command,
* Translates Prometheus query_range parameters into an ES|QL PromqlCommand logical plan,
* executes it, and converts the result into the Prometheus matrix JSON format.
* Only GET is supported. POST with {@code application/x-www-form-urlencoded} bodies is rejected
* at the HTTP layer as a CSRF safeguard before this handler is ever reached — see
Expand All @@ -36,24 +33,10 @@
@ServerlessScope(Scope.PUBLIC)
public class PrometheusQueryRangeRestAction extends BaseRestHandler {

static final String QUERY_PARAM = "query";
static final String START_PARAM = "start";
static final String END_PARAM = "end";
static final String INDEX_PARAM = "index";

static final String ESQL_QUERY = "PROMQL step=?"
+ PrometheusQueryRangeResponseListener.STEP_PARAM
+ " start=?"
+ START_PARAM
+ " end=?"
+ END_PARAM
+ " index=?"
+ INDEX_PARAM
+ " "
+ PrometheusQueryRangeResponseListener.VALUE_COLUMN
+ "=(?"
+ QUERY_PARAM
+ ") | EVAL step = TO_LONG(step)";
private static final String INDEX_PARAM = "index";
private static final String QUERY_PARAM = "query";
private static final String START_PARAM = "start";
private static final String END_PARAM = "end";

@Override
public String getName() {
Expand All @@ -73,8 +56,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String step = getRequiredParam(request, PrometheusQueryRangeResponseListener.STEP_PARAM);
String index = request.param(INDEX_PARAM, "*");

EsqlQueryRequest esqlRequest = EsqlQueryRequest.syncEsqlQueryRequest(ESQL_QUERY);
esqlRequest.params(buildQueryParams(query, index, start, end, step));
EsqlStatement statement = PromqlQueryPlanBuilder.buildStatement(query, index, start, end, step);
var esqlRequest = PreparedEsqlQueryRequest.sync(statement, query);

return channel -> client.execute(EsqlQueryAction.INSTANCE, esqlRequest, new PrometheusQueryRangeResponseListener(channel));
}
Expand All @@ -87,19 +70,4 @@ private static String getRequiredParam(RestRequest request, String name) {
return value;
}

/**
* Creates query parameters for all Prometheus query_range values.
*/
static QueryParams buildQueryParams(String query, String index, String start, String end, String step) {
return new QueryParams(
List.of(
new QueryParam(QUERY_PARAM, query, DataType.KEYWORD, VALUE),
new QueryParam(INDEX_PARAM, index, DataType.KEYWORD, VALUE),
new QueryParam(START_PARAM, start, DataType.KEYWORD, VALUE),
new QueryParam(END_PARAM, end, DataType.KEYWORD, VALUE),
new QueryParam(PrometheusQueryRangeResponseListener.STEP_PARAM, step, DataType.KEYWORD, VALUE)
)
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.prometheus.rest;

import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.UnresolvedTimestamp;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToLong;
import org.elasticsearch.xpack.esql.parser.PromqlParser;
import org.elasticsearch.xpack.esql.parser.promql.PromqlParserUtils;
import org.elasticsearch.xpack.esql.plan.EsqlStatement;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.SourceCommand;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.promql.PromqlCommand;

import java.time.Duration;
import java.time.Instant;
import java.util.List;

/**
* Builds an {@link EsqlStatement} containing a {@link PromqlCommand} logical plan
* directly from Prometheus query_range parameters, bypassing ES|QL string construction and parsing.
*/
class PromqlQueryPlanBuilder {

private static final Duration DEFAULT_SCRAPE_INTERVAL = Duration.ofMinutes(1);

/**
* Builds an {@link EsqlStatement} containing a {@link PromqlCommand} with an {@link Eval} node
* for the {@code TO_LONG(step)} conversion.
*/
static EsqlStatement buildStatement(String query, String index, String startStr, String endStr, String stepStr) {
Instant startInstant = PromqlParserUtils.parseDate(Source.EMPTY, startStr);
Instant endInstant = PromqlParserUtils.parseDate(Source.EMPTY, endStr);
Duration stepDuration = parseStep(Source.EMPTY, stepStr);

Literal startLiteral = Literal.dateTime(Source.EMPTY, startInstant);
Literal endLiteral = Literal.dateTime(Source.EMPTY, endInstant);
Literal stepLiteral = Literal.timeDuration(Source.EMPTY, stepDuration);

IndexPattern indexPattern = new IndexPattern(Source.EMPTY, index);
UnresolvedRelation unresolvedRelation = new UnresolvedRelation(
Source.EMPTY,
indexPattern,
false,
List.of(),
null,
SourceCommand.PROMQL
);

PromqlParser promqlParser = new PromqlParser();
LogicalPlan promqlPlan = promqlParser.createStatement(query, startLiteral, endLiteral, 0, 0);

PromqlCommand promqlCommand = new PromqlCommand(
Source.EMPTY,
unresolvedRelation,
promqlPlan,
startLiteral,
endLiteral,
stepLiteral,
Literal.NULL,
Literal.timeDuration(Source.EMPTY, DEFAULT_SCRAPE_INTERVAL),
PrometheusQueryRangeResponseListener.VALUE_COLUMN,
new UnresolvedTimestamp(Source.EMPTY)
);

// TO_LONG converts the step datetime to epoch millis, avoiding the need to parse a date string in the response listener.
Alias stepAlias = new Alias(
Source.EMPTY,
PromqlCommand.STEP_COLUMN_NAME,
new ToLong(
Source.EMPTY,
promqlCommand.output().stream().filter(a -> a.name().equals(PromqlCommand.STEP_COLUMN_NAME)).findFirst().get()
)
);
// Eval's mergeOutputAttributes drops step(datetime) and appends step_alias(long) at the end,
// producing [value, ...dimensions, step(long)] — the order the response listener expects.
Eval eval = new Eval(Source.EMPTY, promqlCommand, List.of(stepAlias));

// Sort by step (timestamp) ascending so Prometheus clients receive values in chronological order.
Attribute stepAttr = stepAlias.toAttribute();
Order stepOrder = new Order(Source.EMPTY, stepAttr, Order.OrderDirection.ASC, Order.NullsPosition.LAST);
OrderBy orderBy = new OrderBy(Source.EMPTY, eval, List.of(stepOrder));

return new EsqlStatement(orderBy, List.of());
}

private static Duration parseStep(Source source, String value) {
try {
return Duration.ofSeconds(Integer.parseInt(value));
} catch (NumberFormatException ignore) {
return PromqlParserUtils.parseDuration(source, value);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.prometheus.rest;

import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.plan.EsqlStatement;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.promql.PromqlCommand;
import org.elasticsearch.xpack.esql.plan.logical.promql.selector.InstantSelector;

import java.time.Duration;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class PromqlQueryPlanBuilderTests extends ESTestCase {

public void testBuildStatementPlanStructure() {
EsqlStatement statement = PromqlQueryPlanBuilder.buildStatement("up", "*", "2025-01-01T00:00:00Z", "2025-01-01T01:00:00Z", "15s");
assertThat(statement.plan(), instanceOf(OrderBy.class));
Eval eval = (Eval) ((OrderBy) statement.plan()).child();
assertThat(eval.fields().size(), equalTo(1));
assertThat(eval.fields().get(0).name(), equalTo("step"));
assertThat(eval.child(), instanceOf(PromqlCommand.class));
PromqlCommand promqlCommand = (PromqlCommand) eval.child();
assertThat(promqlCommand.valueColumnName(), equalTo("value"));
assertThat(promqlCommand.isRangeQuery(), equalTo(true));
assertThat(promqlCommand.child(), instanceOf(UnresolvedRelation.class));
assertThat(((UnresolvedRelation) promqlCommand.child()).indexPattern().indexPattern(), equalTo("*"));
assertThat(promqlCommand.hasTimeRange(), equalTo(true));
assertThat(promqlCommand.step().value(), equalTo(Duration.ofSeconds(15)));
assertThat(promqlCommand.promqlPlan(), instanceOf(InstantSelector.class));
assertThat(((NamedExpression) ((InstantSelector) promqlCommand.promqlPlan()).series()).name(), equalTo("up"));
}

public void testBuildStatementWithCustomIndex() {
EsqlStatement statement = PromqlQueryPlanBuilder.buildStatement(
"up",
"metrics-*",
"2025-01-01T00:00:00Z",
"2025-01-01T01:00:00Z",
"15s"
);
assertThat(statement.plan(), instanceOf(OrderBy.class));
Eval eval = (Eval) ((OrderBy) statement.plan()).child();
PromqlCommand promqlCommand = (PromqlCommand) eval.child();
assertThat(promqlCommand.valueColumnName(), equalTo("value"));
assertThat(((UnresolvedRelation) promqlCommand.child()).indexPattern().indexPattern(), equalTo("metrics-*"));
assertThat(promqlCommand.hasTimeRange(), equalTo(true));
assertThat(promqlCommand.step().value(), equalTo(Duration.ofSeconds(15)));
assertThat(((NamedExpression) ((InstantSelector) promqlCommand.promqlPlan()).series()).name(), equalTo("up"));
}

public void testBuildStatementWithNumericStep() {
EsqlStatement statement = PromqlQueryPlanBuilder.buildStatement("up", "*", "1735689600", "1735693200", "60");
assertThat(statement.plan(), instanceOf(OrderBy.class));
Eval eval = (Eval) ((OrderBy) statement.plan()).child();
assertThat(eval.child(), instanceOf(PromqlCommand.class));
PromqlCommand promqlCommand = (PromqlCommand) eval.child();
assertThat(((UnresolvedRelation) promqlCommand.child()).indexPattern().indexPattern(), equalTo("*"));
assertThat(promqlCommand.hasTimeRange(), equalTo(true));
assertThat(promqlCommand.step().value(), equalTo(Duration.ofSeconds(60)));
assertThat(((NamedExpression) ((InstantSelector) promqlCommand.promqlPlan()).series()).name(), equalTo("up"));
}
}
Loading