Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugin/otel-data/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ tasks.named("thirdPartyAudit").configure {

tasks.named("licenseHeaders").configure {
excludes << 'io/opentelemetry/proto/**/*'
excludes << 'com/google/rpc/**/*'
}

tasks.named("javaRestTest").configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.oteldata.otlp;

import com.google.protobuf.GeneratedMessage;
import com.google.rpc.Status;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand All @@ -26,7 +27,6 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestResponseListener;

import java.io.IOException;

Expand Down Expand Up @@ -90,22 +90,50 @@ public void onComplete(RestChannel channel, ReleasableBytesReference content, Re
}
var transportRequest = new OTLPActionRequest(content);
var release = Releasables.wrap(content, indexingPressureRelease);
client.execute(type, transportRequest, ActionListener.releaseBefore(release, new RestResponseListener<>(channel) {
@Override
public RestResponse buildResponse(OTLPActionResponse r) {
return new RestResponse(r.getStatus(), CONTENT_TYPE_PROTOBUF, r.getResponse());
}
}));
client.execute(
type,
transportRequest,
ActionListener.releaseBefore(
release,
ActionListener.wrap(
r -> channel.sendResponse(new RestResponse(RestStatus.OK, CONTENT_TYPE_PROTOBUF, r.getResponse())),
e -> sendFailureResponse(channel, e)
)
)
);
}

@Override
public void onFailure(RestChannel channel, Exception e) {
logger.debug("OTLP request failed during content aggregation", e);
channel.sendResponse(new RestResponse(ExceptionsHelper.status(e), CONTENT_TYPE_PROTOBUF, BytesArray.EMPTY));
sendFailureResponse(channel, e);
}
},
IndexingPressureAwareContentAggregator.BodyPostProcessor.NOOP
);
}

