Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/144009.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: TSDB
issues: []
pr: 144009
summary: Add indexing pressure tracking to OTLP endpoints
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
* which must be fully accumulated before processing. It provides backpressure by reserving memory
* up front and rejecting oversized requests with a 413 status.
* <p>
* The caller must reserve memory via {@link IndexingPressure#markCoordinatingOperationStarted} before
* constructing this aggregator. If the reservation fails ({@code EsRejectedExecutionException}),
* the caller should let it propagate to produce a 429 response.
* When {@link #accept(RestChannel)} is called, the aggregator reserves memory via
* {@link IndexingPressure#markCoordinatingOperationStarted}. If the reservation fails
* (e.g. {@code EsRejectedExecutionException} under heavy load), the {@link CompletionHandler#onFailure}
* callback is invoked so the caller can produce a format-appropriate error response (e.g. protobuf).
* <p>
* Once all chunks are accumulated, the reservation is lowered to the actual size and the
* {@link CompletionHandler} is invoked with the aggregated content and the pressure reservation
Expand Down Expand Up @@ -73,7 +74,8 @@ public interface CompletionHandler {
void onComplete(RestChannel channel, ReleasableBytesReference content, Releasable indexingPressureRelease);

/**
* Called when the request body exceeds the maximum allowed size.
* Called when a failure occurs during content accumulation, such as the request body
* exceeding the maximum allowed size or the indexing pressure reservation being rejected.
*
* @param channel the REST channel for sending the error response
* @param e the exception describing the failure
Expand All @@ -82,31 +84,39 @@ public interface CompletionHandler {
}

private final RestRequest request;
private final IndexingPressure.Coordinating coordinating;
private final IndexingPressure indexingPressure;
private final long maxRequestSize;
private final CompletionHandler completionHandler;
private final BodyPostProcessor bodyPostProcessor;

private IndexingPressure.Coordinating coordinating;
private ArrayList<ReleasableBytesReference> chunks;
private long accumulatedSize;
private boolean closed;

public IndexingPressureAwareContentAggregator(
RestRequest request,
IndexingPressure.Coordinating coordinating,
IndexingPressure indexingPressure,
long maxRequestSize,
CompletionHandler completionHandler,
BodyPostProcessor bodyPostProcessor
) {
this.request = request;
this.coordinating = coordinating;
this.indexingPressure = indexingPressure;
this.maxRequestSize = maxRequestSize;
this.completionHandler = completionHandler;
this.bodyPostProcessor = Objects.requireNonNull(bodyPostProcessor);
}

@Override
public void accept(RestChannel channel) {
try {
coordinating = indexingPressure.markCoordinatingOperationStarted(1, maxRequestSize, false);
} catch (Exception e) {
closed = true;
completionHandler.onFailure(channel, e);
return;
}
request.contentStream().next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.index.IndexingPressure;
Expand Down Expand Up @@ -163,6 +164,53 @@ public void testBodyExactlyAtMaxSizeSucceeds() {
assertEquals(0, indexingPressure.stats().getCurrentCoordinatingBytes());
}

public void testRejectsWhenIndexingPressureLimitExceeded() {
long limitBytes = 1024;
var tightPressure = new IndexingPressure(
Settings.builder().put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), ByteSizeValue.ofBytes(limitBytes)).build()
);
var request = newStreamedRequest(stream);
channel = new FakeRestChannel(request, true, 1);
var failureRef = new AtomicReference<Exception>();
aggregator = new IndexingPressureAwareContentAggregator(
request,
tightPressure,
limitBytes + 1,
new IndexingPressureAwareContentAggregator.CompletionHandler() {
@Override
public void onComplete(RestChannel ch, ReleasableBytesReference content, Releasable pressure) {
fail("should not complete");
}

@Override
public void onFailure(RestChannel ch, Exception e) {
failureRef.set(e);
}
},
IndexingPressureAwareContentAggregator.BodyPostProcessor.NOOP
);
stream.setHandler(new HttpBody.ChunkHandler() {
@Override
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
aggregator.handleChunk(channel, chunk, isLast);
}

@Override
public void close() {
aggregator.streamClose();
}
});
try {
aggregator.accept(channel);
} catch (Exception e) {
throw new AssertionError(e);
}

assertNotNull("onFailure should have been called", failureRef.get());
assertNull(contentRef.get());
assertEquals(0, tightPressure.stats().getCurrentCoordinatingBytes());
}

public void testReducesPressureToActualSize() {
long maxSize = 1024;
int actualSize = 100;
Expand Down Expand Up @@ -257,10 +305,9 @@ private void initAggregator(long maxSize) {
private void initAggregator(long maxSize, IndexingPressureAwareContentAggregator.BodyPostProcessor postProcessor) {
var request = newStreamedRequest(stream);
channel = new FakeRestChannel(request, true, 1);
var coordinating = indexingPressure.markCoordinatingOperationStarted(1, maxSize, false);
aggregator = new IndexingPressureAwareContentAggregator(
request,
coordinating,
indexingPressure,
maxSize,
new IndexingPressureAwareContentAggregator.CompletionHandler() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.resources.Resource;

import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -26,6 +29,7 @@
import java.io.IOException;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.hamcrest.Matchers.equalTo;

public abstract class AbstractOTLPIndexingRestIT extends ESRestTestCase {

Expand Down Expand Up @@ -55,6 +59,9 @@ protected Settings restClientSettings() {
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
}

/** The OTLP endpoint path for this signal type, e.g. {@code "/_otlp/v1/metrics"}. */
protected abstract String otlpEndpointPath();

@Before
@Override
public void setUp() throws Exception {
Expand All @@ -68,6 +75,20 @@ public void tearDown() throws Exception {
super.tearDown();
}

/**
* A request body exceeding the default {@code http.max_protobuf_content_length} (8MiB) must be rejected with 413.
* Uses the main {@link #cluster} where the coordinating limit is not tight, so the upfront reservation
* of 8MiB succeeds and the body size check is what triggers the rejection.
*/
public void testOversizedRequestReturns413() throws Exception {
// 9MiB exceeds the default 8MiB http.max_protobuf_content_length
byte[] oversizedBody = new byte[9 * 1024 * 1024];
Request request = new Request("POST", otlpEndpointPath());
request.setEntity(new ByteArrayEntity(oversizedBody, ContentType.create("application/x-protobuf")));
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(413));
}

protected static String createApiKey(String indexPattern) throws IOException {
Request createApiKeyRequest = new Request("POST", "/_security/api_key");
createApiKeyRequest.setJsonEntity("""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.oteldata.otlp;

import org.apache.http.HttpHost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.ClassRule;

import static org.hamcrest.Matchers.equalTo;

/**
* Standalone IT for the 429 indexing-pressure rejection. Uses a single cluster whose
* coordinating limit (4MiB) is below the OTLP handler's upfront reservation of
* {@code http.max_protobuf_content_length} (8MiB), so every OTLP request is rejected
* with 429 before a single byte of the body is read.
*
* This class intentionally does NOT extend {@link AbstractOTLPIndexingRestIT} so that
* signal-specific ITs (e.g. {@link OTLPMetricsIndexingRestIT}) never pay the cost of
* starting this pressure cluster.
*/
public class OTLPIndexingPressureRestIT extends ESRestTestCase {

private static final String USER = "test_admin";
private static final String PASS = "x-pack-test-password";

@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.user(USER, PASS, "superuser", false)
.setting("xpack.security.enabled", "true")
.setting("xpack.security.autoconfiguration.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.ml.enabled", "false")
.setting("xpack.watcher.enabled", "false")
.setting("indexing_pressure.memory.coordinating.limit", "4mb")
.build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected Settings restClientSettings() {
String token = basicAuthHeaderValue(USER, new SecureString(PASS.toCharArray()));
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
}

/**
* Any OTLP request is rejected with 429 because the handler's upfront reservation of
* {@code http.max_protobuf_content_length} (8MiB) exceeds the 4MiB coordinating limit.
* The body size is irrelevant — the 429 fires before any bytes are read.
*/
public void testIndexingPressureLimitReturns429() throws Exception {
byte[] body = new byte[1024];
Request request = new Request("POST", "/_otlp/v1/metrics");
request.setEntity(new ByteArrayEntity(body, ContentType.create("application/x-protobuf")));
HttpHost[] hosts = parseClusterHosts(cluster.getHttpAddresses()).toArray(HttpHost[]::new);
try (RestClient pressureClient = buildClient(restClientSettings(), hosts)) {
ResponseException e = expectThrows(ResponseException.class, () -> pressureClient.performRequest(request));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(429));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public class OTLPMetricsIndexingRestIT extends AbstractOTLPIndexingRestIT {
private OtlpHttpMetricExporter exporter;
private SdkMeterProvider meterProvider;

@Override
protected String otlpEndpointPath() {
return "/_otlp/v1/metrics";
}

@Before
@Override
public void setUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -55,10 +57,13 @@ public enum HistogramMappingSettingValues {
private static final Logger logger = LogManager.getLogger(OTelPlugin.class);

private final SetOnce<OTelIndexTemplateRegistry> registry = new SetOnce<>();
private final SetOnce<IndexingPressure> indexingPressure = new SetOnce<>();
private final boolean enabled;
private final long maxProtobufContentLengthBytes;

public OTelPlugin(Settings settings) {
this.enabled = XPackSettings.OTEL_DATA_ENABLED.get(settings);
this.maxProtobufContentLengthBytes = HttpTransportSettings.SETTING_HTTP_MAX_PROTOBUF_CONTENT_LENGTH.get(settings).getBytes();
}

@Override
Expand All @@ -67,14 +72,16 @@ public Collection<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster,
Predicate<NodeFeature> clusterSupportsFeature
) {
return List.of(new OTLPMetricsRestAction());
assert indexingPressure.get() != null : "indexing pressure must be set";
return List.of(new OTLPMetricsRestAction(indexingPressure.get(), maxProtobufContentLengthBytes));
}

@Override
public Collection<?> createComponents(PluginServices services) {
logger.info("OTel ingest plugin is {}", enabled ? "enabled" : "disabled");
Settings settings = services.environment().settings();
ClusterService clusterService = services.clusterService();
indexingPressure.set(services.indexingPressure());
registry.set(
new OTelIndexTemplateRegistry(settings, clusterService, services.threadPool(), services.client(), services.xContentRegistry())
);
Expand Down
Loading
Loading