diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 2b53fc0adf..a5384b3591 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,9 @@ endif::[] [float] ===== Features +* Overhaul of the `ExecutorService` instrumentation that avoids issues like ``ClassCastException``s - {pull}1206[#1206] +* Support for `ForJoinPool` and `ScheduledExecutorService` (see <>) +* Support for `ExecutorService#invokeAny` and `ExecutorService#invokeAll` [float] ===== Bug fixes diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/ElasticApmAgent.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/ElasticApmAgent.java index e0aa32800b..f244a9924c 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/ElasticApmAgent.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/ElasticApmAgent.java @@ -34,10 +34,10 @@ import co.elastic.apm.agent.bci.bytebuddy.SoftlyReferencingTypePoolCache; import co.elastic.apm.agent.bci.methodmatching.MethodMatcher; import co.elastic.apm.agent.bci.methodmatching.TraceMethodInstrumentation; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.configuration.CoreConfiguration; import co.elastic.apm.agent.impl.ElasticApmTracer; import co.elastic.apm.agent.impl.ElasticApmTracerBuilder; -import co.elastic.apm.agent.matcher.WildcardMatcher; import co.elastic.apm.agent.util.DependencyInjectingServiceLoader; import co.elastic.apm.agent.util.ExecutorUtils; import co.elastic.apm.agent.util.ThreadUtils; @@ -82,9 +82,11 @@ import static co.elastic.apm.agent.bci.ElasticApmInstrumentation.tracer; import static co.elastic.apm.agent.bci.bytebuddy.ClassLoaderNameMatcher.classLoaderWithName; import static co.elastic.apm.agent.bci.bytebuddy.ClassLoaderNameMatcher.isReflectionClassLoader; +import static co.elastic.apm.agent.bci.bytebuddy.CustomElementMatchers.anyMatch; import static net.bytebuddy.asm.Advice.ExceptionHandler.Default.PRINTING; import static net.bytebuddy.matcher.ElementMatchers.any; import static net.bytebuddy.matcher.ElementMatchers.is; +import static net.bytebuddy.matcher.ElementMatchers.isAbstract; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.nameContains; import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith; @@ -104,7 +106,7 @@ public class ElasticApmAgent { @Nullable private static ResettableClassFileTransformer resettableClassFileTransformer; private static final List dynamicClassFileTransformers = new ArrayList<>(); - private static final WeakConcurrentMap, Set>>> dynamicallyInstrumentedClasses = new WeakConcurrentMap.WithInlinedExpunction<>(); + private static final WeakConcurrentMap, Set>>> dynamicallyInstrumentedClasses = WeakMapSupplier.createMap(); @Nullable private static File agentJarFile; @@ -249,7 +251,7 @@ private static AgentBuilder applyAdvice(final ElasticApmTracer tracer, final Age final ElementMatcher.Junction classLoaderMatcher = instrumentation.getClassLoaderMatcher(); final ElementMatcher typeMatcherPreFilter = instrumentation.getTypeMatcherPreFilter(); final ElementMatcher.Junction versionPostFilter = instrumentation.getImplementationVersionPostFilter(); - final ElementMatcher methodMatcher = instrumentation.getMethodMatcher(); + final ElementMatcher methodMatcher = new ElementMatcher.Junction.Conjunction<>(instrumentation.getMethodMatcher(), not(isAbstract())); return agentBuilder .type(new AgentBuilder.RawMatcher() { @Override @@ -411,11 +413,6 @@ private static AgentBuilder getAgentBuilder(final ByteBuddy byteBuddy, final Cor logger.warn("Failed to add ClassFileLocator for the agent jar. Some instrumentations may not work", e); } } - - // Leave these variables here instead of invoking the config methods within the matching methods, otherwise Mockito has trouble with it - final List defaultClassesExcludedFromInstrumentation = coreConfiguration.getDefaultClassesExcludedFromInstrumentation(); - final List classesExcludedFromInstrumentation = coreConfiguration.getClassesExcludedFromInstrumentation(); - return new AgentBuilder.Default(byteBuddy) .with(RedefinitionStrategy.RETRANSFORMATION) // when runtime attaching, only retransform up to 100 classes at once and sleep 100ms in-between as retransformation causes a stop-the-world pause @@ -473,18 +470,8 @@ private static AgentBuilder getAgentBuilder(final ByteBuddy byteBuddy, final Cor .or(nameStartsWith("io.sqreen.")) .or(nameContains("javassist")) .or(nameContains(".asm.")) - .or(new ElementMatcher.Junction.AbstractBase() { - @Override - public boolean matches(TypeDescription target) { - return WildcardMatcher.anyMatch(defaultClassesExcludedFromInstrumentation, target.getName()) != null; - } - }) - .or(new ElementMatcher.Junction.AbstractBase() { - @Override - public boolean matches(TypeDescription target) { - return WildcardMatcher.anyMatch(classesExcludedFromInstrumentation, target.getName()) != null; - } - }) + .or(anyMatch(coreConfiguration.getDefaultClassesExcludedFromInstrumentation())) + .or(anyMatch(coreConfiguration.getClassesExcludedFromInstrumentation())) .disableClassFormatChanges(); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/HelperClassManager.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/HelperClassManager.java index d06a6ac705..20194b54d2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/HelperClassManager.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/HelperClassManager.java @@ -209,6 +209,7 @@ public static class ForAnyClassLoader extends HelperClassManager { private ForAnyClassLoader(ElasticApmTracer tracer, String implementation, String... additionalHelpers) { super(tracer, implementation, additionalHelpers); + // deliberately doesn't use WeakMapSupplier as this class manages the cleanup manually clId2helperMap = new WeakConcurrentMap<>(false); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/bytebuddy/CustomElementMatchers.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/bytebuddy/CustomElementMatchers.java index 510c9bccbb..f074ef1b07 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/bytebuddy/CustomElementMatchers.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/bytebuddy/CustomElementMatchers.java @@ -24,6 +24,7 @@ */ package co.elastic.apm.agent.bci.bytebuddy; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.matcher.AnnotationMatcher; import co.elastic.apm.agent.matcher.WildcardMatcher; import co.elastic.apm.agent.util.Version; @@ -45,6 +46,7 @@ import java.security.CodeSource; import java.security.ProtectionDomain; import java.util.Collection; +import java.util.List; import java.util.jar.JarFile; import java.util.jar.Manifest; @@ -83,7 +85,7 @@ public static ElementMatcher.Junction classLoaderCanLoadClass(final return new ElementMatcher.Junction.AbstractBase() { private final boolean loadableByBootstrapClassLoader = canLoadClass(null, className); - private WeakConcurrentMap cache = new WeakConcurrentMap.WithInlinedExpunction<>(); + private final WeakConcurrentMap cache = WeakMapSupplier.createMap(); @Override public boolean matches(@Nullable ClassLoader target) { @@ -218,6 +220,20 @@ public String toString() { }; } + public static ElementMatcher.Junction anyMatch(final List matchers) { + return new ElementMatcher.Junction.AbstractBase() { + @Override + public boolean matches(NamedElement target) { + return WildcardMatcher.isAnyMatch(matchers, target.getActualName()); + } + + @Override + public String toString() { + return "matches(" + matchers + ")"; + } + }; + } + public static ElementMatcher.Junction annotationMatches(final String annotationWildcard) { return AnnotationMatcher.annotationMatcher(annotationWildcard); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/bytebuddy/SoftlyReferencingTypePoolCache.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/bytebuddy/SoftlyReferencingTypePoolCache.java index 2590df5fda..1ecfd1fa41 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/bytebuddy/SoftlyReferencingTypePoolCache.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/bci/bytebuddy/SoftlyReferencingTypePoolCache.java @@ -53,6 +53,7 @@ public class SoftlyReferencingTypePoolCache extends AgentBuilder.PoolStrategy.Wi /* * Weakly referencing ClassLoaders to avoid class loader leaks * Softly referencing the type pool cache so that it does not cause OOMEs + * deliberately doesn't use WeakMapSupplier as this class manages the cleanup manually */ private final WeakConcurrentMap cacheProviders = new WeakConcurrentMap(false); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/cache/WeakKeySoftValueLoadingCache.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/cache/WeakKeySoftValueLoadingCache.java index ca5a57f861..dcca43444b 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/cache/WeakKeySoftValueLoadingCache.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/cache/WeakKeySoftValueLoadingCache.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,7 @@ */ package co.elastic.apm.agent.cache; +import co.elastic.apm.agent.collections.WeakMapSupplier; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +66,7 @@ public class WeakKeySoftValueLoadingCache { private static final Logger logger = LoggerFactory.getLogger(WeakKeySoftValueLoadingCache.class); - private final WeakConcurrentMap> cache = new WeakConcurrentMap.WithInlinedExpunction<>(); + private final WeakConcurrentMap> cache = WeakMapSupplier.createMap(); private final ValueSupplier valueSupplier; public WeakKeySoftValueLoadingCache(ValueSupplier valueSupplier) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/collections/WeakMapCleaner.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/collections/WeakMapCleaner.java new file mode 100644 index 0000000000..20a5bdf80e --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/collections/WeakMapCleaner.java @@ -0,0 +1,68 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2020 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.collections; + +import co.elastic.apm.agent.context.AbstractLifecycleListener; +import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.util.ExecutorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Regularly calls {@link WeakMapSupplier#expungeStaleEntries()} + */ +public class WeakMapCleaner extends AbstractLifecycleListener implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(WeakMapCleaner.class); + + private final ScheduledThreadPoolExecutor scheduler; + + public WeakMapCleaner() { + this.scheduler = ExecutorUtils.createSingleThreadSchedulingDeamonPool("weak-map-cleaner"); + } + + @Override + public void start(ElasticApmTracer tracer) { + scheduler.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS); + } + + @Override + public void stop() throws Exception { + scheduler.shutdownNow(); + scheduler.awaitTermination(1, TimeUnit.SECONDS); + } + + @Override + public void run() { + try { + WeakMapSupplier.expungeStaleEntries(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } +} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/collections/WeakMapSupplier.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/collections/WeakMapSupplier.java new file mode 100644 index 0000000000..f8b2e53de5 --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/collections/WeakMapSupplier.java @@ -0,0 +1,54 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2020 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.collections; + +import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; +import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentSet; + +/** + * The canonical place to get a new instance of a {@link WeakConcurrentMap}. + * Do not instantiate a {@link WeakConcurrentMap} directly to benefit from the global cleanup of stale entries. + */ +public class WeakMapSupplier { + private static final WeakConcurrentSet> registeredMaps = new WeakConcurrentSet<>(WeakConcurrentSet.Cleaner.INLINE); + + public static WeakConcurrentMap createMap() { + WeakConcurrentMap result = new WeakConcurrentMap<>(false); + registeredMaps.add(result); + return result; + } + + /** + * Calls {@link WeakConcurrentMap#expungeStaleEntries()} on all registered maps, + * causing the entries of already collected keys to be removed. + * Avoids that the maps take unnecessary space for the {@link java.util.Map.Entry}, the {@link java.lang.ref.WeakReference} and the value. + * Failing to call this does not mean the keys cannot be collected. + */ + static void expungeStaleEntries() { + for (WeakConcurrentMap weakMap : registeredMaps) { + weakMap.expungeStaleEntries(); + } + } +} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java index 361143880c..479ab7806f 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java @@ -24,11 +24,10 @@ */ package co.elastic.apm.agent.impl; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.configuration.CoreConfiguration; import co.elastic.apm.agent.configuration.ServiceNameUtil; import co.elastic.apm.agent.context.LifecycleListener; -import co.elastic.apm.agent.impl.async.SpanInScopeCallableWrapper; -import co.elastic.apm.agent.impl.async.SpanInScopeRunnableWrapper; import co.elastic.apm.agent.impl.error.ErrorCapture; import co.elastic.apm.agent.impl.sampling.ProbabilitySampler; import co.elastic.apm.agent.impl.sampling.Sampler; @@ -60,7 +59,6 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadPoolExecutor; @@ -74,14 +72,7 @@ public class ElasticApmTracer { private static final Logger logger = LoggerFactory.getLogger(ElasticApmTracer.class); - /** - * The number of required {@link Runnable} wrappers does not depend on the size of the disruptor - * but rather on the amount of application threads. - * The requirement increases if the application tends to wrap multiple {@link Runnable}s. - */ - private static final int MAX_POOLED_RUNNABLES = 256; - - private static final WeakConcurrentMap serviceNameByClassLoader = new WeakConcurrentMap.WithInlinedExpunction<>(); + private static final WeakConcurrentMap serviceNameByClassLoader = WeakMapSupplier.createMap(); private final ConfigurationRegistry configurationRegistry; private final StacktraceConfiguration stacktraceConfiguration; @@ -90,8 +81,6 @@ public class ElasticApmTracer { private final ObjectPool transactionPool; private final ObjectPool spanPool; private final ObjectPool errorPool; - private final ObjectPool runnableSpanWrapperObjectPool; - private final ObjectPool> callableSpanWrapperObjectPool; private final Reporter reporter; private final ObjectPoolFactory objectPoolFactory; // Maintains a stack of all the activated spans @@ -104,13 +93,6 @@ protected Deque> initialValue() { } }; - private final ThreadLocal allowWrappingOnThread = new ThreadLocal() { - @Override - protected Boolean initialValue() { - return Boolean.TRUE; - } - }; - private final CoreConfiguration coreConfiguration; private final List activationListeners; private final MetricRegistry metricRegistry; @@ -154,9 +136,6 @@ public void onChange(ConfigurationOption configurationOption, Boolean oldValu // we are assuming that we don't need as many errors as spans or transactions errorPool = poolFactory.createErrorPool(maxPooledElements / 2, this); - runnableSpanWrapperObjectPool = poolFactory.createRunnableWrapperPool(MAX_POOLED_RUNNABLES, this); - callableSpanWrapperObjectPool = poolFactory.createCallableWrapperPool(MAX_POOLED_RUNNABLES, this); - sampler = ProbabilitySampler.of(coreConfiguration.getSampleRate().get()); coreConfiguration.getSampleRate().addChangeListener(new ConfigurationOption.ChangeListener() { @Override @@ -302,18 +281,6 @@ private void afterTransactionStart(@Nullable ClassLoader initiatingClassLoader, } } - public void avoidWrappingOnThread() { - allowWrappingOnThread.set(Boolean.FALSE); - } - - public void allowWrappingOnThread() { - allowWrappingOnThread.set(Boolean.TRUE); - } - - public boolean isWrappingAllowedOnThread() { - return allowWrappingOnThread.get() == Boolean.TRUE; - } - public Transaction noopTransaction() { return createTransaction().startNoop(); } @@ -504,28 +471,6 @@ public void recycle(ErrorCapture error) { errorPool.recycle(error); } - public Runnable wrapRunnable(Runnable delegate, AbstractSpan span) { - if (delegate instanceof SpanInScopeRunnableWrapper) { - return delegate; - } - return runnableSpanWrapperObjectPool.createInstance().wrap(delegate, span); - } - - public void recycle(SpanInScopeRunnableWrapper wrapper) { - runnableSpanWrapperObjectPool.recycle(wrapper); - } - - public Callable wrapCallable(Callable delegate, AbstractSpan span) { - if (delegate instanceof SpanInScopeCallableWrapper) { - return delegate; - } - return ((SpanInScopeCallableWrapper) callableSpanWrapperObjectPool.createInstance()).wrap(delegate, span); - } - - public void recycle(SpanInScopeCallableWrapper wrapper) { - callableSpanWrapperObjectPool.recycle(wrapper); - } - /** * Called when the container shuts down. * Cleans up thread pools and other resources. @@ -746,7 +691,7 @@ public void deactivate(AbstractSpan span) { assertIsActive(span, stack.poll()); List activationListeners = getActivationListeners(); for (int i = 0, size = activationListeners.size(); i < size; i++) { - try { + try { // `this` is guaranteed to not be recycled yet as the reference count is only decremented after this method has executed activationListeners.get(i).afterDeactivate(span); } catch (Error e) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeBaseWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeBaseWrapper.java deleted file mode 100644 index 71a11c67c5..0000000000 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeBaseWrapper.java +++ /dev/null @@ -1,78 +0,0 @@ -/*- - * #%L - * Elastic APM Java agent - * %% - * Copyright (C) 2018 - 2020 Elastic and contributors - * %% - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * #L% - */ -package co.elastic.apm.agent.impl.async; - -import co.elastic.apm.agent.impl.ElasticApmTracer; -import co.elastic.apm.agent.impl.transaction.AbstractSpan; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("WeakerAccess") -abstract class SpanInScopeBaseWrapper { - private static final Logger logger = LoggerFactory.getLogger(SpanInScopeCallableWrapper.class); - protected final ElasticApmTracer tracer; - - protected SpanInScopeBaseWrapper(ElasticApmTracer tracer) { - this.tracer = tracer; - } - - protected boolean beforeDelegation(final AbstractSpan localSpan) { - boolean activated = false; - if (localSpan != null) { - try { - if (tracer.getActive() != localSpan) { - // activate only if the corresponding span is not already activated on this thread - localSpan.activate(); - activated = true; - } - } catch (Throwable t) { - try { - logger.error("Unexpected error while activating span", t); - } catch (Throwable ignore) { - } - } - } - return activated; - } - - protected void afterDelegation(final AbstractSpan localSpan, boolean activated) { - try { - if (localSpan != null) { - if (activated) { - localSpan.deactivate(); - } - localSpan.decrementReferences(); - } - doRecycle(); - } catch (Throwable t) { - try { - logger.error("Unexpected error while deactivating or recycling span", t); - } catch (Throwable ignore) { - } - } - } - - protected abstract void doRecycle(); -} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java deleted file mode 100644 index 212ba46d1a..0000000000 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java +++ /dev/null @@ -1,82 +0,0 @@ -/*- - * #%L - * Elastic APM Java agent - * %% - * Copyright (C) 2018 - 2020 Elastic and contributors - * %% - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * #L% - */ -package co.elastic.apm.agent.impl.async; - - -import co.elastic.apm.agent.bci.VisibleForAdvice; -import co.elastic.apm.agent.impl.ElasticApmTracer; -import co.elastic.apm.agent.impl.transaction.AbstractSpan; -import co.elastic.apm.agent.objectpool.Recyclable; - -import javax.annotation.Nullable; -import java.util.concurrent.Callable; - -@VisibleForAdvice -public class SpanInScopeCallableWrapper extends SpanInScopeBaseWrapper implements Callable, Recyclable { - - @Nullable - private volatile Callable delegate; - @Nullable - private volatile AbstractSpan span; - - public SpanInScopeCallableWrapper(ElasticApmTracer tracer) { - super(tracer); - } - - public SpanInScopeCallableWrapper wrap(Callable delegate, AbstractSpan span) { - this.delegate = delegate; - this.span = span; - span.incrementReferences(); - return this; - } - - // Exceptions in the agent may never affect the monitored application - // normally, advices act as the boundary of user and agent code and exceptions are handled via @Advice.OnMethodEnter(suppress = Throwable.class) - // In this case, this class acts as the boundary of user and agent code so we have to do the tedious exception handling here - @Override - public V call() throws Exception { - // minimize volatile reads - AbstractSpan localSpan = span; - boolean activated = beforeDelegation(localSpan); - try { - //noinspection ConstantConditions - return delegate.call(); - // the span may be ended at this point - } finally { - afterDelegation(localSpan, activated); - } - } - - @Override - public void resetState() { - delegate = null; - span = null; - } - - @Override - protected void doRecycle() { - tracer.recycle(this); - } -} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java deleted file mode 100644 index 87591f6001..0000000000 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java +++ /dev/null @@ -1,81 +0,0 @@ -/*- - * #%L - * Elastic APM Java agent - * %% - * Copyright (C) 2018 - 2020 Elastic and contributors - * %% - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * #L% - */ -package co.elastic.apm.agent.impl.async; - - -import co.elastic.apm.agent.bci.VisibleForAdvice; -import co.elastic.apm.agent.impl.ElasticApmTracer; -import co.elastic.apm.agent.impl.transaction.AbstractSpan; -import co.elastic.apm.agent.objectpool.Recyclable; - -import javax.annotation.Nullable; - -@VisibleForAdvice -public class SpanInScopeRunnableWrapper extends SpanInScopeBaseWrapper implements Runnable, Recyclable { - - @Nullable - private volatile Runnable delegate; - @Nullable - private volatile AbstractSpan span; - - public SpanInScopeRunnableWrapper(ElasticApmTracer tracer) { - super(tracer); - } - - public SpanInScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan span) { - this.delegate = delegate; - this.span = span; - span.incrementReferences(); - return this; - } - - // Exceptions in the agent may never affect the monitored application - // normally, advices act as the boundary of user and agent code and exceptions are handled via @Advice.OnMethodEnter(suppress = Throwable.class) - // In this case, this class acts as the boundary of user and agent code so we have to do the tedious exception handling here - @Override - public void run() { - // minimize volatile reads - AbstractSpan localSpan = span; - boolean activated = beforeDelegation(localSpan); - try { - //noinspection ConstantConditions - delegate.run(); - // the span may be ended at this point - } finally { - afterDelegation(localSpan, activated); - } - } - - @Override - public void resetState() { - delegate = null; - span = null; - } - - @Override - protected void doRecycle() { - tracer.recycle(this); - } -} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java index 680e522e2e..c05656a421 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java @@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -466,30 +465,6 @@ public void close() { }; } - /** - * Wraps the provided runnable and makes this {@link AbstractSpan} active in the {@link Runnable#run()} method. - * - *

- * Note: does activates the {@link AbstractSpan} and not only the {@link TraceContext}. - * This should only be used when the span is closed in thread the provided {@link Runnable} is executed in. - *

- */ - public Runnable withActive(Runnable runnable) { - return tracer.wrapRunnable(runnable, this); - } - - /** - * Wraps the provided runnable and makes this {@link AbstractSpan} active in the {@link Runnable#run()} method. - * - *

- * Note: does activates the {@link AbstractSpan} and not only the {@link TraceContext}. - * This should only be used when the span is closed in thread the provided {@link Runnable} is executed in. - *

- */ - public Callable withActive(Callable callable) { - return tracer.wrapCallable(callable, this); - } - /** * Set start timestamp * diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java index aead940f78..52a4371466 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java @@ -24,6 +24,7 @@ */ package co.elastic.apm.agent.impl.transaction; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.configuration.CoreConfiguration; import co.elastic.apm.agent.impl.ElasticApmTracer; import co.elastic.apm.agent.impl.sampling.Sampler; @@ -102,7 +103,7 @@ public class TraceContext implements Recyclable { /** * Helps to reduce allocations by caching {@link WeakReference}s to {@link ClassLoader}s */ - private static final WeakConcurrentMap> classLoaderWeakReferenceCache = new WeakConcurrentMap.WithInlinedExpunction<>(); + private static final WeakConcurrentMap> classLoaderWeakReferenceCache = WeakMapSupplier.createMap(); private static final ChildContextCreator FROM_PARENT_CONTEXT = new ChildContextCreator() { @Override public boolean asChildOf(TraceContext child, TraceContext parent) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/objectpool/ObjectPoolFactory.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/objectpool/ObjectPoolFactory.java index 332b8e3d4b..22b3b73d14 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/objectpool/ObjectPoolFactory.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/objectpool/ObjectPoolFactory.java @@ -25,8 +25,6 @@ package co.elastic.apm.agent.objectpool; import co.elastic.apm.agent.impl.ElasticApmTracer; -import co.elastic.apm.agent.impl.async.SpanInScopeCallableWrapper; -import co.elastic.apm.agent.impl.async.SpanInScopeRunnableWrapper; import co.elastic.apm.agent.impl.error.ErrorCapture; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; @@ -67,22 +65,4 @@ public ErrorCapture createInstance() { } }); } - - public ObjectPool createRunnableWrapperPool(int maxCapacity, final ElasticApmTracer tracer) { - return createRecyclableObjectPool(maxCapacity, new Allocator() { - @Override - public SpanInScopeRunnableWrapper createInstance() { - return new SpanInScopeRunnableWrapper(tracer); - } - }); - } - - public ObjectPool> createCallableWrapperPool(int maxCapacity, final ElasticApmTracer tracer) { - return createRecyclableObjectPool(maxCapacity, new Allocator>() { - @Override - public SpanInScopeCallableWrapper createInstance() { - return new SpanInScopeCallableWrapper(tracer); - } - }); - } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/util/DataStructures.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/util/DataStructures.java deleted file mode 100644 index a5fa7f2cf1..0000000000 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/util/DataStructures.java +++ /dev/null @@ -1,59 +0,0 @@ -/*- - * #%L - * Elastic APM Java agent - * %% - * Copyright (C) 2018 - 2020 Elastic and contributors - * %% - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * #L% - */ -package co.elastic.apm.agent.util; - -import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; -import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DataStructures { - - /** - * Use this utility method for WeakConcurrentMap creation if it is created in the context of a request processing - * thread whose context class loader is the web application ClassLoader. - * This leaks the web application ClassLoader when the application is undeployed/redeployed. - *

- * Tomcat will then stop the thread because it thinks it was created by the web application. - * That means that the map will never be cleared, creating a severe memory leak. - * - * @param map key type - * @param map value type - * @return a new WeakConcurrentMap with a cleaner thread who's context class loader is the system/bootstrap class loader - */ - public static WeakConcurrentMap createWeakConcurrentMapWithCleanerThread() { - WeakConcurrentMap map = new WeakConcurrentMap<>(true); - map.getCleanerThread().setName(ThreadUtils.addElasticApmThreadPrefix(map.getCleanerThread().getName())); - map.getCleanerThread().setContextClassLoader(null); - return map; - } - - public static WeakConcurrentSet createWeakConcurrentSetWithCleanerThread() { - WeakConcurrentSet set = new WeakConcurrentSet<>(WeakConcurrentSet.Cleaner.THREAD); - set.getCleanerThread().setName(ThreadUtils.addElasticApmThreadPrefix(set.getCleanerThread().getName())); - return set; - } - -} diff --git a/apm-agent-core/src/main/resources/META-INF/services/co.elastic.apm.agent.context.LifecycleListener b/apm-agent-core/src/main/resources/META-INF/services/co.elastic.apm.agent.context.LifecycleListener index 2148b1ead3..68a6c7e8d2 100644 --- a/apm-agent-core/src/main/resources/META-INF/services/co.elastic.apm.agent.context.LifecycleListener +++ b/apm-agent-core/src/main/resources/META-INF/services/co.elastic.apm.agent.context.LifecycleListener @@ -6,3 +6,4 @@ co.elastic.apm.agent.metrics.builtin.SystemMetrics co.elastic.apm.agent.metrics.builtin.JvmGcMetrics co.elastic.apm.agent.metrics.builtin.ThreadMetrics co.elastic.apm.agent.impl.circuitbreaker.CircuitBreaker +co.elastic.apm.agent.collections.WeakMapCleaner diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java index 77f090774a..8a34b48d19 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java @@ -381,15 +381,15 @@ void testBreakdown_singleDbSpan_exceedingParent() { void testBreakdown_spanStartedAfterParentEnded() { final Transaction transaction = tracer.startRootTransaction(ConstantSampler.of(true), 0, getClass().getClassLoader()) .withName("test") - .withType("request"); - final Runnable runnable = transaction.withActive(() -> { - final AbstractSpan active = tracer.getActive(); - assertThat(active).isSameAs(transaction); - assertThat(transaction.getTraceContext().getId().isEmpty()).isFalse(); - active.createSpan(20).withType("db").withSubtype("mysql").end(30); - }); + .withType("request") + .activate(); transaction.end(10); - runnable.run(); + + final AbstractSpan active = tracer.getActive(); + assertThat(active).isSameAs(transaction); + assertThat(transaction.getTraceContext().getId().isEmpty()).isFalse(); + active.createSpan(20).withType("db").withSubtype("mysql").end(30); + transaction.deactivate(); reporter.assertRecycledAfterDecrementingReferences(); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/TracerInternalApiUtils.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/TracerInternalApiUtils.java index cf2e4132b2..d01feb02a3 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/TracerInternalApiUtils.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/TracerInternalApiUtils.java @@ -43,4 +43,14 @@ public static void pauseTracer(ElasticApmTracer tracer) { public static void resumeTracer(ElasticApmTracer tracer) { tracer.resume(); } + + public static void runWithoutAssertions(ElasticApmTracer tracer, Runnable runnable) { + boolean assertionsEnabled = tracer.assertionsEnabled; + try { + tracer.assertionsEnabled = false; + runnable.run(); + } finally { + tracer.assertionsEnabled = assertionsEnabled; + } + } } diff --git a/apm-agent-plugins/apm-asynchttpclient-plugin/src/main/java/co/elastic/apm/agent/asynchttpclient/AbstractAsyncHttpClientInstrumentation.java b/apm-agent-plugins/apm-asynchttpclient-plugin/src/main/java/co/elastic/apm/agent/asynchttpclient/AbstractAsyncHttpClientInstrumentation.java index 968a2ca564..6a8c17383d 100644 --- a/apm-agent-plugins/apm-asynchttpclient-plugin/src/main/java/co/elastic/apm/agent/asynchttpclient/AbstractAsyncHttpClientInstrumentation.java +++ b/apm-agent-plugins/apm-asynchttpclient-plugin/src/main/java/co/elastic/apm/agent/asynchttpclient/AbstractAsyncHttpClientInstrumentation.java @@ -28,6 +28,7 @@ import co.elastic.apm.agent.bci.ElasticApmInstrumentation; import co.elastic.apm.agent.bci.HelperClassManager; import co.elastic.apm.agent.bci.VisibleForAdvice; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.http.client.HttpClientHelper; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Span; @@ -63,7 +64,7 @@ public abstract class AbstractAsyncHttpClientInstrumentation extends ElasticApmI public static HelperClassManager> headerSetterManager; @VisibleForAdvice - public static final WeakConcurrentMap, Span> handlerSpanMap = new WeakConcurrentMap.WithInlinedExpunction<>(); + public static final WeakConcurrentMap, Span> handlerSpanMap = WeakMapSupplier.createMap(); @VisibleForAdvice public static final List> ASYNC_HANDLER_INSTRUMENTATIONS = Arrays.>asList( diff --git a/apm-agent-plugins/apm-dubbo-plugin/src/main/java/co/elastic/apm/agent/dubbo/AlibabaResponseFutureInstrumentation.java b/apm-agent-plugins/apm-dubbo-plugin/src/main/java/co/elastic/apm/agent/dubbo/AlibabaResponseFutureInstrumentation.java index d07f49b851..ba0e760992 100644 --- a/apm-agent-plugins/apm-dubbo-plugin/src/main/java/co/elastic/apm/agent/dubbo/AlibabaResponseFutureInstrumentation.java +++ b/apm-agent-plugins/apm-dubbo-plugin/src/main/java/co/elastic/apm/agent/dubbo/AlibabaResponseFutureInstrumentation.java @@ -27,6 +27,7 @@ import co.elastic.apm.agent.bci.ElasticApmAgent; import co.elastic.apm.agent.bci.ElasticApmInstrumentation; import co.elastic.apm.agent.bci.VisibleForAdvice; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import com.alibaba.dubbo.remoting.exchange.ResponseCallback; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; @@ -44,7 +45,7 @@ public class AlibabaResponseFutureInstrumentation extends AbstractAlibabaDubboInstrumentation { @VisibleForAdvice - public static final WeakConcurrentMap> callbackSpanMap = new WeakConcurrentMap.WithInlinedExpunction<>(); + public static final WeakConcurrentMap> callbackSpanMap = WeakMapSupplier.createMap(); @Override public ElementMatcher getTypeMatcher() { diff --git a/apm-agent-plugins/apm-dubbo-plugin/src/main/java/co/elastic/apm/agent/dubbo/helper/AsyncCallbackCreatorImpl.java b/apm-agent-plugins/apm-dubbo-plugin/src/main/java/co/elastic/apm/agent/dubbo/helper/AsyncCallbackCreatorImpl.java index a4ac7a5179..1ac0561044 100644 --- a/apm-agent-plugins/apm-dubbo-plugin/src/main/java/co/elastic/apm/agent/dubbo/helper/AsyncCallbackCreatorImpl.java +++ b/apm-agent-plugins/apm-dubbo-plugin/src/main/java/co/elastic/apm/agent/dubbo/helper/AsyncCallbackCreatorImpl.java @@ -47,7 +47,10 @@ public void accept(Result result, Throwable t) { if (span != null) { try { RpcContext.getContext().remove(DubboTraceHelper.SPAN_KEY); - span.captureException(t).captureException(result.getException()); + span.captureException(t); + if (result != null) { + span.captureException(result.getException()); + } } finally { span.end(); } diff --git a/apm-agent-plugins/apm-dubbo-plugin/src/test/java/co/elastic/apm/agent/dubbo/ApacheDubboInstrumentationTest.java b/apm-agent-plugins/apm-dubbo-plugin/src/test/java/co/elastic/apm/agent/dubbo/ApacheDubboInstrumentationTest.java index c319b3aeaf..1d706ce873 100644 --- a/apm-agent-plugins/apm-dubbo-plugin/src/test/java/co/elastic/apm/agent/dubbo/ApacheDubboInstrumentationTest.java +++ b/apm-agent-plugins/apm-dubbo-plugin/src/test/java/co/elastic/apm/agent/dubbo/ApacheDubboInstrumentationTest.java @@ -37,6 +37,7 @@ import org.apache.dubbo.config.RegistryConfig; import org.apache.dubbo.config.ServiceConfig; import org.apache.dubbo.rpc.RpcContext; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.LinkedList; @@ -149,6 +150,9 @@ public void testAsyncException() throws Exception { } @Test + @Disabled("The new executor instrumentation doesn't wrap Runnables anymore. " + + "In this case, the Runnable is java.util.concurrent.CompletableFuture.AsyncSupply. " + + "Currently, we can't instrument java.* classes in unit tests (this will change with indy plugins).") public void testAsyncByFuture() throws Exception { DubboTestApi dubboTestApi = getDubboTestApi(); String arg = "hello"; @@ -164,6 +168,9 @@ public void testAsyncByFuture() throws Exception { } @Test + @Disabled("The new executor instrumentation doesn't wrap Runnables anymore. " + + "In this case, the Runnable is java.util.concurrent.CompletableFuture.AsyncSupply. " + + "Currently, we can't instrument java.* classes in unit tests (this will change with indy plugins).") public void testAsyncByFutureException() throws Exception { DubboTestApi dubboTestApi = getDubboTestApi(); String arg = "error"; diff --git a/apm-agent-plugins/apm-grpc/apm-grpc-plugin/src/main/java/co/elastic/apm/agent/grpc/helper/GrpcHelperImpl.java b/apm-agent-plugins/apm-grpc/apm-grpc-plugin/src/main/java/co/elastic/apm/agent/grpc/helper/GrpcHelperImpl.java index 1873523d9e..1d92fbedea 100644 --- a/apm-agent-plugins/apm-grpc/apm-grpc-plugin/src/main/java/co/elastic/apm/agent/grpc/helper/GrpcHelperImpl.java +++ b/apm-agent-plugins/apm-grpc/apm-grpc-plugin/src/main/java/co/elastic/apm/agent/grpc/helper/GrpcHelperImpl.java @@ -24,6 +24,7 @@ */ package co.elastic.apm.agent.grpc.helper; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.ElasticApmTracer; import co.elastic.apm.agent.impl.context.Destination; import co.elastic.apm.agent.impl.transaction.AbstractHeaderGetter; @@ -77,19 +78,19 @@ public class GrpcHelperImpl implements GrpcHelper { /** * gRPC header cache used to minimize allocations */ - private static final WeakConcurrentMap.WithInlinedExpunction> headerCache; + private static final WeakConcurrentMap> headerCache; private static final TextHeaderSetter headerSetter; private static final TextHeaderGetter headerGetter; static { - clientCallSpans = new WeakConcurrentMap.WithInlinedExpunction, Span>(); - clientCallListenerSpans = new WeakConcurrentMap.WithInlinedExpunction, Span>(); + clientCallSpans = WeakMapSupplier.createMap(); + clientCallListenerSpans = WeakMapSupplier.createMap(); - serverListenerTransactions = new WeakConcurrentMap.WithInlinedExpunction, Transaction>(); - serverCallTransactions = new WeakConcurrentMap.WithInlinedExpunction, Transaction>(); + serverListenerTransactions = WeakMapSupplier.createMap(); + serverCallTransactions = WeakMapSupplier.createMap(); - headerCache = new WeakConcurrentMap.WithInlinedExpunction>(); + headerCache = WeakMapSupplier.createMap(); headerSetter = new GrpcHeaderSetter(); headerGetter = new GrpcHeaderGetter(); diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java index fa23bc97a2..4704157ae2 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java @@ -26,9 +26,6 @@ import co.elastic.apm.agent.bci.ElasticApmInstrumentation; import co.elastic.apm.agent.bci.VisibleForAdvice; -import co.elastic.apm.agent.impl.transaction.AbstractSpan; -import co.elastic.apm.agent.util.DataStructures; -import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentSet; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.NamedElement; import net.bytebuddy.description.method.MethodDescription; @@ -42,40 +39,39 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static co.elastic.apm.agent.bci.bytebuddy.CustomElementMatchers.isProxy; import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.is; +import static net.bytebuddy.matcher.ElementMatchers.isOverriddenFrom; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.nameContains; +import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith; import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; public abstract class ExecutorInstrumentation extends ElasticApmInstrumentation { - @VisibleForAdvice - public static final WeakConcurrentSet excluded = DataStructures.createWeakConcurrentSetWithCleanerThread(); @VisibleForAdvice public static final Set excludedClasses = new HashSet<>(); static { - // this pool relies on the task to be an instance of org.glassfish.enterprise.concurrent.internal.ManagedFutureTask - // the wrapping is done in org.glassfish.enterprise.concurrent.ManagedExecutorServiceImpl.execute - // so this pool only works when called directly from ManagedExecutorServiceImpl - // excluding this class from instrumentation does not work as it inherits the execute and submit methods - excludedClasses.add("org.glassfish.enterprise.concurrent.internal.ManagedThreadPoolExecutor"); - // Used in Tomcat 7 // Especially the wrapping of org.apache.tomcat.util.net.AprEndpoint$SocketProcessor is problematic // because that is the Runnable for the actual request processor thread. // Wrapping that leaks transactions and spans to other requests. excludedClasses.add("org.apache.tomcat.util.threads.ThreadPoolExecutor"); - - // This pool relies on the task to be an instance of com.pilotfish.eip.server.ntm.transact.StageTransactionRunner - // in its beforeExecute implementation. - excludedClasses.add("com.pilotfish.eip.server.ntm.pool.NTMThreadPool"); } @Override @@ -103,108 +99,148 @@ public Collection getInstrumentationGroupNames() { @VisibleForAdvice public static boolean isExcluded(@Advice.This Executor executor) { - return excluded.contains(executor) || excludedClasses.contains(executor.getClass().getName()); + return excludedClasses.contains(executor.getClass().getName()); } public static class ExecutorRunnableInstrumentation extends ExecutorInstrumentation { - @SuppressWarnings("Duplicates") + @Advice.OnMethodEnter(suppress = Throwable.class) public static void onExecute(@Advice.This Executor thiz, - @Advice.Argument(value = 0, readOnly = false) @Nullable Runnable runnable, - @Advice.Local("original") Runnable original) { - final AbstractSpan active = ExecutorInstrumentation.getActive(); - if (active != null && runnable != null && !isExcluded(thiz) && tracer != null && tracer.isWrappingAllowedOnThread()) { - //noinspection UnusedAssignment - original = runnable; - // Do no discard branches leading to async operations so not to break span references - active.setNonDiscardable(); - runnable = active.withActive(runnable); - tracer.avoidWrappingOnThread(); + @Advice.Argument(value = 0, readOnly = false) @Nullable Runnable runnable) { + if (ExecutorInstrumentation.isExcluded(thiz)) { + return; } + runnable = JavaConcurrent.withContext(runnable, tracer); } - // This advice detects if the Executor can't cope with our wrappers - // If so, it retries without the wrapper and adds it to a list of excluded Executor instances - // which disables context propagation for those - // There is a slight risk that retrying causes a side effect but the more likely scenario is that adding the task to the queue - // fails and noting has been executed yet. - @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Exception.class, repeatOn = Advice.OnNonDefaultValue.class) - public static boolean onError(@Advice.This Executor thiz, - @Nullable @Advice.Thrown Exception exception, - @Nullable @Advice.Argument(value = 0, readOnly = false) Runnable runnable, - @Advice.Local("original") @Nullable Runnable original) { - - try { - if (original != null && (exception instanceof ClassCastException || exception instanceof IllegalArgumentException)) { - // seems like this executor expects a specific subtype of Callable - runnable = original; - // repeat only if submitting a task fails for the first time - return excluded.add(thiz); - } else { - // don't repeat on exceptions which don't seem to be caused by wrapping the runnable - return false; - } - } finally { - if (tracer != null) { - tracer.allowWrappingOnThread(); - } - } + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + private static void onExit(@Nullable @Advice.Thrown Throwable thrown, + @Advice.Argument(value = 0) @Nullable Runnable runnable) { + JavaConcurrent.doFinally(thrown, runnable); } + /** + *

    + *
  • {@link ExecutorService#execute(Runnable)}
  • + *
  • {@link ExecutorService#submit(Runnable)}
  • + *
  • {@link ExecutorService#submit(Runnable, Object)}
  • + *
  • {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)}
  • + *
+ */ @Override public ElementMatcher getMethodMatcher() { return named("execute").and(returns(void.class)).and(takesArguments(Runnable.class)) .or(named("submit").and(returns(Future.class)).and(takesArguments(Runnable.class))) - .or(named("submit").and(returns(Future.class)).and(takesArguments(Runnable.class, Object.class))); + .or(named("submit").and(returns(Future.class)).and(takesArguments(Runnable.class, Object.class))) + .or(named("schedule").and(returns(ScheduledFuture.class)).and(takesArguments(Runnable.class, long.class, TimeUnit.class))); } } public static class ExecutorCallableInstrumentation extends ExecutorInstrumentation { - @SuppressWarnings("Duplicates") + @Advice.OnMethodEnter(suppress = Throwable.class) public static void onSubmit(@Advice.This Executor thiz, - @Advice.Argument(value = 0, readOnly = false) @Nullable Callable callable, - @Advice.Local("original") Callable original) { - final AbstractSpan active = ExecutorInstrumentation.getActive(); - if (active != null && callable != null && !isExcluded(thiz) && tracer != null && tracer.isWrappingAllowedOnThread()) { - original = callable; - // Do no discard branches leading to async operations so not to break span references - active.setNonDiscardable(); - callable = active.withActive(callable); - tracer.avoidWrappingOnThread(); + @Advice.Argument(value = 0, readOnly = false) @Nullable Callable callable) { + if (ExecutorInstrumentation.isExcluded(thiz)) { + return; } + callable = JavaConcurrent.withContext(callable, tracer); } - // This advice detects if the Executor can't cope with our wrappers - // If so, it retries without the wrapper and adds it to a list of excluded Executor instances - // which disables context propagation for those - @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Exception.class, repeatOn = Advice.OnNonDefaultValue.class) - public static boolean onError(@Advice.This Executor thiz, - @Nullable @Advice.Thrown Exception exception, - @Nullable @Advice.Argument(value = 0, readOnly = false) Callable callable, - @Advice.Local("original") Callable original) { - try { - if (exception instanceof ClassCastException || exception instanceof IllegalArgumentException) { - // seems like this executor expects a specific subtype of Callable - callable = original; - // repeat only if submitting a task fails for the first time - return excluded.add(thiz); - } else { - // don't repeat on exceptions which don't seem to be caused by wrapping the runnable - return false; - } - } finally { - if (tracer != null) { - tracer.allowWrappingOnThread(); - } - } + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + private static void onExit(@Nullable @Advice.Thrown Throwable thrown, + @Advice.Argument(value = 0) @Nullable Callable callable) { + JavaConcurrent.doFinally(thrown, callable); } + /** + *
    + *
  • {@link ExecutorService#submit(Callable)}
  • + *
  • {@link ScheduledExecutorService#schedule(Callable, long, TimeUnit)}
  • + *
+ */ @Override public ElementMatcher getMethodMatcher() { - return named("submit").and(returns(Future.class)).and(takesArguments(Callable.class)); + return named("submit").and(returns(Future.class)).and(takesArguments(Callable.class)) + .or(named("schedule").and(returns(ScheduledFuture.class)).and(takesArguments(Callable.class, long.class, TimeUnit.class))); + } + + } + + public static class ExecutorInvokeAnyAllInstrumentation extends ExecutorInstrumentation { + + /** + *
    + *
  • {@link ExecutorService#invokeAll}
  • + *
  • {@link ExecutorService#invokeAny}
  • + *
+ */ + @Override + public ElementMatcher getMethodMatcher() { + return nameStartsWith("invoke") + .and(nameEndsWith("Any").or(nameEndsWith("All"))) + .and(isPublic()) + .and(takesArgument(0, Collection.class)) + .and(isOverriddenFrom(ExecutorService.class)); + } + + @Advice.OnMethodEnter(suppress = Throwable.class) + private static void onEnter(@Advice.This Executor thiz, + @Nullable @Advice.Argument(value = 0, readOnly = false) Collection> callables) { + if (ExecutorInstrumentation.isExcluded(thiz)) { + return; + } + callables = JavaConcurrent.withContext(callables, tracer); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + private static void onExit(@Nullable @Advice.Thrown Throwable thrown, + @Nullable @Advice.Argument(0) Collection> callables) { + JavaConcurrent.doFinally(thrown, callables); + } + + @Override + public Collection getInstrumentationGroupNames() { + return Arrays.asList("concurrent", "executor", "executor-collection"); + } + } + + public static class ForkJoinPoolInstrumentation extends ExecutorInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return hasSuperType(is(ForkJoinPool.class)).and(super.getTypeMatcher()); } + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onExecute(@Advice.This Executor thiz, + @Advice.Argument(value = 0, readOnly = false) @Nullable ForkJoinTask task) { + if (ExecutorInstrumentation.isExcluded(thiz)) { + return; + } + task = JavaConcurrent.withContext(task, tracer); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + private static void onExit(@Nullable @Advice.Thrown Throwable thrown, + @Advice.Argument(value = 0) @Nullable ForkJoinTask task) { + JavaConcurrent.doFinally(thrown, task); + } + + /** + *
    + *
  • {@link ForkJoinPool#execute(ForkJoinTask)}
  • + *
  • {@link ForkJoinPool#submit(ForkJoinTask)}
  • + *
  • {@link ForkJoinPool#invoke(ForkJoinTask)}
  • + *
+ * @return + */ + @Override + public ElementMatcher getMethodMatcher() { + return named("execute").and(returns(void.class)).and(takesArguments(ForkJoinTask.class)) + .or(named("submit").and(returns(ForkJoinTask.class)).and(takesArguments(ForkJoinTask.class))) + .or(named("invoke").and(returns(Object.class)).and(takesArguments(ForkJoinTask.class))); + } } } diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/JavaConcurrent.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/JavaConcurrent.java new file mode 100644 index 0000000000..6745d839ab --- /dev/null +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/JavaConcurrent.java @@ -0,0 +1,220 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2020 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.concurrent; + +import co.elastic.apm.agent.bci.ElasticApmAgent; +import co.elastic.apm.agent.bci.ElasticApmInstrumentation; +import co.elastic.apm.agent.collections.WeakMapSupplier; +import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.impl.transaction.AbstractSpan; +import co.elastic.apm.agent.impl.transaction.TraceContext; +import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ForkJoinTask; + +public class JavaConcurrent { + + private static final WeakConcurrentMap> contextMap = WeakMapSupplier.createMap(); + private static final List> RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION = Collections. + >singletonList(RunnableCallableForkJoinTaskInstrumentation.class); + private static final ThreadLocal needsContext = new ThreadLocal<>(); + + private static void removeContext(Object o) { + AbstractSpan context = contextMap.remove(o); + if (context != null) { + context.decrementReferences(); + } + } + + @Nullable + public static AbstractSpan restoreContext(Object o, @Nullable ElasticApmTracer tracer) { + if (tracer == null) { + return null; + } + // When an Executor executes directly on the current thread we need to enable this thread for context propagation again + needsContext.set(Boolean.TRUE); + AbstractSpan context = contextMap.remove(o); + if (context == null) { + return null; + } + if (tracer.getActive() != context) { + context.activate(); + context.decrementReferences(); + return context; + } else { + context.decrementReferences(); + return null; + } + } + + /** + * Instruments or wraps the provided runnable and makes this {@link AbstractSpan} active in the {@link Runnable#run()} method. + */ + @Nullable + public static Runnable withContext(@Nullable Runnable runnable, @Nullable ElasticApmTracer tracer) { + if (runnable instanceof RunnableLambdaWrapper || runnable == null || tracer == null || needsContext.get() == Boolean.FALSE) { + return runnable; + } + needsContext.set(Boolean.FALSE); + AbstractSpan active = tracer.getActive(); + if (active == null) { + return runnable; + } + if (isLambda(runnable)) { + runnable = new RunnableLambdaWrapper(runnable); + } + ElasticApmAgent.ensureInstrumented(runnable.getClass(), RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION); + contextMap.put(runnable, active); + active.incrementReferences(); + // Do no discard branches leading to async operations so not to break span references + active.setNonDiscardable(); + return runnable; + } + + /** + * Instruments or wraps the provided runnable and makes this {@link AbstractSpan} active in the {@link Runnable#run()} method. + */ + @Nullable + public static Callable withContext(@Nullable Callable callable, @Nullable ElasticApmTracer tracer) { + if (callable instanceof CallableLambdaWrapper || callable == null || tracer == null || needsContext.get() == Boolean.FALSE) { + return callable; + } + needsContext.set(Boolean.FALSE); + AbstractSpan active = tracer.getActive(); + if (active == null) { + return callable; + } + if (isLambda(callable)) { + callable = new CallableLambdaWrapper<>(callable); + } + ElasticApmAgent.ensureInstrumented(callable.getClass(), RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION); + contextMap.put(callable, active); + active.incrementReferences(); + return callable; + } + + @Nullable + public static ForkJoinTask withContext(@Nullable ForkJoinTask task, @Nullable ElasticApmTracer tracer) { + if (task == null || tracer == null || needsContext.get() == Boolean.FALSE) { + return task; + } + needsContext.set(Boolean.FALSE); + AbstractSpan active = tracer.getActive(); + if (active == null) { + return task; + } + ElasticApmAgent.ensureInstrumented(task.getClass(), RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION); + contextMap.put(task, active); + active.incrementReferences(); + return task; + } + + public static void doFinally(@Nullable Throwable thrown, @Nullable Object contextObject) { + needsContext.set(Boolean.TRUE); + if (thrown != null && contextObject != null) { + removeContext(contextObject); + } + } + + public static void doFinally(@Nullable Throwable thrown, @Nullable Collection> callables) { + needsContext.set(Boolean.TRUE); + if (thrown != null && callables != null) { + for (Callable callable : callables) { + removeContext(callable); + } + } + } + + private static boolean isLambda(Object o) { + return o.getClass().getName().indexOf('/') != -1; + } + + @Nullable + public static Collection> withContext(@Nullable Collection> callables, @Nullable ElasticApmTracer tracer) { + if (callables == null || tracer == null) { + return null; + } + if (callables.isEmpty()) { + return callables; + } + final Collection> wrapped; + if (needsWrapping(callables)) { + wrapped = new ArrayList<>(callables.size()); + } else { + wrapped = null; + } + for (Callable callable : callables) { + final Callable potentiallyWrappedCallable = withContext(callable, tracer); + needsContext.set(Boolean.TRUE); + if (wrapped != null) { + wrapped.add(potentiallyWrappedCallable); + } + } + needsContext.set(Boolean.FALSE); + return wrapped != null ? wrapped : callables; + } + + private static boolean needsWrapping(Collection> callables) { + for (Callable callable : callables) { + if (isLambda(callable)) { + return true; + } + } + return false; + } + + public static class RunnableLambdaWrapper implements Runnable { + + private final Runnable delegate; + + public RunnableLambdaWrapper(Runnable delegate) { + this.delegate = delegate; + } + + @Override + public void run() { + delegate.run(); + } + } + + public static class CallableLambdaWrapper implements Callable { + private final Callable delegate; + + public CallableLambdaWrapper(Callable delegate) { + this.delegate = delegate; + } + + @Override + public V call() throws Exception { + return delegate.call(); + } + } +} diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java new file mode 100644 index 0000000000..3f524d9843 --- /dev/null +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java @@ -0,0 +1,88 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2020 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.concurrent; + +import co.elastic.apm.agent.bci.ElasticApmInstrumentation; +import co.elastic.apm.agent.impl.transaction.AbstractSpan; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.ForkJoinTask; + +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.is; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +/** + * Used only within {@link JavaConcurrent#withContext} to + * {@linkplain co.elastic.apm.agent.bci.ElasticApmAgent#ensureInstrumented(Class, Collection) ensure} + * that particular {@link Callable}, {@link Runnable} and {@link ForkJoinTask} classes are instrumented. + */ +public class RunnableCallableForkJoinTaskInstrumentation extends ElasticApmInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return hasSuperType( + is(Runnable.class) + .or(is(Callable.class)) + .or(is(ForkJoinTask.class)) + ); + } + + @Override + public ElementMatcher getMethodMatcher() { + return named("run").and(takesArguments(0)) + .or(named("call").and(takesArguments(0))) + .or(named("exec").and(takesArguments(0).and(returns(boolean.class)))); + } + + @Override + public Collection getInstrumentationGroupNames() { + return Arrays.asList("concurrent", "executor"); + } + + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class) + private static AbstractSpan onEnter(@Advice.This Object thiz) { + return JavaConcurrent.restoreContext(thiz, tracer); + } + + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + private static void onExit(@Advice.Thrown Throwable thrown, + @Nullable @Advice.Enter AbstractSpan span) { + if (span != null) { + span.deactivate(); + } + } +} diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.bci.ElasticApmInstrumentation b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.bci.ElasticApmInstrumentation index f516f6dddf..9f4a511a90 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.bci.ElasticApmInstrumentation +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.bci.ElasticApmInstrumentation @@ -1,2 +1,4 @@ co.elastic.apm.agent.concurrent.ExecutorInstrumentation$ExecutorRunnableInstrumentation co.elastic.apm.agent.concurrent.ExecutorInstrumentation$ExecutorCallableInstrumentation +co.elastic.apm.agent.concurrent.ExecutorInstrumentation$ExecutorInvokeAnyAllInstrumentation +co.elastic.apm.agent.concurrent.ExecutorInstrumentation$ForkJoinPoolInstrumentation diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/CurrentThreadExecutor.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/CurrentThreadExecutor.java new file mode 100644 index 0000000000..d73a99f8bb --- /dev/null +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/CurrentThreadExecutor.java @@ -0,0 +1,34 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2020 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.concurrent; + +import java.util.concurrent.Executor; + +public class CurrentThreadExecutor implements Executor { + @Override + public void execute(Runnable command) { + command.run(); + } +} diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceDoubleWrappingTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceDoubleWrappingTest.java index 636e1b5e8c..55ea7283da 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceDoubleWrappingTest.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceDoubleWrappingTest.java @@ -40,7 +40,7 @@ public class ExecutorServiceDoubleWrappingTest extends AbstractInstrumentationTest { private static final Object TEST_OBJECT = new Object(); - private final RunnableWrapperExecutorService executor = RunnableWrapperExecutorService.wrap(Executors.newSingleThreadExecutor(), tracer); + private final RunnableWrapperExecutorService executor = RunnableWrapperExecutorService.wrap(ExecutorServiceWrapper.wrap(Executors.newSingleThreadExecutor()), tracer); private Transaction transaction; @Before @@ -92,11 +92,7 @@ private void createAsyncSpan() { int numWrappers = 0; StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); for (StackTraceElement stackTraceElement : stackTrace) { - if (stackTraceElement.getClassName().equals("co.elastic.apm.agent.impl.async.SpanInScopeRunnableWrapper") && - stackTraceElement.getMethodName().equals("run")) { - numWrappers++; - } else if (stackTraceElement.getClassName().equals("co.elastic.apm.agent.impl.async.SpanInScopeCallableWrapper") && - stackTraceElement.getMethodName().equals("call")) { + if (stackTraceElement.getClassName().endsWith("LambdaWrapper")) { numWrappers++; } } diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceInstrumentationTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceInstrumentationTest.java index 8926bc45c1..10da15b3f8 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceInstrumentationTest.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceInstrumentationTest.java @@ -25,6 +25,7 @@ package co.elastic.apm.agent.concurrent; import co.elastic.apm.agent.AbstractInstrumentationTest; +import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.After; @@ -34,10 +35,15 @@ import org.junit.runners.Parameterized; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; @@ -46,6 +52,7 @@ public class ExecutorServiceInstrumentationTest extends AbstractInstrumentationTest { private final ExecutorService executor; + private CurrentThreadExecutor currentThreadExecutor; private Transaction transaction; public ExecutorServiceInstrumentationTest(Supplier supplier) { @@ -57,11 +64,12 @@ public static Iterable> data() { return Arrays.asList(() -> ExecutorServiceWrapper.wrap(Executors.newSingleThreadExecutor()), () -> ExecutorServiceWrapper.wrap(Executors.newSingleThreadScheduledExecutor()), () -> ExecutorServiceWrapper.wrap(new ForkJoinPool()), - () -> GlobalEventExecutor.INSTANCE); + () -> ExecutorServiceWrapper.wrap(GlobalEventExecutor.INSTANCE)); } @Before public void setUp() { + currentThreadExecutor = new CurrentThreadExecutor(); transaction = tracer.startRootTransaction(null).withName("Transaction").activate(); } @@ -79,25 +87,25 @@ public void run() { } }).get(); - assertOnlySpanIsChildOfOnlyTransaction(); + assertAsyncSpans(1); } @Test public void testExecutorSubmitRunnableLambda() throws Exception { executor.submit(() -> createAsyncSpan()).get(1, TimeUnit.SECONDS); - assertOnlySpanIsChildOfOnlyTransaction(); + assertAsyncSpans(1); } @Test public void testExecutorExecute() throws Exception { executor.execute(this::createAsyncSpan); - assertOnlySpanIsChildOfOnlyTransaction(); + assertAsyncSpans(1); } @Test public void testExecutorSubmitRunnableWithResult() throws Exception { executor.submit(this::createAsyncSpan, null); - assertOnlySpanIsChildOfOnlyTransaction(); + assertAsyncSpans(1); } @Test @@ -106,10 +114,84 @@ public void testExecutorSubmitCallableMethodReference() throws Exception { createAsyncSpan(); return null; }).get(1, TimeUnit.SECONDS); - assertOnlySpanIsChildOfOnlyTransaction(); + assertAsyncSpans(1); } - private void assertOnlySpanIsChildOfOnlyTransaction() throws InterruptedException { + @Test + public void testInvokeAll() throws Exception { + final List> futures = executor.invokeAll(Arrays.>asList(this::createAsyncSpan, () -> createAsyncSpan(), new Callable() { + @Override + public Span call() throws Exception { + return createAsyncSpan(); + } + })); + futures.forEach(ThrowingConsumer.of(Future::get)); + assertAsyncSpans(3); + } + + @Test + public void testNestedExecutions() throws Exception { + currentThreadExecutor.execute(() -> executor.execute(this::createAsyncSpan)); + assertAsyncSpans(1); + } + + @Test + public void testInvokeAllTimed() throws Exception { + final List> futures = executor.invokeAll(Arrays.asList( + new Callable() { + @Override + public Span call() throws Exception { + return createAsyncSpan(); + } + }, + new Callable() { + @Override + public Span call() throws Exception { + return createAsyncSpan(); + } + }), 1, TimeUnit.SECONDS); + futures.forEach(ThrowingConsumer.of(Future::get)); + assertAsyncSpans(2); + } + + @Test + public void testInvokeAny() throws Exception { + executor.invokeAny(Collections.singletonList(new Callable() { + @Override + public Span call() { + return createAsyncSpan(); + } + })); + assertAsyncSpans(1); + } + + @Test + public void testInvokeAnyTimed() throws Exception { + executor.invokeAny(Collections.>singletonList(new Callable() { + @Override + public Span call() { + return createAsyncSpan(); + } + }), 1, TimeUnit.SECONDS); + assertAsyncSpans(1); + } + + @FunctionalInterface + public interface ThrowingConsumer { + static Consumer of(ThrowingConsumer throwingConsumer) { + return t -> { + try { + throwingConsumer.accept(t); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + } + void accept(T t) throws Exception; + } + + + private void assertAsyncSpans(int expectedSpans) throws InterruptedException { try { // wait for the async operation to end assertThat(reporter.getFirstSpan(1000)).isNotNull(); @@ -117,12 +199,15 @@ private void assertOnlySpanIsChildOfOnlyTransaction() throws InterruptedExceptio transaction.deactivate().end(); } assertThat(reporter.getTransactions()).hasSize(1); - assertThat(reporter.getSpans()).hasSize(1); - assertThat(reporter.getFirstSpan().isChildOf(reporter.getFirstTransaction())).isTrue(); + assertThat(reporter.getSpans()).hasSize(expectedSpans); + reporter.getSpans().forEach(span -> assertThat(span.isChildOf(reporter.getFirstTransaction())).isTrue()); } - private void createAsyncSpan() { + private Span createAsyncSpan() { + assertThat(tracer.getActive()).isNotNull(); assertThat(tracer.getActive().getTraceContext().getId()).isEqualTo(transaction.getTraceContext().getId()); - tracer.getActive().createSpan().withName("Async").end(); + final Span span = tracer.getActive().createSpan().withName("Async"); + span.end(); + return span; } } diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java deleted file mode 100644 index f858a84cc3..0000000000 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/*- - * #%L - * Elastic APM Java agent - * %% - * Copyright (C) 2018 - 2020 Elastic and contributors - * %% - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * #L% - */ -package co.elastic.apm.agent.concurrent; - -import co.elastic.apm.agent.AbstractInstrumentationTest; -import co.elastic.apm.agent.impl.async.SpanInScopeCallableWrapper; -import co.elastic.apm.agent.impl.async.SpanInScopeRunnableWrapper; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -class FailingExecutorInstrumentationTest extends AbstractInstrumentationTest { - - private ExecutorService executor; - private AtomicInteger runCounter; - private AtomicInteger submitWithWrapperCounter; - - @BeforeEach - void setUp() { - executor = ExecutorServiceWrapper.wrap(new ForkJoinPool() { - @Override - public ForkJoinTask submit(Runnable task) { - if (task instanceof SpanInScopeRunnableWrapper) { - submitWithWrapperCounter.incrementAndGet(); - throw new ClassCastException(); - } - return super.submit(task); - } - - @Override - public ForkJoinTask submit(Callable task) { - if (task instanceof SpanInScopeCallableWrapper) { - submitWithWrapperCounter.incrementAndGet(); - throw new IllegalArgumentException(); - } - return super.submit(task); - } - - @Override - public void execute(Runnable task) { - throw new IllegalArgumentException(); - } - - @Override - public ForkJoinTask submit(Runnable task, T result) { - throw new UnsupportedOperationException(); - } - }); - tracer.startRootTransaction(null).activate(); - runCounter = new AtomicInteger(); - submitWithWrapperCounter = new AtomicInteger(); - } - - @AfterEach - void tearDown() { - tracer.currentTransaction().deactivate().end(); - } - - @Test - void testRunnableWrappersNotSupported() throws Exception { - executor.submit(() -> { - assertThat(runCounter.incrementAndGet()).isEqualTo(1); - }).get(); - assertThat(submitWithWrapperCounter.get()).isEqualTo(1); - - assertThat(ExecutorInstrumentation.excluded.contains(executor)).isTrue(); - executor.submit(() -> { - assertThat(runCounter.incrementAndGet()).isEqualTo(2); - }).get(); - assertThat(submitWithWrapperCounter.get()).isEqualTo(1); - } - - @Test - void testCallableWrappersNotSupported() throws Exception { - executor.submit(() -> { - assertThat(runCounter.incrementAndGet()).isEqualTo(1); - return null; - }).get(); - assertThat(submitWithWrapperCounter.get()).isEqualTo(1); - - assertThat(ExecutorInstrumentation.excluded.contains(executor)).isTrue(); - executor.submit(() -> { - assertThat(runCounter.incrementAndGet()).isEqualTo(2); - }).get(); - assertThat(submitWithWrapperCounter.get()).isEqualTo(1); - } - - @Test - void testOnlyRetryOnce() { - assertThatThrownBy(() -> executor.execute(() -> { - })).isInstanceOf(IllegalArgumentException.class); - assertThat(ExecutorInstrumentation.excluded.contains(executor)).isTrue(); - } - - @Test - void testUnrelatedException() { - assertThatThrownBy(() -> executor.submit(() -> { - }, null)).isInstanceOf(UnsupportedOperationException.class); - assertThat(ExecutorInstrumentation.excluded.contains(executor)).isFalse(); - } - -} diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ForkJoinPoolTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ForkJoinPoolTest.java new file mode 100644 index 0000000000..43ea776530 --- /dev/null +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ForkJoinPoolTest.java @@ -0,0 +1,73 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2020 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.concurrent; + +import co.elastic.apm.agent.AbstractInstrumentationTest; +import co.elastic.apm.agent.impl.transaction.AbstractSpan; +import co.elastic.apm.agent.impl.transaction.Transaction; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ForkJoinTask; + +import static co.elastic.apm.agent.concurrent.InstrumentableForkJoinPool.newTask; +import static org.assertj.core.api.Assertions.assertThat; + +public class ForkJoinPoolTest extends AbstractInstrumentationTest { + + private InstrumentableForkJoinPool pool; + private Transaction transaction; + + @BeforeEach + void setUp() { + pool = new InstrumentableForkJoinPool(); + transaction = tracer.startRootTransaction(null).withName("transaction").activate(); + } + + @AfterEach + void tearDown() { + assertThat(tracer.getActive()).isEqualTo(transaction); + transaction.deactivate().end(); + } + + @Test + void testExecute() throws Exception { + final ForkJoinTask> task = newTask(() -> tracer.getActive()); + pool.execute(task); + assertThat(task.get()).isEqualTo(transaction); + } + + @Test + void testSubmit() throws Exception { + assertThat(pool.submit(newTask(() -> tracer.getActive())).get()).isEqualTo(transaction); + } + + @Test + void testInvoke() throws Exception { + assertThat(pool.invoke(newTask(() -> tracer.getActive()))).isEqualTo(transaction); + } + +} diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/InstrumentableForkJoinPool.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/InstrumentableForkJoinPool.java new file mode 100644 index 0000000000..5e5002b7a6 --- /dev/null +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/InstrumentableForkJoinPool.java @@ -0,0 +1,114 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2020 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.concurrent; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.Future; +import java.util.function.Supplier; + +/** + * We can't instrument bootstrap classes, like {@link ForkJoinPool} in unit tests currently. + * This class makes sure the relevat methods can be instrumented. + */ +public class InstrumentableForkJoinPool extends ForkJoinPool { + + public static ForkJoinTask newTask(Supplier supplier) { + return new AdaptedSupplier<>(supplier); + } + + @Override + public T invoke(ForkJoinTask task) { + return super.invoke(task); + } + + @Override + public void execute(ForkJoinTask task) { + super.execute(task); + } + + @Override + public void execute(Runnable task) { + super.execute(task); + } + + @Override + public ForkJoinTask submit(ForkJoinTask task) { + return super.submit(task); + } + + @Override + public ForkJoinTask submit(Callable task) { + return super.submit(task); + } + + @Override + public ForkJoinTask submit(Runnable task, T result) { + return super.submit(task, result); + } + + @Override + public ForkJoinTask submit(Runnable task) { + return super.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) { + return super.invokeAll(tasks); + } + + private static class AdaptedSupplier extends ForkJoinTask implements Runnable { + + private final Supplier supplier; + private V result; + + public AdaptedSupplier(Supplier supplier) { + this.supplier = supplier; + } + + @Override + public V getRawResult() { + return result; + } + + @Override + protected void setRawResult(V value) { + result = value; + } + + @Override + protected boolean exec() { + result = supplier.get(); + return true; + } + + @Override + public final void run() { invoke(); } + + } +} diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ScheduledExecutorServiceTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ScheduledExecutorServiceTest.java new file mode 100644 index 0000000000..8a3de172bb --- /dev/null +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ScheduledExecutorServiceTest.java @@ -0,0 +1,81 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2020 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.concurrent; + +import co.elastic.apm.agent.AbstractInstrumentationTest; +import co.elastic.apm.agent.impl.transaction.AbstractSpan; +import co.elastic.apm.agent.impl.transaction.Transaction; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ScheduledExecutorServiceTest extends AbstractInstrumentationTest { + + private ScheduledThreadPoolExecutor scheduler; + private Transaction transaction; + + @BeforeEach + void setUp() { + transaction = tracer.startRootTransaction(null).withName("transaction").activate(); + scheduler = new ScheduledThreadPoolExecutor(1) { + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return super.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return super.schedule(command, delay, unit); + } + }; + } + + @AfterEach + void tearDown() { + assertThat(tracer.getActive()).isEqualTo(transaction); + transaction.deactivate().end(); + } + + @Test + void testScheduleCallable() throws Exception { + final ScheduledFuture> future = scheduler.schedule(() -> tracer.getActive(), 0, TimeUnit.SECONDS); + assertThat(future.get()).isEqualTo(transaction); + } + + @Test + void testScheduleRunnable() throws Exception { + AtomicReference> ref = new AtomicReference<>(); + scheduler.schedule(() -> ref.set(tracer.getActive()), 0, TimeUnit.SECONDS).get(); + assertThat(ref.get()).isEqualTo(transaction); + } +} diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ScopeManagementTest.java similarity index 51% rename from apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java rename to apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ScopeManagementTest.java index 8c0670f58f..4f62931b79 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ScopeManagementTest.java @@ -22,16 +22,14 @@ * under the License. * #L% */ -package co.elastic.apm.agent.impl; +package co.elastic.apm.agent.concurrent; -import co.elastic.apm.agent.MockReporter; -import co.elastic.apm.agent.configuration.SpyConfiguration; +import co.elastic.apm.agent.AbstractInstrumentationTest; +import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.impl.TracerInternalApiUtils; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.stagemonitor.configuration.ConfigurationRegistry; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -39,39 +37,13 @@ import static org.assertj.core.api.Assertions.assertThat; -class ScopeManagementTest { - - private ElasticApmTracer tracer; - private MockReporter reporter; - private ConfigurationRegistry config; - - @BeforeEach - void setUp() { - reporter = new MockReporter(); - config = SpyConfiguration.createSpyConfig(); - tracer = new ElasticApmTracerBuilder() - .configurationRegistry(config) - .reporter(reporter) - .build(); - tracer.start(); - } - - @AfterEach - void tearDown() { - assertThat(tracer.getActive()).isNull(); - } +class ScopeManagementTest extends AbstractInstrumentationTest { /** * Disables assertions in {@link ElasticApmTracer}, runs the test and restores original setting */ void runTestWithAssertionsDisabled(Runnable test) { - boolean assertionsEnabled = tracer.assertionsEnabled; - try { - tracer.assertionsEnabled = false; - test.run(); - } finally { - tracer.assertionsEnabled = assertionsEnabled; - } + TracerInternalApiUtils.runWithoutAssertions(tracer, test); } @Test @@ -109,53 +81,13 @@ void testRedundantActivation() { }); } - @Test - void testContextAndSpanRunnableActivation() { - runTestWithAssertionsDisabled(() -> { - final Transaction transaction = tracer.startRootTransaction(null).activate(); - transaction.withActive(transaction.withActive((Runnable) () -> - assertThat(tracer.getActive()).isSameAs(transaction))).run(); - transaction.deactivate(); - - assertThat(tracer.getActive()).isNull(); - }); - } - - @Test - void testContextAndSpanCallableActivation() { - runTestWithAssertionsDisabled(() -> { - final Transaction transaction = tracer.startRootTransaction(null).activate(); - try { - assertThat(transaction.withActive(transaction.withActive(() -> tracer.currentTransaction())).call()).isSameAs(transaction); - } catch (Exception e) { - e.printStackTrace(); - } - transaction.deactivate(); - - assertThat(tracer.getActive()).isNull(); - }); - } - - @Test - void testSpanAndContextRunnableActivation() { - runTestWithAssertionsDisabled(() -> { - final Transaction transaction = tracer.startRootTransaction(null).activate(); - Runnable runnable = transaction.withActive((Runnable) () -> - assertThat(tracer.currentTransaction()).isSameAs(transaction)); - transaction.withActive(runnable).run(); - transaction.deactivate(); - - assertThat(tracer.getActive()).isNull(); - }); - } - @Test void testSpanAndContextCallableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startRootTransaction(null).activate(); - Callable callable = transaction.withActive(() -> tracer.currentTransaction()); + Callable callable = () -> tracer.currentTransaction(); try { - assertThat(transaction.withActive(callable).call()).isSameAs(transaction); + assertThat(callable.call()).isSameAs(transaction); } catch (Exception e) { e.printStackTrace(); } @@ -168,10 +100,10 @@ void testSpanAndContextCallableActivation() { @Test void testContextAndSpanRunnableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startRootTransaction(null).activate(); - Executors.newSingleThreadExecutor().submit(transaction.withActive(transaction.withActive(() -> { + ExecutorServiceWrapper.wrap(Executors.newSingleThreadExecutor()).submit(() -> { assertThat(tracer.getActive()).isSameAs(transaction); assertThat(tracer.currentTransaction()).isSameAs(transaction); - }))).get(); + }).get(); transaction.deactivate(); assertThat(tracer.getActive()).isNull(); @@ -180,10 +112,10 @@ void testContextAndSpanRunnableActivationInDifferentThread() throws Exception { @Test void testContextAndSpanCallableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startRootTransaction(null).activate(); - Future transactionFuture = Executors.newSingleThreadExecutor().submit(transaction.withActive(transaction.withActive(() -> { + Future transactionFuture = ExecutorServiceWrapper.wrap(Executors.newSingleThreadExecutor()).submit(() -> { assertThat(tracer.getActive()).isSameAs(transaction); return tracer.currentTransaction(); - }))); + }); assertThat(transactionFuture.get()).isSameAs(transaction); transaction.deactivate(); @@ -193,11 +125,11 @@ void testContextAndSpanCallableActivationInDifferentThread() throws Exception { @Test void testSpanAndContextRunnableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startRootTransaction(null).activate(); - Runnable runnable = transaction.withActive(() -> { + Runnable runnable = () -> { assertThat(tracer.currentTransaction()).isSameAs(transaction); assertThat(tracer.getActive()).isSameAs(transaction); - }); - Executors.newSingleThreadExecutor().submit(transaction.withActive(runnable)).get(); + }; + ExecutorServiceWrapper.wrap(Executors.newSingleThreadExecutor()).submit(runnable).get(); transaction.deactivate(); assertThat(tracer.getActive()).isNull(); @@ -206,32 +138,12 @@ void testSpanAndContextRunnableActivationInDifferentThread() throws Exception { @Test void testSpanAndContextCallableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startRootTransaction(null).activate(); - Callable callable = transaction.withActive(() -> { + assertThat(ExecutorServiceWrapper.wrap(Executors.newSingleThreadExecutor()).submit(() -> { assertThat(tracer.currentTransaction()).isSameAs(transaction); return tracer.currentTransaction(); - }); - assertThat(Executors.newSingleThreadExecutor().submit(transaction.withActive(callable)).get()).isSameAs(transaction); + }).get()).isSameAs(transaction); transaction.deactivate(); assertThat(tracer.getActive()).isNull(); } - - @Test - void testAsyncActivationAfterEnd() throws Exception { - final Transaction transaction = tracer.startRootTransaction(null).activate(); - Callable callable = transaction.withActive(() -> { - assertThat(tracer.getActive()).isSameAs(transaction); - return tracer.currentTransaction(); - }); - transaction.deactivate().end(); - reporter.decrementReferences(); - assertThat(transaction.isReferenced()).isTrue(); - - assertThat(Executors.newSingleThreadExecutor().submit(callable).get()).isSameAs(transaction); - assertThat(transaction.isReferenced()).isFalse(); - // recycled because the transaction is finished, reported and the reference counter is 0 - assertThat(transaction.getTraceContext().getTraceId().isEmpty()).isTrue(); - - assertThat(tracer.getActive()).isNull(); - } } diff --git a/apm-agent-plugins/apm-jaxrs-plugin/src/main/java/co/elastic/apm/agent/jaxrs/JaxRsTransactionNameInstrumentation.java b/apm-agent-plugins/apm-jaxrs-plugin/src/main/java/co/elastic/apm/agent/jaxrs/JaxRsTransactionNameInstrumentation.java index a8f0c58fbc..95ac1b2226 100644 --- a/apm-agent-plugins/apm-jaxrs-plugin/src/main/java/co/elastic/apm/agent/jaxrs/JaxRsTransactionNameInstrumentation.java +++ b/apm-agent-plugins/apm-jaxrs-plugin/src/main/java/co/elastic/apm/agent/jaxrs/JaxRsTransactionNameInstrumentation.java @@ -27,6 +27,7 @@ import co.elastic.apm.agent.bci.ElasticApmInstrumentation; import co.elastic.apm.agent.bci.VisibleForAdvice; import co.elastic.apm.agent.bci.bytebuddy.SimpleMethodSignatureOffsetMappingFactory.SimpleMethodSignature; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.ElasticApmTracer; import co.elastic.apm.agent.impl.stacktrace.StacktraceConfiguration; import co.elastic.apm.agent.impl.transaction.Transaction; @@ -59,7 +60,7 @@ public class JaxRsTransactionNameInstrumentation extends ElasticApmInstrumentation { @VisibleForAdvice - public static final WeakConcurrentMap, String> versionsCache = new WeakConcurrentMap.WithInlinedExpunction<>(); + public static final WeakConcurrentMap, String> versionsCache = WeakMapSupplier.createMap(); private static final String FRAMEWORK_NAME = "JAX-RS"; private static final String GROUP_ID = "javax.ws.rs"; diff --git a/apm-agent-plugins/apm-jdbc-plugin/src/main/java/co/elastic/apm/agent/jdbc/helper/JdbcHelper.java b/apm-agent-plugins/apm-jdbc-plugin/src/main/java/co/elastic/apm/agent/jdbc/helper/JdbcHelper.java index 3ee4287e1f..0b82d6fc76 100644 --- a/apm-agent-plugins/apm-jdbc-plugin/src/main/java/co/elastic/apm/agent/jdbc/helper/JdbcHelper.java +++ b/apm-agent-plugins/apm-jdbc-plugin/src/main/java/co/elastic/apm/agent/jdbc/helper/JdbcHelper.java @@ -24,16 +24,16 @@ */ package co.elastic.apm.agent.jdbc.helper; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Span; -import co.elastic.apm.agent.util.DataStructures; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; import javax.annotation.Nullable; public abstract class JdbcHelper { - private static final WeakConcurrentMap statementSqlMap = DataStructures.createWeakConcurrentMapWithCleanerThread(); + private static final WeakConcurrentMap statementSqlMap = WeakMapSupplier.createMap(); public static final String DB_SPAN_TYPE = "db"; public static final String DB_SPAN_ACTION = "query"; diff --git a/apm-agent-plugins/apm-jdbc-plugin/src/main/java/co/elastic/apm/agent/jdbc/helper/JdbcHelperImpl.java b/apm-agent-plugins/apm-jdbc-plugin/src/main/java/co/elastic/apm/agent/jdbc/helper/JdbcHelperImpl.java index 36043436f2..0cfce52537 100644 --- a/apm-agent-plugins/apm-jdbc-plugin/src/main/java/co/elastic/apm/agent/jdbc/helper/JdbcHelperImpl.java +++ b/apm-agent-plugins/apm-jdbc-plugin/src/main/java/co/elastic/apm/agent/jdbc/helper/JdbcHelperImpl.java @@ -25,11 +25,11 @@ package co.elastic.apm.agent.jdbc.helper; import co.elastic.apm.agent.bci.VisibleForAdvice; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.context.Destination; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.jdbc.signature.SignatureParser; -import co.elastic.apm.agent.util.DataStructures; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +50,9 @@ public class JdbcHelperImpl extends JdbcHelper { // because this class is potentially loaded from multiple classloaders, making those fields 'static' will not // have the expected behavior, thus, any direct reference to `JdbcHelperImpl` should only be obtained from the // HelperClassManager instance. - private final WeakConcurrentMap metaDataMap = DataStructures.createWeakConcurrentMapWithCleanerThread(); - private final WeakConcurrentMap, Boolean> metadataSupported = new WeakConcurrentMap.WithInlinedExpunction, Boolean>(); - private final WeakConcurrentMap, Boolean> connectionSupported = new WeakConcurrentMap.WithInlinedExpunction, Boolean>(); + private final WeakConcurrentMap metaDataMap = WeakMapSupplier.createMap(); + private final WeakConcurrentMap, Boolean> metadataSupported = WeakMapSupplier.createMap(); + private final WeakConcurrentMap, Boolean> connectionSupported = WeakMapSupplier.createMap(); @VisibleForAdvice public final ThreadLocal SIGNATURE_PARSER_THREAD_LOCAL = new ThreadLocal() { diff --git a/apm-agent-plugins/apm-process-plugin/pom.xml b/apm-agent-plugins/apm-process-plugin/pom.xml index f7e898aecb..c87f9fb61c 100644 --- a/apm-agent-plugins/apm-process-plugin/pom.xml +++ b/apm-agent-plugins/apm-process-plugin/pom.xml @@ -15,6 +15,11 @@ ${project.groupId}:${project.artifactId} + + ${project.groupId} + apm-java-concurrent-plugin + ${project.version} + org.apache.commons commons-exec diff --git a/apm-agent-plugins/apm-process-plugin/src/main/java/co/elastic/apm/agent/process/CommonsExecAsyncInstrumentation.java b/apm-agent-plugins/apm-process-plugin/src/main/java/co/elastic/apm/agent/process/CommonsExecAsyncInstrumentation.java index 439eeb039a..1d03843780 100644 --- a/apm-agent-plugins/apm-process-plugin/src/main/java/co/elastic/apm/agent/process/CommonsExecAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-process-plugin/src/main/java/co/elastic/apm/agent/process/CommonsExecAsyncInstrumentation.java @@ -25,6 +25,7 @@ package co.elastic.apm.agent.process; import co.elastic.apm.agent.bci.ElasticApmInstrumentation; +import co.elastic.apm.agent.concurrent.JavaConcurrent; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.NamedElement; import net.bytebuddy.description.method.MethodDescription; @@ -89,13 +90,13 @@ public static final class CommonsExecAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) private static void onEnter(@Advice.Argument(value = 0, readOnly = false) Runnable runnable) { - if (tracer == null || tracer.getActive() == null) { - return; - } - // context propagation is done by wrapping existing runnable argument + runnable = JavaConcurrent.withContext(runnable, tracer); + } - //noinspection UnusedAssignment - runnable = tracer.getActive().withActive(runnable); + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + private static void onExit(@Advice.Thrown Throwable thrown, + @Advice.Argument(value = 0) Runnable runnable) { + JavaConcurrent.doFinally(thrown, runnable); } } } diff --git a/apm-agent-plugins/apm-process-plugin/src/main/java/co/elastic/apm/agent/process/ProcessHelper.java b/apm-agent-plugins/apm-process-plugin/src/main/java/co/elastic/apm/agent/process/ProcessHelper.java index 67b248908c..79fff89a73 100644 --- a/apm-agent-plugins/apm-process-plugin/src/main/java/co/elastic/apm/agent/process/ProcessHelper.java +++ b/apm-agent-plugins/apm-process-plugin/src/main/java/co/elastic/apm/agent/process/ProcessHelper.java @@ -25,6 +25,7 @@ package co.elastic.apm.agent.process; import co.elastic.apm.agent.bci.VisibleForAdvice; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Span; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; @@ -40,7 +41,7 @@ @VisibleForAdvice public class ProcessHelper { - private static final ProcessHelper INSTANCE = new ProcessHelper(new WeakConcurrentMap.WithInlinedExpunction()); + private static final ProcessHelper INSTANCE = new ProcessHelper(WeakMapSupplier.createMap()); private final WeakConcurrentMap inFlightSpans; diff --git a/apm-agent-plugins/apm-process-plugin/src/test/java/co/elastic/apm/agent/process/CommonsExecAsyncInstrumentationTest.java b/apm-agent-plugins/apm-process-plugin/src/test/java/co/elastic/apm/agent/process/CommonsExecAsyncInstrumentationTest.java index 4117047d7b..49c274cdd8 100644 --- a/apm-agent-plugins/apm-process-plugin/src/test/java/co/elastic/apm/agent/process/CommonsExecAsyncInstrumentationTest.java +++ b/apm-agent-plugins/apm-process-plugin/src/test/java/co/elastic/apm/agent/process/CommonsExecAsyncInstrumentationTest.java @@ -33,43 +33,33 @@ import org.apache.commons.exec.ExecuteException; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; public class CommonsExecAsyncInstrumentationTest extends AbstractInstrumentationTest { @Test - void asyncProcessWithinTransaction() throws IOException, InterruptedException { + void asyncProcessWithinTransaction() throws Exception { startTransaction(); - asyncProcessHasTransactionContext(true); + assertThat(asyncProcessHasTransactionContext().get()) + .describedAs("executor runnable not in the expected transaction context") + .isNotNull(); terminateTransaction(); } @Test - void asyncProcessOutsideTransaction() throws IOException, InterruptedException { - asyncProcessHasTransactionContext(false); + void asyncProcessOutsideTransaction() throws Exception { + assertThat(asyncProcessHasTransactionContext().get()) + .describedAs("executor runnable should not be in transaction context") + .isNull(); } - @Test - void customInstrumentationClassName() { - assertThat(MyExecutor.class.getSimpleName()) - .describedAs("'Executor' is required in subclass name for faster instrumentation non-matching") - .contains("Executor"); - } - - private static AbstractSpan asyncProcessHasTransactionContext(boolean expectedInTransaction) throws IOException, InterruptedException { - AtomicReference> activeTransaction = new AtomicReference<>(); - - DefaultExecutor executor = new MyExecutor(activeTransaction); - - final AtomicBoolean processProperlyCompleted = new AtomicBoolean(false); - + private static CompletableFuture> asyncProcessHasTransactionContext() throws Exception { + final CompletableFuture> future = new CompletableFuture<>(); DefaultExecuteResultHandler handler = new DefaultExecuteResultHandler() { // note: calling super is required otherwise process termination is not detected and waits forever @@ -77,36 +67,28 @@ private static AbstractSpan asyncProcessHasTransactionContext(boolean expecte @Override public void onProcessComplete(int exitValue) { super.onProcessComplete(exitValue); - processProperlyCompleted.set(exitValue == 0); + if (exitValue == 0) { + future.complete(tracer.getActive()); + } else { + future.completeExceptionally(new IllegalStateException("Exit value is not 0: " + exitValue)); + } } @Override public void onProcessFailed(ExecuteException e) { super.onProcessFailed(e); - processProperlyCompleted.set(false); + future.completeExceptionally(e); } }; - executor.execute(new CommandLine(getJavaBinaryPath()).addArgument("-version"), handler); + new DefaultExecutor().execute(new CommandLine(getJavaBinaryPath()).addArgument("-version"), handler); handler.waitFor(); - - assertThat(processProperlyCompleted.get()) + assertThat(future.isCompletedExceptionally()) .describedAs("async process should have properly executed") - .isTrue(); - - if (expectedInTransaction) { - assertThat(activeTransaction.get()) - .describedAs("executor runnable not in the expected transaction context") - .isNotNull(); - } else { - assertThat(activeTransaction.get()) - .describedAs("executor runnable should not be in transaction context") - .isNull(); - } - + .isFalse(); - return activeTransaction.get(); + return future; } private static String getJavaBinaryPath() { @@ -132,30 +114,4 @@ private static void terminateTransaction() { reporter.assertRecycledAfterDecrementingReferences(); } - /** - * Custom implementation for testing, requires to have 'Executor' in name - */ - private static class MyExecutor extends DefaultExecutor { - - private AtomicReference> activeTransaction; - - private MyExecutor(AtomicReference> activeTransaction) { - this.activeTransaction = activeTransaction; - } - - @Override - protected Thread createThread(final Runnable runnable, String name) { - Runnable wrapped = new Runnable() { - @Override - public void run() { - // we don't assert directly here as throwing an exception will wait forever - activeTransaction.set(tracer.getActive()); - - runnable.run(); - } - }; - return super.createThread(wrapped, name); - } - } - } diff --git a/apm-agent-plugins/apm-process-plugin/src/test/java/co/elastic/apm/agent/process/ProcessHelperTest.java b/apm-agent-plugins/apm-process-plugin/src/test/java/co/elastic/apm/agent/process/ProcessHelperTest.java index 2272056dbe..cb6e5c6aa1 100644 --- a/apm-agent-plugins/apm-process-plugin/src/test/java/co/elastic/apm/agent/process/ProcessHelperTest.java +++ b/apm-agent-plugins/apm-process-plugin/src/test/java/co/elastic/apm/agent/process/ProcessHelperTest.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,9 +26,9 @@ import co.elastic.apm.agent.AbstractInstrumentationTest; import co.elastic.apm.agent.TransactionUtils; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; -import co.elastic.apm.agent.util.DataStructures; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/apm-agent-plugins/apm-profiling-plugin/src/main/java/co/elastic/apm/agent/profiler/CallTree.java b/apm-agent-plugins/apm-profiling-plugin/src/main/java/co/elastic/apm/agent/profiler/CallTree.java index e1edad31e8..c82942b848 100644 --- a/apm-agent-plugins/apm-profiling-plugin/src/main/java/co/elastic/apm/agent/profiler/CallTree.java +++ b/apm-agent-plugins/apm-profiling-plugin/src/main/java/co/elastic/apm/agent/profiler/CallTree.java @@ -563,7 +563,7 @@ public static class Root extends CallTree implements Recyclable { private static final StackFrame ROOT_FRAME = new StackFrame("root", "root"); /** * The context of the thread root, - * mostly a transaction or a span which got activated by {@link co.elastic.apm.agent.impl.async.SpanInScopeRunnableWrapper} + * mostly a transaction or a span which got activated in an auxiliary thread */ protected TraceContext rootContext; /** diff --git a/apm-agent-plugins/apm-quartz-job-plugin/src/main/java/co/elastic/apm/agent/quartz/job/JobTransactionNameAdvice.java b/apm-agent-plugins/apm-quartz-job-plugin/src/main/java/co/elastic/apm/agent/quartz/job/JobTransactionNameAdvice.java index 3c8ad92fc4..1cb54651f2 100644 --- a/apm-agent-plugins/apm-quartz-job-plugin/src/main/java/co/elastic/apm/agent/quartz/job/JobTransactionNameAdvice.java +++ b/apm-agent-plugins/apm-quartz-job-plugin/src/main/java/co/elastic/apm/agent/quartz/job/JobTransactionNameAdvice.java @@ -27,6 +27,7 @@ import co.elastic.apm.agent.bci.ElasticApmInstrumentation; import co.elastic.apm.agent.bci.VisibleForAdvice; import co.elastic.apm.agent.bci.bytebuddy.SimpleMethodSignatureOffsetMappingFactory.SimpleMethodSignature; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.util.VersionUtils; @@ -42,7 +43,7 @@ public class JobTransactionNameAdvice { private static final String FRAMEWORK_NAME = "Quartz"; @VisibleForAdvice - public static final WeakConcurrentMap, String> versionsCache = new WeakConcurrentMap.WithInlinedExpunction<>(); + public static final WeakConcurrentMap, String> versionsCache = WeakMapSupplier.createMap(); @VisibleForAdvice public static final Logger logger = LoggerFactory.getLogger(JobTransactionNameInstrumentation.class); diff --git a/apm-agent-plugins/apm-redis-plugin/apm-lettuce-plugin/src/main/java/co/elastic/apm/agent/redis/lettuce/Lettuce34Instrumentation.java b/apm-agent-plugins/apm-redis-plugin/apm-lettuce-plugin/src/main/java/co/elastic/apm/agent/redis/lettuce/Lettuce34Instrumentation.java index 7b3610a490..cef7234f8e 100644 --- a/apm-agent-plugins/apm-redis-plugin/apm-lettuce-plugin/src/main/java/co/elastic/apm/agent/redis/lettuce/Lettuce34Instrumentation.java +++ b/apm-agent-plugins/apm-redis-plugin/apm-lettuce-plugin/src/main/java/co/elastic/apm/agent/redis/lettuce/Lettuce34Instrumentation.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,7 @@ import co.elastic.apm.agent.bci.ElasticApmInstrumentation; import co.elastic.apm.agent.bci.VisibleForAdvice; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.transaction.Span; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; import com.lambdaworks.redis.protocol.RedisCommand; @@ -39,7 +40,7 @@ public abstract class Lettuce34Instrumentation extends ElasticApmInstrumentation { @VisibleForAdvice @SuppressWarnings("WeakerAccess") - public static final WeakConcurrentMap commandToSpan = new WeakConcurrentMap.WithInlinedExpunction(); + public static final WeakConcurrentMap, Span> commandToSpan = WeakMapSupplier.createMap(); /** * We don't support Lettuce up to version 3.3, as the {@link RedisCommand#getType()} method is missing diff --git a/apm-agent-plugins/apm-redis-plugin/apm-lettuce-plugin/src/main/java/co/elastic/apm/agent/redis/lettuce/Lettuce5StartSpanInstrumentation.java b/apm-agent-plugins/apm-redis-plugin/apm-lettuce-plugin/src/main/java/co/elastic/apm/agent/redis/lettuce/Lettuce5StartSpanInstrumentation.java index aa2b9da084..46a022c880 100644 --- a/apm-agent-plugins/apm-redis-plugin/apm-lettuce-plugin/src/main/java/co/elastic/apm/agent/redis/lettuce/Lettuce5StartSpanInstrumentation.java +++ b/apm-agent-plugins/apm-redis-plugin/apm-lettuce-plugin/src/main/java/co/elastic/apm/agent/redis/lettuce/Lettuce5StartSpanInstrumentation.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,7 @@ import co.elastic.apm.agent.bci.ElasticApmInstrumentation; import co.elastic.apm.agent.bci.VisibleForAdvice; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.redis.RedisSpanUtils; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; @@ -54,7 +55,7 @@ public class Lettuce5StartSpanInstrumentation extends ElasticApmInstrumentation @VisibleForAdvice @SuppressWarnings("WeakerAccess") - public static final WeakConcurrentMap commandToSpan = new WeakConcurrentMap.WithInlinedExpunction(); + public static final WeakConcurrentMap, Span> commandToSpan = WeakMapSupplier.createMap(); @Override public ElementMatcher getTypeMatcher() { @@ -75,7 +76,7 @@ public Collection getInstrumentationGroupNames() { } @Advice.OnMethodEnter(suppress = Throwable.class) - private static void beforeDispatch(@Nullable @Advice.Argument(0) RedisCommand command, @Advice.Local("span") Span span) throws Exception { + private static void beforeDispatch(@Nullable @Advice.Argument(0) RedisCommand command, @Advice.Local("span") Span span) throws Exception { if (command != null) { span = RedisSpanUtils.createRedisSpan(command.getType().name()); if (span != null) { diff --git a/apm-agent-plugins/apm-servlet-plugin/pom.xml b/apm-agent-plugins/apm-servlet-plugin/pom.xml index 38fa93f732..e5db1c4079 100644 --- a/apm-agent-plugins/apm-servlet-plugin/pom.xml +++ b/apm-agent-plugins/apm-servlet-plugin/pom.xml @@ -16,6 +16,11 @@ + + ${project.groupId} + apm-java-concurrent-plugin + ${project.version} + javax.servlet javax.servlet-api diff --git a/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java b/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java index 2261d60c30..5a43632b32 100644 --- a/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java +++ b/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java @@ -26,8 +26,8 @@ import co.elastic.apm.agent.bci.HelperClassManager; import co.elastic.apm.agent.bci.VisibleForAdvice; +import co.elastic.apm.agent.concurrent.JavaConcurrent; import co.elastic.apm.agent.impl.ElasticApmTracer; -import co.elastic.apm.agent.impl.transaction.Transaction; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.NamedElement; import net.bytebuddy.description.method.MethodDescription; @@ -181,20 +181,13 @@ public static class AsyncContextStartAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) private static void onEnterAsyncContextStart(@Advice.Argument(value = 0, readOnly = false) @Nullable Runnable runnable) { - if (tracer != null && runnable != null && tracer.isWrappingAllowedOnThread()) { - final Transaction transaction = tracer.currentTransaction(); - if (transaction != null) { - runnable = transaction.withActive(runnable); - tracer.avoidWrappingOnThread(); - } - } + runnable = JavaConcurrent.withContext(runnable, tracer); } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Exception.class) - private static void onExitAsyncContextStart() { - if (tracer != null) { - tracer.allowWrappingOnThread(); - } + private static void onExitAsyncContextStart(@Nullable @Advice.Thrown Throwable thrown, + @Advice.Argument(value = 0) @Nullable Runnable runnable) { + JavaConcurrent.doFinally(thrown, runnable); } } } diff --git a/apm-agent-plugins/apm-urlconnection-plugin/src/main/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentation.java b/apm-agent-plugins/apm-urlconnection-plugin/src/main/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentation.java index 1be3c9d006..7508d4c50a 100644 --- a/apm-agent-plugins/apm-urlconnection-plugin/src/main/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentation.java +++ b/apm-agent-plugins/apm-urlconnection-plugin/src/main/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentation.java @@ -26,10 +26,10 @@ import co.elastic.apm.agent.bci.ElasticApmInstrumentation; import co.elastic.apm.agent.bci.VisibleForAdvice; +import co.elastic.apm.agent.collections.WeakMapSupplier; import co.elastic.apm.agent.http.client.HttpClientHelper; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.TraceContext; -import co.elastic.apm.agent.util.DataStructures; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.NamedElement; @@ -52,7 +52,7 @@ public abstract class HttpUrlConnectionInstrumentation extends ElasticApmInstrumentation { @VisibleForAdvice - public static final WeakConcurrentMap inFlightSpans = DataStructures.createWeakConcurrentMapWithCleanerThread(); + public static final WeakConcurrentMap inFlightSpans = WeakMapSupplier.createMap(); @Override public Collection getInstrumentationGroupNames() { diff --git a/apm-agent-plugins/apm-urlconnection-plugin/src/test/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentationTest.java b/apm-agent-plugins/apm-urlconnection-plugin/src/test/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentationTest.java index e0323ac3d7..8efb795443 100644 --- a/apm-agent-plugins/apm-urlconnection-plugin/src/test/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentationTest.java +++ b/apm-agent-plugins/apm-urlconnection-plugin/src/test/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentationTest.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,6 +25,8 @@ package co.elastic.apm.agent.urlconnection; import co.elastic.apm.agent.httpclient.AbstractHttpClientInstrumentationTest; +import co.elastic.apm.agent.impl.Scope; +import co.elastic.apm.agent.impl.transaction.AbstractSpan; import org.junit.Ignore; import org.junit.Test; @@ -59,13 +61,14 @@ protected void performGet(String path) throws Exception { public void testEndInDifferentThread() throws Exception { final HttpURLConnection urlConnection = new HttpURLConnectionWrapper(new HttpURLConnectionWrapper((HttpURLConnection) new URL(getBaseUrl() + "/").openConnection())); urlConnection.connect(); - final Thread thread = new Thread(tracer.getActive().withActive(() -> { - try { + AbstractSpan active = tracer.getActive(); + final Thread thread = new Thread(() -> { + try (Scope scope = active.activateInScope()) { urlConnection.getInputStream(); } catch (IOException e) { throw new RuntimeException(e); } - })); + }); thread.start(); thread.join(); diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index 4a38a3d359..6ee8d57525 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -671,7 +671,7 @@ you should add an additional entry to this list (make sure to also include the d ==== `disable_instrumentations` (added[1.0.0,Changing this value at runtime is possible since version 1.15.0]) A list of instrumentations which should be disabled. -Valid options are `annotations`, `apache-commons-exec`, `apache-httpclient`, `asynchttpclient`, `concurrent`, `dubbo`, `elasticsearch-restclient`, `exception-handler`, `executor`, `experimental`, `grails`, `grpc`, `hibernate-search`, `http-client`, `jax-rs`, `jax-ws`, `jdbc`, `jedis`, `jms`, `jsf`, `kafka`, `lettuce`, `log4j`, `logging`, `mongodb-client`, `mule`, `okhttp`, `opentracing`, `process`, `public-api`, `quartz`, `redis`, `redisson`, `render`, `scheduled`, `servlet-api`, `servlet-api-async`, `servlet-input-stream`, `slf4j`, `spring-mvc`, `spring-resttemplate`, `spring-service-name`, `spring-view-render`, `ssl-context`, `urlconnection`. +Valid options are `annotations`, `apache-commons-exec`, `apache-httpclient`, `asynchttpclient`, `concurrent`, `dubbo`, `elasticsearch-restclient`, `exception-handler`, `executor`, `executor-collection`, `experimental`, `grails`, `grpc`, `hibernate-search`, `http-client`, `jax-rs`, `jax-ws`, `jdbc`, `jedis`, `jms`, `jsf`, `kafka`, `lettuce`, `log4j`, `logging`, `mongodb-client`, `mule`, `okhttp`, `opentracing`, `process`, `public-api`, `quartz`, `redis`, `redisson`, `render`, `scheduled`, `servlet-api`, `servlet-api-async`, `servlet-input-stream`, `slf4j`, `spring-mvc`, `spring-resttemplate`, `spring-service-name`, `spring-view-render`, `ssl-context`, `urlconnection`. If you want to try out experimental features, set the value to an empty string. NOTE: Changing this value at runtime can slow down the application temporarily. @@ -2530,7 +2530,7 @@ The default unit for this option is `ms`. # sanitize_field_names=password,passwd,pwd,secret,*key,*token*,*session*,*credit*,*card*,authorization,set-cookie # A list of instrumentations which should be disabled. -# Valid options are `annotations`, `apache-commons-exec`, `apache-httpclient`, `asynchttpclient`, `concurrent`, `dubbo`, `elasticsearch-restclient`, `exception-handler`, `executor`, `experimental`, `grails`, `grpc`, `hibernate-search`, `http-client`, `jax-rs`, `jax-ws`, `jdbc`, `jedis`, `jms`, `jsf`, `kafka`, `lettuce`, `log4j`, `logging`, `mongodb-client`, `mule`, `okhttp`, `opentracing`, `process`, `public-api`, `quartz`, `redis`, `redisson`, `render`, `scheduled`, `servlet-api`, `servlet-api-async`, `servlet-input-stream`, `slf4j`, `spring-mvc`, `spring-resttemplate`, `spring-service-name`, `spring-view-render`, `ssl-context`, `urlconnection`. +# Valid options are `annotations`, `apache-commons-exec`, `apache-httpclient`, `asynchttpclient`, `concurrent`, `dubbo`, `elasticsearch-restclient`, `exception-handler`, `executor`, `executor-collection`, `experimental`, `grails`, `grpc`, `hibernate-search`, `http-client`, `jax-rs`, `jax-ws`, `jdbc`, `jedis`, `jms`, `jsf`, `kafka`, `lettuce`, `log4j`, `logging`, `mongodb-client`, `mule`, `okhttp`, `opentracing`, `process`, `public-api`, `quartz`, `redis`, `redisson`, `render`, `scheduled`, `servlet-api`, `servlet-api-async`, `servlet-input-stream`, `slf4j`, `spring-mvc`, `spring-resttemplate`, `spring-service-name`, `spring-view-render`, `ssl-context`, `urlconnection`. # If you want to try out experimental features, set the value to an empty string. # # NOTE: Changing this value at runtime can slow down the application temporarily. diff --git a/docs/supported-technologies.asciidoc b/docs/supported-technologies.asciidoc index 409369707d..0367161717 100644 --- a/docs/supported-technologies.asciidoc +++ b/docs/supported-technologies.asciidoc @@ -286,11 +286,21 @@ This section lists all supported asynchronous frameworks. |=== |Framework |Supported versions | Description | Since -|ExecutorService +|`ExecutorService` | -|The agent propagates the context when using the `java.util.concurrent.ExecutorService` methods of any `ExecutorService` implementation. +|The agent propagates the context for ``ExecutorService``s. |1.4.0 +|`ScheduledExecutorService` +| +|The agent propagates the context for `ScheduledExecutorService#schedule` (this does not include `scheduleAtFixedRate` or `scheduleWithFixedDelay`. +|1.17.0 + +|`ForkJoinPool` +| +|The agent propagates the context for ``ForkJoinPool``s. +|1.17.0 + |===