Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
75b857f
Bump OTel to get health metrics
mamazzol Mar 26, 2026
713645b
Add health metrics to OTel SDK
mamazzol Mar 30, 2026
a215113
Small refactor
mamazzol Mar 30, 2026
0fb8b2f
Merge branch 'main' into otel-health
mamazzol Mar 30, 2026
5a1de63
Merge from main
mamazzol Mar 30, 2026
66006de
Merge branch 'main' into otel-health
mamazzol Mar 30, 2026
d35976a
Merge branch 'main' into otel-health
mamazzol Mar 31, 2026
67bc726
Merge branch 'main' into otel-health
mamazzol Mar 31, 2026
d6ecec0
Adding test for health metrics exported
mamazzol Apr 1, 2026
1b83ae1
Add toDuration method to TimeValue
mamazzol Apr 1, 2026
987fa0a
Change RunTask to run ES with OTEL SDK enabled without mock APM server
mamazzol Apr 1, 2026
9d6a989
PR Feedback
mamazzol Apr 1, 2026
843f705
Merge branch 'main' into otel-health
mamazzol Apr 1, 2026
716124b
simplify disable APM Agent logic
mamazzol Apr 1, 2026
0950d19
Remove duplicated flush
mamazzol Apr 1, 2026
18d12d7
Merge branch 'main' into otel-health
prdoyle Apr 1, 2026
13e9b5f
Merge branch 'main' into otel-health
prdoyle Apr 1, 2026
814da9c
Reinstate double flush to ensure flushing of health metrics
mamazzol Apr 2, 2026
65882c3
Merge branch 'main' into otel-health
mamazzol Apr 2, 2026
365d364
Merge branch 'main' into otel-health
mamazzol Apr 2, 2026
538db6a
Merge branch 'main' into otel-health
mamazzol Apr 2, 2026
40f61bb
Merge branch 'main' into otel-health
mamazzol Apr 2, 2026
d5abad8
Merge branch 'main' into otel-health
mamazzol Apr 2, 2026
b44c92b
Merge branch 'main' into otel-health
prdoyle Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ public Boolean getUsingOtelSdk() {

@Option(
option = "using-otel-sdk",
description = "Use the OTel SDK for metrics export instead of the APM agent (requires --with-apm-server)"
description = "Use the OTel SDK for metrics export instead of the APM agent. "
+ "Can be combined with --with-apm-server (uses built-in mock server) or alone, manually "
+ "setting telemetry.otel.metrics.endpoint."
)
public void setUsingOtelSdk(Boolean usingOtelSdk) {
this.usingOtelSdk = usingOtelSdk;
Expand Down Expand Up @@ -260,10 +262,6 @@ public void beforeStart() {
getDataPath = n -> dataDir.resolve(n.getName());
}

if (usingOtelSdk && apmServerEnabled == false) {
throw new GradleException("--using-otel-sdk requires --with-apm-server");
}

if (apmServerEnabled) {
try {
mockServer = new MockApmServer(apmServerMetrics, apmServerTransactions, apmServerTransactionsExcludes);
Expand Down Expand Up @@ -301,14 +299,16 @@ public void beforeStart() {
node.setting("xpack.security.transport.ssl.keystore.path", "transport.keystore");
node.setting("xpack.security.transport.ssl.certificate_authorities", "transport.ca");
}
if (usingOtelSdk) {
node.systemProperty("telemetry.otel.metrics.enabled", "true");
node.setting("telemetry.metrics.enabled", "true");
}
if (mockServer != null) {
node.setting("telemetry.metrics.enabled", "true");
node.setting("telemetry.tracing.enabled", "true");
node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort());
if (usingOtelSdk) {
node.systemProperty("telemetry.otel.metrics.enabled", "true");
node.setting("telemetry.otel.metrics.endpoint", "http://127.0.0.1:" + mockServer.getPort() + "/v1/metrics");
node.setting("telemetry.otel.metrics.interval", "10s");
} else {
node.setting("telemetry.agent.transaction_sample_rate", "1.0");
node.setting("telemetry.agent.transaction_max_spans", "100");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ static List<String> apmJvmOptions(Settings settings, @Nullable SecureSettings se
if (agentMetricsEnabled == false) {
propertiesMap.put("metrics_interval", "0s");
propertiesMap.put("disable_metrics", "*");
disableMetricInstrumentation(propertiesMap);
}

// Configures a log file to write to. Don't disable writing to a log file,
Expand Down Expand Up @@ -263,6 +264,21 @@ static Map<String, String> extractApmSettings(Settings settings) throws UserExce
return propertiesMap;
}

/**
* Disables the APM Agent hook that adds an exporter to application {@code SdkMeterProvider} instances.
* When Elasticsearch uses the OTel SDK for metrics, that hook is redundant and can break export;
* traces still use the APM Agent through {@code GlobalOpenTelemetry}.
*/
static void disableMetricInstrumentation(Map<String, String> propertiesMap) {
String existing = propertiesMap.get("disable_instrumentations");
String otelMetrics = "opentelemetry-metrics";
if (existing == null || existing.isBlank()) {
propertiesMap.put("disable_instrumentations", otelMetrics);
} else {
propertiesMap.put("disable_instrumentations", existing + "," + otelMetrics);
}
}

private static StringJoiner extractGlobalLabels(String prefix, Map<String, String> propertiesMap, Settings settings) {
// special handling of global labels, the agent expects them in format: key1=value1,key2=value2
final Settings globalLabelsSettings = settings.getByPrefix(prefix + "global_labels.");
Expand Down
172 changes: 65 additions & 107 deletions gradle/verification-metadata.xml

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions libs/core/src/main/java/org/elasticsearch/core/TimeValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.core;

import java.time.Duration;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -455,4 +456,8 @@ public int compareTo(TimeValue timeValue) {
double otherValue = ((double) timeValue.duration) * timeValue.timeUnit.toNanos(1);
return Double.compare(thisValue, otherValue);
}

public Duration toDuration() {
return Duration.ofNanos(nanos());
}
}
18 changes: 4 additions & 14 deletions modules/apm/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ esplugin {
classname ='org.elasticsearch.telemetry.apm.APM'
}

def otelVersion = '1.57.0'
def otelInstrumentationVersion = '2.23.0-alpha'
def otelVersion = '1.60.1'
def otelInstrumentationVersion = '2.26.1-alpha'

dependencies {
implementation "io.opentelemetry:opentelemetry-api:${otelVersion}"
Expand All @@ -24,7 +24,7 @@ dependencies {
implementation("io.opentelemetry:opentelemetry-exporter-otlp:${otelVersion}") {
exclude group: 'io.opentelemetry', module: 'opentelemetry-exporter-sender-okhttp'
}
implementation "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java17:${otelInstrumentationVersion}"
implementation "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry:${otelInstrumentationVersion}"

implementation "io.opentelemetry:opentelemetry-sdk-common:${otelVersion}"
runtimeOnly "io.opentelemetry:opentelemetry-common:${otelVersion}"
Expand All @@ -51,19 +51,9 @@ tasks.named("thirdPartyAudit").configure {
'com.fasterxml.jackson.core.JsonFactory',
'com.fasterxml.jackson.core.JsonGenerator',
'com.google.common.io.ByteStreams',
'com.google.common.util.concurrent.ListenableFuture',
'io.grpc.CallOptions',
'io.grpc.Channel',
'io.grpc.Drainable',
'io.grpc.KnownLength',
'io.grpc.ManagedChannel',
'io.grpc.MethodDescriptor',
'io.grpc.MethodDescriptor$Builder',
'io.grpc.MethodDescriptor$Marshaller',
'io.grpc.MethodDescriptor$MethodType',
'io.grpc.stub.AbstractFutureStub',
'io.grpc.stub.AbstractStub',
'io.grpc.stub.ClientCalls'
Comment thread
mamazzol marked this conversation as resolved.
'io.grpc.ManagedChannel'
)
}

Expand Down
2 changes: 1 addition & 1 deletion modules/apm/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
requires io.opentelemetry.sdk;
requires io.opentelemetry.sdk.metrics;
requires io.opentelemetry.exporter.otlp;
requires io.opentelemetry.instrumentation.runtime_telemetry_java17;
requires io.opentelemetry.instrumentation.runtime_telemetry;
requires io.opentelemetry.sdk.common;

exports org.elasticsearch.telemetry.apm;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
package org.elasticsearch.telemetry.apm.internal;

import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics;
import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.InternalTelemetryVersion;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
Expand All @@ -23,15 +25,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.telemetry.TelemetryProvider.OTEL_METRICS_ENABLED_SYSTEM_PROPERTY;

public class OTelSdkMeterSupplier implements MeterSupplier {
private final Settings settings;
private volatile SdkMeterProvider meterProvider;
private volatile RuntimeMetrics runtimeMetrics;
private volatile OTelMetricsResources resources;
private static final Object mutex = new Object();

OTelSdkMeterSupplier(Settings settings) {
Expand All @@ -41,27 +42,41 @@ public class OTelSdkMeterSupplier implements MeterSupplier {
@Override
public Meter get() {
synchronized (mutex) {
if (meterProvider == null) {
var exporter = createOTLPExporter();
TimeValue intervalTimeValue = OTelSdkSettings.TELEMETRY_OTEL_METRICS_INTERVAL.get(settings);
var reader = PeriodicMetricReader.builder(exporter).setInterval(Duration.ofMillis(intervalTimeValue.millis())).build();
meterProvider = SdkMeterProvider.builder()
.setResource(Resource.builder().put("service.name", "elasticsearch").build())
.registerMetricReader(reader)
.build();
if (OTelSdkSettings.TELEMETRY_OTEL_METRICS_ENABLED.get(settings)) {
var otelSdk = OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
// RuntimeMetrics uses two underlying implementations to gather the full set of metric data, JFR and JMX.
// The metrics gathered by the two implementations are mutually exclusive and the union of them produces the full
// set of available metrics. See more at: https://ela.st/otel-runtime-telemetry
runtimeMetrics = RuntimeMetrics.builder(otelSdk).build();
}
if (resources == null) {
resources = createMeteringResources();
}
return meterProvider.get("elasticsearch");
return resources.systemMeterProvider().get("elasticsearch");
}
}

private OtlpHttpMetricExporter createOTLPExporter() {
private OTelMetricsResources createMeteringResources() {
TimeValue intervalTimeValue = OTelSdkSettings.TELEMETRY_OTEL_METRICS_INTERVAL.get(settings);

// Reader to collect metrics about OTLPExporter
var metricHealthReader = PeriodicMetricReader.builder(createOTLPExporter(MeterProvider.noop()))
.setInterval(intervalTimeValue.toDuration())
.build();
var metricHealthProvider = sdkMeterProvider(metricHealthReader);

var reader = PeriodicMetricReader.builder(createOTLPExporter(metricHealthProvider))
.setInterval(intervalTimeValue.toDuration())
.build();
var systemMeterProvider = sdkMeterProvider(reader);
var otelSdk = OpenTelemetrySdk.builder().setMeterProvider(systemMeterProvider).build();

// RuntimeTelemetry uses JMX (Java 8+) and JFR (Java 17+) to collect JVM metrics. See https://ela.st/otel-runtime-telemetry
var runtimeTelemetry = OTelSdkSettings.TELEMETRY_OTEL_METRICS_ENABLED.get(settings) ? RuntimeTelemetry.create(otelSdk) : null;
return new OTelMetricsResources(systemMeterProvider, metricHealthProvider, runtimeTelemetry);
}

private static SdkMeterProvider sdkMeterProvider(PeriodicMetricReader reader) {
return SdkMeterProvider.builder()
.setResource(Resource.builder().put("service.name", "elasticsearch").build())
.registerMetricReader(reader)
.build();
}

private OtlpHttpMetricExporter createOTLPExporter(MeterProvider healthExportMeterProvider) {
String endpoint = OTelSdkSettings.TELEMETRY_OTEL_METRICS_ENDPOINT.get(settings);
if (endpoint == null || endpoint.isEmpty()) {
throw new IllegalStateException(
Expand All @@ -70,7 +85,9 @@ private OtlpHttpMetricExporter createOTLPExporter() {
}
OtlpHttpMetricExporterBuilder builder = OtlpHttpMetricExporter.builder()
.setEndpoint(endpoint)
.setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred());
.setMeterProvider(() -> healthExportMeterProvider)
.setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred())
.setInternalTelemetryVersion(InternalTelemetryVersion.LATEST);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This line here seems to be the real change in this commit; the rest is refactoring.)

String authHeader = getAuthorizationHeader();
if (authHeader != null) {
builder.addHeader("Authorization", authHeader);
Expand All @@ -95,24 +112,45 @@ private String getAuthorizationHeader() {
@Override
public void attemptFlushMetrics() {
synchronized (mutex) {
if (meterProvider != null) {
// If the timeout expires, this quietly returns, which is ok in this context.
meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
if (resources != null) {
resources.systemMeterProvider.forceFlush().join(10, TimeUnit.SECONDS);
resources.meterHealthMeterProvider.forceFlush().join(10, TimeUnit.SECONDS);
// PeriodicMetricReader records collection.duration after
// each collection, so a second cycle is required to collect and export it.
resources.systemMeterProvider.forceFlush().join(10, TimeUnit.SECONDS);
resources.meterHealthMeterProvider.forceFlush().join(10, TimeUnit.SECONDS);
Comment thread
mamazzol marked this conversation as resolved.
}
}
}

@Override
public void close() {
synchronized (mutex) {
if (runtimeMetrics != null) {
runtimeMetrics.close();
runtimeMetrics = null;
if (resources != null) {
resources.close();
resources = null;
}
if (meterProvider != null) {
meterProvider.close();
meterProvider = null;
}
}

private record OTelMetricsResources(
SdkMeterProvider systemMeterProvider,
SdkMeterProvider meterHealthMeterProvider,
RuntimeTelemetry runtimeTelemetry
) implements AutoCloseable {
Comment thread
mamazzol marked this conversation as resolved.

OTelMetricsResources {
Objects.requireNonNull(systemMeterProvider, "systemMeterProvider");
Objects.requireNonNull(meterHealthMeterProvider, "meterHealthMeterProvider");
}

@Override
public void close() {
if (runtimeTelemetry != null) {
runtimeTelemetry.close();
}
systemMeterProvider.close();
meterHealthMeterProvider.close();
}
}
}
5 changes: 1 addition & 4 deletions modules/apm/src/main/plugin-metadata/entitlement-policy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,7 @@ io.opentelemetry.sdk.metrics:
io.opentelemetry.exporter.sender.jdk.internal:
- outbound_network

io.opentelemetry.instrumentation.runtime_telemetry_java17:
- manage_threads

io.opentelemetry.instrumentation.runtime_telemetry_java8:
io.opentelemetry.instrumentation.runtime_telemetry:
- manage_threads

io.opentelemetry.sdk.common:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@

package org.elasticsearch.test.apmintegration;

import org.elasticsearch.client.Request;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.TestRule;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* Test metrics exported by Elasticsearch directly using the OTel SDK
*/
Expand All @@ -31,4 +40,37 @@ public class OtelMetricsIT extends AbstractMetricsIT {
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

public void testOTelHealthMetrics() throws Exception {
final Set<String> remaining = new HashSet<>(
Set.of(
"otel.sdk.metric_reader.collection.duration",
"otel.sdk.exporter.metric_data_point.exported",
"otel.sdk.exporter.metric_data_point.inflight",
"otel.sdk.exporter.operation.duration"
)
);

CountDownLatch finished = new CountDownLatch(1);

Consumer<ReceivedTelemetry> messageConsumer = (ReceivedTelemetry msg) -> {
if (msg instanceof ReceivedTelemetry.ReceivedMetricSet m) {
for (Map.Entry<String, ReceivedTelemetry.ReceivedMetricValue> e : m.samples().entrySet()) {
remaining.remove(e.getKey());
}
}
if (remaining.isEmpty()) {
finished.countDown();
}
};

recordingApmServer.addMessageConsumer(messageConsumer);

client().performRequest(new Request("GET", "/_use_apm_metrics"));
client().performRequest(new Request("GET", "/_flush_telemetry"));

boolean completed = finished.await(TELEMETRY_TIMEOUT, TimeUnit.SECONDS);
String missing = remaining.stream().sorted().collect(Collectors.joining(", "));
assertTrue("Timeout waiting for OTel SDK health metrics. Missing: " + missing, completed && remaining.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.xpack.core.security.support.CancellableRunnable;
import org.elasticsearch.xpack.core.security.user.User;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -164,9 +163,7 @@ private static GraphServiceClient buildClient(RealmConfig config) {
final var clientSecret = config.getSetting(MicrosoftGraphAuthzRealmSettings.CLIENT_SECRET);

final var timeout = config.getSetting(MicrosoftGraphAuthzRealmSettings.HTTP_REQUEST_TIMEOUT);
final var httpClient = new OkHttpClient.Builder().callTimeout(Duration.ofSeconds(timeout.seconds()))
.addInterceptor(new RetryHandler())
.build();
final var httpClient = new OkHttpClient.Builder().callTimeout(timeout.toDuration()).addInterceptor(new RetryHandler()).build();

final var credentialProviderBuilder = new ClientSecretCredentialBuilder().clientId(
config.getSetting(MicrosoftGraphAuthzRealmSettings.CLIENT_ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public NativeAnalyticsProcessFactory(
}

void setProcessConnectTimeout(TimeValue processConnectTimeout) {
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
this.processConnectTimeout = processConnectTimeout.toDuration();
}

@Override
Expand Down
Loading
Loading