/**
* Sends a failure response using a Protobuf-encoded {@link Status google.rpc.Status} message.
* <p>
* From the OTLP spec:
* "The response body for all HTTP 4xx and HTTP 5xx responses MUST be a Protobuf-encoded Status message
* that describes the problem."
*
* @see <a href="https://opentelemetry.io/docs/specs/otlp/#failures-1">OTLP Failures</a>
*/
private static void sendFailureResponse(RestChannel channel, Exception e) {
logger.debug("OTLP request failed", e);
try {
// Per the OTLP spec, Status.code is not used over HTTP: "the server MAY omit Status.code field.
// The clients are not expected to alter their behavior based on Status.code field".
// The HTTP status code in the response is what drives client retry behavior.
String message = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName();
Status status = Status.newBuilder().setMessage(message).build();
channel.sendResponse(new RestResponse(ExceptionsHelper.status(e), CONTENT_TYPE_PROTOBUF, new BytesArray(status.toByteArray())));
} catch (Exception sendException) {
sendException.addSuppressed(e);
logger.warn("failed to send failure response", sendException);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand Down Expand Up @@ -51,40 +51,35 @@ public AbstractOTLPTransportAction(

@Override
protected void doExecute(Task task, OTLPActionRequest request, ActionListener<OTLPActionResponse> listener) {
ProcessingContext context = ProcessingContext.EMPTY;
try {
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
context = prepareBulkRequest(request, bulkRequestBuilder);
ProcessingContext context = prepareBulkRequest(request, bulkRequestBuilder);
if (bulkRequestBuilder.numberOfActions() == 0) {
if (context.getIgnoredDataPoints() == 0) {
handleEmptyRequest(listener);
listener.onResponse(new OTLPActionResponse(BytesArray.EMPTY));
} else {
// all data points were ignored
handlePartialSuccess(listener, context);
listener.onFailure(
new ElasticsearchStatusException(
context.getIgnoredDataPointsMessage(IGNORED_DATA_POINTS_MESSAGE_LIMIT),
RestStatus.BAD_REQUEST
)
);
}
return;
}

ProcessingContext finalContext = context;
bulkRequestBuilder.execute(new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
if (bulkItemResponses.hasFailures() || finalContext.getIgnoredDataPoints() > 0) {
handlePartialSuccess(bulkItemResponses, finalContext, listener);
} else {
handleSuccess(listener);
}
}

@Override
public void onFailure(Exception e) {
handleFailure(listener, e, finalContext);
bulkRequestBuilder.execute(listener.delegateFailure((delegate, bulkResponse) -> {
if (bulkResponse.hasFailures() || finalContext.getIgnoredDataPoints() > 0) {
handlePartialSuccess(bulkResponse, finalContext, delegate);
} else {
delegate.onResponse(new OTLPActionResponse(BytesArray.EMPTY));
}
});
}));

} catch (Exception e) {
logger.error("failed to execute otlp request", e);
handleFailure(listener, e, context);
listener.onFailure(e);
}
}

Expand Down Expand Up @@ -138,30 +133,6 @@ record WithTotalDataPoints(int totalDataPoints) implements ProcessingContext {}
protected abstract ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRequestBuilder bulkRequestBuilder)
throws IOException;

public void handleSuccess(ActionListener<OTLPActionResponse> listener) {
listener.onResponse(new OTLPActionResponse(RestStatus.OK, BytesArray.EMPTY));
}

public void handleEmptyRequest(ActionListener<OTLPActionResponse> listener) {
// If the server receives an empty request
// (a request that does not carry any telemetry data)
// the server SHOULD respond with success.
// https://opentelemetry.io/docs/specs/otlp/#full-success-1
handleSuccess(listener);
}

public void handlePartialSuccess(ActionListener<OTLPActionResponse> listener, ProcessingContext context) {
// If the request is only partially accepted
// (i.e. when the server accepts only parts of the data and rejects the rest),
// the server MUST respond with HTTP 200 OK.
// https://opentelemetry.io/docs/specs/otlp/#partial-success-1
MessageLite response = responseWithRejectedDataPoints(
context.getIgnoredDataPoints(),
context.getIgnoredDataPointsMessage(IGNORED_DATA_POINTS_MESSAGE_LIMIT)
);
listener.onResponse(new OTLPActionResponse(RestStatus.BAD_REQUEST, response));
}

private void handlePartialSuccess(
BulkResponse bulkItemResponses,
ProcessingContext context,
Expand All @@ -184,7 +155,7 @@ private void handlePartialSuccess(
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
// If the server receives more requests than the client is allowed or the server is overloaded,
// the server SHOULD respond with HTTP 429 Too Many Requests or HTTP 503 Service Unavailable
// and MAY include Retry-After header with a recommended time interval in seconds to wait before retrying.
// and MAY include "Retry-After" header with a recommended time interval in seconds to wait before retrying.
// https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
status = RestStatus.TOO_MANY_REQUESTS;
}
Expand Down Expand Up @@ -215,21 +186,17 @@ private void handlePartialSuccess(
}
}
failureMessageBuilder.append(context.getIgnoredDataPointsMessage(10));
MessageLite response = responseWithRejectedDataPoints(failures + context.getIgnoredDataPoints(), failureMessageBuilder.toString());
listener.onResponse(new OTLPActionResponse(status, response));
String message = failureMessageBuilder.toString();
if (status == RestStatus.TOO_MANY_REQUESTS) {
listener.onFailure(new ElasticsearchStatusException(message, RestStatus.TOO_MANY_REQUESTS));
} else {
MessageLite response = responseWithRejectedDataPoints(failures + context.getIgnoredDataPoints(), message);
listener.onResponse(new OTLPActionResponse(response));
}
}

record FailureGroup(AtomicInteger failureCount, String failureMessageSample) {}

public void handleFailure(ActionListener<OTLPActionResponse> listener, Exception e, ProcessingContext context) {
// https://opentelemetry.io/docs/specs/otlp/#failures-1
// If the processing of the request fails,
// the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code.
listener.onResponse(
new OTLPActionResponse(ExceptionsHelper.status(e), responseWithRejectedDataPoints(context.totalDataPoints(), e.getMessage()))
);
}

