Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit package events #9301

Merged
merged 14 commits into from
Oct 11, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ plugins {

dependencies {
implementation(project(":instrumentation:runtime-telemetry:runtime-telemetry-java8:library"))
implementation("io.opentelemetry:opentelemetry-api-events")
implementation("com.google.guava:guava")
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
}

tasks {
test {
jvmArgs("-Dotel.instrumentation.runtime-telemetry.package-emitter.enabled=true")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8;

import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackageChecksum;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackageDescription;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackageNameAndVersion;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackagePath;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackageType;

import com.google.common.util.concurrent.RateLimiter;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.events.EventEmitter;
import io.opentelemetry.api.events.GlobalEventEmitterProvider;
import io.opentelemetry.instrumentation.api.internal.GuardedBy;
import io.opentelemetry.instrumentation.runtimemetrics.java8.internal.JmxRuntimeMetricsUtil;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.CodeSource;
import java.security.ProtectionDomain;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* {@link JarAnalyzer} processes the {@link ProtectionDomain} of each class loaded and emits an
* event with metadata about each distinct JAR location identified.
*/
final class JarAnalyzer {

private static final Logger LOGGER = Logger.getLogger(JarAnalyzer.class.getName());
private static final JarAnalyzer INSTANCE = new JarAnalyzer();

private final Set<URI> seenUris = new HashSet<>();
private final BlockingQueue<URL> toProcess = new LinkedBlockingDeque<>();

/** {@link #handle(ProtectionDomain)} does nothing until {@link #configure(int)} is called. */
private Consumer<ProtectionDomain> handler = unused -> {};

@GuardedBy("this")
private Integer jarsPerSecond;

@GuardedBy("this")
private Worker worker;

private JarAnalyzer() {}

/** Get the {@link JarAnalyzer} singleton. */
public static JarAnalyzer getInstance() {
return INSTANCE;
}

/**
* Configure the {@link JarAnalyzer}. If not called, {@link #handle(ProtectionDomain)} is a noop.
*
* @throws IllegalStateException if called multiple times
*/
public synchronized void configure(int jarsPerSecond) {
if (this.jarsPerSecond != null) {
throw new IllegalStateException("JarAnalyzer has already been configured");
}
this.jarsPerSecond = jarsPerSecond;
this.handler = this::handleInternal;
}

/**
* Install {@link OpenTelemetry} and start processing jars if {@link #configure(int)} was called.
*
* @throws IllegalStateException if called multiple times
*/
public synchronized void maybeInstall(OpenTelemetry unused) {
if (worker != null) {
throw new IllegalStateException("JarAnalyzer has already been installed");
}
if (this.jarsPerSecond == null) {
return;
}
// TODO(jack-berg): Use OpenTelemetry to obtain EventEmitter when event API is stable
EventEmitter eventEmitter =
GlobalEventEmitterProvider.get()
.eventEmitterBuilder(JmxRuntimeMetricsUtil.getInstrumentationName())
.setInstrumentationVersion(JmxRuntimeMetricsUtil.getInstrumentationVersion())
.setEventDomain("jvm")
.build();
this.worker = new Worker(eventEmitter, toProcess, jarsPerSecond);
Thread workerThread =
new DaemonThreadFactory(JarAnalyzer.class.getSimpleName() + "_WorkerThread")
.newThread(worker);
workerThread.start();
}

/**
* Identify the JAR associated with the {@code protectionDomain} and queue it to be processed if
* its the first time we've seen it.
*
* <p>NOTE: does nothing if {@link #configure(int)} has not been called.
*/
void handle(ProtectionDomain protectionDomain) {
this.handler.accept(protectionDomain);
}

private void handleInternal(ProtectionDomain protectionDomain) {
if (protectionDomain == null) {
return;
}
CodeSource codeSource = protectionDomain.getCodeSource();
if (codeSource == null) {
return;
}
URL jarUrl = codeSource.getLocation();
if (jarUrl == null) {
return;
}
URI locationUri;
try {
locationUri = jarUrl.toURI();
} catch (URISyntaxException e) {
LOGGER.log(Level.WARNING, "Unable to get URI for jar URL: " + jarUrl, e);
return;
}

if (!seenUris.add(locationUri)) {
return;
}
if ("jrt".equals(jarUrl.getProtocol())) {
LOGGER.log(Level.FINEST, "Skipping processing jar for java runtime module: " + jarUrl);
return;
}
if (!jarUrl.getFile().endsWith(JarAnalyzerUtil.JAR_EXTENSION)) {
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.log(Level.INFO, "Skipping processing jar with unrecognized code location: " + jarUrl);
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
return;
}

toProcess.add(jarUrl);
}

private static final class Worker implements Runnable {

private final EventEmitter eventEmitter;
private final BlockingQueue<URL> toProcess;
private final RateLimiter rateLimiter;

private Worker(EventEmitter eventEmitter, BlockingQueue<URL> toProcess, int jarsPerSecond) {
this.eventEmitter = eventEmitter;
this.toProcess = toProcess;
this.rateLimiter = RateLimiter.create(jarsPerSecond);
}

/**
* Continuously poll the {@link #toProcess} for JAR {@link URL}s, and process each wit {@link
* #processUrl(URL)}.
*/
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
URL jarUrl = null;
try {
jarUrl = toProcess.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (jarUrl == null) {
continue;
}
rateLimiter.acquire();
// TODO(jack-berg): add ability to optionally re-process urls periodically to re-emit events
processUrl(jarUrl);
}
LOGGER.warning("JarAnalyzer stopped");
}

/**
* Process the {@code jarUrl}, extracting metadata from it and emitting an event with the
* content.
*/
private void processUrl(URL jarUrl) {
AttributesBuilder builder = Attributes.builder();

addPackageType(builder);

try {
addPackageChecksum(builder, jarUrl);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Error adding package checksum for jar URL: " + jarUrl, e);
}

try {
addPackagePath(builder, jarUrl);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Error adding package path jar URL: " + jarUrl, e);
}

try {
addPackageDescription(builder, jarUrl);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Error adding package description for jar URL: " + jarUrl, e);
}

try {
addPackageNameAndVersion(builder, jarUrl);
} catch (Exception e) {
LOGGER.log(
Level.WARNING, "Error adding package name and version for jar URL: " + jarUrl, e);
}

eventEmitter.emit("info", builder.build());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8;

import com.google.auto.service.AutoService;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.javaagent.bootstrap.InstrumentationHolder;
import io.opentelemetry.javaagent.extension.AgentListener;
import io.opentelemetry.javaagent.tooling.AgentExtension;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import java.lang.instrument.ClassFileTransformer;
import java.lang.instrument.Instrumentation;
import java.security.ProtectionDomain;
import net.bytebuddy.agent.builder.AgentBuilder;

/** Installs the {@link JarAnalyzer}. */
@AutoService({AgentExtension.class, AgentListener.class})
public class JarAnalyzerInstaller implements AgentExtension, AgentListener {

@Override
public void afterAgent(AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) {
JarAnalyzer.getInstance().maybeInstall(autoConfiguredOpenTelemetrySdk.getOpenTelemetrySdk());
}

@Override
@CanIgnoreReturnValue
public AgentBuilder extend(AgentBuilder agentBuilder, ConfigProperties config) {
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
boolean enabled =
config.getBoolean("otel.instrumentation.runtime-telemetry.package-emitter.enabled", false);
if (!enabled) {
return agentBuilder;
}
JarAnalyzer jarAnalyzer = JarAnalyzer.getInstance();
int jarsPerSecond =
config.getInt("otel.instrumentation.runtime-telemetry.package-emitter.jars-per-second", 10);
jarAnalyzer.configure(jarsPerSecond);
Instrumentation inst = InstrumentationHolder.getInstrumentation();
if (inst == null) {
return agentBuilder;
}
inst.addTransformer(
new ClassFileTransformer() {
@Override
public byte[] transform(
ClassLoader loader,
String className,
Class<?> classBeingRedefined,
ProtectionDomain protectionDomain,
byte[] classfileBuffer) {
jarAnalyzer.handle(protectionDomain);
return null;
}
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could use some advice on whether this is the best way to get a hook to inspect every class that's loaded.

The idea is basically:

  • For each class, determine the URL of the containing JAR
  • Add each unique JAR to a processing queue
  • Process each JAR by finding / extracting various metadata and emitting an event with event.domain=jvm, event.name=info
  • If disabled, dont add the ClassFileTransformer, and don't start the processing thread

return agentBuilder;
}

@Override
public String extensionName() {
return "jar-analyzer";
}
}
Loading