diff --git a/x-pack/plugin/prometheus/build.gradle b/x-pack/plugin/prometheus/build.gradle index 4e557cf535b46..aca6c0f543ba7 100644 --- a/x-pack/plugin/prometheus/build.gradle +++ b/x-pack/plugin/prometheus/build.gradle @@ -18,6 +18,8 @@ def protobufVersion = "4.32.0" dependencies { compileOnly project(path: xpackModule('core')) compileOnly project(path: xpackModule('esql')) + compileOnly project(path: xpackModule('esql-core')) + compileOnly project(':x-pack:plugin:esql:compute') testImplementation(testArtifact(project(xpackModule('core')))) testImplementation project(path: xpackModule('esql')) testImplementation project(path: xpackModule('esql-core')) diff --git a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusErrorResponse.java b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusErrorResponse.java new file mode 100644 index 0000000000000..f828a6360a6e9 --- /dev/null +++ b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusErrorResponse.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.prometheus.rest; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; + +import java.io.IOException; + +/** + * Utility for building and sending Prometheus-format error responses. + * + *

Error types follow the Prometheus HTTP API specification: + * api.go + */ +class PrometheusErrorResponse { + + private PrometheusErrorResponse() {} + + /** + * Sends a Prometheus-format error response derived from the given exception. + * If sending fails, logs a warning and attempts a plain-text fallback response. + */ + static void send(RestChannel channel, Exception e, Logger logger) { + try { + RestStatus status = ExceptionsHelper.status(e); + channel.sendResponse(new RestResponse(status, build(status, e.getMessage()))); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn("Failed to send error response", inner); + try { + channel.sendResponse( + new RestResponse( + RestStatus.INTERNAL_SERVER_ERROR, + RestResponse.TEXT_CONTENT_TYPE, + new BytesArray("Internal server error") + ) + ); + } catch (Exception ignored) {} + } + } + + /** + * Builds a Prometheus-format error JSON response body: + * {@code {"status":"error","errorType":"","error":""}} + */ + static XContentBuilder build(RestStatus status, String message) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + builder.field("status", "error"); + builder.field("errorType", mapErrorType(status)); + builder.field("error", message != null ? message : "unknown error"); + builder.endObject(); + return builder; + } + + /** + * Maps an HTTP status to a Prometheus error type string. + */ + static String mapErrorType(RestStatus status) { + return switch (status) { + case BAD_REQUEST -> "bad_data"; + case SERVICE_UNAVAILABLE, REQUEST_TIMEOUT, GATEWAY_TIMEOUT -> "timeout"; + default -> "execution"; + }; + } +} diff --git a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java index 393409a3ad443..25f2901ac4e4d 100644 --- a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java +++ b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java @@ -7,9 +7,7 @@ package org.elasticsearch.xpack.prometheus.rest; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.rest.RestChannel; @@ -40,7 +38,6 @@ class PrometheusQueryRangeResponseListener implements ActionListener { private static final Logger logger = LogManager.getLogger(PrometheusQueryRangeResponseListener.class); - private static final String JSON_CONTENT_TYPE = XContentType.JSON.mediaType(); // Column names expected in the ES|QL PROMQL response. static final String VALUE_COLUMN = "value"; @@ -78,16 +75,7 @@ public void onFailure(Exception e) { private void sendErrorResponse(Exception e) { logger.debug("PromQL query_range request failed", e); - try { - RestStatus status = ExceptionsHelper.status(e); - XContentBuilder builder = buildErrorJson(status, e.getMessage()); - channel.sendResponse(new RestResponse(status, builder)); - } catch (Exception inner) { - logger.error("failed to send error response for PromQL query_range", inner); - try { - channel.sendResponse(new RestResponse(RestStatus.INTERNAL_SERVER_ERROR, JSON_CONTENT_TYPE, new BytesArray("{}"))); - } catch (Exception ignored) {} - } + PrometheusErrorResponse.send(channel, e, logger); } /** @@ -263,24 +251,6 @@ private static void writeMetricFields(XContentBuilder builder, String prefix, Ma } } - static XContentBuilder buildErrorJson(RestStatus status, String message) throws IOException { - XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.startObject(); - builder.field("status", "error"); - builder.field("errorType", mapErrorType(status)); - builder.field("error", message != null ? message : "unknown error"); - builder.endObject(); - return builder; - } - - private static String mapErrorType(RestStatus status) { - return switch (status) { - case BAD_REQUEST -> "bad_data"; - case SERVICE_UNAVAILABLE, REQUEST_TIMEOUT, GATEWAY_TIMEOUT -> "timeout"; - default -> "execution"; - }; - } - static class SeriesData { final String rawSeriesJson; final Map labels; diff --git a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListener.java b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListener.java new file mode 100644 index 0000000000000..140e938eb2d3f --- /dev/null +++ b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListener.java @@ -0,0 +1,167 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.prometheus.rest; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.Strings; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Converts an {@link EsqlQueryResponse} from a {@link org.elasticsearch.xpack.esql.plan.logical.TsInfo} plan into the + * Prometheus {@code /api/v1/series} JSON response format. + */ +public class PrometheusSeriesResponseListener implements ActionListener { + + private static final Logger logger = LogManager.getLogger(PrometheusSeriesResponseListener.class); + + static final String COL_METRIC_NAME = "metric_name"; + static final String COL_DIMENSIONS = "dimensions"; + private static final String LABELS_PREFIX = "labels."; + private static final String CONTENT_TYPE = "application/json"; + + private final RestChannel channel; + + public PrometheusSeriesResponseListener(RestChannel channel) { + this.channel = channel; + } + + @Override + public void onResponse(EsqlQueryResponse response) { + // Do NOT close/decRef the response here: the framework (via respondAndRelease) calls + // decRef() after this method returns, which is the correct single release. + try { + List> seriesList = extractSeries(response); + sendSuccess(seriesList); + } catch (Exception e) { + logger.debug("Failed to build series response", e); + PrometheusErrorResponse.send(channel, e, logger); + } + } + + @Override + public void onFailure(Exception e) { + logger.debug("Series query failed", e); + PrometheusErrorResponse.send(channel, e, logger); + } + + private static List> extractSeries(EsqlQueryResponse response) { + var columns = response.columns(); + int metricNameCol = -1; + int dimensionsCol = -1; + for (int i = 0; i < columns.size(); i++) { + String name = columns.get(i).name(); + if (COL_METRIC_NAME.equals(name)) { + metricNameCol = i; + } else if (COL_DIMENSIONS.equals(name)) { + dimensionsCol = i; + } + } + if (metricNameCol == -1 || dimensionsCol == -1) { + throw new IllegalArgumentException( + "TsInfo response is missing required columns [" + COL_METRIC_NAME + ", " + COL_DIMENSIONS + "]" + ); + } + final int metricNameIdx = metricNameCol; + final int dimensionsIdx = dimensionsCol; + List> result = new ArrayList<>(); + for (Iterable row : response.rows()) { + String metricName = null; + String dimensionsJson = null; + int col = 0; + for (Object value : row) { + if (col == metricNameIdx) { + metricName = value != null ? value.toString() : null; + } else if (col == dimensionsIdx) { + dimensionsJson = value != null ? value.toString() : null; + } + col++; + } + Map labels = buildLabelMap(metricName, dimensionsJson); + if (labels.isEmpty() == false) { + result.add(labels); + } + } + return result; + } + + /** + * Builds the label map for one TsInfo row. Parses {@code dimensionsJson} and, when + * {@code dimensions} carries no {@code __name__} entry (OTel metrics), synthesises it + * from the {@code metric_name} column. + */ + static Map buildLabelMap(String metricName, String dimensionsJson) { + Map labels = parseDimensions(dimensionsJson); + // OTel metrics have no labels.__name__ in dimensions — synthesise it from metric_name + if (labels.containsKey("__name__") == false && metricName != null) { + labels.put("__name__", metricName); + } + assert labels.isEmpty() == false + : "label map must not be empty for metric_name=[" + metricName + "] dimensions=[" + dimensionsJson + "]"; + return labels; + } + + /** + * Parses the {@code dimensions} JSON object and strips the {@code labels.} prefix from keys. + * Example: {@code {"labels.__name__":"up","labels.job":"prometheus"}} + * → {@code {"__name__":"up","job":"prometheus"}} + */ + static Map parseDimensions(String json) { + Map labels = new LinkedHashMap<>(); + if (json == null || json.isBlank()) { + return labels; + } + // Simple JSON object parser for {"key":"value",...} – all string-typed values + // Use xcontent for robust parsing + try (var parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, json)) { + parser.nextToken(); // START_OBJECT + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String rawKey = parser.currentName(); + parser.nextToken(); + String value = parser.text(); + String labelName = rawKey.startsWith(LABELS_PREFIX) ? rawKey.substring(LABELS_PREFIX.length()) : rawKey; + labels.put(labelName, value); + } + } catch (IOException e) { + logger.debug("Failed to parse dimensions JSON [{}]", json, e); + } + return labels; + } + + private void sendSuccess(List> seriesList) throws IOException { + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + builder.field("status", "success"); + builder.startArray("data"); + for (Map labels : seriesList) { + builder.startObject(); + for (Map.Entry entry : labels.entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + channel.sendResponse(new RestResponse(RestStatus.OK, CONTENT_TYPE, Strings.toString(builder))); + } + +} diff --git a/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java index e13ff01522323..be2cbe2ed6de4 100644 --- a/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java +++ b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java @@ -185,7 +185,7 @@ public void testFormatSampleValueNull() { } public void testBuildErrorJson() throws IOException { - try (XContentBuilder builder = PrometheusQueryRangeResponseListener.buildErrorJson(RestStatus.BAD_REQUEST, "test error")) { + try (XContentBuilder builder = PrometheusErrorResponse.build(RestStatus.BAD_REQUEST, "test error")) { ObjectPath path = toObjectPath(builder); assertThat(path.evaluate("status"), equalTo("error")); assertThat(path.evaluate("errorType"), equalTo("bad_data")); @@ -194,7 +194,7 @@ public void testBuildErrorJson() throws IOException { } public void testBuildErrorJsonTimeout() throws IOException { - try (XContentBuilder builder = PrometheusQueryRangeResponseListener.buildErrorJson(RestStatus.SERVICE_UNAVAILABLE, "timeout")) { + try (XContentBuilder builder = PrometheusErrorResponse.build(RestStatus.SERVICE_UNAVAILABLE, "timeout")) { ObjectPath path = toObjectPath(builder); assertThat(path.evaluate("errorType"), equalTo("timeout")); } diff --git a/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListenerTests.java b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListenerTests.java new file mode 100644 index 0000000000000..807fa84230320 --- /dev/null +++ b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusSeriesResponseListenerTests.java @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.prometheus.rest; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestChannel; +import org.elasticsearch.test.rest.FakeRestRequest; + +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class PrometheusSeriesResponseListenerTests extends ESTestCase { + + // ------------------------------------------------------------------------- + // parseDimensions tests + // ------------------------------------------------------------------------- + + public void testParseDimensionsStripsLabelsPrefix() { + String json = "{\"labels.__name__\":\"up\",\"labels.job\":\"prometheus\",\"labels.instance\":\"localhost:9090\"}"; + Map labels = PrometheusSeriesResponseListener.parseDimensions(json); + assertThat(labels.get("__name__"), equalTo("up")); + assertThat(labels.get("job"), equalTo("prometheus")); + assertThat(labels.get("instance"), equalTo("localhost:9090")); + assertThat(labels.size(), is(3)); + } + + public void testParseDimensionsKeepsNonLabelsPrefixKeys() { + String json = "{\"metric_name\":\"cpu_usage\",\"labels.env\":\"prod\"}"; + Map labels = PrometheusSeriesResponseListener.parseDimensions(json); + assertThat(labels.get("metric_name"), equalTo("cpu_usage")); + assertThat(labels.get("env"), equalTo("prod")); + assertThat(labels.size(), is(2)); + } + + public void testParseDimensionsEmptyObject() { + assertThat(PrometheusSeriesResponseListener.parseDimensions("{}").isEmpty(), is(true)); + } + + public void testParseDimensionsNullOrBlank() { + assertThat(PrometheusSeriesResponseListener.parseDimensions(null).isEmpty(), is(true)); + assertThat(PrometheusSeriesResponseListener.parseDimensions("").isEmpty(), is(true)); + assertThat(PrometheusSeriesResponseListener.parseDimensions(" ").isEmpty(), is(true)); + } + + public void testParseDimensionsSingleLabel() { + String json = "{\"labels.__name__\":\"http_requests_total\"}"; + Map labels = PrometheusSeriesResponseListener.parseDimensions(json); + assertThat(labels.get("__name__"), equalTo("http_requests_total")); + assertThat(labels.size(), is(1)); + } + + // ------------------------------------------------------------------------- + // Error response tests + // ------------------------------------------------------------------------- + + public void testOnFailureBadRequest() throws Exception { + FakeRestRequest fakeRequest = new FakeRestRequest(); + FakeRestChannel channel = new FakeRestChannel(fakeRequest, true, 1); + PrometheusSeriesResponseListener listener = new PrometheusSeriesResponseListener(channel); + + ElasticsearchStatusException ex = new ElasticsearchStatusException("bad selector syntax", RestStatus.BAD_REQUEST); + listener.onFailure(ex); + + assertThat(channel.errors().get(), is(1)); + assertThat(channel.capturedResponse().status(), equalTo(RestStatus.BAD_REQUEST)); + } + + public void testOnFailureInternalError() throws Exception { + FakeRestRequest fakeRequest = new FakeRestRequest(); + FakeRestChannel channel = new FakeRestChannel(fakeRequest, true, 1); + PrometheusSeriesResponseListener listener = new PrometheusSeriesResponseListener(channel); + + listener.onFailure(new RuntimeException("something went wrong")); + + assertThat(channel.errors().get(), is(1)); + assertThat(channel.capturedResponse().status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + } + + public void testOnFailureResponseBodyContainsErrorType() throws Exception { + FakeRestRequest fakeRequest = new FakeRestRequest(); + FakeRestChannel channel = new FakeRestChannel(fakeRequest, true, 1); + PrometheusSeriesResponseListener listener = new PrometheusSeriesResponseListener(channel); + + ElasticsearchStatusException ex = new ElasticsearchStatusException("invalid parameter", RestStatus.BAD_REQUEST); + listener.onFailure(ex); + + String body = channel.capturedResponse().content().utf8ToString(); + assertThat(body, containsString("\"status\":\"error\"")); + assertThat(body, containsString("\"errorType\":\"bad_data\"")); + assertThat(body, containsString("invalid parameter")); + } + + // ------------------------------------------------------------------------- + // buildLabelMap / metric_name fallback tests (Change 3) + // ------------------------------------------------------------------------- + + public void testBuildLabelMapUsesMetricNameAsFallback() { + Map labels = PrometheusSeriesResponseListener.buildLabelMap("cpu_usage", "{}"); + assertThat(labels.get("__name__"), equalTo("cpu_usage")); + } + + public void testBuildLabelMapDoesNotOverrideExistingName() { + Map labels = PrometheusSeriesResponseListener.buildLabelMap("something_else", "{\"labels.__name__\":\"up\"}"); + assertThat(labels.get("__name__"), equalTo("up")); + } + + public void testBuildLabelMapNullMetricNameAndEmptyDimensionsTripsAssertion() { + expectThrows(AssertionError.class, () -> PrometheusSeriesResponseListener.buildLabelMap(null, "{}")); + } + + public void testBuildLabelMapWithDimensionsAndFallback() { + // dimensions has other labels but no __name__; metric_name must fill in __name__ + Map labels = PrometheusSeriesResponseListener.buildLabelMap( + "otel_metric", + "{\"labels.job\":\"myservice\",\"labels.env\":\"prod\"}" + ); + assertThat(labels.get("__name__"), equalTo("otel_metric")); + assertThat(labels.get("job"), equalTo("myservice")); + assertThat(labels.get("env"), equalTo("prod")); + } + +}