Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.prometheus.rest;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Converts an {@link EsqlQueryResponse} into the
* Prometheus {@code /api/v1/label/{name}/values} JSON response format.
*
* <p>ESQL has already sorted and deduplicated the values via {@code STATS BY} and {@code ORDER BY}.
* This listener iterates the rows and strips storage prefixes before returning values:
* <ul>
* <li>{@code labels.} prefix — ESQL resolves the passthrough field to its stored path
* {@code labels.job} for Prometheus data; OTel stores without prefix so no stripping needed.</li>
* <li>{@code metrics.} prefix — the {@code metric_name} column returned by MetricsInfo for the
* {@code __name__} plan branch carries the raw ES field path, e.g. {@code metrics.up}.</li>
* </ul>
*
* <p>Truncation detection uses a {@code limit + 1} sentinel: if the result contains exactly
* {@code limit + 1} rows, the last row is dropped and a warning is emitted.
*
* <p>When ESQL returns a {@code "Unknown column"} BAD_REQUEST error (label name absent from all
* index mappings), the listener converts it into an empty {@code data:[]} success response — the
* correct Prometheus behaviour for a label with no values.
*/
public class PrometheusLabelValuesResponseListener {

private static final Logger logger = LogManager.getLogger(PrometheusLabelValuesResponseListener.class);

private static final String LABELS_PREFIX = "labels.";
private static final String METRICS_PREFIX = "metrics.";
private static final String CONTENT_TYPE = "application/json";

private PrometheusLabelValuesResponseListener() {}

public static ActionListener<EsqlQueryResponse> create(RestChannel channel, int limit) {
// Do NOT close/decRef the response here: the framework (via respondAndRelease) calls
// decRef() after this method returns, which is the correct single release.
return ActionListener.<Void>wrap(ignored -> {}, e -> {
logger.debug("Label values query failed", e);
// When ESQL cannot find the label name in any index mapping it throws a BAD_REQUEST
// "Unknown column [<name>]" error. The correct Prometheus response for a label that has
// no values is an empty data array, not an error.
if (isUnknownColumn(e)) {
try {
channel.sendResponse(buildSuccessResponse(List.of(), 0));
} catch (Exception sendEx) {
sendEx.addSuppressed(e);
logger.warn("Failed to send empty-data response for unknown column", sendEx);
}
} else {
PrometheusErrorResponse.send(channel, e, logger);
}
}).delegateFailureAndWrap((l, response) -> {
List<String> values = collectValues(response.rows());
channel.sendResponse(buildSuccessResponse(values, limit));
});
}

/**
* Collects label values from the single-column ESQL result rows, stripping storage prefixes:
* {@code "labels."} (Prometheus passthrough field) and {@code "metrics."} (raw ES field path
* returned by MetricsInfo for the {@code __name__} plan branch). OTel values carry neither
* prefix and are returned as-is. Package-private for testing.
*/
static List<String> collectValues(Iterable<? extends Iterable<Object>> rows) {
List<String> result = new ArrayList<>();
for (Iterable<Object> row : rows) {
Object value = row.iterator().next();
if (value == null) {
continue;
}
String raw = value.toString();
String labelValue;
if (raw.startsWith(LABELS_PREFIX)) {
labelValue = raw.substring(LABELS_PREFIX.length());
} else if (raw.startsWith(METRICS_PREFIX)) {
labelValue = raw.substring(METRICS_PREFIX.length());
} else {
labelValue = raw;
}
if (labelValue.isEmpty() == false) {
result.add(labelValue);
}
}
return result;
}

/**
* Builds the success response. Uses the {@code limit + 1} sentinel to detect truncation:
* if collected size equals {@code limit + 1}, truncates to {@code limit} and adds a warning.
* Package-private for testing.
*/
static RestResponse buildSuccessResponse(List<String> values, int limit) throws IOException {
boolean truncated = false;
List<String> output = values;
if (limit > 0 && values.size() == limit + 1) {
output = values.subList(0, limit);
truncated = true;
}
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
builder.field("status", "success");
builder.startArray("data");
for (String v : output) {
builder.value(v);
}
builder.endArray();
if (truncated) {
builder.startArray("warnings");
builder.value("results truncated due to limit");
builder.endArray();
}
builder.endObject();
return new RestResponse(RestStatus.OK, CONTENT_TYPE, Strings.toString(builder));
}

/**
* Returns {@code true} if {@code e} is an ESQL analysis error caused by a label name that is
* absent from all index mappings. ESQL produces a {@code "Unknown column [name]"} message in
* its {@code VerificationException}. When no TS indices exist at all, the error may be
* combined with an {@code "@timestamp"} unresolved message and arrive as a 500; both cases
* mean no data exists for the requested label, so an empty {@code data:[]} response is correct.
*/
static boolean isUnknownColumn(Exception e) {
return e != null && e.getMessage() != null && e.getMessage().contains("Unknown column [");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.prometheus.rest;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestChannel;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.util.Arrays;
import java.util.List;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

public class PrometheusLabelValuesResponseListenerTests extends ESTestCase {

public void testCollectValuesStripsLabelsPrefix() {
List<String> values = PrometheusLabelValuesResponseListener.collectValues(
List.of(List.of("labels.prometheus"), List.of("labels.node_exporter"))
);
assertThat(values, equalTo(List.of("prometheus", "node_exporter")));
}

public void testCollectValuesSingleEntry() {
List<String> values = PrometheusLabelValuesResponseListener.collectValues(List.of(List.of("labels.production")));
assertThat(values, equalTo(List.of("production")));
}

public void testCollectValuesSkipsNullValues() {
List<List<Object>> rows = new java.util.ArrayList<>();
rows.add(Arrays.asList((Object) null));
rows.add(List.of("labels.staging"));
assertThat(PrometheusLabelValuesResponseListener.collectValues(rows), equalTo(List.of("staging")));
}

public void testCollectValuesEmptyRowsReturnsEmptyList() {
assertThat(PrometheusLabelValuesResponseListener.collectValues(List.of()).isEmpty(), is(true));
}

public void testCollectValuesStripsMetricsPrefix() {
List<String> values = PrometheusLabelValuesResponseListener.collectValues(
List.of(List.of("metrics.up"), List.of("metrics.http_requests_total"))
);
assertThat(values, equalTo(List.of("up", "http_requests_total")));
}

public void testCollectValuesOtelValuesReturnedAsIs() {
List<String> values = PrometheusLabelValuesResponseListener.collectValues(List.of(List.of("http_server"), List.of("grpc_server")));
assertThat(values, equalTo(List.of("http_server", "grpc_server")));
}

public void testBuildSuccessResponseBodyShape() throws Exception {
String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(List.of("prometheus", "node_exporter"), 0)
.content()
.utf8ToString();
assertThat(body, containsString("\"status\":\"success\""));
assertThat(body, containsString("\"prometheus\""));
assertThat(body, containsString("\"node_exporter\""));
}

public void testBuildSuccessResponseEmptyData() throws Exception {
String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(List.of(), 0).content().utf8ToString();
assertThat(body, containsString("\"status\":\"success\""));
assertThat(body, containsString("\"data\":[]"));
}

public void testNoTruncationWarningWhenRowsBelowLimitPlusOne() throws Exception {
// 2 rows, limit=5 → limit+1=6, no truncation
String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(List.of("a", "b"), 5).content().utf8ToString();
assertThat(body, not(containsString("warnings")));
}

public void testTruncationWarningWhenRowsEqualLimitPlusOne() throws Exception {
// 6 rows returned (limit+1 sentinel), limit=5 → truncated
List<String> values = List.of("a", "b", "c", "d", "e", "f");
String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(values, 5).content().utf8ToString();
assertThat(body, containsString("warnings"));
assertThat(body, containsString("results truncated due to limit"));
// last value should be dropped
assertThat(body, not(containsString("\"f\"")));
assertThat(body, containsString("\"e\""));
}

public void testTruncationOutputContainsExactlyLimitValues() throws Exception {
List<String> values = List.of("a", "b", "c", "d", "e", "f"); // 6 = limit+1
String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(values, 5).content().utf8ToString();
// "a" through "e" should be present, "f" absent
assertThat(body, containsString("\"a\""));
assertThat(body, containsString("\"e\""));
assertThat(body, not(containsString("\"f\"")));
}

public void testNoTruncationWhenLimitIsZero() throws Exception {
List<String> values = List.of("a", "b", "c");
String body = PrometheusLabelValuesResponseListener.buildSuccessResponse(values, 0).content().utf8ToString();
assertThat(body, not(containsString("warnings")));
assertThat(body, containsString("\"c\""));
}

public void testOnFailureBadRequest() {
FakeRestRequest fakeRequest = new FakeRestRequest();
FakeRestChannel channel = new FakeRestChannel(fakeRequest, true);
ActionListener<EsqlQueryResponse> listener = PrometheusLabelValuesResponseListener.create(channel, 0);

ElasticsearchStatusException ex = new ElasticsearchStatusException("bad selector syntax", RestStatus.BAD_REQUEST);
listener.onFailure(ex);

assertThat(channel.errors().get(), is(1));
assertThat(channel.capturedResponse().status(), equalTo(RestStatus.BAD_REQUEST));
}

public void testOnFailureInternalError() {
FakeRestRequest fakeRequest = new FakeRestRequest();
FakeRestChannel channel = new FakeRestChannel(fakeRequest, true);
ActionListener<EsqlQueryResponse> listener = PrometheusLabelValuesResponseListener.create(channel, 0);

listener.onFailure(new RuntimeException("something went wrong"));

assertThat(channel.errors().get(), is(1));
assertThat(channel.capturedResponse().status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
}

public void testOnFailureResponseBodyContainsBadDataErrorType() {
FakeRestRequest fakeRequest = new FakeRestRequest();
FakeRestChannel channel = new FakeRestChannel(fakeRequest, true);
ActionListener<EsqlQueryResponse> listener = PrometheusLabelValuesResponseListener.create(channel, 0);

ElasticsearchStatusException ex = new ElasticsearchStatusException("invalid parameter", RestStatus.BAD_REQUEST);
listener.onFailure(ex);

String body = channel.capturedResponse().content().utf8ToString();
assertThat(body, containsString("\"status\":\"error\""));
assertThat(body, containsString("\"errorType\":\"bad_data\""));
assertThat(body, containsString("invalid parameter"));
}

public void testOnFailureTimeoutReturnsExecutionErrorType() {
FakeRestRequest fakeRequest = new FakeRestRequest();
FakeRestChannel channel = new FakeRestChannel(fakeRequest, true);
ActionListener<EsqlQueryResponse> listener = PrometheusLabelValuesResponseListener.create(channel, 0);

// Generic RuntimeException does not trigger timeout — errorType is "execution"
listener.onFailure(new RuntimeException("query failed"));

String body = channel.capturedResponse().content().utf8ToString();
assertThat(body, containsString("\"errorType\":\"execution\""));
}

public void testIsUnknownColumnReturnsTrueForBadRequest() {
ElasticsearchStatusException ex = new ElasticsearchStatusException("Unknown column [my_label]", RestStatus.BAD_REQUEST);
assertThat(PrometheusLabelValuesResponseListener.isUnknownColumn(ex), is(true));
}

public void testIsUnknownColumnReturnsTrueForInternalError() {
// When no TS indices exist at all, @timestamp is also unresolved — the combined error
// arrives as a 500 but still contains "Unknown column [...]"
ElasticsearchStatusException ex = new ElasticsearchStatusException(
"Found 2 problems\nline -1:-1: [] requires the [@timestamp] field...\nline -1:-1: Unknown column [my_label]",
RestStatus.INTERNAL_SERVER_ERROR
);
assertThat(PrometheusLabelValuesResponseListener.isUnknownColumn(ex), is(true));
}

public void testIsUnknownColumnReturnsFalseForUnrelatedError() {
ElasticsearchStatusException ex = new ElasticsearchStatusException(
"match[] selector must be an instant vector selector",
RestStatus.BAD_REQUEST
);
assertThat(PrometheusLabelValuesResponseListener.isUnknownColumn(ex), is(false));
}

public void testIsUnknownColumnReturnsFalseForNull() {
assertThat(PrometheusLabelValuesResponseListener.isUnknownColumn(null), is(false));
}

public void testOnFailureUnknownColumnReturnsEmptyData() throws Exception {
FakeRestRequest fakeRequest = new FakeRestRequest();
FakeRestChannel channel = new FakeRestChannel(fakeRequest, true);
ActionListener<EsqlQueryResponse> listener = PrometheusLabelValuesResponseListener.create(channel, 0);

ElasticsearchStatusException ex = new ElasticsearchStatusException("Unknown column [nonexistent_label]", RestStatus.BAD_REQUEST);
listener.onFailure(ex);

assertThat(channel.responses().get(), is(1));
assertThat(channel.capturedResponse().status(), equalTo(RestStatus.OK));
String body = channel.capturedResponse().content().utf8ToString();
assertThat(body, containsString("\"status\":\"success\""));
assertThat(body, containsString("\"data\":[]"));
}
}
Loading