Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions x-pack/plugin/prometheus/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def protobufVersion = "4.32.0"
dependencies {
compileOnly project(path: xpackModule('core'))
compileOnly project(path: xpackModule('esql'))
compileOnly project(path: xpackModule('esql-core'))
compileOnly project(':x-pack:plugin:esql:compute')
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(path: xpackModule('esql'))
testImplementation project(path: xpackModule('esql-core'))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.prometheus.rest;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;

import java.io.IOException;

/**
* Utility for building and sending Prometheus-format error responses.
*
* <p>Error types follow the Prometheus HTTP API specification:
* <a href="https://github.com/prometheus/prometheus/blob/main/web/api/v1/api.go">api.go</a>
*/
class PrometheusErrorResponse {

private PrometheusErrorResponse() {}

/**
* Sends a Prometheus-format error response derived from the given exception.
* If sending fails, logs a warning and attempts a plain-text fallback response.
*/
static void send(RestChannel channel, Exception e, Logger logger) {
try {
RestStatus status = ExceptionsHelper.status(e);
channel.sendResponse(new RestResponse(status, build(status, e.getMessage())));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("Failed to send error response", inner);
try {
channel.sendResponse(
new RestResponse(
RestStatus.INTERNAL_SERVER_ERROR,
RestResponse.TEXT_CONTENT_TYPE,
new BytesArray("Internal server error")
)
);
} catch (Exception ignored) {}
}
}

/**
* Builds a Prometheus-format error JSON response body:
* {@code {"status":"error","errorType":"<type>","error":"<message>"}}
*/
static XContentBuilder build(RestStatus status, String message) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder.field("status", "error");
builder.field("errorType", mapErrorType(status));
builder.field("error", message != null ? message : "unknown error");
builder.endObject();
return builder;
}

/**
* Maps an HTTP status to a Prometheus error type string.
*/
static String mapErrorType(RestStatus status) {
return switch (status) {
case BAD_REQUEST -> "bad_data";
case SERVICE_UNAVAILABLE, REQUEST_TIMEOUT, GATEWAY_TIMEOUT -> "timeout";
default -> "execution";
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

package org.elasticsearch.xpack.prometheus.rest;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestChannel;
Expand Down Expand Up @@ -40,7 +38,6 @@
class PrometheusQueryRangeResponseListener implements ActionListener<EsqlQueryResponse> {

private static final Logger logger = LogManager.getLogger(PrometheusQueryRangeResponseListener.class);
private static final String JSON_CONTENT_TYPE = XContentType.JSON.mediaType();

// Column names expected in the ES|QL PROMQL response.
static final String VALUE_COLUMN = "value";
Expand Down Expand Up @@ -78,16 +75,7 @@ public void onFailure(Exception e) {

private void sendErrorResponse(Exception e) {
logger.debug("PromQL query_range request failed", e);
try {
RestStatus status = ExceptionsHelper.status(e);
XContentBuilder builder = buildErrorJson(status, e.getMessage());
channel.sendResponse(new RestResponse(status, builder));
} catch (Exception inner) {
logger.error("failed to send error response for PromQL query_range", inner);
try {
channel.sendResponse(new RestResponse(RestStatus.INTERNAL_SERVER_ERROR, JSON_CONTENT_TYPE, new BytesArray("{}")));
} catch (Exception ignored) {}
}
PrometheusErrorResponse.send(channel, e, logger);
}

/**
Expand Down Expand Up @@ -263,24 +251,6 @@ private static void writeMetricFields(XContentBuilder builder, String prefix, Ma
}
}

static XContentBuilder buildErrorJson(RestStatus status, String message) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder.field("status", "error");
builder.field("errorType", mapErrorType(status));
builder.field("error", message != null ? message : "unknown error");
builder.endObject();
return builder;
}

private static String mapErrorType(RestStatus status) {
return switch (status) {
case BAD_REQUEST -> "bad_data";
case SERVICE_UNAVAILABLE, REQUEST_TIMEOUT, GATEWAY_TIMEOUT -> "timeout";
default -> "execution";
};
}

static class SeriesData {
final String rawSeriesJson;
final Map<String, String> labels;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.prometheus.rest;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
* Converts an {@link EsqlQueryResponse} from a {@link org.elasticsearch.xpack.esql.plan.logical.TsInfo} plan into the
* Prometheus {@code /api/v1/series} JSON response format.
*/
public class PrometheusSeriesResponseListener implements ActionListener<EsqlQueryResponse> {

private static final Logger logger = LogManager.getLogger(PrometheusSeriesResponseListener.class);

static final String COL_METRIC_NAME = "metric_name";
static final String COL_DIMENSIONS = "dimensions";
private static final String LABELS_PREFIX = "labels.";
private static final String CONTENT_TYPE = "application/json";

private final RestChannel channel;

public PrometheusSeriesResponseListener(RestChannel channel) {
this.channel = channel;
}

@Override
public void onResponse(EsqlQueryResponse response) {
// Do NOT close/decRef the response here: the framework (via respondAndRelease) calls
// decRef() after this method returns, which is the correct single release.
try {
List<Map<String, String>> seriesList = extractSeries(response);
sendSuccess(seriesList);
} catch (Exception e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse org/elasticsearch/action/ActionListener.java:250 ?

logger.debug("Failed to build series response", e);
PrometheusErrorResponse.send(channel, e, logger);
}
}

@Override
public void onFailure(Exception e) {
logger.debug("Series query failed", e);
PrometheusErrorResponse.send(channel, e, logger);
}

private static List<Map<String, String>> extractSeries(EsqlQueryResponse response) {
var columns = response.columns();
int metricNameCol = -1;
int dimensionsCol = -1;
for (int i = 0; i < columns.size(); i++) {
String name = columns.get(i).name();
if (COL_METRIC_NAME.equals(name)) {
metricNameCol = i;
} else if (COL_DIMENSIONS.equals(name)) {
dimensionsCol = i;
}
}
if (metricNameCol == -1 || dimensionsCol == -1) {
throw new IllegalArgumentException(
"TsInfo response is missing required columns [" + COL_METRIC_NAME + ", " + COL_DIMENSIONS + "]"
);
}
final int metricNameIdx = metricNameCol;
final int dimensionsIdx = dimensionsCol;
List<Map<String, String>> result = new ArrayList<>();
for (Iterable<Object> row : response.rows()) {
String metricName = null;
String dimensionsJson = null;
int col = 0;
for (Object value : row) {
if (col == metricNameIdx) {
metricName = value != null ? value.toString() : null;
} else if (col == dimensionsIdx) {
dimensionsJson = value != null ? value.toString() : null;
}
col++;
}
Map<String, String> labels = buildLabelMap(metricName, dimensionsJson);
if (labels.isEmpty() == false) {
result.add(labels);
}
}
return result;
}

/**
* Builds the label map for one TsInfo row. Parses {@code dimensionsJson} and, when
* {@code dimensions} carries no {@code __name__} entry (OTel metrics), synthesises it
* from the {@code metric_name} column.
*/
static Map<String, String> buildLabelMap(String metricName, String dimensionsJson) {
Map<String, String> labels = parseDimensions(dimensionsJson);
// OTel metrics have no labels.__name__ in dimensions — synthesise it from metric_name
if (labels.containsKey("__name__") == false && metricName != null) {
labels.put("__name__", metricName);
}
assert labels.isEmpty() == false
: "label map must not be empty for metric_name=[" + metricName + "] dimensions=[" + dimensionsJson + "]";
return labels;
}

/**
* Parses the {@code dimensions} JSON object and strips the {@code labels.} prefix from keys.
* Example: {@code {"labels.__name__":"up","labels.job":"prometheus"}}
* → {@code {"__name__":"up","job":"prometheus"}}
*/
static Map<String, String> parseDimensions(String json) {
Map<String, String> labels = new LinkedHashMap<>();
if (json == null || json.isBlank()) {
return labels;
}
// Simple JSON object parser for {"key":"value",...} – all string-typed values
// Use xcontent for robust parsing
try (var parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, json)) {
parser.nextToken(); // START_OBJECT
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String rawKey = parser.currentName();
parser.nextToken();
String value = parser.text();
String labelName = rawKey.startsWith(LABELS_PREFIX) ? rawKey.substring(LABELS_PREFIX.length()) : rawKey;
labels.put(labelName, value);
}
} catch (IOException e) {
logger.debug("Failed to parse dimensions JSON [{}]", json, e);
}
return labels;
}

private void sendSuccess(List<Map<String, String>> seriesList) throws IOException {
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
builder.field("status", "success");
builder.startArray("data");
for (Map<String, String> labels : seriesList) {
builder.startObject();
for (Map.Entry<String, String> entry : labels.entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
}
builder.endArray();
builder.endObject();
channel.sendResponse(new RestResponse(RestStatus.OK, CONTENT_TYPE, Strings.toString(builder)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testFormatSampleValueNull() {
}

public void testBuildErrorJson() throws IOException {
try (XContentBuilder builder = PrometheusQueryRangeResponseListener.buildErrorJson(RestStatus.BAD_REQUEST, "test error")) {
try (XContentBuilder builder = PrometheusErrorResponse.build(RestStatus.BAD_REQUEST, "test error")) {
ObjectPath path = toObjectPath(builder);
assertThat(path.evaluate("status"), equalTo("error"));
assertThat(path.evaluate("errorType"), equalTo("bad_data"));
Expand All @@ -194,7 +194,7 @@ public void testBuildErrorJson() throws IOException {
}

public void testBuildErrorJsonTimeout() throws IOException {
try (XContentBuilder builder = PrometheusQueryRangeResponseListener.buildErrorJson(RestStatus.SERVICE_UNAVAILABLE, "timeout")) {
try (XContentBuilder builder = PrometheusErrorResponse.build(RestStatus.SERVICE_UNAVAILABLE, "timeout")) {
ObjectPath path = toObjectPath(builder);
assertThat(path.evaluate("errorType"), equalTo("timeout"));
}
Expand Down
Loading
Loading