diff --git a/build-tools/build.gradle b/build-tools/build.gradle index b35a505a7ff89..5a2b2d12c9eaa 100644 --- a/build-tools/build.gradle +++ b/build-tools/build.gradle @@ -143,6 +143,7 @@ dependencies { implementation buildLibs.asm implementation buildLibs.jackson.core implementation buildLibs.jackson.databind + implementation "io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha" testFixturesApi buildLibs.commmons.io testFixturesApi gradleApi() diff --git a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java index cc98d6d139121..d38ea3bd7fe1b 100644 --- a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java +++ b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java @@ -9,6 +9,8 @@ package org.elasticsearch.gradle.testclusters; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -89,6 +91,7 @@ public void start() throws IOException { } InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 0); HttpServer server = HttpServer.create(addr, 10); + server.createContext("/v1/metrics", new OtlpMetricsHandler()); server.createContext("/", new RootHandler()); server.start(); instance = server; @@ -209,4 +212,30 @@ private void logFiltered(InputStream body) throws IOException { } } } + + class OtlpMetricsHandler implements HttpHandler { + @Override + public void handle(HttpExchange t) throws IOException { + byte[] bytes = t.getRequestBody().readAllBytes(); + ExportMetricsServiceRequest metrics = ExportMetricsServiceRequest.parseFrom(bytes); + for (var resourceMetrics : metrics.getResourceMetricsList()) { + var samples = new ArrayList(); + for (var scopeMetrics : resourceMetrics.getScopeMetricsList()) { + for (var metric : scopeMetrics.getMetricsList()) { + String name = metric.getName(); + if (metricFilter != null && metricFilter.matcher(name).matches() == false) { + continue; + } + samples.add(metric.toString()); + } + } + if (samples.isEmpty() == false) { + logger.lifecycle("OTLP Metricset:\n{}", String.join("\n", samples)); + } + } + + t.sendResponseHeaders(200, 0); + t.getResponseBody().close(); + } + } } diff --git a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java index 0a450dcfb6e4d..136d9b7fe0c64 100644 --- a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java +++ b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java @@ -48,6 +48,8 @@ public abstract class RunTask extends DefaultTestClustersTask { private Boolean apmServerEnabled = false; + private Boolean usingOtelSdk = false; + private String apmServerMetrics = null; private String apmServerTransactions = null; @@ -130,6 +132,19 @@ public void setApmServerEnabled(Boolean apmServerEnabled) { this.apmServerEnabled = apmServerEnabled; } + @Input + public Boolean getUsingOtelSdk() { + return usingOtelSdk; + } + + @Option( + option = "using-otel-sdk", + description = "Use the OTel SDK for metrics export instead of the APM agent (requires --with-apm-server)" + ) + public void setUsingOtelSdk(Boolean usingOtelSdk) { + this.usingOtelSdk = usingOtelSdk; + } + @Option(option = "apm-metrics", description = "Metric wildcard filter for APM server") public void setApmServerMetrics(String apmServerMetrics) { this.apmServerMetrics = apmServerMetrics; @@ -245,6 +260,10 @@ public void beforeStart() { getDataPath = n -> dataDir.resolve(n.getName()); } + if (usingOtelSdk && apmServerEnabled == false) { + throw new GradleException("--using-otel-sdk requires --with-apm-server"); + } + if (apmServerEnabled) { try { mockServer = new MockApmServer(apmServerMetrics, apmServerTransactions, apmServerTransactionsExcludes); @@ -285,10 +304,16 @@ public void beforeStart() { if (mockServer != null) { node.setting("telemetry.metrics.enabled", "true"); node.setting("telemetry.tracing.enabled", "true"); - node.setting("telemetry.agent.transaction_sample_rate", "1.0"); - node.setting("telemetry.agent.transaction_max_spans", "100"); - node.setting("telemetry.agent.metrics_interval", "10s"); node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort()); + if (usingOtelSdk) { + node.systemProperty("telemetry.otel.metrics.enabled", "true"); + node.setting("telemetry.otel.metrics.endpoint", "http://127.0.0.1:" + mockServer.getPort() + "/v1/metrics"); + node.setting("telemetry.otel.metrics.interval", "10s"); + } else { + node.setting("telemetry.agent.transaction_sample_rate", "1.0"); + node.setting("telemetry.agent.transaction_max_spans", "100"); + node.setting("telemetry.agent.metrics_interval", "10s"); + } } // in serverless metrics are enabled by default // if metrics were not enabled explicitly for gradlew run we should disable them diff --git a/distribution/tools/server-cli/src/main/java/org/elasticsearch/server/cli/APMJvmOptions.java b/distribution/tools/server-cli/src/main/java/org/elasticsearch/server/cli/APMJvmOptions.java index 844c0bba651d5..96a39ce9a2907 100644 --- a/distribution/tools/server-cli/src/main/java/org/elasticsearch/server/cli/APMJvmOptions.java +++ b/distribution/tools/server-cli/src/main/java/org/elasticsearch/server/cli/APMJvmOptions.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Booleans; import org.elasticsearch.core.Nullable; import java.io.IOException; @@ -31,6 +32,8 @@ import java.util.Set; import java.util.StringJoiner; +import static org.elasticsearch.telemetry.TelemetryProvider.OTEL_METRICS_ENABLED_SYSTEM_PROPERTY; + /** * This class is responsible for working out if APM telemetry is configured and if so, preparing * a temporary config file for the APM Java agent and CLI options to the JVM to configure APM. @@ -38,6 +41,7 @@ * server URL and secret key can only be provided when Elasticsearch starts. */ class APMJvmOptions { + /** * Contains agent configuration that must always be applied, and cannot be overridden. */ @@ -137,14 +141,24 @@ class APMJvmOptions { */ static List apmJvmOptions(Settings settings, @Nullable SecureSettings secrets, Path logsDir, Path tmpdir) throws UserException, IOException { + boolean tracingEnabled = settings.getAsBoolean("telemetry.tracing.enabled", false); + boolean metricsEnabled = settings.getAsBoolean("telemetry.metrics.enabled", false); + boolean agentMetricsEnabled = Booleans.parseBoolean(System.getProperty(OTEL_METRICS_ENABLED_SYSTEM_PROPERTY, "false")) == false; + boolean attachAgent = tracingEnabled || (metricsEnabled && agentMetricsEnabled); + final Path agentJar = findAgentJar(); - if (agentJar == null) { + if (attachAgent == false || agentJar == null) { return List.of(); } final Map propertiesMap = extractApmSettings(settings); + if (metricsEnabled == false || agentMetricsEnabled == false) { + propertiesMap.put("metrics_interval", "0s"); + propertiesMap.put("disable_metrics", "*"); + } + // Configures a log file to write to. Don't disable writing to a log file, // as the agent will then require extra Security Manager permissions when // it tries to do something else, and it's just painful. diff --git a/gradle/verification-metadata.xml b/gradle/verification-metadata.xml index 9f2ae4d1f0c89..1282766203e15 100644 --- a/gradle/verification-metadata.xml +++ b/gradle/verification-metadata.xml @@ -417,16 +417,16 @@ + + + + + - - - - - @@ -602,11 +602,6 @@ - - - - - @@ -687,21 +682,11 @@ - - - - - - - - - - @@ -832,11 +817,6 @@ - - - - - @@ -847,11 +827,6 @@ - - - - - @@ -1062,16 +1037,31 @@ + + + + + + + + + + + + + + + @@ -1829,21 +1819,11 @@ - - - - - - - - - - @@ -1854,146 +1834,61 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -2014,11 +1909,6 @@ - - - - - @@ -2059,31 +1949,16 @@ - - - - - - - - - - - - - - - @@ -2109,11 +1984,6 @@ - - - - - @@ -2124,11 +1994,6 @@ - - - - - @@ -2154,6 +2019,11 @@ + + + + + @@ -2164,6 +2034,11 @@ + + + + + @@ -2174,11 +2049,21 @@ + + + + + + + + + + @@ -2189,66 +2074,141 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -2259,6 +2219,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -2647,11 +2644,6 @@ - - - - - @@ -3062,11 +3054,6 @@ - - - - - @@ -3103,11 +3090,6 @@ - - - - - @@ -3695,6 +3677,11 @@ + + + + + @@ -4000,51 +3987,11 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -4333,11 +4280,6 @@ - - - - - @@ -5648,11 +5590,6 @@ - - - - - @@ -5808,21 +5745,11 @@ - - - - - - - - - - @@ -5864,11 +5791,6 @@ - - - - - @@ -5879,21 +5801,11 @@ - - - - - - - - - - @@ -5919,11 +5831,6 @@ - - - - - @@ -5934,11 +5841,6 @@ - - - - - @@ -5949,11 +5851,6 @@ - - - - - @@ -5974,21 +5871,11 @@ - - - - - - - - - - @@ -5999,11 +5886,6 @@ - - - - - @@ -6014,11 +5896,6 @@ - - - - - @@ -6029,11 +5906,6 @@ - - - - - @@ -6044,11 +5916,6 @@ - - - - - @@ -6064,11 +5931,6 @@ - - - - - @@ -6079,21 +5941,11 @@ - - - - - - - - - - @@ -6109,11 +5961,6 @@ - - - - - diff --git a/modules/apm/build.gradle b/modules/apm/build.gradle index 91a75bf4926cc..ccca6030898f3 100644 --- a/modules/apm/build.gradle +++ b/modules/apm/build.gradle @@ -14,19 +14,59 @@ esplugin { } def otelVersion = '1.57.0' -def otelSemconvVersion = '1.21.0-alpha' +def otelInstrumentationVersion = '2.23.0-alpha' dependencies { implementation "io.opentelemetry:opentelemetry-api:${otelVersion}" implementation "io.opentelemetry:opentelemetry-context:${otelVersion}" - implementation "io.opentelemetry:opentelemetry-semconv:${otelSemconvVersion}" - runtimeOnly "io.opentelemetry:opentelemetry-common:${otelVersion}" - runtimeOnly "co.elastic.apm:elastic-apm-agent-java8:1.55.0" + implementation "io.opentelemetry:opentelemetry-sdk:${otelVersion}" + implementation "io.opentelemetry:opentelemetry-sdk-metrics:${otelVersion}" + implementation("io.opentelemetry:opentelemetry-exporter-otlp:${otelVersion}") { + exclude group: 'io.opentelemetry', module: 'opentelemetry-exporter-sender-okhttp' + } + implementation "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java17:${otelInstrumentationVersion}" + + implementation "io.opentelemetry:opentelemetry-sdk-common:${otelVersion}" + runtimeOnly "io.opentelemetry:opentelemetry-common:${otelVersion}" + runtimeOnly "io.opentelemetry:opentelemetry-sdk-trace:${otelVersion}" + runtimeOnly "io.opentelemetry:opentelemetry-sdk-logs:${otelVersion}" + runtimeOnly "io.opentelemetry:opentelemetry-exporter-common:${otelVersion}" + runtimeOnly "io.opentelemetry:opentelemetry-exporter-otlp-common:${otelVersion}" + runtimeOnly "io.opentelemetry:opentelemetry-exporter-sender-jdk:${otelVersion}" + runtimeOnly "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:${otelVersion}" + runtimeOnly "co.elastic.apm:elastic-apm-agent-java8:1.55.0" + + testImplementation "io.opentelemetry:opentelemetry-sdk-testing:${otelVersion}" javaRestTestImplementation project(':modules:apm') javaRestTestImplementation project(':test:framework') } +tasks.named("thirdPartyAudit").configure { + ignoreViolations( + 'io.opentelemetry.exporter.internal.marshal.UnsafeStringEncoder', + 'io.opentelemetry.exporter.internal.marshal.UnsafeStringEncoder$UnsafeHolder' + ) + ignoreMissingClasses( + 'com.fasterxml.jackson.core.JsonFactory', + 'com.fasterxml.jackson.core.JsonGenerator', + 'com.google.common.io.ByteStreams', + 'com.google.common.util.concurrent.ListenableFuture', + 'io.grpc.CallOptions', + 'io.grpc.Channel', + 'io.grpc.Drainable', + 'io.grpc.KnownLength', + 'io.grpc.ManagedChannel', + 'io.grpc.MethodDescriptor', + 'io.grpc.MethodDescriptor$Builder', + 'io.grpc.MethodDescriptor$Marshaller', + 'io.grpc.MethodDescriptor$MethodType', + 'io.grpc.stub.AbstractFutureStub', + 'io.grpc.stub.AbstractStub', + 'io.grpc.stub.ClientCalls' + ) +} + tasks.named("dependencyLicenses").configure { mapping from: /opentelemetry-.*/, to: 'opentelemetry' } diff --git a/modules/apm/src/main/java/module-info.java b/modules/apm/src/main/java/module-info.java index f179c4c7bbe0a..111db2794d24d 100644 --- a/modules/apm/src/main/java/module-info.java +++ b/modules/apm/src/main/java/module-info.java @@ -13,8 +13,13 @@ requires org.elasticsearch.xcontent; requires org.apache.logging.log4j; requires org.apache.lucene.core; - requires io.opentelemetry.context; requires io.opentelemetry.api; + requires io.opentelemetry.context; + requires io.opentelemetry.sdk; + requires io.opentelemetry.sdk.metrics; + requires io.opentelemetry.exporter.otlp; + requires io.opentelemetry.instrumentation.runtime_telemetry_java17; + requires io.opentelemetry.sdk.common; exports org.elasticsearch.telemetry.apm; exports org.elasticsearch.telemetry.apm.metrics; diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java index 43447cfa21a62..b2e2b17a8a93a 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java @@ -21,6 +21,7 @@ import org.elasticsearch.telemetry.apm.internal.APMAgentSettings; import org.elasticsearch.telemetry.apm.internal.APMMeterService; import org.elasticsearch.telemetry.apm.internal.APMTelemetryProvider; +import org.elasticsearch.telemetry.apm.internal.OTelSdkSettings; import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer; import java.util.Collection; @@ -88,6 +89,8 @@ public List> getSettings() { APMAgentSettings.TELEMETRY_API_KEY_SETTING, // Metrics APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING, + OTelSdkSettings.TELEMETRY_OTEL_METRICS_ENDPOINT, + OTelSdkSettings.TELEMETRY_OTEL_METRICS_INTERVAL, // Tracing APMAgentSettings.TELEMETRY_TRACING_ENABLED_SETTING, APMAgentSettings.TELEMETRY_TRACING_NAMES_INCLUDE_SETTING, diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java index 5a8978074d164..2d19139ca426f 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java @@ -11,36 +11,47 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.metrics.Meter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Booleans; import org.elasticsearch.telemetry.apm.APMMeterRegistry; -import java.util.function.Supplier; +import static org.elasticsearch.telemetry.TelemetryProvider.OTEL_METRICS_ENABLED_SYSTEM_PROPERTY; public class APMMeterService extends AbstractLifecycleComponent { - private final APMMeterRegistry meterRegistry; - private final Supplier otelMeterSupplier; - private final Supplier noopMeterSupplier; + private static final Logger LOGGER = LogManager.getLogger(APMMeterService.class); + + private final APMMeterRegistry meterRegistry; + private final MeterSupplier otelMeterSupplier; + private final MeterSupplier noopMeterSupplier; protected volatile boolean enabled; public APMMeterService(Settings settings) { - this(settings, APMMeterService.otelMeter(), APMMeterService.noopMeter()); + this(settings, createOtelMeterSupplier(settings), () -> OpenTelemetry.noop().getMeter("noop")); } - public APMMeterService(Settings settings, Supplier otelMeterSupplier, Supplier noopMeterSupplier) { + public APMMeterService(Settings settings, MeterSupplier otelMeterSupplier, MeterSupplier noopMeterSupplier) { this(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.get(settings), otelMeterSupplier, noopMeterSupplier); } - public APMMeterService(boolean enabled, Supplier otelMeterSupplier, Supplier noopMeterSupplier) { + public APMMeterService(boolean enabled, MeterSupplier otelMeterSupplier, MeterSupplier noopMeterSupplier) { this.enabled = enabled; this.otelMeterSupplier = otelMeterSupplier; this.noopMeterSupplier = noopMeterSupplier; - this.meterRegistry = new APMMeterRegistry(enabled ? createOtelMeter() : createNoopMeter()); + this.meterRegistry = new APMMeterRegistry(enabled ? otelMeterSupplier.get() : noopMeterSupplier.get()); + } + + private static MeterSupplier createOtelMeterSupplier(Settings settings) { + if (Booleans.parseBoolean(System.getProperty(OTEL_METRICS_ENABLED_SYSTEM_PROPERTY, "false")) == false) { + return () -> GlobalOpenTelemetry.get().getMeter("elasticsearch"); + } + return new OTelSdkMeterSupplier(settings); } public APMMeterRegistry getMeterRegistry() { @@ -52,11 +63,7 @@ public APMMeterRegistry getMeterRegistry() { */ void setEnabled(boolean enabled) { this.enabled = enabled; - if (enabled) { - meterRegistry.setProvider(createOtelMeter()); - } else { - meterRegistry.setProvider(createNoopMeter()); - } + meterRegistry.setProvider(enabled ? otelMeterSupplier.get() : noopMeterSupplier.get()); } @Override @@ -64,29 +71,14 @@ protected void doStart() {} @Override protected void doStop() { - meterRegistry.setProvider(createNoopMeter()); + try { + otelMeterSupplier.close(); + } catch (Exception e) { + LOGGER.warn("Exception closing OTel MeterSupplier", e); + } + meterRegistry.setProvider(noopMeterSupplier.get()); } @Override protected void doClose() {} - - protected Meter createOtelMeter() { - assert this.enabled; - return otelMeterSupplier.get(); - } - - protected Meter createNoopMeter() { - return noopMeterSupplier.get(); - } - - protected static Supplier noopMeter() { - return () -> OpenTelemetry.noop().getMeter("noop"); - } - - // to be used within doPrivileged block - private static Supplier otelMeter() { - var openTelemetry = GlobalOpenTelemetry.get(); - var meter = openTelemetry.getMeter("elasticsearch"); - return () -> meter; - } } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.java new file mode 100644 index 0000000000000..78899e08c7ecf --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.telemetry.apm.internal; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.function.Supplier; + +public interface MeterSupplier extends Supplier, AutoCloseable { + + @Override + default void close() {} +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MetricValidator.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MetricValidator.java index 24586d4f60f6d..b82a5ba48c0f0 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MetricValidator.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MetricValidator.java @@ -67,7 +67,16 @@ public class MetricValidator { "es.thread_pool.searchable_snapshots_cache_fetch_async.*", "es.thread_pool.searchable_snapshots_cache_prewarming.*", "es.thread_pool.security-crypto.*", - "es.thread_pool.security-token-key.*" + "es.thread_pool.security-token-key.*", + // APM Java agent-compatible metric names (see https://www.elastic.co/docs/reference/apm/agents/java/metrics#metrics-jvm) + "system.cpu.*", + "system.memory.*", + "system.process.*", + "jvm.fd.*", + "jvm.file_descriptor.*", + "jvm.gc.*", + "jvm.memory.*", + "jvm.thread.*" ); /** @@ -241,7 +250,16 @@ private static class Attributes { Map.entry("es.tsdb.downsample.actions.shard.total", DOWNSAMPLE_ATTRIBUTES), Map.entry("es.tsdb.downsample.actions.total", DOWNSAMPLE_ATTRIBUTES), Map.entry("es.tsdb.downsample.latency.shard.histogram", DOWNSAMPLE_ATTRIBUTES), - Map.entry("es.tsdb.downsample.latency.total.histogram", DOWNSAMPLE_ATTRIBUTES) + Map.entry("es.tsdb.downsample.latency.total.histogram", DOWNSAMPLE_ATTRIBUTES), + // APM Java agent-compatible metrics (see https://www.elastic.co/docs/reference/apm/agents/java/metrics#metrics-jvm) + Map.entry("jvm.gc.count", Set.of("name")), + Map.entry("jvm.gc.time", Set.of("name")), + Map.entry("jvm.memory.heap.pool.used", Set.of("name")), + Map.entry("jvm.memory.heap.pool.committed", Set.of("name")), + Map.entry("jvm.memory.heap.pool.max", Set.of("name")), + Map.entry("jvm.memory.non_heap.pool.used", Set.of("name")), + Map.entry("jvm.memory.non_heap.pool.committed", Set.of("name")), + Map.entry("jvm.memory.non_heap.pool.max", Set.of("name")) ); // forbidden attributes known to cause issues due to mapping conflicts or high cardinality @@ -317,6 +335,11 @@ public static void assertValidAttributeNames(String metricName, Map TELEMETRY_OTEL_METRICS_ENDPOINT = Setting.simpleString( + "telemetry.otel.metrics.endpoint", + "", + NodeScope + ); + + public static final Setting TELEMETRY_OTEL_METRICS_INTERVAL = Setting.timeSetting( + "telemetry.otel.metrics.interval", + TimeValue.timeValueSeconds(10), + NodeScope + ); +} diff --git a/modules/apm/src/main/plugin-metadata/entitlement-policy.yaml b/modules/apm/src/main/plugin-metadata/entitlement-policy.yaml index 216c67c492260..c65183e939687 100644 --- a/modules/apm/src/main/plugin-metadata/entitlement-policy.yaml +++ b/modules/apm/src/main/plugin-metadata/entitlement-policy.yaml @@ -1,3 +1,4 @@ +# These can be removed once the APM Agent is fully deprecated org.elasticsearch.telemetry.apm: - create_class_loader - write_system_properties: @@ -83,3 +84,17 @@ org.elasticsearch.telemetry.apm: - elastic.apm.application_packages - elastic.apm.stack_trace_limit - elastic.apm.span_stack_trace_min_duration +io.opentelemetry.sdk.metrics: + - manage_threads + +io.opentelemetry.exporter.sender.jdk.internal: + - outbound_network + +io.opentelemetry.instrumentation.runtime_telemetry_java17: + - manage_threads + +io.opentelemetry.instrumentation.runtime_telemetry_java8: + - manage_threads + +io.opentelemetry.sdk.common: + - manage_threads diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/TestAPMMeterService.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/TestAPMMeterService.java index 0ef49cbb748ab..7eced1bbe971a 100644 --- a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/TestAPMMeterService.java +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/TestAPMMeterService.java @@ -9,14 +9,11 @@ package org.elasticsearch.telemetry.apm.internal; -import io.opentelemetry.api.metrics.Meter; - import org.elasticsearch.common.settings.Settings; -import java.util.function.Supplier; - public class TestAPMMeterService extends APMMeterService { - public TestAPMMeterService(Settings settings, Supplier otelMeterSupplier, Supplier noopMeterSupplier) { + + public TestAPMMeterService(Settings settings, MeterSupplier otelMeterSupplier, MeterSupplier noopMeterSupplier) { super(settings, otelMeterSupplier, noopMeterSupplier); } diff --git a/server/src/main/java/org/elasticsearch/monitor/jvm/SunThreadInfo.java b/server/src/main/java/org/elasticsearch/monitor/jvm/SunThreadInfo.java index b62e58c04b278..33f650366d67d 100644 --- a/server/src/main/java/org/elasticsearch/monitor/jvm/SunThreadInfo.java +++ b/server/src/main/java/org/elasticsearch/monitor/jvm/SunThreadInfo.java @@ -14,12 +14,14 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; import java.lang.reflect.Method; +import java.util.Arrays; import static org.apache.logging.log4j.LogManager.getLogger; public class SunThreadInfo { private static final ThreadMXBean threadMXBean; + private static final Method getAllThreadAllocatedBytes; private static final Method getThreadAllocatedBytes; private static final Method isThreadAllocatedMemorySupported; private static final Method isThreadAllocatedMemoryEnabled; @@ -29,6 +31,7 @@ public class SunThreadInfo { static { threadMXBean = ManagementFactory.getThreadMXBean(); + getAllThreadAllocatedBytes = getMethod("getThreadAllocatedBytes", long[].class); getThreadAllocatedBytes = getMethod("getThreadAllocatedBytes", long.class); isThreadAllocatedMemorySupported = getMethod("isThreadAllocatedMemorySupported"); isThreadAllocatedMemoryEnabled = getMethod("isThreadAllocatedMemoryEnabled"); @@ -60,6 +63,28 @@ public boolean isThreadAllocatedMemoryEnabled() { } } + public long[] getAllThreadAllocatedBytes(long[] ids) { + if (getAllThreadAllocatedBytes == null) { + return new long[0]; + } + + if (isThreadAllocatedMemorySupported() == false || isThreadAllocatedMemoryEnabled() == false) { + return new long[0]; + } + + long[] validIds = Arrays.stream(ids).filter(id -> id <= 0).toArray(); + if (validIds.length == 0) { + return new long[0]; + } + + try { + return (long[]) getAllThreadAllocatedBytes.invoke(threadMXBean, (Object) validIds); + } catch (Exception e) { + logger.warn("exception retrieving all threads allocated memory", e); + return new long[0]; + } + } + public long getThreadAllocatedBytes(long id) { if (getThreadAllocatedBytes == null) { return 0; diff --git a/server/src/main/java/org/elasticsearch/monitor/metrics/SystemMetrics.java b/server/src/main/java/org/elasticsearch/monitor/metrics/SystemMetrics.java new file mode 100644 index 0000000000000..6ad56610aadc5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/monitor/metrics/SystemMetrics.java @@ -0,0 +1,375 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.monitor.metrics; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.core.Booleans; +import org.elasticsearch.monitor.jvm.SunThreadInfo; +import org.elasticsearch.monitor.os.OsProbe; +import org.elasticsearch.monitor.process.ProcessProbe; +import org.elasticsearch.telemetry.metric.DoubleWithAttributes; +import org.elasticsearch.telemetry.metric.LongWithAttributes; +import org.elasticsearch.telemetry.metric.MeterRegistry; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryType; +import java.lang.management.ThreadMXBean; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.LongSupplier; + +import static org.elasticsearch.telemetry.TelemetryProvider.OTEL_METRICS_ENABLED_SYSTEM_PROPERTY; + +/** + * Emits system and JVM metrics compatible with the Elastic APM Java agent, complementing + * the OTel SDK's JMX auto-instrumentation. + */ +public class SystemMetrics extends AbstractLifecycleComponent { + private static final Logger logger = LogManager.getLogger(SystemMetrics.class); + + private static final MemoryMXBean MEMORY_BEAN = ManagementFactory.getMemoryMXBean(); + private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean(); + private static final List MEMORY_POOL_MX_BEANS = ManagementFactory.getMemoryPoolMXBeans(); + private static final AllocatedBytesMetrics ALLOCATED_BYTES_METRICS = new AllocatedBytesMetrics(); + + private final MeterRegistry registry; + private final List metrics = new ArrayList<>(); + + public SystemMetrics(MeterRegistry registry) { + this.registry = registry; + } + + @Override + protected void doStart() { + if (Booleans.parseBoolean(System.getProperty(OTEL_METRICS_ENABLED_SYSTEM_PROPERTY, "false")) == false) { + return; + } + registerJvmMemoryMetrics(); + registerJvmGcMetrics(); + registerJvmThreadMetrics(); + registerFileDescriptorMetrics(); + registerSystemMemoryMetrics(); + registerCgroupMemoryMetrics(); + registerSystemCpuMetrics(); + } + + @Override + protected void doStop() {} + + @Override + protected void doClose() { + for (AutoCloseable metric : metrics) { + try { + metric.close(); + } catch (Exception e) { + logger.warn("metrics close() method should not throw Exception", e); + } + } + } + + // TODO: remove when dashboards are migrated to OTel SDK auto-emitted jvm.memory.used, jvm.memory.committed, + // and jvm.memory.limit (per-pool, with jvm.memory.pool.name and jvm.memory.type attributes) + private void registerJvmMemoryMetrics() { + metrics.add( + registry.registerLongGauge( + "jvm.memory.heap.used", + "The amount of used heap memory in bytes.", + "By", + () -> new LongWithAttributes(MEMORY_BEAN.getHeapMemoryUsage().getUsed()) + ) + ); + metrics.add( + registry.registerLongGauge( + "jvm.memory.heap.committed", + "The amount of heap memory in bytes that is committed for the JVM to use.", + "By", + () -> new LongWithAttributes(MEMORY_BEAN.getHeapMemoryUsage().getCommitted()) + ) + ); + registerLongGaugeUnlessNegative( + "jvm.memory.heap.max", + "The maximum amount of heap memory in bytes that can be used for memory management.", + "By", + () -> MEMORY_BEAN.getHeapMemoryUsage().getMax() + ); + metrics.add( + registry.registerLongGauge( + "jvm.memory.non_heap.used", + "The amount of used non-heap memory in bytes.", + "By", + () -> new LongWithAttributes(MEMORY_BEAN.getNonHeapMemoryUsage().getUsed()) + ) + ); + metrics.add( + registry.registerLongGauge( + "jvm.memory.non_heap.committed", + "The amount of non-heap memory in bytes that is committed for the JVM to use.", + "By", + () -> new LongWithAttributes(MEMORY_BEAN.getNonHeapMemoryUsage().getCommitted()) + ) + ); + registerLongGaugeUnlessNegative( + "jvm.memory.non_heap.max", + "The maximum amount of non-heap memory in bytes that can be used for memory management.", + "By", + () -> MEMORY_BEAN.getNonHeapMemoryUsage().getMax() + ); + registerPoolGauges("jvm.memory.heap.pool", MemoryType.HEAP); + registerPoolGauges("jvm.memory.non_heap.pool", MemoryType.NON_HEAP); + } + + private void registerPoolGauges(String prefix, MemoryType type) { + List pools = MEMORY_POOL_MX_BEANS.stream().filter(p -> p.getType() == type).toList(); + if (pools.isEmpty()) { + return; + } + metrics.add(registry.registerLongsGauge(prefix + ".used", "The amount of memory in bytes used by this pool.", "By", () -> { + var result = new ArrayList(pools.size()); + for (MemoryPoolMXBean pool : pools) { + result.add(new LongWithAttributes(pool.getUsage().getUsed(), Map.of("name", pool.getName()))); + } + return result; + })); + metrics.add( + registry.registerLongsGauge( + prefix + ".committed", + "The amount of memory in bytes committed for the JVM to use in this pool.", + "By", + () -> { + var result = new ArrayList(pools.size()); + for (MemoryPoolMXBean pool : pools) { + result.add(new LongWithAttributes(pool.getUsage().getCommitted(), Map.of("name", pool.getName()))); + } + return result; + } + ) + ); + metrics.add( + registry.registerLongsGauge( + prefix + ".max", + "The maximum amount of memory in bytes that can be used by this pool.", + "By", + () -> { + var result = new ArrayList(pools.size()); + for (MemoryPoolMXBean pool : pools) { + long max = pool.getUsage().getMax(); + if (max >= 0) { + result.add(new LongWithAttributes(max, Map.of("name", pool.getName()))); + } + } + return result; + } + ) + ); + } + + // TODO: jvm.gc.count and jvm.gc.time can be removed when dashboards are migrated to OTel SDK auto-emitted + // jvm.gc.duration histogram (per-collector, with jvm.gc.name and jvm.gc.action attributes). + // jvm.gc.alloc has no OTel SDK equivalent and must be kept. + private void registerJvmGcMetrics() { + List beans = ManagementFactory.getGarbageCollectorMXBeans(); + if (beans.isEmpty()) { + return; + } + + metrics.add(registry.registerLongsAsyncCounter("jvm.gc.count", "The total number of collections that have occurred.", "1", () -> { + var measurements = new ArrayList(beans.size()); + for (GarbageCollectorMXBean bean : beans) { + long count = bean.getCollectionCount(); + if (count >= 0) { + measurements.add(new LongWithAttributes(count, Map.of("name", bean.getName()))); + } + } + return measurements; + })); + + metrics.add( + registry.registerLongsAsyncCounter( + "jvm.gc.time", + "The approximate accumulated collection elapsed time in milliseconds.", + "ms", + () -> { + var measurements = new ArrayList(beans.size()); + for (GarbageCollectorMXBean bean : beans) { + long timeMs = bean.getCollectionTime(); + if (timeMs >= 0) { + measurements.add(new LongWithAttributes(timeMs, Map.of("name", bean.getName()))); + } + } + return measurements; + } + ) + ); + + metrics.add( + registry.registerLongGauge( + "jvm.gc.alloc", + "An approximation of the total amount of memory, in bytes, allocated in heap memory.", + "By", + ALLOCATED_BYTES_METRICS::readAllocatedBytes + ) + ); + } + + // TODO: remove when dashboards are migrated to OTel SDK auto-emitted jvm.thread.count (ES-14386) + // (with jvm.thread.daemon and jvm.thread.state attributes) + private void registerJvmThreadMetrics() { + metrics.add( + registry.registerLongGauge( + "jvm.thread.count", + "The current number of live threads including both daemon and non-daemon threads.", + "{thread}", + () -> new LongWithAttributes(THREAD_BEAN.getThreadCount()) + ) + ); + } + + // TODO: remove jvm.fd.used / jvm.fd.max once dashboards are migrated to the OTel semantic convention names (ES-14386) + // (jvm.file_descriptor.count / jvm.file_descriptor.limit) + private void registerFileDescriptorMetrics() { + registerLongGaugeUnlessNegative( + "jvm.fd.used", + "The number of opened file descriptors. As previously emitted by APM Agent.", + "{file_descriptor}", + ProcessProbe::getOpenFileDescriptorCount + ); + registerLongGaugeUnlessNegative( + "jvm.fd.max", + "The maximum number of opened file descriptors. As previously emitted by APM Agent.", + "{file_descriptor}", + ProcessProbe::getMaxFileDescriptorCount + ); + registerLongGaugeUnlessNegative( + "jvm.file_descriptor.count", + "The number of opened file descriptors. As per the OTel Semantic Convention.", + "{file_descriptor}", + ProcessProbe::getOpenFileDescriptorCount + ); + registerLongGaugeUnlessNegative( + "jvm.file_descriptor.limit", + "The maximum number of opened file descriptors. As per the OTel Semantic Convention.", + "{file_descriptor}", + ProcessProbe::getMaxFileDescriptorCount + ); + } + + private void registerSystemMemoryMetrics() { + if (OsProbe.getInstance().getTotalPhysicalMemorySize() <= 0) { + return; + } + metrics.add( + registry.registerLongGauge( + "system.memory.actual.free", + "Actual free memory in bytes.", + "By", + () -> new LongWithAttributes(OsProbe.getInstance().getActualFreePhysicalMemorySize()) + ) + ); + metrics.add( + registry.registerLongGauge( + "system.memory.total", + "Total memory.", + "By", + () -> new LongWithAttributes(OsProbe.getInstance().getTotalPhysicalMemorySize()) + ) + ); + registerLongGaugeUnlessNegative( + "system.process.memory.size", + "The total virtual memory the process has.", + "By", + ProcessProbe::getTotalVirtualMemorySize + ); + } + + private void registerCgroupMemoryMetrics() { + metrics.add( + registry.registerLongGauge( + "system.process.cgroup.memory.mem.usage.bytes", + "Memory usage in current cgroup slice.", + "By", + () -> new LongWithAttributes(OsProbe.getInstance().getCgroupMemoryUsageInBytes().orElse(0L)) + ) + ); + metrics.add( + registry.registerLongGauge( + "system.process.cgroup.memory.mem.limit.bytes", + "Memory limit for current cgroup slice.", + "By", + () -> new LongWithAttributes(OsProbe.getInstance().getCgroupMemoryLimitInBytes().orElse(0L)) + ) + ); + } + + // TODO: system.process.cpu.total.norm.pct can be removed when dashboards are migrated to OTel SDK + // auto-emitted jvm.cpu.recent_utilization. system.cpu.total.norm.pct has no OTel SDK equivalent and must be kept. + private void registerSystemCpuMetrics() { + short initialSystemCpu = OsProbe.getSystemCpuPercent(); + if (initialSystemCpu >= 0) { + metrics.add( + registry.registerDoubleGauge( + "system.cpu.total.norm.pct", + "System-wide CPU usage as a ratio.", + "1", + () -> new DoubleWithAttributes(Math.max(0, OsProbe.getSystemCpuPercent() / 100.0)) + ) + ); + } + short initialProcessCpu = ProcessProbe.getProcessCpuPercent(); + if (initialProcessCpu >= 0) { + metrics.add( + registry.registerDoubleGauge( + "system.process.cpu.total.norm.pct", + "Process CPU usage as a ratio.", + "1", + () -> new DoubleWithAttributes(Math.max(0, ProcessProbe.getProcessCpuPercent() / 100.0)) + ) + ); + } + } + + private void registerLongGaugeUnlessNegative(String name, String description, String unit, LongSupplier supplier) { + long initial = supplier.getAsLong(); + if (initial < 0) { + return; + } + metrics.add(registry.registerLongGauge(name, description, unit, () -> new LongWithAttributes(supplier.getAsLong()))); + } + + private static final class AllocatedBytesMetrics { + private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); + private volatile SunThreadInfo sunThreadInfo; + + private AllocatedBytesMetrics() {} + + LongWithAttributes readAllocatedBytes() { + if (sunThreadInfo == null) { + if (SunThreadInfo.INSTANCE.isThreadAllocatedMemorySupported() == false + || SunThreadInfo.INSTANCE.isThreadAllocatedMemoryEnabled() == false) { + return new LongWithAttributes(0); + } + this.sunThreadInfo = SunThreadInfo.INSTANCE; + } + + long total = 0; + for (long allocated : sunThreadInfo.getAllThreadAllocatedBytes(THREAD_MX_BEAN.getAllThreadIds())) { + if (allocated > 0) { + total += allocated; + } + } + return new LongWithAttributes(total); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java b/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java index ff4b92769bc96..0e96538f68113 100644 --- a/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java +++ b/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -105,6 +106,47 @@ public long getFreePhysicalMemorySize() { } } + /** + * Returns the amount of actual free physical memory in bytes. + *

+ * This method accounts for memory that can be reclaimed under pressure, reading {@code MemAvailable} from {@code /proc/meminfo} + * when available, or otherwise falling back to {@code MemFree + Buffers + Cached}. + * When {@code /proc/meminfo} can't be read or parsed, this falls back to {@link #getFreePhysicalMemorySize()}. + */ + public long getActualFreePhysicalMemorySize() { + if (Constants.LINUX == false) { + return getFreePhysicalMemorySize(); + } + try { + List meminfoLines = readProcMeminfo(); + long available = parseMeminfoKbToBytes(meminfoLines, "MemAvailable:"); + if (available >= 0) { + return available; + } + long free = parseMeminfoKbToBytes(meminfoLines, "MemFree:"); + long buffers = parseMeminfoKbToBytes(meminfoLines, "Buffers:"); + long cached = parseMeminfoKbToBytes(meminfoLines, "Cached:"); + if (free >= 0 && buffers >= 0 && cached >= 0) { + return free + buffers + cached; + } + } catch (Exception e) { + logger.warn("exception retrieving actual free physical memory", e); + } + return getFreePhysicalMemorySize(); + } + + private static long parseMeminfoKbToBytes(List meminfoLines, String prefix) { + for (String line : meminfoLines) { + if (line.startsWith(prefix) == false) { + continue; + } + int end = line.lastIndexOf(" kB"); + String number = line.substring(prefix.length(), end).trim(); + return Long.parseLong(number) * 1024; + } + return -1; + } + /** * Returns the total amount of physical memory in bytes. */ @@ -795,6 +837,31 @@ public static OsProbe getInstance() { private static final Logger logger = LogManager.getLogger(OsProbe.class); + public OptionalLong getCgroupMemoryUsageInBytes() { + OsStats.Cgroup cgroup = getCgroup(Constants.LINUX); + return cgroup == null ? OptionalLong.empty() : parseCgroupBytes(cgroup.getMemoryUsageInBytes()); + } + + public OptionalLong getCgroupMemoryLimitInBytes() { + OsStats.Cgroup cgroup = getCgroup(Constants.LINUX); + return cgroup == null ? OptionalLong.empty() : parseCgroupBytes(cgroup.getMemoryLimitInBytes()); + } + + private static OptionalLong parseCgroupBytes(String value) { + if (value == null || "max".equals(value)) { + return OptionalLong.empty(); + } + try { + BigInteger bigInt = new BigInteger(value); + if (bigInt.signum() < 0) { + return OptionalLong.empty(); + } + return OptionalLong.of(bigInt.longValueExact()); + } catch (NumberFormatException | ArithmeticException e) { + return OptionalLong.empty(); + } + } + OsInfo osInfo(long refreshInterval, Processors processors) throws IOException { return new OsInfo( refreshInterval, @@ -875,6 +942,9 @@ List readOsRelease() throws IOException { @SuppressForbidden(reason = "access /proc/meminfo") List readProcMeminfo() throws IOException { final List lines; + if (Constants.LINUX == false) { + return Collections.emptyList(); + } if (Files.exists(PathUtils.get("/proc/meminfo"))) { lines = Files.readAllLines(PathUtils.get("/proc/meminfo")); assert lines != null && lines.isEmpty() == false; @@ -889,28 +959,7 @@ List readProcMeminfo() throws IOException { */ long getTotalMemFromProcMeminfo() throws IOException { List meminfoLines = readProcMeminfo(); - final List memTotalLines = meminfoLines.stream().filter(line -> line.startsWith("MemTotal")).toList(); - assert memTotalLines.size() <= 1 : memTotalLines; - if (memTotalLines.size() == 1) { - final String memTotalLine = memTotalLines.get(0); - int beginIdx = memTotalLine.indexOf("MemTotal:"); - int endIdx = memTotalLine.lastIndexOf(" kB"); - if (beginIdx + 9 < endIdx) { - final String memTotalString = memTotalLine.substring(beginIdx + 9, endIdx).trim(); - try { - long memTotalInKb = Long.parseLong(memTotalString); - return memTotalInKb * 1024; - } catch (NumberFormatException e) { - logger.warn("Unable to retrieve total memory from meminfo line [" + memTotalLine + "]"); - return 0; - } - } else { - logger.warn("Unable to retrieve total memory from meminfo line [" + memTotalLine + "]"); - return 0; - } - } else { - return 0; - } + return Math.max(0, parseMeminfoKbToBytes(meminfoLines, "MemTotal:")); } boolean isDebian8() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 3bdc3f4c38e3d..d47c766d7d5da 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -65,6 +65,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.metrics.IndicesMetrics; import org.elasticsearch.monitor.metrics.NodeMetrics; +import org.elasticsearch.monitor.metrics.SystemMetrics; import org.elasticsearch.node.internal.TerminationHandler; import org.elasticsearch.plugins.ClusterCoordinationPlugin; import org.elasticsearch.plugins.ClusterPlugin; @@ -289,6 +290,7 @@ public Node start() throws NodeValidationException { injector.getInstance(FsHealthService.class).start(); injector.getInstance(NodeMetrics.class).start(); injector.getInstance(IndicesMetrics.class).start(); + injector.getInstance(SystemMetrics.class).start(); injector.getInstance(HealthPeriodicLogger.class).start(); nodeService.getMonitorService().start(); @@ -487,6 +489,7 @@ private void stop() { stopIfStarted(TransportService.class); stopIfStarted(NodeMetrics.class); stopIfStarted(IndicesMetrics.class); + stopIfStarted(SystemMetrics.class); pluginLifecycleComponents.forEach(Node::stopIfStarted); // we should stop this last since it waits for resources to get released @@ -558,6 +561,7 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(TransportService.class)); toClose.add(injector.getInstance(NodeMetrics.class)); toClose.add(injector.getInstance(IndicesMetrics.class)); + toClose.add(injector.getInstance(SystemMetrics.class)); if (ReadinessService.enabled(environment)) { toClose.add(injector.getInstance(ReadinessService.class)); } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index d5bdec6031bcf..92c629cbbfa78 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -160,6 +160,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.metrics.IndicesMetrics; import org.elasticsearch.monitor.metrics.NodeMetrics; +import org.elasticsearch.monitor.metrics.SystemMetrics; import org.elasticsearch.node.internal.TerminationHandler; import org.elasticsearch.node.internal.TerminationHandlerProvider; import org.elasticsearch.persistent.PersistentTasksClusterService; @@ -1259,6 +1260,7 @@ public void sendRequest( final TimeValue metricsInterval = settings.getAsTime("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(10)); final NodeMetrics nodeMetrics = new NodeMetrics(telemetryProvider.getMeterRegistry(), nodeService, metricsInterval); final IndicesMetrics indicesMetrics = new IndicesMetrics(telemetryProvider.getMeterRegistry(), indicesService, metricsInterval); + final SystemMetrics systemMetrics = new SystemMetrics(telemetryProvider.getMeterRegistry()); OnlinePrewarmingService onlinePrewarmingService = pluginsService.loadSingletonServiceProvider( OnlinePrewarmingServiceProvider.class, @@ -1364,6 +1366,7 @@ public void sendRequest( b.bind(TransportService.class).toInstance(transportService); b.bind(NodeMetrics.class).toInstance(nodeMetrics); b.bind(IndicesMetrics.class).toInstance(indicesMetrics); + b.bind(SystemMetrics.class).toInstance(systemMetrics); b.bind(NetworkService.class).toInstance(networkService); b.bind(IndexMetadataVerifier.class).toInstance(indexMetadataVerifier); b.bind(ClusterInfoService.class).toInstance(clusterInfoService); diff --git a/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java b/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java index 5fc6b39f438a6..3ac6184669abc 100644 --- a/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java +++ b/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java @@ -14,6 +14,8 @@ public interface TelemetryProvider { + String OTEL_METRICS_ENABLED_SYSTEM_PROPERTY = "telemetry.otel.metrics.enabled"; + Tracer getTracer(); MeterRegistry getMeterRegistry(); diff --git a/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java b/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java index 3b259497d066d..0995eb51f3802 100644 --- a/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java +++ b/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java @@ -291,7 +291,7 @@ public void testGetTotalMemFromProcMeminfo() throws Exception { "Inactive: 8130280 kB" ); probe = buildStubOsProbe(cgroupsVersion, "", List.of(), meminfoLines); - assertThat(probe.getTotalMemFromProcMeminfo(), equalTo(0L)); + expectThrows(NumberFormatException.class, probe::getTotalMemFromProcMeminfo); // MemTotal line with invalid unit meminfoLines = Arrays.asList( @@ -305,7 +305,7 @@ public void testGetTotalMemFromProcMeminfo() throws Exception { "Inactive: 8130280 kB" ); probe = buildStubOsProbe(cgroupsVersion, "", List.of(), meminfoLines); - assertThat(probe.getTotalMemFromProcMeminfo(), equalTo(0L)); + expectThrows(StringIndexOutOfBoundsException.class, probe::getTotalMemFromProcMeminfo); // MemTotal line with random valid value long memTotalInKb = randomLongBetween(1, Long.MAX_VALUE / 1024L); @@ -323,6 +323,23 @@ public void testGetTotalMemFromProcMeminfo() throws Exception { assertThat(probe.getTotalMemFromProcMeminfo(), equalTo(memTotalInKb * 1024L)); } + public void testGetActualFreePhysicalMemory() { + assumeTrue("meminfo parsing is Linux-specific", Constants.LINUX); + int cgroupsVersion = 1; + var meminfoLines = Arrays.asList( + "MemFree: 8467692 kB", + "MemAvailable: 39646240 kB", + "Buffers: 4699504 kB", + "Cached: 23290380 kB" + ); + OsProbe probe = buildStubOsProbe(cgroupsVersion, "", List.of(), meminfoLines); + assertThat(probe.getActualFreePhysicalMemorySize(), equalTo(39646240 * 1024L)); + + meminfoLines = Arrays.asList("MemFree: 10 kB", "Buffers: 20 kB", "Cached: 30 kB"); + probe = buildStubOsProbe(cgroupsVersion, "", List.of(), meminfoLines); + assertThat(probe.getActualFreePhysicalMemorySize(), equalTo((10 + 20 + 30) * 1024L)); + } + public void testTotalMemoryOverride() { assertThat(OsProbe.getTotalMemoryOverride("123456789"), is(123456789L)); assertThat(OsProbe.getTotalMemoryOverride("123456789123456789"), is(123456789123456789L)); diff --git a/test/external-modules/apm-integration/build.gradle b/test/external-modules/apm-integration/build.gradle index 63e149490a9a4..74a9993c16bdf 100644 --- a/test/external-modules/apm-integration/build.gradle +++ b/test/external-modules/apm-integration/build.gradle @@ -31,4 +31,5 @@ tasks.named('javaRestTest').configure { dependencies { clusterModules project(':modules:apm') implementation project(':libs:logging') + javaRestTestImplementation "io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha" } diff --git a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java index 07141846dde4d..3f1222814ac72 100644 --- a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java +++ b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java @@ -9,8 +9,13 @@ package org.elasticsearch.test.apmintegration; +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.elasticsearch.client.Request; import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.MutableSettingsProvider; +import org.elasticsearch.test.cluster.MutableSystemPropertyProvider; import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentParser; @@ -18,13 +23,14 @@ import org.elasticsearch.xcontent.spi.XContentProvider; import org.hamcrest.Matcher; import org.hamcrest.StringDescription; +import org.junit.Before; import org.junit.ClassRule; -import org.junit.Rule; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,21 +43,35 @@ import static java.util.Map.entry; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class MetricsApmIT extends ESRestTestCase { private static final XContentProvider.FormatProvider XCONTENT = XContentProvider.provider().getJsonXContent(); + private static final MutableSettingsProvider clusterSettings = new MutableSettingsProvider(); + private static final MutableSystemPropertyProvider systemProperties = new MutableSystemPropertyProvider(); + private final boolean withOTel; @ClassRule - public static RecordingApmServer mockApmServer = new RecordingApmServer(); + public static RecordingApmServer recordingApmServer = new RecordingApmServer(); + + public MetricsApmIT(@Name("withOTel") boolean withOTel) { + this.withOTel = withOTel; + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return List.of(new Object[] { true }, new Object[] { false }); + } - @Rule - public ElasticsearchCluster cluster = ElasticsearchCluster.local() + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() .distribution(DistributionType.INTEG_TEST) .module("test-apm-integration") .module("apm") .setting("telemetry.metrics.enabled", "true") - .setting("telemetry.agent.metrics_interval", "1s") - .setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockApmServer.getPort()) + .settings(clusterSettings) + .systemProperties(systemProperties) .build(); @Override @@ -59,6 +79,29 @@ protected String getTestRestCluster() { return cluster.getHttpAddresses(); } + /** + * Restarts the shared test cluster when needed so the parameterized cluster settings and system properties + * for the current test instance take effect. This follows the same pattern used in {@code AbstractNetty4IT}. + */ + @Before + public void maybeRestart() throws IOException { + String current = systemProperties.get(null).get("telemetry.otel.metrics.enabled"); + if (current == null || current.equals(Boolean.toString(withOTel)) == false) { + systemProperties.get(null).put("telemetry.otel.metrics.enabled", String.valueOf(withOTel)); + if (withOTel) { + clusterSettings.get(null).put("telemetry.otel.metrics.interval", "1s"); + clusterSettings.get(null) + .put("telemetry.otel.metrics.endpoint", "http://127.0.0.1:" + recordingApmServer.getPort() + "/v1/metrics"); + } else { + clusterSettings.get(null).put("telemetry.agent.metrics_interval", "1s"); + clusterSettings.get(null).put("telemetry.agent.server_url", "http://127.0.0.1:" + recordingApmServer.getPort()); + } + cluster.restart(false); + closeClients(); + initClient(); + } + } + @SuppressWarnings("unchecked") public void testApmIntegration() throws Exception { Map>> valueAssertions = new HashMap<>( @@ -116,12 +159,12 @@ public void testApmIntegration() throws Exception { }); } - if (valueAssertions.isEmpty()) { + if (valueAssertions.isEmpty() && histogramAssertions.isEmpty()) { finished.countDown(); } }; - mockApmServer.addMessageConsumer(messageConsumer); + recordingApmServer.addMessageConsumer(messageConsumer); client().performRequest(new Request("GET", "/_use_apm_metrics")); @@ -131,6 +174,63 @@ public void testApmIntegration() throws Exception { assertTrue("Timeout when waiting for assertions to complete. Remaining assertions to match: " + remainingAssertions, completed); } + @SuppressWarnings("unchecked") + public void testJvmMetrics() throws Exception { + Map>> valueAssertions = new HashMap<>( + Map.ofEntries( + assertion("system.cpu.total.norm.pct", m -> (Double) m.get("value"), closeTo(0.0, 1.0)), + assertion("system.process.cpu.total.norm.pct", m -> (Double) m.get("value"), closeTo(0.0, 1.0)), + assertion("system.memory.total", m -> ((Number) m.get("value")).longValue(), greaterThan(0L)), + assertion("system.memory.actual.free", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("system.process.memory.size", m -> ((Number) m.get("value")).longValue(), greaterThan(0L)), + assertion("jvm.memory.heap.used", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.memory.heap.committed", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.memory.heap.max", m -> ((Number) m.get("value")).longValue(), greaterThan(0L)), + assertion("jvm.memory.non_heap.used", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.memory.non_heap.committed", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.gc.count", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.gc.time", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.gc.alloc", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.thread.count", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(1L)), + assertion("jvm.fd.used", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.fd.max", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.memory.heap.pool.used", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.memory.heap.pool.committed", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.memory.non_heap.pool.used", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)), + assertion("jvm.memory.non_heap.pool.committed", m -> ((Number) m.get("value")).longValue(), greaterThanOrEqualTo(0L)) + ) + ); + + CountDownLatch finished = new CountDownLatch(1); + + Consumer messageConsumer = (String message) -> { + var apmMessage = parseMap(message); + var metricset = (Map) apmMessage.getOrDefault("metricset", Collections.emptyMap()); + var samples = (Map) metricset.getOrDefault("samples", Collections.emptyMap()); + + samples.forEach((key, value) -> { + var valueAssertion = valueAssertions.get(key); + if (valueAssertion != null) { + var sampleObject = (Map) value; + if (valueAssertion.test(sampleObject)) { + logger.info("{} assertion PASSED", key); + valueAssertions.remove(key); + } + } + }); + + if (valueAssertions.isEmpty()) { + finished.countDown(); + } + }; + + recordingApmServer.addMessageConsumer(messageConsumer); + + var completed = finished.await(30, TimeUnit.SECONDS); + var remaining = valueAssertions.keySet().stream().collect(Collectors.joining(", ")); + assertTrue("Timeout waiting for JVM metrics. Missing: " + remaining, completed); + } + private Map.Entry>> assertion( String sampleKeyName, Function, T> accessor, diff --git a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java index d20f2d08719e7..d9ae659a94d65 100644 --- a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java +++ b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java @@ -9,12 +9,23 @@ package org.elasticsearch.test.apmintegration; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.Strings; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; import org.junit.rules.ExternalResource; import java.io.BufferedReader; @@ -52,16 +63,16 @@ protected void before() throws Throwable { private Thread consumerThread() { return new Thread(() -> { - while (running) { + while (running && Thread.currentThread().isInterrupted() == false) { if (consumer != null) { try { String msg = received.poll(1L, TimeUnit.SECONDS); if (msg != null && msg.isEmpty() == false) { consumer.accept(msg); } - } catch (InterruptedException e) { - throw new RuntimeException(e); + Thread.currentThread().interrupt(); + return; } } } @@ -71,21 +82,30 @@ private Thread consumerThread() { @Override protected void after() { running = false; - server.stop(1); + messageConsumerThread.interrupt(); + if (server != null) { + server.stop(1); + } consumer = null; + try { + messageConsumerThread.join(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } private void handle(HttpExchange exchange) throws IOException { try (exchange) { + String path = exchange.getRequestURI().getPath(); if (running) { - try { - try (InputStream requestBody = exchange.getRequestBody()) { - if (requestBody != null) { - var read = readJsonMessages(requestBody); - received.addAll(read); + try (InputStream requestBody = exchange.getRequestBody()) { + if (requestBody != null) { + if ("/v1/metrics".equals(path)) { + parseOtlpMetrics(requestBody); + } else { + received.addAll(readJsonMessages(requestBody)); } } - } catch (Throwable t) { // The lifetime of HttpServer makes message handling "brittle": we need to start handling and recording received // messages before the test starts running. We should also stop handling them before the test ends (and the test @@ -101,6 +121,70 @@ private void handle(HttpExchange exchange) throws IOException { } } + /** + * Parses OTLP protobuf metrics and normalizes them into the same JSON shape that the APM agent produces. + */ + private void parseOtlpMetrics(InputStream input) throws IOException { + ExportMetricsServiceRequest request = ExportMetricsServiceRequest.parseFrom(input); + for (ResourceMetrics resourceMetrics : request.getResourceMetricsList()) { + for (ScopeMetrics scopeMetrics : resourceMetrics.getScopeMetricsList()) { + String scopeName = scopeMetrics.getScope().getName(); + for (Metric metric : scopeMetrics.getMetricsList()) { + switch (metric.getDataCase()) { + case SUM, GAUGE -> { + var dataPoints = metric.getDataCase() == Metric.DataCase.SUM + ? metric.getSum().getDataPointsList() + : metric.getGauge().getDataPointsList(); + for (NumberDataPoint dp : dataPoints) { + var builder = XContentFactory.jsonBuilder().startObject().startObject("metricset"); + writeTags(builder, scopeName, dp.getAttributesList()); + builder.startObject("samples").startObject(metric.getName()); + switch (dp.getValueCase()) { + case AS_DOUBLE -> builder.field("value", dp.getAsDouble()); + case AS_INT -> builder.field("value", dp.getAsInt()); + } + builder.endObject().endObject(); + received.offer(Strings.toString(builder.endObject().endObject())); + } + } + case HISTOGRAM -> { + for (HistogramDataPoint dp : metric.getHistogram().getDataPointsList()) { + var builder = XContentFactory.jsonBuilder().startObject().startObject("metricset"); + writeTags(builder, scopeName, dp.getAttributesList()); + builder.startObject("samples").startObject(metric.getName()); + builder.field("counts", dp.getBucketCountsList()); + builder.endObject().endObject(); + received.offer(Strings.toString(builder.endObject().endObject())); + } + } + default -> { + var builder = XContentFactory.jsonBuilder().startObject().startObject("metricset"); + writeTags(builder, scopeName, List.of()); + builder.startObject("samples").startObject(metric.getName()).endObject().endObject(); + received.offer(Strings.toString(builder.endObject().endObject())); + } + } + } + } + } + } + + private static void writeTags(XContentBuilder builder, String scopeName, List attributes) throws IOException { + builder.startObject("tags"); + builder.field("otel_instrumentation_scope_name", scopeName); + for (KeyValue kv : attributes) { + switch (kv.getValue().getValueCase()) { + case STRING_VALUE -> builder.field(kv.getKey(), kv.getValue().getStringValue()); + case INT_VALUE -> builder.field(kv.getKey(), kv.getValue().getIntValue()); + case DOUBLE_VALUE -> builder.field(kv.getKey(), kv.getValue().getDoubleValue()); + case BOOL_VALUE -> builder.field(kv.getKey(), kv.getValue().getBoolValue()); + default -> { + } + } + } + builder.endObject(); + } + private List readJsonMessages(InputStream input) { // parse NDJSON return new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)).lines().toList();