diff --git a/x-pack/plugin/prometheus/src/javaRestTest/java/org/elasticsearch/xpack/prometheus/PrometheusSeriesRestIT.java b/x-pack/plugin/prometheus/src/javaRestTest/java/org/elasticsearch/xpack/prometheus/PrometheusSeriesRestIT.java new file mode 100644 index 0000000000000..2a7d7f132adff --- /dev/null +++ b/x-pack/plugin/prometheus/src/javaRestTest/java/org/elasticsearch/xpack/prometheus/PrometheusSeriesRestIT.java @@ -0,0 +1,226 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.prometheus; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.compression.Snappy; + +import org.apache.http.HttpHeaders; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.prometheus.proto.RemoteWrite; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; + +/** + * Integration tests for the Prometheus {@code GET /api/v1/series} endpoint. + * + *

Tests focus on high-level HTTP concerns: routing, request/response format, status codes. + * Detailed plan-building and response-parsing logic is covered by unit tests. + */ +public class PrometheusSeriesRestIT extends ESRestTestCase { + + private static final String USER = "test_admin"; + private static final String PASS = "x-pack-test-password"; + private static final String DEFAULT_DATA_STREAM = "metrics-generic.prometheus-default"; + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .distribution(DistributionType.DEFAULT) + .user(USER, PASS, "superuser", false) + .setting("xpack.security.enabled", "true") + .setting("xpack.security.autoconfiguration.enabled", "false") + .setting("xpack.license.self_generated.type", "trial") + .setting("xpack.ml.enabled", "false") + .setting("xpack.watcher.enabled", "false") + .build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Override + protected Settings restClientSettings() { + String token = basicAuthHeaderValue(USER, new SecureString(PASS.toCharArray())); + return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + + // Validation — 400 responses + + public void testMissingMatchSelectorReturnsBadRequest() throws Exception { + Request request = new Request("GET", "/_prometheus/api/v1/series"); + ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request)); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("match[]")); + } + + public void testInvalidSelectorSyntaxReturnsBadRequest() throws Exception { + // {not valid!!!} is not valid PromQL + Request request = seriesRequest("{not valid!!!}"); + ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request)); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + } + + public void testRangeSelectorReturnsBadRequest() throws Exception { + // up[5m] is a range vector, not an instant vector + Request request = seriesRequest("up[5m]"); + ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request)); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + } + + // Response format — 200 with JSON envelope + + public void testGetResponseIsJsonWithSuccessEnvelope() throws Exception { + writeMetric("test_gauge", Map.of()); + + Response response = querySeries("test_gauge"); + + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(response.getEntity().getContentType().getValue(), containsString("application/json")); + + Map body = entityAsMap(response); + assertThat(body.get("status"), equalTo("success")); + assertThat(body.get("data"), notNullValue()); + } + + // Data round-trip — write via remote write, read back via series + + public void testGetReturnsIndexedSeries() throws Exception { + writeMetric("test_gauge", Map.of("job", "series_test", "instance", "localhost:9090"), 42.0); + + List> data = querySeriesData("test_gauge"); + + assertThat(data, hasSize(1)); + Map series = data.getFirst(); + assertThat(series.get("__name__"), equalTo("test_gauge")); + assertThat(series.get("job"), equalTo("series_test")); + assertThat(series.get("instance"), equalTo("localhost:9090")); + } + + public void testGetWithLabelMatcherFiltersCorrectly() throws Exception { + writeMetric("matched_metric", Map.of("job", "target_job")); + writeMetric("other_metric", Map.of("job", "other_job")); + + // Query by exact metric name — should only return the matched metric + List> data = querySeriesData("matched_metric"); + + assertThat(data, hasSize(1)); + assertThat(data.getFirst().get("__name__"), equalTo("matched_metric")); + assertThat(data.getFirst().get("job"), equalTo("target_job")); + } + + public void testSeriesWithIndexPattern() throws Exception { + writeMetric("test_gauge_idx", Map.of("job", "index_test")); + + List> data = querySeriesData("metrics-generic.prometheus-*", "test_gauge_idx"); + + assertThat(data, hasSize(1)); + assertThat(data.getFirst().get("__name__"), equalTo("test_gauge_idx")); + } + + // Helpers + + /** + * Builds a series request with a single {@code match[]} parameter. + * TODO: support multiple {@code match[]} values once multi-value query param support lands. + */ + private static Request seriesRequest(String matcher) { + return seriesRequest(null, matcher); + } + + private static Request seriesRequest(String index, String matcher) { + String path = index == null ? "/_prometheus/api/v1/series" : "/_prometheus/" + index + "/api/v1/series"; + Request request = new Request("GET", path); + request.addParameter("match[]", matcher); + return request; + } + + private Response querySeries(String matcher) throws IOException { + return querySeries(null, matcher); + } + + private Response querySeries(String index, String matcher) throws IOException { + return client().performRequest(seriesRequest(index, matcher)); + } + + @SuppressWarnings("unchecked") + private List> querySeriesData(String matcher) throws IOException { + return querySeriesData(null, matcher); + } + + @SuppressWarnings("unchecked") + private List> querySeriesData(String index, String matcher) throws IOException { + return seriesData(querySeries(index, matcher)); + } + + @SuppressWarnings("unchecked") + private List> seriesData(Response response) throws IOException { + Map body = entityAsMap(response); + return (List>) body.get("data"); + } + + private void writeMetric(String metricName, Map labels) throws IOException { + writeMetric(metricName, labels, 1.0); + } + + private void writeMetric(String metricName, Map labels, double value) throws IOException { + RemoteWrite.TimeSeries.Builder ts = RemoteWrite.TimeSeries.newBuilder().addLabels(label("__name__", metricName)); + labels.forEach((k, v) -> ts.addLabels(label(k, v))); + ts.addSamples(sample(value, System.currentTimeMillis())); + + RemoteWrite.WriteRequest writeRequest = RemoteWrite.WriteRequest.newBuilder().addTimeseries(ts.build()).build(); + + Request request = new Request("POST", "/_prometheus/api/v1/write"); + request.setEntity(new ByteArrayEntity(snappyEncode(writeRequest.toByteArray()), ContentType.create("application/x-protobuf"))); + request.setOptions(request.getOptions().toBuilder().addHeader(HttpHeaders.CONTENT_ENCODING, "snappy")); + client().performRequest(request); + client().performRequest(new Request("POST", "/" + DEFAULT_DATA_STREAM + "/_refresh")); + } + + private static RemoteWrite.Label label(String name, String value) { + return RemoteWrite.Label.newBuilder().setName(name).setValue(value).build(); + } + + private static RemoteWrite.Sample sample(double value, long timestamp) { + return RemoteWrite.Sample.newBuilder().setValue(value).setTimestamp(timestamp).build(); + } + + private static byte[] snappyEncode(byte[] input) { + ByteBuf in = Unpooled.wrappedBuffer(input); + ByteBuf out = Unpooled.buffer(input.length); + try { + new Snappy().encode(in, out, input.length); + byte[] result = new byte[out.readableBytes()]; + out.readBytes(result); + return result; + } finally { + in.release(); + out.release(); + } + } +} diff --git a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/PrometheusPlugin.java b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/PrometheusPlugin.java index 815391a27f6a5..d6392b4a15afc 100644 --- a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/PrometheusPlugin.java +++ b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/PrometheusPlugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.prometheus.rest.PrometheusQueryRangeRestAction; import org.elasticsearch.xpack.prometheus.rest.PrometheusRemoteWriteRestAction; import org.elasticsearch.xpack.prometheus.rest.PrometheusRemoteWriteTransportAction; +import org.elasticsearch.xpack.prometheus.rest.PrometheusSeriesRestAction; import java.util.Collection; import java.util.List; @@ -98,6 +99,7 @@ public Collection getRestHandlers( assert indexingPressure.get() != null : "indexing pressure must be set if plugin is enabled"; return List.of( new PrometheusRemoteWriteRestAction(indexingPressure.get(), maxProtobufContentLengthBytes, recycler.get()), + new PrometheusSeriesRestAction(), new PrometheusQueryRangeRestAction(), new PrometheusLabelValuesRestAction() ); diff --git a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesRestAction.java b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesRestAction.java new file mode 100644 index 0000000000000..8ce07cf3aca4b --- /dev/null +++ b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesRestAction.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.prometheus.rest; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.xpack.esql.action.EsqlQueryAction; +import org.elasticsearch.xpack.esql.action.PreparedEsqlQueryRequest; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.parser.promql.PromqlParserUtils; +import org.elasticsearch.xpack.esql.plan.EsqlStatement; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import static java.time.temporal.ChronoUnit.HOURS; +import static org.elasticsearch.rest.RestRequest.Method.GET; + +/** + * REST handler for the Prometheus {@code GET /api/v1/series} endpoint. + * Returns the list of time series matching a label selector. + * Only GET is supported. POST with {@code application/x-www-form-urlencoded} bodies is rejected + * at the HTTP layer as a CSRF safeguard before this handler is ever reached — see + * {@code RestController#isContentTypeDisallowed}. + */ +@ServerlessScope(Scope.PUBLIC) +public class PrometheusSeriesRestAction extends BaseRestHandler { + + private static final String MATCH_PARAM = "match[]"; + private static final String START_PARAM = "start"; + private static final String END_PARAM = "end"; + private static final String LIMIT_PARAM = "limit"; + private static final String INDEX_PARAM = "index"; + + private static final int DEFAULT_LIMIT = 0; // 0 = no limit, matching Prometheus semantics + private static final long DEFAULT_LOOKBACK_HOURS = 24; + + @Override + public String getName() { + return "prometheus_series_action"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_prometheus/api/v1/series"), new Route(GET, "/_prometheus/{index}/api/v1/series")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + // TODO: support multiple match[] values once multi-value query param support lands + String matchSelector = request.param(MATCH_PARAM); + if (matchSelector == null) { + throw new IllegalArgumentException("At least one [match[]] selector is required"); + } + List matchSelectors = List.of(matchSelector); + + // Time range + String endParam = request.param(END_PARAM); + String startParam = request.param(START_PARAM); + Instant end = endParam != null ? PromqlParserUtils.parseDate(Source.EMPTY, endParam) : Instant.now(); + Instant start = startParam != null + ? PromqlParserUtils.parseDate(Source.EMPTY, startParam) + : end.minus(DEFAULT_LOOKBACK_HOURS, HOURS); + + // Optional limit; 0 means "disabled" (Prometheus semantics), which defers to the ESQL + // result_truncation_max_size cluster setting (default 10 000). Positive values use a + // limit+1 sentinel to detect and report truncation. + int limit = request.paramAsInt(LIMIT_PARAM, DEFAULT_LIMIT); + + String index = request.param(INDEX_PARAM, "*"); + LogicalPlan plan = PrometheusSeriesPlanBuilder.buildPlan(index, matchSelectors, start, end, limit); + EsqlStatement statement = new EsqlStatement(plan, List.of()); + PreparedEsqlQueryRequest esqlRequest = PreparedEsqlQueryRequest.sync(statement, "prometheus_series"); + + return channel -> client.execute(EsqlQueryAction.INSTANCE, esqlRequest, new PrometheusSeriesResponseListener(channel, limit)); + } + +} diff --git a/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListenerTests.java b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListenerTests.java index a8beb68ef8ea1..3b10a221f6ddb 100644 --- a/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListenerTests.java +++ b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListenerTests.java @@ -23,9 +23,7 @@ public class PrometheusSeriesResponseListenerTests extends ESTestCase { - // ------------------------------------------------------------------------- // parseDimensions tests - // ------------------------------------------------------------------------- public void testParseDimensionsStripsLabelsPrefix() { String json = "{\"labels.__name__\":\"up\",\"labels.job\":\"prometheus\",\"labels.instance\":\"localhost:9090\"}"; @@ -61,9 +59,7 @@ public void testParseDimensionsSingleLabel() { assertThat(labels.size(), is(1)); } - // ------------------------------------------------------------------------- // Error response tests - // ------------------------------------------------------------------------- public void testOnFailureBadRequest() throws Exception { FakeRestRequest fakeRequest = new FakeRestRequest(); @@ -102,9 +98,7 @@ public void testOnFailureResponseBodyContainsErrorType() throws Exception { assertThat(body, containsString("invalid parameter")); } - // ------------------------------------------------------------------------- // Truncation / limit tests - // ------------------------------------------------------------------------- public void testNoWarningWhenResultsLessThanLimitPlusOne() throws Exception { // limit=2, 2 rows returned → not truncated (sentinel threshold is 3) @@ -134,9 +128,7 @@ public void testNoWarningWhenLimitIsZero() throws Exception { assertThat(body, not(containsString("warnings"))); } - // ------------------------------------------------------------------------- // buildLabelMap / metric_name fallback tests (Change 3) - // ------------------------------------------------------------------------- public void testBuildLabelMapUsesMetricNameAsFallback() { Map labels = PrometheusSeriesResponseListener.buildLabelMap("cpu_usage", "{}");