From 63bdd0f4284f0b6765c6aebdd0ae8b57a79957e3 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Mon, 16 Mar 2026 10:41:42 +0100 Subject: [PATCH 1/3] Add PrometheusQueryRangeResponseListener and unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the ES|QL → Prometheus matrix JSON conversion layer: - `PrometheusQueryRangeResponseListener` converts an `EsqlQueryResponse` (columnar PROMQL output) into the Prometheus range query JSON format. Handles two column modes: * `_timeseries` JSON column — unwraps the `labels` namespace and dot-flattens other namespaces. * Individual label columns — maps each column name to a metric label. Also converts epoch-ms timestamps to epoch-second doubles and formats NaN / ±Inf sample values as Prometheus expects. - `PrometheusQueryRangeResponseListenerTests` covers every conversion path, edge cases (empty result, missing columns), error JSON building, and special float formatting. --- .../PrometheusQueryRangeResponseListener.java | 295 ++++++++++++++++++ ...etheusQueryRangeResponseListenerTests.java | 289 +++++++++++++++++ 2 files changed, 584 insertions(+) create mode 100644 x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java create mode 100644 x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java diff --git a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java new file mode 100644 index 0000000000000..d0b6827cccdf5 --- /dev/null +++ b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java @@ -0,0 +1,295 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.prometheus.rest; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.esql.action.ColumnInfo; +import org.elasticsearch.xpack.core.esql.action.EsqlResponse; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; +import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Listens for an {@link EsqlQueryResponse}, converts its columnar result into the + * Prometheus range query JSON format, and sends it as a {@link RestResponse}. + * + * @see Prometheus Range Vectors + */ +class PrometheusQueryRangeResponseListener implements ActionListener { + + private static final Logger logger = LogManager.getLogger(PrometheusQueryRangeResponseListener.class); + private static final String JSON_CONTENT_TYPE = XContentType.JSON.mediaType(); + + // Column names expected in the ES|QL PROMQL response. + static final String VALUE_COLUMN = "value"; + static final String STEP_PARAM = "step"; + + // Fixed column indices produced by the PROMQL command + EVAL step = TO_LONG(step). + // EVAL appends the new step column at the end, so dimension columns occupy indices 1..N-2. + private static final int VALUE_COL_IDX = 0; + private static final int DIMENSION_COL_START_IDX = 1; + + private final RestChannel channel; + + PrometheusQueryRangeResponseListener(RestChannel channel) { + this.channel = channel; + } + + @Override + public void onResponse(EsqlQueryResponse queryResponse) { + // Do not close queryResponse here - the transport framework's respondAndRelease handles decRef. + // If we close it manually, it will cause an AssertionError ("invalid decRef call: already closed") + // and crash the node. + try { + EsqlResponse response = queryResponse.response(); + XContentBuilder builder = convertToPrometheusJson(response); + channel.sendResponse(new RestResponse(RestStatus.OK, builder)); + } catch (Exception e) { + sendErrorResponse(e); + } + } + + @Override + public void onFailure(Exception e) { + sendErrorResponse(e); + } + + private void sendErrorResponse(Exception e) { + logger.debug("PromQL query_range request failed", e); + try { + RestStatus status = ExceptionsHelper.status(e); + XContentBuilder builder = buildErrorJson(status, e.getMessage()); + channel.sendResponse(new RestResponse(status, builder)); + } catch (Exception inner) { + logger.error("failed to send error response for PromQL query_range", inner); + try { + channel.sendResponse(new RestResponse(RestStatus.INTERNAL_SERVER_ERROR, JSON_CONTENT_TYPE, new BytesArray("{}"))); + } catch (Exception ignored) {} + } + } + + /** + * Converts an ES|QL response into a Prometheus-compatible JSON response. + * + *

The ES|QL PROMQL command, combined with {@code | EVAL step = TO_LONG(step)}, produces + * rows with the following column order (EVAL appends the converted step at the end): + *

    + *
  1. Column 0: value ({@code double})
  2. + *
  3. Columns 1..N-2: either a single {@code _timeseries} keyword column (JSON labels) + * or individual dimension/label columns
  4. + *
  5. Column N-1 (last): step ({@code long}, epoch milliseconds)
  6. + *
+ */ + static XContentBuilder convertToPrometheusJson(EsqlResponse response) throws IOException { + List columns = response.columns(); + if (columns.size() < 1 || VALUE_COLUMN.equals(columns.get(VALUE_COL_IDX).name()) == false) { + throw new IllegalStateException("PROMQL response is missing required 'value' column at index " + VALUE_COL_IDX); + } + final int stepColIdx = columns.size() - 1; + if (columns.size() < 2 || STEP_PARAM.equals(columns.get(stepColIdx).name()) == false) { + throw new IllegalStateException("PROMQL response is missing required 'step' column at last index " + stepColIdx); + } + // Column 1 is either _timeseries (a JSON blob) or the first of the individual dimension columns + final boolean useSeriesCol = columns.size() > 2 && MetadataAttribute.TIMESERIES.equals(columns.get(DIMENSION_COL_START_IDX).name()); + + Map seriesMap = new LinkedHashMap<>(); + + for (Iterable row : response.rows()) { + Object[] values = toArray(row, columns.size()); + + String seriesKey; + Map metric; + + if (useSeriesCol) { + seriesKey = values[DIMENSION_COL_START_IDX] != null ? values[DIMENSION_COL_START_IDX].toString() : "{}"; + metric = null; + } else { + StringBuilder keyBuilder = new StringBuilder(); + metric = new LinkedHashMap<>(); + for (int i = DIMENSION_COL_START_IDX; i < stepColIdx; i++) { + String label = columns.get(i).name(); + String value = values[i] != null ? values[i].toString() : ""; + metric.put(label, value); + keyBuilder.append(label).append('\0').append(value).append('\0'); + } + seriesKey = keyBuilder.toString(); + } + + String sampleValue = formatSampleValue(values[VALUE_COL_IDX]); + double timestamp = parseTimestamp(values[stepColIdx]); + + SeriesData series = seriesMap.get(seriesKey); + if (series == null) { + series = new SeriesData(useSeriesCol ? seriesKey : null, metric); + seriesMap.put(seriesKey, series); + } + series.values.add(new double[] { timestamp }); + series.stringValues.add(sampleValue); + } + + return buildSuccessJson(seriesMap); + } + + private static Object[] toArray(Iterable row, int size) { + Object[] arr = new Object[size]; + int i = 0; + for (Object val : row) { + if (i < size) { + arr[i++] = val; + } + } + return arr; + } + + /** + * Converts a timestamp from the ES|QL response into Unix epoch seconds. + * The step column is cast to {@code LONG} (epoch milliseconds) via {@code TO_LONG(step)} in the ES|QL query. + */ + private static double parseTimestamp(Object value) { + if (value instanceof Number n) { + return n.doubleValue() / 1000.0; + } + return 0; + } + + /** + * Formats a sample value for the Prometheus JSON response. + * Prometheus represents values as strings, with special handling for NaN and Infinity. + */ + static String formatSampleValue(Object value) { + if (value == null) { + return "NaN"; + } + if (value instanceof Double d) { + if (Double.isNaN(d)) { + return "NaN"; + } else if (Double.isInfinite(d)) { + return d > 0 ? "+Inf" : "-Inf"; + } + return d.toString(); + } + return value.toString(); + } + + private static XContentBuilder buildSuccessJson(Map seriesMap) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + builder.field("status", "success"); + builder.startObject("data"); + builder.field("resultType", "matrix"); + builder.startArray("result"); + + for (SeriesData series : seriesMap.values()) { + builder.startObject(); + + builder.startObject("metric"); + if (series.rawSeriesJson != null) { + writeMetricFromSeriesJson(builder, series.rawSeriesJson); + } else if (series.labels != null) { + for (Map.Entry entry : series.labels.entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + } + builder.endObject(); + + builder.startArray("values"); + for (int i = 0; i < series.values.size(); i++) { + builder.startArray(); + builder.value(series.values.get(i)[0]); + builder.value(series.stringValues.get(i)); + builder.endArray(); + } + builder.endArray(); + + builder.endObject(); + } + + builder.endArray(); + builder.endObject(); + builder.endObject(); + return builder; + } + + /** + * Writes metric labels from a {@code _timeseries} JSON value. + *
    + *
  • The {@code labels} namespace is unwrapped without a prefix: + * {@code {"labels":{"__name__":"up","job":"prometheus"}}} → fields {@code __name__}, {@code job}.
  • + *
  • All other namespaces are flattened recursively with dot-separated paths: + * {@code {"attributes":{"resource":{"service.name":"foo"}}}} → field {@code attributes.resource.service.name}.
  • + *
+ */ + private static void writeMetricFromSeriesJson(XContentBuilder builder, String seriesJson) throws IOException { + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, seriesJson)) { + Map root = parser.map(); + Object labelsObj = root.remove("labels"); + if (labelsObj instanceof Map labels) { + writeMetricFields(builder, "", labels); + } + writeMetricFields(builder, "", root); + } + } + + private static void writeMetricFields(XContentBuilder builder, String prefix, Map map) throws IOException { + for (Map.Entry entry : map.entrySet()) { + String key = prefix + entry.getKey(); + if (entry.getValue() instanceof Map nested) { + writeMetricFields(builder, key + ".", nested); + } else if (entry.getValue() != null) { + builder.field(key, entry.getValue().toString()); + } + } + } + + static XContentBuilder buildErrorJson(RestStatus status, String message) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + builder.field("status", "error"); + builder.field("errorType", mapErrorType(status)); + builder.field("error", message != null ? message : "unknown error"); + builder.endObject(); + return builder; + } + + private static String mapErrorType(RestStatus status) { + return switch (status) { + case BAD_REQUEST -> "bad_data"; + case SERVICE_UNAVAILABLE, REQUEST_TIMEOUT, GATEWAY_TIMEOUT -> "timeout"; + default -> "execution"; + }; + } + + static class SeriesData { + final String rawSeriesJson; + final Map labels; + final List values = new ArrayList<>(); + final List stringValues = new ArrayList<>(); + + SeriesData(String rawSeriesJson, Map labels) { + this.rawSeriesJson = rawSeriesJson; + this.labels = labels; + } + } +} diff --git a/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java new file mode 100644 index 0000000000000..96e90b0ddff96 --- /dev/null +++ b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java @@ -0,0 +1,289 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.prometheus.rest; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.esql.action.ColumnInfo; +import org.elasticsearch.xpack.core.esql.action.EsqlResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class PrometheusQueryRangeResponseListenerTests extends ESTestCase { + + // Column names are bare label names (e.g. "job", "instance") — no "labels." prefix. + // The Prometheus data stream maps labels with type: passthrough, so PROMQL resolves + // labels.job → column "job", labels.instance → column "instance", etc. + public void testConvertRangeQueryWithIndividualLabels() throws IOException { + List columns = List.of( + new TestColumnInfo("value", "double"), + new TestColumnInfo("__name__", "keyword"), + new TestColumnInfo("instance", "keyword"), + new TestColumnInfo("job", "keyword"), + new TestColumnInfo("step", "long") + ); + + List> rows = List.of( + List.of(1.5, "http_requests_total", "localhost:9090", "prometheus", 1735689600000L), + List.of(2.0, "http_requests_total", "localhost:9090", "prometheus", 1735689660000L), + List.of(3.0, "http_requests_total", "localhost:9091", "prometheus", 1735689600000L), + List.of(4.0, "http_requests_total", "localhost:9091", "prometheus", 1735689660000L) + ); + + EsqlResponse response = new TestEsqlResponse(columns, rows); + try (XContentBuilder builder = PrometheusQueryRangeResponseListener.convertToPrometheusJson(response)) { + ObjectPath path = toObjectPath(builder); + assertSuccessMatrix(path); + + assertThat(path.evaluate("data.result"), hasSize(2)); + assertThat(path.evaluate("data.result.0.metric.__name__"), equalTo("http_requests_total")); + assertThat(path.evaluate("data.result.0.metric.instance"), equalTo("localhost:9090")); + assertThat(path.evaluate("data.result.0.metric.job"), equalTo("prometheus")); + assertThat(path.evaluate("data.result.0.values.0"), equalTo(List.of(1735689600.0, "1.5"))); + assertThat(path.evaluate("data.result.0.values.1"), equalTo(List.of(1735689660.0, "2.0"))); + assertThat(path.evaluate("data.result.1.metric.__name__"), equalTo("http_requests_total")); + assertThat(path.evaluate("data.result.1.metric.instance"), equalTo("localhost:9091")); + assertThat(path.evaluate("data.result.1.metric.job"), equalTo("prometheus")); + assertThat(path.evaluate("data.result.1.values.0"), equalTo(List.of(1735689600.0, "3.0"))); + assertThat(path.evaluate("data.result.1.values.1"), equalTo(List.of(1735689660.0, "4.0"))); + } + } + + public void testConvertRangeQueryWithTimeseriesColumn() throws IOException { + // The PROMQL command returns a _timeseries column with JSON format {"labels":{...}} + // The listener extracts the inner labels as bare metric keys (no "labels." prefix) + List columns = List.of( + new TestColumnInfo("value", "double"), + new TestColumnInfo("_timeseries", "keyword"), + new TestColumnInfo("step", "long") + ); + + List> rows = List.of( + List.of(1.5, "{\"labels\":{\"__name__\":\"http_requests_total\",\"job\":\"prometheus\"}}", 1735689600000L), + List.of(2.0, "{\"labels\":{\"__name__\":\"http_requests_total\",\"job\":\"prometheus\"}}", 1735689660000L) + ); + + EsqlResponse response = new TestEsqlResponse(columns, rows); + try (XContentBuilder builder = PrometheusQueryRangeResponseListener.convertToPrometheusJson(response)) { + ObjectPath path = toObjectPath(builder); + assertSuccessMatrix(path); + + assertThat(path.evaluate("data.result"), hasSize(1)); + assertThat(path.evaluate("data.result.0.metric.__name__"), equalTo("http_requests_total")); + assertThat(path.evaluate("data.result.0.metric.job"), equalTo("prometheus")); + assertThat(path.evaluate("data.result.0.values.0"), equalTo(List.of(1735689600.0, "1.5"))); + assertThat(path.evaluate("data.result.0.values.1"), equalTo(List.of(1735689660.0, "2.0"))); + } + } + + public void testConvertRangeQueryWithTimeseriesColumnAttributesNamespace() throws IOException { + // _timeseries JSON with an "attributes" namespace containing a nested object. + // All non-"labels" namespaces are flattened recursively with dot-separated paths, so + // {"attributes":{"resource":{"service.name":"my-service"}}} → metric key "attributes.resource.service.name". + List columns = List.of( + new TestColumnInfo("value", "double"), + new TestColumnInfo("_timeseries", "keyword"), + new TestColumnInfo("step", "long") + ); + + List> rows = List.of( + List.of(1.5, "{\"attributes\":{\"resource\":{\"service.name\":\"my-service\"}}}", 1735689600000L), + List.of(2.0, "{\"attributes\":{\"resource\":{\"service.name\":\"my-service\"}}}", 1735689660000L) + ); + + EsqlResponse response = new TestEsqlResponse(columns, rows); + try (XContentBuilder builder = PrometheusQueryRangeResponseListener.convertToPrometheusJson(response)) { + ObjectPath path = toObjectPath(builder); + assertSuccessMatrix(path); + + assertThat(path.evaluate("data.result"), hasSize(1)); + assertThat(path.evaluate("data.result.0.metric.attributes\\.resource\\.service\\.name"), equalTo("my-service")); + assertThat(path.evaluate("data.result.0.values.0"), equalTo(List.of(1735689600.0, "1.5"))); + assertThat(path.evaluate("data.result.0.values.1"), equalTo(List.of(1735689660.0, "2.0"))); + } + } + + public void testConvertRangeQueryWithTimeseriesColumnTopLevelScalar() throws IOException { + // _timeseries JSON with a top-level scalar field (no namespace object). + // {"host":"my-host"} → metric key "host". + List columns = List.of( + new TestColumnInfo("value", "double"), + new TestColumnInfo("_timeseries", "keyword"), + new TestColumnInfo("step", "long") + ); + + List> rows = List.of( + List.of(1.5, "{\"host\":\"my-host\"}", 1735689600000L), + List.of(2.0, "{\"host\":\"my-host\"}", 1735689660000L) + ); + + EsqlResponse response = new TestEsqlResponse(columns, rows); + try (XContentBuilder builder = PrometheusQueryRangeResponseListener.convertToPrometheusJson(response)) { + ObjectPath path = toObjectPath(builder); + assertSuccessMatrix(path); + + assertThat(path.evaluate("data.result"), hasSize(1)); + assertThat(path.evaluate("data.result.0.metric.host"), equalTo("my-host")); + assertThat(path.evaluate("data.result.0.values.0"), equalTo(List.of(1735689600.0, "1.5"))); + assertThat(path.evaluate("data.result.0.values.1"), equalTo(List.of(1735689660.0, "2.0"))); + } + } + + public void testConvertEmptyResult() throws IOException { + List columns = List.of(new TestColumnInfo("value", "double"), new TestColumnInfo("step", "long")); + + EsqlResponse response = new TestEsqlResponse(columns, List.of()); + try (XContentBuilder builder = PrometheusQueryRangeResponseListener.convertToPrometheusJson(response)) { + ObjectPath path = toObjectPath(builder); + assertSuccessMatrix(path); + + assertThat(path.evaluate("data.result"), empty()); + } + } + + public void testTimestampConversion() throws IOException { + List columns = List.of(new TestColumnInfo("value", "double"), new TestColumnInfo("step", "long")); + List> rows = List.of(List.of(1.0, 1735689600000L)); + + EsqlResponse response = new TestEsqlResponse(columns, rows); + try (XContentBuilder builder = PrometheusQueryRangeResponseListener.convertToPrometheusJson(response)) { + ObjectPath path = toObjectPath(builder); + // 2025-01-01T00:00:00.000Z = 1735689600 epoch seconds + assertThat(path.evaluate("data.result.0.values.0"), equalTo(List.of(1735689600.0, "1.0"))); + } + } + + public void testFormatSampleValueNaN() { + assertThat(PrometheusQueryRangeResponseListener.formatSampleValue(Double.NaN), equalTo("NaN")); + } + + public void testFormatSampleValueInfinity() { + assertThat(PrometheusQueryRangeResponseListener.formatSampleValue(Double.POSITIVE_INFINITY), equalTo("+Inf")); + assertThat(PrometheusQueryRangeResponseListener.formatSampleValue(Double.NEGATIVE_INFINITY), equalTo("-Inf")); + } + + public void testFormatSampleValueNull() { + assertThat(PrometheusQueryRangeResponseListener.formatSampleValue(null), equalTo("NaN")); + } + + public void testBuildErrorJson() throws IOException { + try (XContentBuilder builder = PrometheusQueryRangeResponseListener.buildErrorJson(RestStatus.BAD_REQUEST, "test error")) { + ObjectPath path = toObjectPath(builder); + assertThat(path.evaluate("status"), equalTo("error")); + assertThat(path.evaluate("errorType"), equalTo("bad_data")); + assertThat(path.evaluate("error"), equalTo("test error")); + } + } + + public void testBuildErrorJsonTimeout() throws IOException { + try (XContentBuilder builder = PrometheusQueryRangeResponseListener.buildErrorJson(RestStatus.SERVICE_UNAVAILABLE, "timeout")) { + ObjectPath path = toObjectPath(builder); + assertThat(path.evaluate("errorType"), equalTo("timeout")); + } + } + + public void testMissingValueColumnThrows() { + List columns = List.of(new TestColumnInfo("step", "date")); + EsqlResponse response = new TestEsqlResponse(columns, List.of()); + expectThrows(IllegalStateException.class, () -> PrometheusQueryRangeResponseListener.convertToPrometheusJson(response)); + } + + public void testMissingStepColumnThrows() { + // Only value column — step is missing (would need to be last) + List columns = List.of(new TestColumnInfo("value", "double")); + EsqlResponse response = new TestEsqlResponse(columns, List.of()); + expectThrows(IllegalStateException.class, () -> PrometheusQueryRangeResponseListener.convertToPrometheusJson(response)); + } + + public void testWrongLastColumnNameThrows() { + // Last column is not named "step" + List columns = List.of(new TestColumnInfo("value", "double"), new TestColumnInfo("timestamp", "long")); + EsqlResponse response = new TestEsqlResponse(columns, List.of()); + IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> PrometheusQueryRangeResponseListener.convertToPrometheusJson(response) + ); + assertThat(e.getMessage(), containsString("missing required 'step' column at last index")); + } + + private static void assertSuccessMatrix(ObjectPath path) throws IOException { + assertThat(path.evaluate("status"), equalTo("success")); + assertThat(path.evaluate("data.resultType"), equalTo("matrix")); + } + + private static ObjectPath toObjectPath(XContentBuilder builder) throws IOException { + try ( + XContentParser parser = XContentType.JSON.xContent() + .createParser(XContentParserConfiguration.EMPTY, BytesReference.bytes(builder).streamInput()) + ) { + return new ObjectPath(parser.map()); + } + } + + record TestColumnInfo(String name, String outputType) implements ColumnInfo { + @Override + public org.elasticsearch.xcontent.XContentBuilder toXContent( + org.elasticsearch.xcontent.XContentBuilder builder, + org.elasticsearch.xcontent.ToXContent.Params params + ) { + return builder; + } + + @Override + public void writeTo(org.elasticsearch.common.io.stream.StreamOutput out) {} + } + + static class TestEsqlResponse implements EsqlResponse { + private final List columns; + private final List> rows; + + TestEsqlResponse(List columns, List> rows) { + this.columns = columns; + this.rows = rows; + } + + @Override + public List columns() { + return columns; + } + + @Override + public Iterable> rows() { + List> result = new ArrayList<>(); + for (List row : rows) { + result.add(row); + } + return result; + } + + @Override + public Iterable column(int columnIndex) { + List col = new ArrayList<>(); + for (List row : rows) { + col.add(row.get(columnIndex)); + } + return col; + } + + @Override + public void close() {} + } +} From fb2242e42378a5a63f3d97649964d1f71418c9c6 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 19 Mar 2026 07:42:04 +0100 Subject: [PATCH 2/3] Update x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java Co-authored-by: Sergey Sidorov --- .../rest/PrometheusQueryRangeResponseListenerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java index 96e90b0ddff96..e13ff01522323 100644 --- a/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java +++ b/x-pack/plugin/prometheus/src/test/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListenerTests.java @@ -97,7 +97,7 @@ public void testConvertRangeQueryWithTimeseriesColumn() throws IOException { public void testConvertRangeQueryWithTimeseriesColumnAttributesNamespace() throws IOException { // _timeseries JSON with an "attributes" namespace containing a nested object. // All non-"labels" namespaces are flattened recursively with dot-separated paths, so - // {"attributes":{"resource":{"service.name":"my-service"}}} → metric key "attributes.resource.service.name". + // {"attributes":{"resource":{"service.name":"my-service"}}} -> metric key "attributes.resource.service.name". List columns = List.of( new TestColumnInfo("value", "double"), new TestColumnInfo("_timeseries", "keyword"), From fee0b51814f0cd957f5f7d0cc0ce4afd29926719 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 19 Mar 2026 11:49:47 +0100 Subject: [PATCH 3/3] Add comments to end* calls in buildSuccessJson --- .../rest/PrometheusQueryRangeResponseListener.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java index d0b6827cccdf5..393409a3ad443 100644 --- a/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java +++ b/x-pack/plugin/prometheus/src/main/java/org/elasticsearch/xpack/prometheus/rest/PrometheusQueryRangeResponseListener.java @@ -212,7 +212,7 @@ private static XContentBuilder buildSuccessJson(Map seriesMa builder.field(entry.getKey(), entry.getValue()); } } - builder.endObject(); + builder.endObject(); // metric builder.startArray("values"); for (int i = 0; i < series.values.size(); i++) { @@ -221,14 +221,14 @@ private static XContentBuilder buildSuccessJson(Map seriesMa builder.value(series.stringValues.get(i)); builder.endArray(); } - builder.endArray(); + builder.endArray(); // values - builder.endObject(); + builder.endObject(); // result entry } - builder.endArray(); - builder.endObject(); - builder.endObject(); + builder.endArray(); // result + builder.endObject(); // data + builder.endObject(); // root return builder; }