Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit package events #9301

Merged
merged 14 commits into from
Oct 11, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ plugins {

dependencies {
implementation(project(":instrumentation:runtime-telemetry:runtime-telemetry-java8:library"))
implementation("io.opentelemetry:opentelemetry-api-events")

compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
}

tasks {
test {
jvmArgs("-Dotel.instrumentation.runtime-telemetry.package-emitter.enabled=true")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8;

import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackageChecksum;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackageDescription;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackageNameAndVersion;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackagePath;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarAnalyzerUtil.addPackageType;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.events.EventEmitter;
import io.opentelemetry.api.events.GlobalEventEmitterProvider;
import io.opentelemetry.instrumentation.api.internal.GuardedBy;
import io.opentelemetry.instrumentation.runtimemetrics.java8.internal.JmxRuntimeMetricsUtil;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.CodeSource;
import java.security.ProtectionDomain;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* {@link JarAnalyzer} processes the {@link ProtectionDomain} of each class loaded and emits an
* event with metadata about each distinct archive location identified.
*/
final class JarAnalyzer {

private static final Logger logger = Logger.getLogger(JarAnalyzer.class.getName());

private static final JarAnalyzer INSTANCE = new JarAnalyzer();
private static final String JAR_EXTENSION = ".jar";
private static final String WAR_EXTENSION = ".war";
private static final String EVENT_DOMAIN_PACKAGE = "package";
private static final String EVENT_NAME_INFO = "info";

private final Set<URI> seenUris = new HashSet<>();
private final BlockingQueue<URL> toProcess = new LinkedBlockingDeque<>();

/** {@link #handle(ProtectionDomain)} does nothing until {@link #configure(int)} is called. */
private Consumer<ProtectionDomain> handler = unused -> {};

@GuardedBy("this")
private Integer jarsPerSecond;

@GuardedBy("this")
private Worker worker;

private JarAnalyzer() {}

/** Get the {@link JarAnalyzer} singleton. */
public static JarAnalyzer getInstance() {
return INSTANCE;
}

/**
* Configure the {@link JarAnalyzer}. If not called, {@link #handle(ProtectionDomain)} is a noop.
*
* @throws IllegalStateException if called multiple times
*/
public synchronized void configure(int jarsPerSecond) {
if (this.jarsPerSecond != null) {
throw new IllegalStateException("JarAnalyzer has already been configured");
}
this.jarsPerSecond = jarsPerSecond;
this.handler = this::handleInternal;
}

/**
* Install {@link OpenTelemetry} and start processing archives if {@link #configure(int)} was
* called.
*
* @throws IllegalStateException if called multiple times
*/
public synchronized void maybeInstall(OpenTelemetry unused) {
if (worker != null) {
throw new IllegalStateException("JarAnalyzer has already been installed");
}
if (this.jarsPerSecond == null) {
return;
}
// TODO(jack-berg): Use OpenTelemetry to obtain EventEmitter when event API is stable
EventEmitter eventEmitter =
GlobalEventEmitterProvider.get()
.eventEmitterBuilder(JmxRuntimeMetricsUtil.getInstrumentationName())
.setInstrumentationVersion(JmxRuntimeMetricsUtil.getInstrumentationVersion())
.setEventDomain(EVENT_DOMAIN_PACKAGE)
.build();
this.worker = new Worker(eventEmitter, toProcess, jarsPerSecond);
Thread workerThread =
new DaemonThreadFactory(JarAnalyzer.class.getSimpleName() + "_WorkerThread")
.newThread(worker);
workerThread.start();
}

/**
* Identify the archive (JAR or WAR) associated with the {@code protectionDomain} and queue it to
* be processed if its the first time we've seen it.
*
* <p>NOTE: does nothing if {@link #configure(int)} has not been called.
*/
void handle(ProtectionDomain protectionDomain) {
this.handler.accept(protectionDomain);
}

private void handleInternal(ProtectionDomain protectionDomain) {
if (protectionDomain == null) {
return;
}
CodeSource codeSource = protectionDomain.getCodeSource();
if (codeSource == null) {
return;
}
URL archiveUrl = codeSource.getLocation();
if (archiveUrl == null) {
return;
}
URI locationUri;
try {
locationUri = archiveUrl.toURI();
} catch (URISyntaxException e) {
logger.log(Level.WARNING, "Unable to get URI for code location URL: " + archiveUrl, e);
return;
}

if (!seenUris.add(locationUri)) {
return;
}
if ("jrt".equals(archiveUrl.getProtocol())) {
logger.log(Level.FINEST, "Skipping processing for java runtime module: " + archiveUrl);
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
return;
}
String file = archiveUrl.getFile();
if (file.endsWith("/")) {
logger.log(Level.FINEST, "Skipping processing non-archive code location: " + archiveUrl);
return;
}
if (!file.endsWith(JAR_EXTENSION) && !file.endsWith(WAR_EXTENSION)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about ear? Some people might still use that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh didn't know about ear. Let me see if I can't get the test module to build an .ear so I can confirm the logic works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ear isn't usually an issue because app servers will extract them during the deploy. The exception is jboss that uses a custom url protocol that can represent nested archives. Similarly war files are almost always extracted to support ServletContext.getRealPath, I think only weblogic may deploy (there is a checkbox in the weblogic admin console) from packaged war, but that won't affect class loading as they extract WEB-INF/lib and also package classes from WEB-INF/classes into a jar.

logger.log(Level.INFO, "Skipping processing unrecognized code location: " + archiveUrl);
return;
}

// Only code locations with .jar and .war extension should make it here
toProcess.add(archiveUrl);
}

private static final class Worker implements Runnable {

private final EventEmitter eventEmitter;
private final BlockingQueue<URL> toProcess;
private final io.opentelemetry.sdk.internal.RateLimiter rateLimiter;

private Worker(EventEmitter eventEmitter, BlockingQueue<URL> toProcess, int jarsPerSecond) {
this.eventEmitter = eventEmitter;
this.toProcess = toProcess;
this.rateLimiter =
new io.opentelemetry.sdk.internal.RateLimiter(
jarsPerSecond, jarsPerSecond, Clock.getDefault());
}

/**
* Continuously poll the {@link #toProcess} for archive {@link URL}s, and process each wit
* {@link #processUrl(EventEmitter, URL)}.
*/
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
URL archiveUrl = null;
try {
if (!rateLimiter.trySpend(1.0)) {
Thread.sleep(100);
continue;
}
archiveUrl = toProcess.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (archiveUrl == null) {
continue;
}
// TODO(jack-berg): add ability to optionally re-process urls periodically to re-emit events
processUrl(eventEmitter, archiveUrl);
}
logger.warning("JarAnalyzer stopped");
}
}

/**
* Process the {@code archiveUrl}, extracting metadata from it and emitting an event with the
* content.
*/
static void processUrl(EventEmitter eventEmitter, URL archiveUrl) {
AttributesBuilder builder = Attributes.builder();

try {
addPackageType(builder, archiveUrl);
} catch (Exception e) {
logger.log(Level.WARNING, "Error adding package type for archive URL: " + archiveUrl, e);
}

try {
addPackageChecksum(builder, archiveUrl);
} catch (Exception e) {
logger.log(Level.WARNING, "Error adding package checksum for archive URL: " + archiveUrl, e);
}

try {
addPackagePath(builder, archiveUrl);
} catch (Exception e) {
logger.log(Level.WARNING, "Error adding package path archive URL: " + archiveUrl, e);
}

try {
addPackageDescription(builder, archiveUrl);
} catch (Exception e) {
logger.log(
Level.WARNING, "Error adding package description for archive URL: " + archiveUrl, e);
}

try {
addPackageNameAndVersion(builder, archiveUrl);
} catch (Exception e) {
logger.log(
Level.WARNING, "Error adding package name and version for archive URL: " + archiveUrl, e);
}

eventEmitter.emit(EVENT_NAME_INFO, builder.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8;

import com.google.auto.service.AutoService;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.javaagent.bootstrap.InstrumentationHolder;
import io.opentelemetry.javaagent.extension.AgentListener;
import io.opentelemetry.javaagent.tooling.AgentExtension;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import java.lang.instrument.ClassFileTransformer;
import java.lang.instrument.Instrumentation;
import java.security.ProtectionDomain;
import net.bytebuddy.agent.builder.AgentBuilder;

/** Installs the {@link JarAnalyzer}. */
@AutoService({AgentExtension.class, AgentListener.class})
public class JarAnalyzerInstaller implements AgentExtension, AgentListener {

@Override
public void afterAgent(AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) {
JarAnalyzer.getInstance().maybeInstall(autoConfiguredOpenTelemetrySdk.getOpenTelemetrySdk());
}

@Override
@CanIgnoreReturnValue
public AgentBuilder extend(AgentBuilder agentBuilder, ConfigProperties config) {
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
boolean enabled =
config.getBoolean("otel.instrumentation.runtime-telemetry.package-emitter.enabled", false);
if (!enabled) {
return agentBuilder;
}
JarAnalyzer jarAnalyzer = JarAnalyzer.getInstance();
int jarsPerSecond =
config.getInt("otel.instrumentation.runtime-telemetry.package-emitter.jars-per-second", 10);
jarAnalyzer.configure(jarsPerSecond);
Instrumentation inst = InstrumentationHolder.getInstrumentation();
if (inst == null) {
return agentBuilder;
}
inst.addTransformer(
new ClassFileTransformer() {
@Override
public byte[] transform(
ClassLoader loader,
String className,
Class<?> classBeingRedefined,
ProtectionDomain protectionDomain,
byte[] classfileBuffer) {
jarAnalyzer.handle(protectionDomain);
return null;
}
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could use some advice on whether this is the best way to get a hook to inspect every class that's loaded.

The idea is basically:

  • For each class, determine the URL of the containing JAR
  • Add each unique JAR to a processing queue
  • Process each JAR by finding / extracting various metadata and emitting an event with event.domain=jvm, event.name=info
  • If disabled, dont add the ClassFileTransformer, and don't start the processing thread

return agentBuilder;
}

@Override
public String extensionName() {
return "jar-analyzer";
}
}
Loading