Skip to content

Commit

Permalink
Emit package events (#9301)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Oct 11, 2023
1 parent ebcaec6 commit b7df46d
Show file tree
Hide file tree
Showing 11 changed files with 830 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ plugins {

dependencies {
implementation(project(":instrumentation:runtime-telemetry:runtime-telemetry-java8:library"))
implementation("io.opentelemetry:opentelemetry-api-events")

compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
}

tasks {
test {
jvmArgs("-Dotel.instrumentation.runtime-telemetry.package-emitter.enabled=true")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8;

import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarDetails.EAR_EXTENSION;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarDetails.JAR_EXTENSION;
import static io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8.JarDetails.WAR_EXTENSION;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.events.EventEmitter;
import io.opentelemetry.api.events.GlobalEventEmitterProvider;
import io.opentelemetry.instrumentation.runtimemetrics.java8.internal.JmxRuntimeMetricsUtil;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import java.io.IOException;
import java.lang.instrument.ClassFileTransformer;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.CodeSource;
import java.security.ProtectionDomain;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* {@link JarAnalyzer} is a {@link ClassFileTransformer} which processes the {@link
* ProtectionDomain} of each class loaded and emits an event with metadata about each distinct
* archive location identified.
*/
final class JarAnalyzer implements ClassFileTransformer {

private static final Logger logger = Logger.getLogger(JarAnalyzer.class.getName());

private static final String EVENT_DOMAIN_PACKAGE = "package";
private static final String EVENT_NAME_INFO = "info";
static final AttributeKey<String> PACKAGE_NAME = AttributeKey.stringKey("package.name");
static final AttributeKey<String> PACKAGE_VERSION = AttributeKey.stringKey("package.version");
static final AttributeKey<String> PACKAGE_TYPE = AttributeKey.stringKey("package.type");
static final AttributeKey<String> PACKAGE_DESCRIPTION =
AttributeKey.stringKey("package.description");
static final AttributeKey<String> PACKAGE_CHECKSUM = AttributeKey.stringKey("package.checksum");
static final AttributeKey<String> PACKAGE_CHECKSUM_ALGORITHM =
AttributeKey.stringKey("package.checksum_algorithm");
static final AttributeKey<String> PACKAGE_PATH = AttributeKey.stringKey("package.path");

private final Set<URI> seenUris = new HashSet<>();
private final BlockingQueue<URL> toProcess = new LinkedBlockingDeque<>();

private JarAnalyzer(OpenTelemetry unused, int jarsPerSecond) {
// TODO(jack-berg): Use OpenTelemetry to obtain EventEmitter when event API is stable
EventEmitter eventEmitter =
GlobalEventEmitterProvider.get()
.eventEmitterBuilder(JmxRuntimeMetricsUtil.getInstrumentationName())
.setInstrumentationVersion(JmxRuntimeMetricsUtil.getInstrumentationVersion())
.setEventDomain(EVENT_DOMAIN_PACKAGE)
.build();
Worker worker = new Worker(eventEmitter, toProcess, jarsPerSecond);
Thread workerThread =
new DaemonThreadFactory(JarAnalyzer.class.getSimpleName() + "_WorkerThread")
.newThread(worker);
workerThread.start();
}

/** Create {@link JarAnalyzer} and start the worker thread. */
public static JarAnalyzer create(OpenTelemetry unused, int jarsPerSecond) {
return new JarAnalyzer(unused, jarsPerSecond);
}

/**
* Identify the archive (JAR or WAR) associated with the {@code protectionDomain} and queue it to
* be processed if its the first time we've seen it.
*/
@Override
public byte[] transform(
ClassLoader loader,
String className,
Class<?> classBeingRedefined,
ProtectionDomain protectionDomain,
byte[] classfileBuffer) {
handle(protectionDomain);
return null;
}

private void handle(ProtectionDomain protectionDomain) {
if (protectionDomain == null) {
return;
}
CodeSource codeSource = protectionDomain.getCodeSource();
if (codeSource == null) {
return;
}
URL archiveUrl = codeSource.getLocation();
if (archiveUrl == null) {
return;
}
URI locationUri;
try {
locationUri = archiveUrl.toURI();
} catch (URISyntaxException e) {
logger.log(Level.WARNING, "Unable to get URI for code location URL: " + archiveUrl, e);
return;
}

if (!seenUris.add(locationUri)) {
return;
}
if ("jrt".equals(archiveUrl.getProtocol())) {
logger.log(Level.FINEST, "Skipping processing for java runtime module: {0}", archiveUrl);
return;
}
String file = archiveUrl.getFile();
if (file.endsWith("/")) {
logger.log(Level.FINEST, "Skipping processing non-archive code location: {0}", archiveUrl);
return;
}
if (!file.endsWith(JAR_EXTENSION)
&& !file.endsWith(WAR_EXTENSION)
&& !file.endsWith(EAR_EXTENSION)) {
logger.log(Level.INFO, "Skipping processing unrecognized code location: {0}", archiveUrl);
return;
}

// Only code locations with .jar and .war extension should make it here
toProcess.add(archiveUrl);
}

private static final class Worker implements Runnable {

private final EventEmitter eventEmitter;
private final BlockingQueue<URL> toProcess;
private final io.opentelemetry.sdk.internal.RateLimiter rateLimiter;

private Worker(EventEmitter eventEmitter, BlockingQueue<URL> toProcess, int jarsPerSecond) {
this.eventEmitter = eventEmitter;
this.toProcess = toProcess;
this.rateLimiter =
new io.opentelemetry.sdk.internal.RateLimiter(
jarsPerSecond, jarsPerSecond, Clock.getDefault());
}

/**
* Continuously poll the {@link #toProcess} for archive {@link URL}s, and process each wit
* {@link #processUrl(EventEmitter, URL)}.
*/
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
URL archiveUrl = null;
try {
if (!rateLimiter.trySpend(1.0)) {
Thread.sleep(100);
continue;
}
archiveUrl = toProcess.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (archiveUrl == null) {
continue;
}
try {
// TODO(jack-berg): add ability to optionally re-process urls periodically to re-emit
// events
processUrl(eventEmitter, archiveUrl);
} catch (Throwable e) {
logger.log(Level.WARNING, "Unexpected error processing archive URL: " + archiveUrl, e);
}
}
logger.warning("JarAnalyzer stopped");
}
}

/**
* Process the {@code archiveUrl}, extracting metadata from it and emitting an event with the
* content.
*/
static void processUrl(EventEmitter eventEmitter, URL archiveUrl) {
JarDetails jarDetails;
try {
jarDetails = JarDetails.forUrl(archiveUrl);
} catch (IOException e) {
logger.log(Level.WARNING, "Error reading package for archive URL: " + archiveUrl, e);
return;
}
AttributesBuilder builder = Attributes.builder();

String packagePath = jarDetails.packagePath();
if (packagePath != null) {
builder.put(PACKAGE_PATH, packagePath);
}

String packageType = jarDetails.packageType();
if (packageType != null) {
builder.put(PACKAGE_TYPE, packageType);
}

String packageName = jarDetails.packageName();
if (packageName != null) {
builder.put(PACKAGE_NAME, packageName);
}

String packageVersion = jarDetails.version();
if (packageVersion != null) {
builder.put(PACKAGE_VERSION, packageVersion);
}

String packageDescription = jarDetails.packageDescription();
if (packageDescription != null) {
builder.put(PACKAGE_DESCRIPTION, packageDescription);
}

String packageChecksum = jarDetails.computeSha1();
builder.put(PACKAGE_CHECKSUM, packageChecksum);
builder.put(PACKAGE_CHECKSUM_ALGORITHM, "SHA1");

eventEmitter.emit(EVENT_NAME_INFO, builder.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.javaagent.runtimemetrics.java8;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.bootstrap.InstrumentationHolder;
import io.opentelemetry.javaagent.tooling.BeforeAgentListener;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.internal.AutoConfigureUtil;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import java.lang.instrument.Instrumentation;

/** Installs the {@link JarAnalyzer}. */
@AutoService(BeforeAgentListener.class)
public class JarAnalyzerInstaller implements BeforeAgentListener {

@Override
public void beforeAgent(AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) {
ConfigProperties config = AutoConfigureUtil.getConfig(autoConfiguredOpenTelemetrySdk);
boolean enabled =
config.getBoolean("otel.instrumentation.runtime-telemetry.package-emitter.enabled", false);
if (!enabled) {
return;
}
Instrumentation inst = InstrumentationHolder.getInstrumentation();
if (inst == null) {
return;
}
int jarsPerSecond =
config.getInt("otel.instrumentation.runtime-telemetry.package-emitter.jars-per-second", 10);
JarAnalyzer jarAnalyzer =
JarAnalyzer.create(autoConfiguredOpenTelemetrySdk.getOpenTelemetrySdk(), jarsPerSecond);
inst.addTransformer(jarAnalyzer);
}
}
Loading

0 comments on commit b7df46d

Please sign in to comment.