diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index bd67578bf7..743cf413f2 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -34,6 +34,7 @@ endif::[] * Added support for `java.util.TimerTask` - {pull}1235[#1235] * Add capturing of request body in Elasticsearch queries: `_msearch`, `_count`, `_msearch/template`, `_search/template`, `_rollup_search` - {pull}1222[#1222] * Add <> flag +* Add experimental support for Scala Futures [float] ===== Bug fixes diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml new file mode 100644 index 0000000000..5868b89d3b --- /dev/null +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml @@ -0,0 +1,56 @@ + + + 4.0.0 + + + apm-agent-plugins + co.elastic.apm + 1.17.1-SNAPSHOT + + + apm-scala-concurrent-plugin + ${project.groupId}:${project.artifactId} + + + ${project.basedir}/../.. + + + + + ${project.groupId} + apm-java-concurrent-plugin + ${project.version} + test + + + org.scalameta + munit_2.13 + 0.7.2 + test + + + + + src/test/scala + + + net.alchim31.maven + scala-maven-plugin + 4.3.1 + + 2.13.2 + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + + + diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scala/concurrent/FutureInstrumentation.java b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scala/concurrent/FutureInstrumentation.java new file mode 100644 index 0000000000..d40e56c57f --- /dev/null +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scala/concurrent/FutureInstrumentation.java @@ -0,0 +1,114 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2020 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.scala.concurrent; + +import co.elastic.apm.agent.bci.ElasticApmInstrumentation; +import co.elastic.apm.agent.bci.VisibleForAdvice; +import co.elastic.apm.agent.impl.transaction.AbstractSpan; +import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Collection; + +import static net.bytebuddy.matcher.ElementMatchers.*; + +public abstract class FutureInstrumentation extends ElasticApmInstrumentation { + + @VisibleForAdvice + @SuppressWarnings("WeakerAccess") + public static final WeakConcurrentMap> promisesToContext = + new WeakConcurrentMap.WithInlinedExpunction<>(); + + @Nonnull + @Override + public Collection getInstrumentationGroupNames() { + return Arrays.asList("scala-future", "experimental"); + } + + public static class ConstructorInstrumentation extends FutureInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return named("scala.concurrent.impl.Promise$Transformation"); + } + + @Override + public ElementMatcher getMethodMatcher() { + return isConstructor(); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.This Object thiz) { + final AbstractSpan context = getActive(); + if (context != null) { + promisesToContext.put(thiz, context); + // this span might be ended before the Promise$Transformation#run method starts + // we have to avoid that this span gets recycled, even in the above mentioned case + context.incrementReferences(); + } + } + + } + + public static class RunInstrumentation extends FutureInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return named("scala.concurrent.impl.Promise$Transformation"); + } + + @Override + public ElementMatcher getMethodMatcher() { + return named("run").and(returns(void.class)); + } + + @VisibleForAdvice + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.This Object thiz, @Nullable @Advice.Local("context") AbstractSpan context) { + context = promisesToContext.remove(thiz); + if (tracer != null && context != null) { + tracer.activate(context); + // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run + // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice + context.decrementReferences(); + } + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Nullable @Advice.Local("context") AbstractSpan context) { + if (tracer != null && context != null) { + tracer.deactivate(context); + } + } + + } + +} diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.bci.ElasticApmInstrumentation b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.bci.ElasticApmInstrumentation new file mode 100644 index 0000000000..3637dc8e54 --- /dev/null +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.bci.ElasticApmInstrumentation @@ -0,0 +1,2 @@ +co.elastic.apm.agent.scala.concurrent.FutureInstrumentation$ConstructorInstrumentation +co.elastic.apm.agent.scala.concurrent.FutureInstrumentation$RunInstrumentation diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scala/concurrent/FutureInstrumentationSpec.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scala/concurrent/FutureInstrumentationSpec.scala new file mode 100644 index 0000000000..88107ca507 --- /dev/null +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scala/concurrent/FutureInstrumentationSpec.scala @@ -0,0 +1,238 @@ +package co.elastic.apm.agent.scala.concurrent + +import java.util.concurrent.Executors + +import co.elastic.apm.agent.MockReporter +import co.elastic.apm.agent.bci.ElasticApmAgent +import co.elastic.apm.agent.configuration.{CoreConfiguration, SpyConfiguration} +import co.elastic.apm.agent.impl.transaction.Transaction +import co.elastic.apm.agent.impl.{ElasticApmTracer, ElasticApmTracerBuilder} +import munit.FunSuite +import net.bytebuddy.agent.ByteBuddyAgent +import org.stagemonitor.configuration.ConfigurationRegistry + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future, Promise} +import scala.util.{Failure, Success} + +class FutureInstrumentationSpec extends FunSuite { + + private var reporter: MockReporter = _ + private var tracer: ElasticApmTracer = _ + private var coreConfiguration: CoreConfiguration = _ + private var transaction: Transaction = _ + + override def beforeEach(context: BeforeEach): Unit = { + reporter = new MockReporter + val config: ConfigurationRegistry = SpyConfiguration.createSpyConfig + coreConfiguration = config.getConfig(classOf[CoreConfiguration]) + tracer = new ElasticApmTracerBuilder().configurationRegistry(config).reporter(reporter).build + ElasticApmAgent.initInstrumentation(tracer, ByteBuddyAgent.install) + tracer.start() + transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + } + + override def afterEach(context: AfterEach): Unit = ElasticApmAgent.reset() + + test("Scala Future should propagate the tracing-context correctly across different threads") { + implicit val executionContext: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) + + val future = Future("Test") + .map(_.length) + .flatMap(l => Future(l * 2)) + .map(_.toString) + .flatMap(s => Future(s"$s-$s")) + .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + + Await.ready(future, 10.seconds) + transaction.deactivate().end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + + + } + + test("Worker thread should correctly set context on the current transaction") { + implicit val executionContext: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) + + new TestFutureTraceMethods().invokeAsync(tracer) + transaction.deactivate().end() + assertEquals(reporter.getTransactions.size(), 1) + assertEquals(reporter.getSpans.size(), 0) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + } + + test("Multiple async operations should be able to set context on the current transaction") { + + implicit val multiPoolEc: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + + val future = Future + .traverse(1 to 100) { _ => + Future.sequence(List( + Future { + Thread.sleep(25) + tracer.currentTransaction().addCustomContext("future1", true) + }, + Future { + Thread.sleep(50) + tracer.currentTransaction().addCustomContext("future2", true) + }, + Future { + Thread.sleep(10) + tracer.currentTransaction().addCustomContext("future3", true) + } + )) + } + + Await.ready(future, 10.seconds) + transaction.deactivate().end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], + true + ) + + } + + test("Handle a combination of Promises and Futures correctly") { + + implicit val multiPoolEc: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + + val promise = Promise[Int]() + + Future { Thread.sleep(100) } + .map(_ => 42) + .onComplete { + case Success(value) => promise.success(value) + case Failure(exception) => promise.failure(exception) + } + + val future = promise + .future + .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + + Await.ready(future, 10.seconds) + transaction.deactivate().end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + + } + + test("Handle a Future.sequence correctly") { + + implicit val multiPoolEc: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + + val future = Future + .sequence(List( + Future(Thread.sleep(25)) + )) + .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + + Await.ready(future, 10.seconds) + transaction.deactivate().end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + + } + + test("Handle a combination of Promises and complex Futures correctly") { + + implicit val multiPoolEc: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + + val promise = Promise[Int]() + + Future + .sequence(List( + Future(Thread.sleep(25)) + )) + .map(_ => tracer.currentTransaction().addCustomContext("future1", true)) + .map(_ => 42) + .onComplete { + case Success(value) => promise.success(value) + case Failure(exception) => promise.failure(exception) + } + + val future = promise + .future + .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) + + Await.ready(future, 10.seconds) + transaction.deactivate().end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + true + ) + + } + +} + + private class TestFutureTraceMethods { + + /** + * Calling this method results in this method call tree: + * + * main thread | worker thread + * ------------------------------------------------------------------------------------------- + * invokeAsync | + * | | + * --- blockingMethodOnMainThread | + * | | + * --- nonBlockingMethodOnMainThread | + * | | + * --------------------------> methodOnWorkerThread + * | | + * | --- longMethod + * | + */ + def invokeAsync(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = blockingMethodOnMainThread(tracer) + + private def blockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = { + try { + Await.result(nonBlockingMethodOnMainThread(tracer), 10.seconds) + } catch { + case e: Exception => e.printStackTrace() + } + } + + private def nonBlockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Future[Unit] = + Future(methodOnWorkerThread(tracer)) + + private def methodOnWorkerThread(tracer: ElasticApmTracer): Unit = longMethod(tracer) + + private def longMethod(tracer: ElasticApmTracer): Unit = { + try { + Thread.sleep(100) + tracer.currentTransaction().addCustomContext("future", true) + } catch { + case e: InterruptedException => e.printStackTrace() + } + } + +} diff --git a/apm-agent-plugins/pom.xml b/apm-agent-plugins/pom.xml index 16a42351f6..21b39d5a7e 100644 --- a/apm-agent-plugins/pom.xml +++ b/apm-agent-plugins/pom.xml @@ -41,6 +41,7 @@ apm-jms-plugin apm-hibernate-search-plugin apm-redis-plugin + apm-scala-concurrent-plugin apm-error-logging-plugin apm-jmx-plugin apm-mule4-plugin diff --git a/docs/supported-technologies.asciidoc b/docs/supported-technologies.asciidoc index 0ed917ccd4..2eae341252 100644 --- a/docs/supported-technologies.asciidoc +++ b/docs/supported-technologies.asciidoc @@ -301,6 +301,14 @@ This section lists all supported asynchronous frameworks. |The agent propagates the context for ``ForkJoinPool``s. |1.17.0 +|Scala Future +|2.13.x +|The agent propagates the context when using the `scala.concurrent.Future` or `scala.concurrent.Promise`. +It will propagate the context when using chaining methods such as `map`, `flatMap`, `traverse`, ... + +NOTE: To enable Scala Future support, you need to enable experimental plugins. +|1.18.0 + |=== diff --git a/elastic-apm-agent/pom.xml b/elastic-apm-agent/pom.xml index dd1def9caa..168cf5692a 100644 --- a/elastic-apm-agent/pom.xml +++ b/elastic-apm-agent/pom.xml @@ -224,6 +224,11 @@ apm-dubbo-plugin ${project.version} + + ${project.groupId} + apm-scala-concurrent-plugin + ${project.version} + diff --git a/pom.xml b/pom.xml index 67991d7374..a038ef00dc 100644 --- a/pom.xml +++ b/pom.xml @@ -85,9 +85,9 @@ 2.2.0 1.4.196 [2.10.0,) - 5.4.2 - 4.12 - 5.3.2 + 5.6.2 + 4.13 + 5.6.2 1.2.3 3.9.1 1.7.25 @@ -426,13 +426,8 @@ maven-surefire-plugin - 2.19.1 + 2.22.2 - - org.junit.platform - junit-platform-surefire-provider - 1.1.1 - org.junit.vintage junit-vintage-engine @@ -444,6 +439,11 @@ ${version.junit-jupiter} + + + **/*Spec.* + + maven-failsafe-plugin