diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java
index 2d19139ca426f..03caa32717a69 100644
--- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java
+++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java
@@ -11,6 +11,7 @@
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.Meter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -18,6 +19,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Booleans;
+import org.elasticsearch.core.TimeValue;
import org.elasticsearch.telemetry.apm.APMMeterRegistry;
import static org.elasticsearch.telemetry.TelemetryProvider.OTEL_METRICS_ENABLED_SYSTEM_PROPERTY;
@@ -26,6 +28,11 @@ public class APMMeterService extends AbstractLifecycleComponent {
private static final Logger LOGGER = LogManager.getLogger(APMMeterService.class);
+ /**
+ * Time to wait for the APM agent to export telemetry if we don't have access to the settings to check.
+ */
+ public static final TimeValue DEFAULT_AGENT_INTERVAL = TimeValue.timeValueSeconds(10);
+
private final APMMeterRegistry meterRegistry;
private final MeterSupplier otelMeterSupplier;
private final MeterSupplier noopMeterSupplier;
@@ -33,31 +40,81 @@ public class APMMeterService extends AbstractLifecycleComponent {
protected volatile boolean enabled;
public APMMeterService(Settings settings) {
- this(settings, createOtelMeterSupplier(settings), () -> OpenTelemetry.noop().getMeter("noop"));
+ this(settings, createOtelMeterSupplier(settings), new NoOpMeterSupplier());
}
public APMMeterService(Settings settings, MeterSupplier otelMeterSupplier, MeterSupplier noopMeterSupplier) {
- this(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.get(settings), otelMeterSupplier, noopMeterSupplier);
- }
-
- public APMMeterService(boolean enabled, MeterSupplier otelMeterSupplier, MeterSupplier noopMeterSupplier) {
- this.enabled = enabled;
+ this.enabled = APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.get(settings);
this.otelMeterSupplier = otelMeterSupplier;
this.noopMeterSupplier = noopMeterSupplier;
this.meterRegistry = new APMMeterRegistry(enabled ? otelMeterSupplier.get() : noopMeterSupplier.get());
}
private static MeterSupplier createOtelMeterSupplier(Settings settings) {
- if (Booleans.parseBoolean(System.getProperty(OTEL_METRICS_ENABLED_SYSTEM_PROPERTY, "false")) == false) {
- return () -> GlobalOpenTelemetry.get().getMeter("elasticsearch");
+ boolean otelMetricsEnabled = Booleans.parseBoolean(System.getProperty(OTEL_METRICS_ENABLED_SYSTEM_PROPERTY, "false"));
+ if (otelMetricsEnabled) {
+ return new OTelSdkMeterSupplier(settings);
+ } else {
+ long agentFlushWaitMs = 2 * agentMetricsInterval(settings).millis();
+ return new MeterSupplier() {
+ @Override
+ public Meter get() {
+ // CONFUSION ALERT: When we do `GlobalOpenTelemetry.get()`, we're actually getting an OpenTelemetry
+ // object that routes telemetry to the APM agent; that is, we're still using OTel to report telemetry
+ // from the code, but we're using the APM agent (instead of the OTel SDK) to export it.
+ // That's why this "else" branch, where otelMetricsEnabled is false, is still using OpenTelemetry.
+
+ return GlobalOpenTelemetry.get().getMeter("elasticsearch");
+ }
+
+ @Override
+ public void attemptFlushMetrics() {
+ try {
+ // The agent offers no flush API, so we do a best-effort pause that exceeds
+ // the agent reporting interval, making it extremely likely that all telemetry
+ // has been exported.
+ //
+ // Note that the first intake request to the APM server can still be delayed beyond this window:
+ // the APM agent checks for configuration changes only periodically,
+ // so the setting changes we made during initialization don't take effect immediately.
+
+ LOGGER.info("Waiting {} ms for APM agent to flush metrics", agentFlushWaitMs);
+ Thread.sleep(agentFlushWaitMs);
+ } catch (InterruptedException e) {
+ // Flush is best-effort. We can reestablish the interrupt flag and proceed.
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
}
- return new OTelSdkMeterSupplier(settings);
+ }
+
+ private static TimeValue agentMetricsInterval(Settings settings) {
+ String intervalStr = settings.get("telemetry.agent.metrics_interval");
+ if (intervalStr != null && intervalStr.isEmpty() == false) {
+ return TimeValue.parseTimeValue(intervalStr, "telemetry.agent.metrics_interval");
+ }
+ return DEFAULT_AGENT_INTERVAL;
}
public APMMeterRegistry getMeterRegistry() {
return meterRegistry;
}
+ /**
+ * Export buffered metrics on a best-effort basis.
+ *
+ * For OpenTelemetry SDK metrics, pushes buffered data to the exporter. For Elastic APM agent metrics,
+ * sleeps for {@code 2 * telemetry.agent.metrics_interval} because the agent has no
+ * programmatic flush; observable export (e.g. first HTTP to {@code telemetry.agent.server_url}) may still
+ * take substantially longer than this sleep.
+ */
+ public void attemptFlushMetrics() {
+ if (enabled) {
+ otelMeterSupplier.attemptFlushMetrics();
+ }
+ }
+
/**
* @see APMAgentSettings#addClusterSettingsListeners(ClusterService, APMTelemetryProvider)
*/
@@ -81,4 +138,16 @@ protected void doStop() {
@Override
protected void doClose() {}
+
+ private static final class NoOpMeterSupplier implements MeterSupplier {
+ @Override
+ public Meter get() {
+ return OpenTelemetry.noop().getMeter("noop");
+ }
+
+ @Override
+ public void attemptFlushMetrics() {
+ // No-op
+ }
+ }
}
diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java
index d0957aaed6744..21e8ee6540742 100644
--- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java
+++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java
@@ -36,4 +36,14 @@ public APMMeterService getMeterService() {
public APMMeterRegistry getMeterRegistry() {
return apmMeterService.getMeterRegistry();
}
+
+ @Override
+ public void attemptFlushMetrics() {
+ apmMeterService.attemptFlushMetrics();
+ }
+
+ @Override
+ public void attemptFlushTraces() {
+ apmTracer.attemptFlushTraces();
+ }
}
diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.java
index 78899e08c7ecf..de413d2533148 100644
--- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.java
+++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.java
@@ -15,6 +15,13 @@
public interface MeterSupplier extends Supplier, AutoCloseable {
+ /**
+ * Export any buffered metrics on a best-effort basis.
+ *
+ * This defaults to a no-op just to support the fairly widespread practice of using a lambda for this in tests.
+ */
+ default void attemptFlushMetrics() {}
+
@Override
default void close() {}
}
diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/OTelSdkMeterSupplier.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/OTelSdkMeterSupplier.java
index d04203c202304..442b509443b94 100644
--- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/OTelSdkMeterSupplier.java
+++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/OTelSdkMeterSupplier.java
@@ -24,6 +24,7 @@
import org.elasticsearch.core.TimeValue;
import java.time.Duration;
+import java.util.concurrent.TimeUnit;
import static org.elasticsearch.telemetry.TelemetryProvider.OTEL_METRICS_ENABLED_SYSTEM_PROPERTY;
@@ -91,6 +92,16 @@ private String getAuthorizationHeader() {
return null;
}
+ @Override
+ public void attemptFlushMetrics() {
+ synchronized (mutex) {
+ if (meterProvider != null) {
+ // If the timeout expires, this quietly returns, which is ok in this context.
+ meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
+ }
+ }
+ }
+
@Override
public void close() {
synchronized (mutex) {
diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java
index e0d0a48b70375..a10c8b77ca2db 100644
--- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java
+++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java
@@ -33,6 +33,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.TimeValue;
import org.elasticsearch.lucene.util.automaton.MinimizationOperations;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.apm.internal.APMAgentSettings;
@@ -56,9 +57,15 @@ public class APMTracer extends AbstractLifecycleComponent implements org.elastic
private static final Logger logger = LogManager.getLogger(APMTracer.class);
+ /** Default interval when agent export timing is unknown; same semantics as APMMeterService. */
+ private static final TimeValue DEFAULT_AGENT_INTERVAL = TimeValue.timeValueSeconds(10);
+
/** Holds in-flight span information. */
private final Map spans = ConcurrentCollections.newConcurrentMap();
+ /** Time to wait in attemptFlushTraces when using the agent (2× export interval). */
+ private final long agentFlushWaitMs;
+
private volatile boolean enabled;
private volatile APMServices services;
@@ -92,6 +99,34 @@ public APMTracer(Settings settings) {
this.filterAutomaton = buildAutomaton(includeNames, excludeNames);
this.labelFilterAutomaton = buildAutomaton(labelFilters, List.of());
this.enabled = APMAgentSettings.TELEMETRY_TRACING_ENABLED_SETTING.get(settings);
+ this.agentFlushWaitMs = 2 * agentExportIntervalMs(settings);
+ }
+
+ private static long agentExportIntervalMs(Settings settings) {
+ String intervalStr = settings.get("telemetry.agent.metrics_interval");
+ if (intervalStr != null && intervalStr.isEmpty() == false) {
+ try {
+ return TimeValue.parseTimeValue(intervalStr, "telemetry.agent.metrics_interval").millis();
+ } catch (Exception e) {
+ logger.debug("Could not parse telemetry.agent.metrics_interval [{}], using default", intervalStr);
+ }
+ }
+ return DEFAULT_AGENT_INTERVAL.millis();
+ }
+
+ /**
+ * Ensures buffered traces are exported on a best-effort basis. When using the APM agent (no ES-owned
+ * tracer provider), this waits for 2× the agent export interval.
+ */
+ public void attemptFlushTraces() {
+ if (enabled == false) {
+ return;
+ }
+ try {
+ Thread.sleep(agentFlushWaitMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
public void setEnabled(boolean enabled) {
diff --git a/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java b/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java
index 3ac6184669abc..8bb189054daff 100644
--- a/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java
+++ b/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java
@@ -20,7 +20,26 @@ public interface TelemetryProvider {
MeterRegistry getMeterRegistry();
- TelemetryProvider NOOP = new TelemetryProvider() {
+ /**
+ * Ensures buffered metrics are exported. Implementations should flush the meter provider they own
+ * (e.g. OTel SdkMeterProvider) or wait for the next Elastic APM Java agent export cycle.
+ *
+ * When metrics are backed by the Elastic APM agent, there is no flush API: the implementation only waits
+ * a bounded interval derived from {@code telemetry.agent.metrics_interval}. The first HTTP request to the
+ * configured APM server can still arrive much later (agent reporter scheduling), so callers that need
+ * observable export must allow additional wall-clock time beyond this method.
+ */
+ void attemptFlushMetrics();
+
+ /**
+ * Ensures buffered traces are exported. Implementations should flush the tracer provider they own
+ * (e.g. OTel SdkTracerProvider) or wait for the next agent export cycle.
+ */
+ void attemptFlushTraces();
+
+ TelemetryProvider NOOP = new NoopTelemetryProvider();
+
+ class NoopTelemetryProvider implements TelemetryProvider {
@Override
public Tracer getTracer() {
@@ -31,5 +50,11 @@ public Tracer getTracer() {
public MeterRegistry getMeterRegistry() {
return MeterRegistry.NOOP;
}
- };
+
+ @Override
+ public void attemptFlushMetrics() {}
+
+ @Override
+ public void attemptFlushTraces() {}
+ }
}
diff --git a/test/external-modules/apm-integration/build.gradle b/test/external-modules/apm-integration/build.gradle
index 74a9993c16bdf..7a663708737c8 100644
--- a/test/external-modules/apm-integration/build.gradle
+++ b/test/external-modules/apm-integration/build.gradle
@@ -25,6 +25,7 @@ tasks.named("test").configure {
}
tasks.named('javaRestTest').configure {
+ def buildParams = project.rootProject.extensions.getByName('buildParams')
it.onlyIf("snapshot build") { buildParams.snapshotBuild }
}
diff --git a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/AbstractMetricsIT.java b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/AbstractMetricsIT.java
new file mode 100644
index 0000000000000..7f44d655ff155
--- /dev/null
+++ b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/AbstractMetricsIT.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.test.apmintegration;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.local.LocalClusterSpecBuilder;
+import org.elasticsearch.test.cluster.local.distribution.DistributionType;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.junit.runners.model.Statement;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Map.entry;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+
+/**
+ * Ensures metrics are being exported as expected.
+ */
+public abstract class AbstractMetricsIT extends ESRestTestCase {
+ private static final Logger logger = LogManager.getLogger(AbstractMetricsIT.class);
+
+ /**
+ * The APM agent is reconfigured dynamically by the APM module after booting,
+ * and the agent only reloads its configuration every 30 seconds.
+ * The first telemetry can be blocked waiting for this, so let's give it
+ * a good long time before giving up.
+ *
+ * This should be unnecessary when the APM agent is no longer used.
+ */
+ static final int TELEMETRY_TIMEOUT = 40;
+
+ protected static RecordingApmServer recordingApmServer = new RecordingApmServer();
+
+ /**
+ * Returns a builder with common cluster settings (distribution, modules, telemetry.metrics.enabled).
+ * Subclasses add mode-specific settings and call {@code .build()}.
+ */
+ protected static LocalClusterSpecBuilder baseClusterBuilder() {
+ return ElasticsearchCluster.local()
+ .distribution(DistributionType.INTEG_TEST)
+ .module("test-apm-integration")
+ .module("apm")
+ .setting("telemetry.metrics.enabled", "true");
+ }
+
+ /**
+ * Builds the rule chain for a subclass: recording server first, then cluster, then closeClients in finally.
+ */
+ protected static org.junit.rules.TestRule buildRuleChain(RecordingApmServer server, ElasticsearchCluster cluster) {
+ return org.junit.rules.RuleChain.outerRule(server).around(cluster).around((base, description) -> new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ try {
+ base.evaluate();
+ } finally {
+ try {
+ closeClients();
+ } catch (IOException e) {
+ logger.error("failed to close REST clients after test", e);
+ }
+ }
+ }
+ });
+ }
+
+ public void testExplicitMetrics() throws Exception {
+ Map> valueAssertions = new HashMap<>(
+ Map.ofEntries(
+ entry("es.test.long_counter.total", n -> closeTo(1.0, 0.001).matches(n.doubleValue())),
+ entry("es.test.double_counter.total", n -> closeTo(1.0, 0.001).matches(n.doubleValue())),
+ entry("es.test.async_double_counter.total", n -> closeTo(1.0, 0.001).matches(n.doubleValue())),
+ entry("es.test.async_long_counter.total", n -> equalTo(1).matches(n.intValue())),
+ entry("es.test.double_gauge.current", n -> closeTo(1.0, 0.001).matches(n.doubleValue())),
+ entry("es.test.long_gauge.current", n -> equalTo(1).matches(n.intValue()))
+ )
+ );
+
+ Map histogramAssertions = new HashMap<>(
+ Map.ofEntries(entry("es.test.double_histogram.histogram", 2), entry("es.test.long_histogram.histogram", 2))
+ );
+
+ CountDownLatch finished = new CountDownLatch(1);
+
+ Consumer messageConsumer = (ReceivedTelemetry msg) -> {
+ if (msg instanceof ReceivedTelemetry.ReceivedMetricSet m && "elasticsearch".equals(m.instrumentationScopeName())) {
+ logger.info("Apm metric message received: {}", m);
+
+ for (Map.Entry entry : m.samples().entrySet()) {
+ String key = entry.getKey();
+ ReceivedTelemetry.ReceivedMetricValue sampleValue = entry.getValue();
+
+ var valuePredicate = valueAssertions.get(key);
+ if (valuePredicate != null && sampleValue instanceof ReceivedTelemetry.ValueSample(Number value)) {
+ if (valuePredicate.test(value)) {
+ logger.info("{} assertion PASSED", key);
+ valueAssertions.remove(key);
+ } else {
+ logger.error("{} assertion FAILED", key);
+ }
+ }
+
+ var histogramExpected = histogramAssertions.get(key);
+ if (histogramExpected != null && sampleValue instanceof ReceivedTelemetry.HistogramSample(var counts)) {
+ int total = counts.stream().mapToInt(Integer::intValue).sum();
+ int remaining = histogramExpected - total;
+ if (remaining == 0) {
+ logger.info("{} assertion PASSED", key);
+ histogramAssertions.remove(key);
+ } else {
+ histogramAssertions.put(key, remaining);
+ }
+ }
+ }
+ }
+
+ if (valueAssertions.isEmpty() && histogramAssertions.isEmpty()) {
+ finished.countDown();
+ }
+ };
+
+ recordingApmServer.addMessageConsumer(messageConsumer);
+
+ client().performRequest(new Request("GET", "/_use_apm_metrics"));
+ client().performRequest(new Request("GET", "/_flush_telemetry"));
+ finished.await(TELEMETRY_TIMEOUT, TimeUnit.SECONDS);
+
+ var remainingAssertions = Stream.concat(valueAssertions.keySet().stream(), histogramAssertions.keySet().stream())
+ .collect(Collectors.joining(","));
+ assertTrue(
+ "Timeout when waiting for assertions to complete. Remaining assertions to match: " + remainingAssertions,
+ finished.getCount() == 0
+ );
+ }
+
+ public void testJvmMetrics() throws Exception {
+ Map> valueAssertions = new HashMap<>(
+ Map.ofEntries(
+ entry("system.cpu.total.norm.pct", n -> closeTo(0.0, 1.0).matches(n.doubleValue())),
+ entry("system.process.cpu.total.norm.pct", n -> closeTo(0.0, 1.0).matches(n.doubleValue())),
+ entry("system.memory.total", n -> greaterThan(0L).matches(n.longValue())),
+ entry("system.memory.actual.free", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("system.process.memory.size", n -> greaterThan(0L).matches(n.longValue())),
+ entry("jvm.memory.heap.used", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.memory.heap.committed", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.memory.heap.max", n -> greaterThan(0L).matches(n.longValue())),
+ entry("jvm.memory.non_heap.used", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.memory.non_heap.committed", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.gc.count", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.gc.time", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.gc.alloc", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.thread.count", n -> greaterThanOrEqualTo(1L).matches(n.longValue())),
+ entry("jvm.fd.used", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.fd.max", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.memory.heap.pool.used", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.memory.heap.pool.committed", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.memory.non_heap.pool.used", n -> greaterThanOrEqualTo(0L).matches(n.longValue())),
+ entry("jvm.memory.non_heap.pool.committed", n -> greaterThanOrEqualTo(0L).matches(n.longValue()))
+ )
+ );
+
+ CountDownLatch finished = new CountDownLatch(1);
+
+ Consumer messageConsumer = (ReceivedTelemetry msg) -> {
+ if (msg instanceof ReceivedTelemetry.ReceivedMetricSet m) {
+ for (Map.Entry e : m.samples().entrySet()) {
+ String key = e.getKey();
+ var valueAssertion = valueAssertions.get(key);
+ if (valueAssertion != null && e.getValue() instanceof ReceivedTelemetry.ValueSample(Number value)) {
+ if (valueAssertion.test(value)) {
+ logger.info("{} assertion PASSED", key);
+ valueAssertions.remove(key);
+ }
+ }
+ }
+ }
+ if (valueAssertions.isEmpty()) {
+ finished.countDown();
+ }
+ };
+
+ recordingApmServer.addMessageConsumer(messageConsumer);
+
+ client().performRequest(new Request("GET", "/_flush_telemetry"));
+ logger.debug("About to wait for telemetry");
+ var completed = finished.await(TELEMETRY_TIMEOUT, TimeUnit.SECONDS);
+ var remaining = valueAssertions.keySet().stream().collect(Collectors.joining(", "));
+ assertTrue("Timeout waiting for JVM metrics. Missing: " + remaining, completed);
+ }
+}
diff --git a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmAgentMetricsIT.java b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmAgentMetricsIT.java
new file mode 100644
index 0000000000000..96860ac4f6ded
--- /dev/null
+++ b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmAgentMetricsIT.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.test.apmintegration;
+
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.junit.ClassRule;
+import org.junit.rules.TestRule;
+
+/**
+ * Tests metrics exported by the APM agent
+ */
+public class ApmAgentMetricsIT extends AbstractMetricsIT {
+
+ public static ElasticsearchCluster cluster = AbstractMetricsIT.baseClusterBuilder()
+ .systemProperty("telemetry.otel.metrics.enabled", "false")
+ .setting("telemetry.agent.server_url", () -> "http://127.0.0.1:" + recordingApmServer.getPort())
+ .setting("telemetry.agent.metrics_interval", "1s")
+ .build();
+
+ @ClassRule
+ public static TestRule ruleChain = AbstractMetricsIT.buildRuleChain(recordingApmServer, cluster);
+
+ @Override
+ protected String getTestRestCluster() {
+ return cluster.getHttpAddresses();
+ }
+}
diff --git a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmIntakeMessageParser.java b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmIntakeMessageParser.java
new file mode 100644
index 0000000000000..a8d54ed7db684
--- /dev/null
+++ b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmIntakeMessageParser.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.test.apmintegration;
+
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.json.JsonXContent;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Parses a single line of APM intake NDJSON into a protocol-neutral {@link ReceivedTelemetry} event.
+ * Intake-specific; a future OTLP decoder will produce the same ADT from OTLP payloads.
+ */
+public final class ApmIntakeMessageParser {
+
+ static final Set IGNORED_EVENT_NAMES = Set.of("metadata");
+
+ private ApmIntakeMessageParser() {}
+
+ /**
+ * Parse one NDJSON line into a received telemetry event, or {@link Optional#empty() empty} if the line should be skipped.
+ *
+ * @param line one line of NDJSON
+ * @throws IOException if the line is malformed or invalid
+ */
+ public static Optional parseLine(String line) throws IOException {
+ if (line == null || line.isBlank()) {
+ return Optional.empty();
+ }
+ try (XContentParser parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, line)) {
+ Map map = parser.map();
+ if (map.containsKey("metricset")) {
+ return Optional.of(parseMetricSet(map));
+ } else if (map.containsKey("transaction")) {
+ return Optional.of(parseTransaction(map));
+ } else if (map.containsKey("span")) {
+ return Optional.of(parseSpan(map));
+ } else if (IGNORED_EVENT_NAMES.containsAll(map.keySet())) {
+ // We don't care about these
+ return Optional.empty();
+ } else {
+ throw new IOException("Unexpected event type: " + map.keySet());
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static ReceivedTelemetry parseMetricSet(Map root) throws IOException {
+ Object metricsetObj = root.get("metricset");
+ if ((metricsetObj instanceof Map, ?> == false)) {
+ throw new IOException("metricset missing or not an object");
+ }
+ Map metricset = (Map) metricsetObj;
+ Map tags = (Map) metricset.getOrDefault("tags", Collections.emptyMap());
+ String scopeName = tags.get("otel_instrumentation_scope_name") != null
+ ? tags.get("otel_instrumentation_scope_name").toString()
+ : "";
+
+ Object samplesObj = metricset.get("samples");
+ if (samplesObj == null) {
+ return new ReceivedTelemetry.ReceivedMetricSet(scopeName, Map.of());
+ }
+ if (samplesObj instanceof Map, ?> == false) {
+ throw new IOException("metricset.samples is not an object");
+ }
+ Map samplesMap = (Map) samplesObj;
+
+ Map samples = new HashMap<>();
+ for (Map.Entry entry : samplesMap.entrySet()) {
+ if (entry.getValue() instanceof Map, ?> sampleObj) {
+ samples.put(entry.getKey(), parseSample((Map) sampleObj));
+ } else {
+ throw new IOException("metricset.samples entry [" + entry.getKey() + "] is not an object");
+ }
+ }
+ return new ReceivedTelemetry.ReceivedMetricSet(scopeName, Map.copyOf(samples));
+ }
+
+ private static ReceivedTelemetry.ReceivedMetricValue parseSample(Map sample) throws IOException {
+ if (sample.containsKey("value")) {
+ Object v = sample.get("value");
+ if (v instanceof Number n) {
+ return new ReceivedTelemetry.ValueSample(n);
+ }
+ throw new IOException("metric sample has value that is not a number");
+ }
+ if (sample.containsKey("counts")) {
+ Object c = sample.get("counts");
+ if (c instanceof List> list) {
+ List counts = new ArrayList<>();
+ for (Object o : list) {
+ if (o instanceof Number n) {
+ counts.add(n.intValue());
+ } else {
+ throw new IOException("metric sample counts element is not a number");
+ }
+ }
+ return new ReceivedTelemetry.HistogramSample(List.copyOf(counts));
+ }
+ throw new IOException("metric sample counts is not a list");
+ }
+ throw new IOException("metric sample has no value or counts");
+ }
+
+ @SuppressWarnings("unchecked")
+ private static ReceivedTelemetry parseTransaction(Map root) throws IOException {
+ Object transactionObj = root.get("transaction");
+ if ((transactionObj instanceof Map, ?> == false)) {
+ throw new IOException("transaction missing or not an object");
+ }
+ Map transaction = (Map) transactionObj;
+ String name = getString(transaction, "name");
+ String traceId = getString(transaction, "trace_id");
+ String id = getString(transaction, "id");
+ if (name == null || traceId == null) {
+ throw new IOException("transaction missing name or trace_id");
+ }
+ String spanId = id != null ? id : "";
+ return new ReceivedTelemetry.ReceivedSpan(name, traceId, spanId, Optional.empty());
+ }
+
+ @SuppressWarnings("unchecked")
+ private static ReceivedTelemetry parseSpan(Map root) throws IOException {
+ Object spanObj = root.get("span");
+ if (spanObj == null || (spanObj instanceof Map, ?> == false)) {
+ throw new IOException("span missing or not an object");
+ }
+ Map span = (Map) spanObj;
+ String name = getString(span, "name");
+ String traceId = getString(span, "trace_id");
+ String id = getString(span, "id");
+ if (name == null || traceId == null || id == null) {
+ throw new IOException("span missing name, trace_id, or id");
+ }
+ String parentId = getString(span, "parent_id");
+ if (parentId == null) {
+ parentId = getString(span, "transaction_id");
+ }
+ return new ReceivedTelemetry.ReceivedSpan(name, traceId, id, Optional.ofNullable(parentId));
+ }
+
+ private static String getString(Map map, String key) {
+ Object v = map.get(key);
+ return v != null ? v.toString() : null;
+ }
+}
diff --git a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java
deleted file mode 100644
index 2442447d93a89..0000000000000
--- a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-
-package org.elasticsearch.test.apmintegration;
-
-import com.carrotsearch.randomizedtesting.annotations.Name;
-import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
-
-import org.elasticsearch.client.Request;
-import org.elasticsearch.test.cluster.ElasticsearchCluster;
-import org.elasticsearch.test.cluster.MutableSettingsProvider;
-import org.elasticsearch.test.cluster.MutableSystemPropertyProvider;
-import org.elasticsearch.test.cluster.local.distribution.DistributionType;
-import org.elasticsearch.test.rest.ESRestTestCase;
-import org.elasticsearch.xcontent.XContentParser;
-import org.elasticsearch.xcontent.XContentParserConfiguration;
-import org.elasticsearch.xcontent.spi.XContentProvider;
-import org.hamcrest.Matcher;
-import org.hamcrest.StringDescription;
-import org.junit.Before;
-import org.junit.ClassRule;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.util.Map.entry;
-import static org.hamcrest.Matchers.closeTo;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-
-public class MetricsApmIT extends ESRestTestCase {
- private static final XContentProvider.FormatProvider XCONTENT = XContentProvider.provider().getJsonXContent();
- private static final MutableSettingsProvider clusterSettings = new MutableSettingsProvider();
- private static final MutableSystemPropertyProvider systemProperties = new MutableSystemPropertyProvider();
- private final boolean withOTel;
-
- @ClassRule
- public static RecordingApmServer recordingApmServer = new RecordingApmServer();
-
- public MetricsApmIT(@Name("withOTel") boolean withOTel) {
- this.withOTel = withOTel;
- }
-
- @ParametersFactory
- public static Iterable