diff --git a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusLabelValuesResponseListener.java b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusLabelValuesResponseListener.java new file mode 100644 index 0000000000000..faab4b5468670 --- /dev/null +++ b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusLabelValuesResponseListener.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.prometheus.rest; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.Strings; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Converts an {@link EsqlQueryResponse} into the + * Prometheus {@code /api/v1/label/{name}/values} JSON response format. + * + *

ESQL has already sorted and deduplicated the values via {@code STATS BY} and {@code ORDER BY}. + * This listener iterates the rows and strips storage prefixes before returning values: + *

+ * + *

Truncation detection uses a {@code limit + 1} sentinel: if the result contains exactly + * {@code limit + 1} rows, the last row is dropped and a warning is emitted. + * + *

When ESQL returns a {@code "Unknown column"} BAD_REQUEST error (label name absent from all + * index mappings), the listener converts it into an empty {@code data:[]} success response — the + * correct Prometheus behaviour for a label with no values. + */ +public class PrometheusLabelValuesResponseListener { + + private static final Logger logger = LogManager.getLogger(PrometheusLabelValuesResponseListener.class); + + private static final String LABELS_PREFIX = "labels."; + private static final String METRICS_PREFIX = "metrics."; + private static final String CONTENT_TYPE = "application/json"; + + private PrometheusLabelValuesResponseListener() {} + + public static ActionListener create(RestChannel channel, int limit) { + // Do NOT close/decRef the response here: the framework (via respondAndRelease) calls + // decRef() after this method returns, which is the correct single release. + return ActionListener.wrap(ignored -> {}, e -> { + logger.debug("Label values query failed", e); + // When ESQL cannot find the label name in any index mapping it throws a BAD_REQUEST + // "Unknown column []" error. The correct Prometheus response for a label that has + // no values is an empty data array, not an error. + if (isUnknownColumn(e)) { + try { + channel.sendResponse(buildSuccessResponse(List.of(), 0)); + } catch (Exception sendEx) { + sendEx.addSuppressed(e); + logger.warn("Failed to send empty-data response for unknown column", sendEx); + } + } else { + PrometheusErrorResponse.send(channel, e, logger); + } + }).delegateFailureAndWrap((l, response) -> { + List values = collectValues(response.rows()); + channel.sendResponse(buildSuccessResponse(values, limit)); + }); + } + + /** + * Collects label values from the single-column ESQL result rows, stripping storage prefixes: + * {@code "labels."} (Prometheus passthrough field) and {@code "metrics."} (raw ES field path + * returned by MetricsInfo for the {@code __name__} plan branch). OTel values carry neither + * prefix and are returned as-is. Package-private for testing. + */ + static List collectValues(Iterable> rows) { + List result = new ArrayList<>(); + for (Iterable row : rows) { + Object value = row.iterator().next(); + if (value == null) { + continue; + } + String raw = value.toString(); + String labelValue; + if (raw.startsWith(LABELS_PREFIX)) { + labelValue = raw.substring(LABELS_PREFIX.length()); + } else if (raw.startsWith(METRICS_PREFIX)) { + labelValue = raw.substring(METRICS_PREFIX.length()); + } else { + labelValue = raw; + } + if (labelValue.isEmpty() == false) { + result.add(labelValue); + } + } + return result; + } + + /** + * Builds the success response. Uses the {@code limit + 1} sentinel to detect truncation: + * if collected size equals {@code limit + 1}, truncates to {@code limit} and adds a warning. + * Package-private for testing. + */ + static RestResponse buildSuccessResponse(List values, int limit) throws IOException { + boolean truncated = false; + List output = values; + if (limit > 0 && values.size() == limit + 1) { + output = values.subList(0, limit); + truncated = true; + } + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + builder.field("status", "success"); + builder.startArray("data"); + for (String v : output) { + builder.value(v); + } + builder.endArray(); + if (truncated) { + builder.startArray("warnings"); + builder.value("results truncated due to limit"); + builder.endArray(); + } + builder.endObject(); + return new RestResponse(RestStatus.OK, CONTENT_TYPE, Strings.toString(builder)); + } + + /** + * Returns {@code true} if {@code e} is an ESQL analysis error caused by a label name that is + * absent from all index mappings. ESQL produces a {@code "Unknown column [name]"} message in + * its {@code VerificationException}. When no TS indices exist at all, the error may be + * combined with an {@code "@timestamp"} unresolved message and arrive as a 500; both cases + * mean no data exists for the requested label, so an empty {@code data:[]} response is correct. + */ + static boolean isUnknownColumn(Exception e) { + return e != null && e.getMessage() != null && e.getMessage().contains("Unknown column ["); + } +} diff --git a/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusLabelValuesResponseListenerTests.java b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusLabelValuesResponseListenerTests.java new file mode 100644 index 0000000000000..96dff801f0287 --- /dev/null +++ b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusLabelValuesResponseListenerTests.java @@ -0,0 +1,201 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.prometheus.rest; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestChannel; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; + +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +public class PrometheusLabelValuesResponseListenerTests extends ESTestCase { + + public void testCollectValuesStripsLabelsPrefix() { + List values = PrometheusLabelValuesResponseListener.collectValues( + List.of(List.of("labels.prometheus"), List.of("labels.node_exporter")) + ); + assertThat(values, equalTo(List.of("prometheus", "node_exporter"))); + } + + public void testCollectValuesSingleEntry() { + List values = PrometheusLabelValuesResponseListener.collectValues(List.of(List.of("labels.production"))); + assertThat(values, equalTo(List.of("production"))); + } + + public void testCollectValuesSkipsNullValues() { + List> rows = new java.util.ArrayList<>(); + rows.add(Arrays.asList((Object) null)); + rows.add(List.of("labels.staging")); + assertThat(PrometheusLabelValuesResponseListener.collectValues(rows), equalTo(List.of("staging"))); + } + + public void testCollectValuesEmptyRowsReturnsEmptyList() { + assertThat(PrometheusLabelValuesResponseListener.collectValues(List.of()).isEmpty(), is(true)); + } + + public void testCollectValuesStripsMetricsPrefix() { + List values = PrometheusLabelValuesResponseListener.collectValues( + List.of(List.of("metrics.up"), List.of("metrics.http_requests_total")) + ); + assertThat(values, equalTo(List.of("up", "http_requests_total"))); + } + + public void testCollectValuesOtelValuesReturnedAsIs() { + List values = PrometheusLabelValuesResponseListener.collectValues(List.of(List.of("http_server"), List.of("grpc_server"))); + assertThat(values, equalTo(List.of("http_server", "grpc_server"))); + } + + public void testBuildSuccessResponseBodyShape() throws Exception { + String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(List.of("prometheus", "node_exporter"), 0) + .content() + .utf8ToString(); + assertThat(body, containsString("\"status\":\"success\"")); + assertThat(body, containsString("\"prometheus\"")); + assertThat(body, containsString("\"node_exporter\"")); + } + + public void testBuildSuccessResponseEmptyData() throws Exception { + String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(List.of(), 0).content().utf8ToString(); + assertThat(body, containsString("\"status\":\"success\"")); + assertThat(body, containsString("\"data\":[]")); + } + + public void testNoTruncationWarningWhenRowsBelowLimitPlusOne() throws Exception { + // 2 rows, limit=5 → limit+1=6, no truncation + String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(List.of("a", "b"), 5).content().utf8ToString(); + assertThat(body, not(containsString("warnings"))); + } + + public void testTruncationWarningWhenRowsEqualLimitPlusOne() throws Exception { + // 6 rows returned (limit+1 sentinel), limit=5 → truncated + List values = List.of("a", "b", "c", "d", "e", "f"); + String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(values, 5).content().utf8ToString(); + assertThat(body, containsString("warnings")); + assertThat(body, containsString("results truncated due to limit")); + // last value should be dropped + assertThat(body, not(containsString("\"f\""))); + assertThat(body, containsString("\"e\"")); + } + + public void testTruncationOutputContainsExactlyLimitValues() throws Exception { + List values = List.of("a", "b", "c", "d", "e", "f"); // 6 = limit+1 + String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(values, 5).content().utf8ToString(); + // "a" through "e" should be present, "f" absent + assertThat(body, containsString("\"a\"")); + assertThat(body, containsString("\"e\"")); + assertThat(body, not(containsString("\"f\""))); + } + + public void testNoTruncationWhenLimitIsZero() throws Exception { + List values = List.of("a", "b", "c"); + String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(values, 0).content().utf8ToString(); + assertThat(body, not(containsString("warnings"))); + assertThat(body, containsString("\"c\"")); + } + + public void testOnFailureBadRequest() { + FakeRestRequest fakeRequest = new FakeRestRequest(); + FakeRestChannel channel = new FakeRestChannel(fakeRequest, true); + ActionListener listener = PrometheusLabelValuesResponseListener.create(channel, 0); + + ElasticsearchStatusException ex = new ElasticsearchStatusException("bad selector syntax", RestStatus.BAD_REQUEST); + listener.onFailure(ex); + + assertThat(channel.errors().get(), is(1)); + assertThat(channel.capturedResponse().status(), equalTo(RestStatus.BAD_REQUEST)); + } + + public void testOnFailureInternalError() { + FakeRestRequest fakeRequest = new FakeRestRequest(); + FakeRestChannel channel = new FakeRestChannel(fakeRequest, true); + ActionListener listener = PrometheusLabelValuesResponseListener.create(channel, 0); + + listener.onFailure(new RuntimeException("something went wrong")); + + assertThat(channel.errors().get(), is(1)); + assertThat(channel.capturedResponse().status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + } + + public void testOnFailureResponseBodyContainsBadDataErrorType() { + FakeRestRequest fakeRequest = new FakeRestRequest(); + FakeRestChannel channel = new FakeRestChannel(fakeRequest, true); + ActionListener listener = PrometheusLabelValuesResponseListener.create(channel, 0); + + ElasticsearchStatusException ex = new ElasticsearchStatusException("invalid parameter", RestStatus.BAD_REQUEST); + listener.onFailure(ex); + + String body = channel.capturedResponse().content().utf8ToString(); + assertThat(body, containsString("\"status\":\"error\"")); + assertThat(body, containsString("\"errorType\":\"bad_data\"")); + assertThat(body, containsString("invalid parameter")); + } + + public void testOnFailureTimeoutReturnsExecutionErrorType() { + FakeRestRequest fakeRequest = new FakeRestRequest(); + FakeRestChannel channel = new FakeRestChannel(fakeRequest, true); + ActionListener listener = PrometheusLabelValuesResponseListener.create(channel, 0); + + // Generic RuntimeException does not trigger timeout — errorType is "execution" + listener.onFailure(new RuntimeException("query failed")); + + String body = channel.capturedResponse().content().utf8ToString(); + assertThat(body, containsString("\"errorType\":\"execution\"")); + } + + public void testIsUnknownColumnReturnsTrueForBadRequest() { + ElasticsearchStatusException ex = new ElasticsearchStatusException("Unknown column [my_label]", RestStatus.BAD_REQUEST); + assertThat(PrometheusLabelValuesResponseListener.isUnknownColumn(ex), is(true)); + } + + public void testIsUnknownColumnReturnsTrueForInternalError() { + // When no TS indices exist at all, @timestamp is also unresolved — the combined error + // arrives as a 500 but still contains "Unknown column [...]" + ElasticsearchStatusException ex = new ElasticsearchStatusException( + "Found 2 problems\nline -1:-1: [] requires the [@timestamp] field...\nline -1:-1: Unknown column [my_label]", + RestStatus.INTERNAL_SERVER_ERROR + ); + assertThat(PrometheusLabelValuesResponseListener.isUnknownColumn(ex), is(true)); + } + + public void testIsUnknownColumnReturnsFalseForUnrelatedError() { + ElasticsearchStatusException ex = new ElasticsearchStatusException( + "match[] selector must be an instant vector selector", + RestStatus.BAD_REQUEST + ); + assertThat(PrometheusLabelValuesResponseListener.isUnknownColumn(ex), is(false)); + } + + public void testIsUnknownColumnReturnsFalseForNull() { + assertThat(PrometheusLabelValuesResponseListener.isUnknownColumn(null), is(false)); + } + + public void testOnFailureUnknownColumnReturnsEmptyData() throws Exception { + FakeRestRequest fakeRequest = new FakeRestRequest(); + FakeRestChannel channel = new FakeRestChannel(fakeRequest, true); + ActionListener listener = PrometheusLabelValuesResponseListener.create(channel, 0); + + ElasticsearchStatusException ex = new ElasticsearchStatusException("Unknown column [nonexistent_label]", RestStatus.BAD_REQUEST); + listener.onFailure(ex); + + assertThat(channel.responses().get(), is(1)); + assertThat(channel.capturedResponse().status(), equalTo(RestStatus.OK)); + String body = channel.capturedResponse().content().utf8ToString(); + assertThat(body, containsString("\"status\":\"success\"")); + assertThat(body, containsString("\"data\":[]")); + } +}