/**
* Builds the response for a request that had some rejected data points.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,28 @@
import com.google.protobuf.MessageLite;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

public class OTLPActionResponse extends ActionResponse {
private final BytesReference response;
private final RestStatus status;

public OTLPActionResponse(RestStatus status, MessageLite response) {
this(status, new BytesArray(response.toByteArray()));
public OTLPActionResponse(MessageLite response) {
this(new BytesArray(response.toByteArray()));
}

public OTLPActionResponse(RestStatus status, BytesReference response) {
public OTLPActionResponse(BytesReference response) {
this.response = response;
this.status = status;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesReference(response);
out.writeEnum(status);
public void writeTo(StreamOutput out) {
TransportAction.localOnly();
}

public BytesReference getResponse() {
return response;
}

public RestStatus getStatus() {
return status;
}
}
47 changes: 47 additions & 0 deletions x-pack/plugin/otel-data/src/main/proto/google/rpc/status.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Source: googleapis/googleapis
// Original file:
// https://github.com/googleapis/googleapis/blob/master/google/rpc/status.proto

syntax = "proto3";

package google.rpc;

import "google/protobuf/any.proto";

option java_multiple_files = true;
option java_outer_classname = "StatusProto";
option java_package = "com.google.rpc";

// The `Status` type defines a logical error model that is suitable for
// different programming environments, including REST APIs and RPC APIs. It is
// used by [gRPC](https://github.com/grpc). Each `Status` message contains
// three pieces of data: error code, error message, and error details.
//
// You can find out more about this error model and how to work with it in the
// [API Design Guide](https://cloud.google.com/apis/design/errors).
message Status {
// The status code, which should be an enum value of
// [google.rpc.Code][google.rpc.Code].
int32 code = 1;

// A developer-facing error message, which should be in English.
string message = 2;

// A list of messages that carry the error details. There is a common set of
// message types for APIs to use.
repeated google.protobuf.Any details = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.oteldata.otlp;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Status;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

/**
Expand Down Expand Up @@ -111,7 +113,7 @@ public void testEmptyBodyReturnsSuccess() throws Exception {
}

@SuppressWarnings("unchecked")
public void testTransportActionFailure() {
public void testTransportActionFailure() throws Exception {
client = new NoOpNodeClient(threadPool) {
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
Expand All @@ -124,26 +126,30 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
};
try (var response = execute(1024, 64)) {
assertThat(response.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(response.contentType(), equalTo(AbstractOTLPRestAction.CONTENT_TYPE_PROTOBUF));
assertThat(Status.parseFrom(response.content().array()).getMessage(), equalTo("ingest failed"));
}
}

public void testOversizedBodyReturns413() {
public void testOversizedBodyReturns413() throws Exception {
try (var response = execute(100, 101)) {
assertThat(response.status(), equalTo(RestStatus.REQUEST_ENTITY_TOO_LARGE));
assertThat(response.contentType(), equalTo(AbstractOTLPRestAction.CONTENT_TYPE_PROTOBUF));
assertThat(response.content().length(), equalTo(0));
Status status = Status.parseFrom(response.content().array());
assertThat(status.getMessage(), containsString("request body too large"));
}
}

public void testIndexingPressureRejectionReturns429() {
public void testIndexingPressureRejectionReturns429() throws Exception {
long limitBytes = 1024;
var tightPressure = new IndexingPressure(
Settings.builder().put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), ByteSizeValue.ofBytes(limitBytes)).build()
);
try (var response = execute(tightPressure, limitBytes + 1, 64)) {
assertThat(response.status(), equalTo(RestStatus.TOO_MANY_REQUESTS));
assertThat(response.contentType(), equalTo(AbstractOTLPRestAction.CONTENT_TYPE_PROTOBUF));
assertThat(response.content().length(), equalTo(0));
Status status = Status.parseFrom(response.content().array());
assertThat(status.getMessage(), containsString("rejected execution of coordinating operation"));
}
}

Expand Down
Loading
Loading