Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.Meter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.telemetry.apm.APMMeterRegistry;

import static org.elasticsearch.telemetry.TelemetryProvider.OTEL_METRICS_ENABLED_SYSTEM_PROPERTY;
Expand All @@ -26,38 +28,93 @@ public class APMMeterService extends AbstractLifecycleComponent {

private static final Logger LOGGER = LogManager.getLogger(APMMeterService.class);

/**
* Time to wait for the APM agent to export telemetry if we don't have access to the settings to check.
*/
public static final TimeValue DEFAULT_AGENT_INTERVAL = TimeValue.timeValueSeconds(10);

private final APMMeterRegistry meterRegistry;
private final MeterSupplier otelMeterSupplier;
private final MeterSupplier noopMeterSupplier;

protected volatile boolean enabled;

public APMMeterService(Settings settings) {
this(settings, createOtelMeterSupplier(settings), () -> OpenTelemetry.noop().getMeter("noop"));
this(settings, createOtelMeterSupplier(settings), new NoOpMeterSupplier());
}

public APMMeterService(Settings settings, MeterSupplier otelMeterSupplier, MeterSupplier noopMeterSupplier) {
this(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.get(settings), otelMeterSupplier, noopMeterSupplier);
}

public APMMeterService(boolean enabled, MeterSupplier otelMeterSupplier, MeterSupplier noopMeterSupplier) {
this.enabled = enabled;
this.enabled = APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.get(settings);
this.otelMeterSupplier = otelMeterSupplier;
this.noopMeterSupplier = noopMeterSupplier;
this.meterRegistry = new APMMeterRegistry(enabled ? otelMeterSupplier.get() : noopMeterSupplier.get());
}

private static MeterSupplier createOtelMeterSupplier(Settings settings) {
if (Booleans.parseBoolean(System.getProperty(OTEL_METRICS_ENABLED_SYSTEM_PROPERTY, "false")) == false) {
return () -> GlobalOpenTelemetry.get().getMeter("elasticsearch");
boolean otelMetricsEnabled = Booleans.parseBoolean(System.getProperty(OTEL_METRICS_ENABLED_SYSTEM_PROPERTY, "false"));
if (otelMetricsEnabled) {
return new OTelSdkMeterSupplier(settings);
} else {
long agentFlushWaitMs = 2 * agentMetricsInterval(settings).millis();
return new MeterSupplier() {
@Override
public Meter get() {
// CONFUSION ALERT: When we do `GlobalOpenTelemetry.get()`, we're actually getting an OpenTelemetry
// object that routes telemetry to the APM agent; that is, we're still using OTel to report telemetry
// from the code, but we're using the APM agent (instead of the OTel SDK) to export it.
// That's why this "else" branch, where otelMetricsEnabled is false, is still using OpenTelemetry.

return GlobalOpenTelemetry.get().getMeter("elasticsearch");
}

@Override
public void attemptFlushMetrics() {
try {
// The agent offers no flush API, so we do a best-effort pause that exceeds
// the agent reporting interval, making it extremely likely that all telemetry
// has been exported.
//
// Note that the first intake request to the APM server can still be delayed beyond this window:
// the APM agent checks for configuration changes only periodically,
// so the setting changes we made during initialization don't take effect immediately.

LOGGER.info("Waiting {} ms for APM agent to flush metrics", agentFlushWaitMs);
Thread.sleep(agentFlushWaitMs);
} catch (InterruptedException e) {
// Flush is best-effort. We can reestablish the interrupt flag and proceed.
Thread.currentThread().interrupt();
}
}
};
}
return new OTelSdkMeterSupplier(settings);
}

private static TimeValue agentMetricsInterval(Settings settings) {
String intervalStr = settings.get("telemetry.agent.metrics_interval");
if (intervalStr != null && intervalStr.isEmpty() == false) {
return TimeValue.parseTimeValue(intervalStr, "telemetry.agent.metrics_interval");
}
return DEFAULT_AGENT_INTERVAL;
}

public APMMeterRegistry getMeterRegistry() {
return meterRegistry;
}

/**
* Export buffered metrics on a best-effort basis.
* <p>
* For OpenTelemetry SDK metrics, pushes buffered data to the exporter. For Elastic APM agent metrics,
* sleeps for {@code 2 * telemetry.agent.metrics_interval} because the agent has no
* programmatic flush; observable export (e.g. first HTTP to {@code telemetry.agent.server_url}) may still
* take substantially longer than this sleep.
*/
public void attemptFlushMetrics() {
if (enabled) {
otelMeterSupplier.attemptFlushMetrics();
}
}

/**
* @see APMAgentSettings#addClusterSettingsListeners(ClusterService, APMTelemetryProvider)
*/
Expand All @@ -81,4 +138,16 @@ protected void doStop() {

@Override
protected void doClose() {}

private static final class NoOpMeterSupplier implements MeterSupplier {
@Override
public Meter get() {
return OpenTelemetry.noop().getMeter("noop");
}

@Override
public void attemptFlushMetrics() {
// No-op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,14 @@ public APMMeterService getMeterService() {
public APMMeterRegistry getMeterRegistry() {
return apmMeterService.getMeterRegistry();
}

@Override
public void attemptFlushMetrics() {
apmMeterService.attemptFlushMetrics();
}

@Override
public void attemptFlushTraces() {
apmTracer.attemptFlushTraces();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@

public interface MeterSupplier extends Supplier<Meter>, AutoCloseable {

/**
* Export any buffered metrics on a best-effort basis.
* <p>
* This defaults to a no-op just to support the fairly widespread practice of using a lambda for this in tests.
*/
default void attemptFlushMetrics() {}

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.core.TimeValue;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.telemetry.TelemetryProvider.OTEL_METRICS_ENABLED_SYSTEM_PROPERTY;

Expand Down Expand Up @@ -91,6 +92,16 @@ private String getAuthorizationHeader() {
return null;
}

@Override
public void attemptFlushMetrics() {
synchronized (mutex) {
if (meterProvider != null) {
// If the timeout expires, this quietly returns, which is ok in this context.
meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
}
}
}

@Override
public void close() {
synchronized (mutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.lucene.util.automaton.MinimizationOperations;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.apm.internal.APMAgentSettings;
Expand All @@ -56,9 +57,15 @@ public class APMTracer extends AbstractLifecycleComponent implements org.elastic

private static final Logger logger = LogManager.getLogger(APMTracer.class);

/** Default interval when agent export timing is unknown; same semantics as APMMeterService. */
private static final TimeValue DEFAULT_AGENT_INTERVAL = TimeValue.timeValueSeconds(10);

/** Holds in-flight span information. */
private final Map<String, Context> spans = ConcurrentCollections.newConcurrentMap();

/** Time to wait in attemptFlushTraces when using the agent (2× export interval). */
private final long agentFlushWaitMs;

private volatile boolean enabled;
private volatile APMServices services;

Expand Down Expand Up @@ -92,6 +99,34 @@ public APMTracer(Settings settings) {
this.filterAutomaton = buildAutomaton(includeNames, excludeNames);
this.labelFilterAutomaton = buildAutomaton(labelFilters, List.of());
this.enabled = APMAgentSettings.TELEMETRY_TRACING_ENABLED_SETTING.get(settings);
this.agentFlushWaitMs = 2 * agentExportIntervalMs(settings);
}

private static long agentExportIntervalMs(Settings settings) {
String intervalStr = settings.get("telemetry.agent.metrics_interval");
if (intervalStr != null && intervalStr.isEmpty() == false) {
try {
return TimeValue.parseTimeValue(intervalStr, "telemetry.agent.metrics_interval").millis();
} catch (Exception e) {
logger.debug("Could not parse telemetry.agent.metrics_interval [{}], using default", intervalStr);
}
}
return DEFAULT_AGENT_INTERVAL.millis();
}

/**
* Ensures buffered traces are exported on a best-effort basis. When using the APM agent (no ES-owned
* tracer provider), this waits for 2× the agent export interval.
*/
public void attemptFlushTraces() {
if (enabled == false) {
return;
}
try {
Thread.sleep(agentFlushWaitMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public void setEnabled(boolean enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,26 @@ public interface TelemetryProvider {

MeterRegistry getMeterRegistry();

TelemetryProvider NOOP = new TelemetryProvider() {
/**
* Ensures buffered metrics are exported. Implementations should flush the meter provider they own
* (e.g. OTel SdkMeterProvider) or wait for the next Elastic APM Java agent export cycle.
* <p>
* When metrics are backed by the Elastic APM agent, there is no flush API: the implementation only waits
* a bounded interval derived from {@code telemetry.agent.metrics_interval}. The first HTTP request to the
* configured APM server can still arrive much later (agent reporter scheduling), so callers that need
* observable export must allow additional wall-clock time beyond this method.
*/
void attemptFlushMetrics();

/**
* Ensures buffered traces are exported. Implementations should flush the tracer provider they own
* (e.g. OTel SdkTracerProvider) or wait for the next agent export cycle.
*/
void attemptFlushTraces();

TelemetryProvider NOOP = new NoopTelemetryProvider();

class NoopTelemetryProvider implements TelemetryProvider {

@Override
public Tracer getTracer() {
Expand All @@ -31,5 +50,11 @@ public Tracer getTracer() {
public MeterRegistry getMeterRegistry() {
return MeterRegistry.NOOP;
}
};

@Override
public void attemptFlushMetrics() {}

@Override
public void attemptFlushTraces() {}
}
}
1 change: 1 addition & 0 deletions test/external-modules/apm-integration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tasks.named("test").configure {
}

tasks.named('javaRestTest').configure {
def buildParams = project.rootProject.extensions.getByName('buildParams')
it.onlyIf("snapshot build") { buildParams.snapshotBuild }
}

Expand Down
Loading
Loading