diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml index 41e0835091..d4804160fc 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml @@ -20,12 +20,17 @@ ${project.groupId} apm-java-concurrent-plugin ${project.version} + + + org.scala-lang + scala-library + 2.13.6 test org.scalameta munit_2.13 - 0.7.9 + 0.7.29 test @@ -36,9 +41,9 @@ net.alchim31.maven scala-maven-plugin - 4.3.1 + 4.5.4 - 2.13.2 + 2.13.6 diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java index a1b7903c77..f7f78159a3 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java @@ -19,7 +19,9 @@ package co.elastic.apm.agent.scalaconcurrent; import co.elastic.apm.agent.bci.TracerAwareInstrumentation; +import co.elastic.apm.agent.concurrent.JavaConcurrent; import co.elastic.apm.agent.impl.transaction.AbstractSpan; +import co.elastic.apm.agent.sdk.advice.AssignTo; import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent; import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap; import net.bytebuddy.asm.Advice; @@ -32,9 +34,11 @@ import java.util.Arrays; import java.util.Collection; +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; public abstract class FutureInstrumentation extends TracerAwareInstrumentation { @@ -48,7 +52,34 @@ public Collection getInstrumentationGroupNames() { return Arrays.asList("scala-future", "experimental"); } - public static class ConstructorInstrumentation extends FutureInstrumentation { + public static class BatchedExecutionContextInstrumentation extends FutureInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return hasSuperType(named("scala.concurrent.BatchingExecutor")); + } + + @Override + public ElementMatcher getMethodMatcher() { + return named("submitForExecution").and(returns(void.class)).and(takesArguments(Runnable.class)); + } + + @Nullable + @AssignTo.Argument(0) + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + public static Runnable onExecute(@Advice.Argument(0) @Nullable Runnable runnable) { + return JavaConcurrent.withContext(runnable, tracer); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false) + public static void onExit(@Nullable @Advice.Thrown Throwable thrown, + @Advice.Argument(value = 0) @Nullable Runnable runnable) { + JavaConcurrent.doFinally(thrown, runnable); + } + } + + + public static class TransformationConstructorInstrumentation extends FutureInstrumentation { @Override public ElementMatcher getTypeMatcher() { @@ -69,13 +100,15 @@ public static void onExit(@Advice.This Object thiz) { // this span might be ended before the Promise$Transformation#run method starts // we have to avoid that this span gets recycled, even in the above mentioned case context.incrementReferences(); + // Do no discard branches leading to async operations so not to break span references + context.setNonDiscardable(); } } } } - public static class RunInstrumentation extends FutureInstrumentation { + public static class TransformationRunInstrumentation extends FutureInstrumentation { @Override public ElementMatcher getTypeMatcher() { diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation index fc420a060d..a850a9e3fe 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation @@ -1,2 +1,3 @@ -co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$ConstructorInstrumentation -co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$RunInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$BatchedExecutionContextInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationConstructorInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationRunInstrumentation diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala index 847f3f11ed..b38c03909c 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala @@ -1,7 +1,5 @@ package co.elastic.apm.agent.scalaconcurrent -import java.util.concurrent.Executors - import co.elastic.apm.agent.MockReporter import co.elastic.apm.agent.bci.ElasticApmAgent import co.elastic.apm.agent.configuration.{CoreConfiguration, SpyConfiguration} @@ -11,8 +9,9 @@ import munit.FunSuite import net.bytebuddy.agent.ByteBuddyAgent import org.stagemonitor.configuration.ConfigurationRegistry +import java.util.concurrent.{Executors, ForkJoinPool} import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future, Promise} +import scala.concurrent._ import scala.util.{Failure, Success} class FutureInstrumentationSpec extends FunSuite { @@ -20,7 +19,6 @@ class FutureInstrumentationSpec extends FunSuite { private var reporter: MockReporter = _ private var tracer: ElasticApmTracer = _ private var coreConfiguration: CoreConfiguration = _ - private var transaction: Transaction = _ override def beforeEach(context: BeforeEach): Unit = { reporter = new MockReporter @@ -29,139 +27,189 @@ class FutureInstrumentationSpec extends FunSuite { tracer = new ElasticApmTracerBuilder().configurationRegistry(config).reporter(reporter).build ElasticApmAgent.initInstrumentation(tracer, ByteBuddyAgent.install) tracer.start(false) - transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() } override def afterEach(context: AfterEach): Unit = ElasticApmAgent.reset() - test("Scala Future should propagate the tracing-context correctly across different threads") { - implicit val executionContext: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) + override def munitExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(5)) - val future = Future("Test") - .map(_.length) - .flatMap(l => Future(l * 2)) - .map(_.toString) - .flatMap(s => Future(s"$s-$s")) - .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + test("Scala Future should propagate the tracing-context correctly across different threads") { + implicit val executionContext: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - true - ) + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + val future = Future("Test") + .map(_.length) + .flatMap(l => Future(l * 2)) + .map(_.toString) + .flatMap(s => Future(s"$s-$s")) + .map(_ => tracer.currentTransaction().addCustomContext("future", true)) - } + Await.ready(future, 10.seconds) + transaction.deactivate().end() - test("Worker thread should correctly set context on the current transaction") { - implicit val executionContext: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) - - new TestFutureTraceMethods().invokeAsync(tracer) - transaction.deactivate().end() - assertEquals(reporter.getTransactions.size(), 1) - assertEquals(reporter.getSpans.size(), 0) - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - true - ) - } + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + } - test("Multiple async operations should be able to set context on the current transaction") { + test("Worker thread should correctly set context on the current transaction") { + implicit val executionContext: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) - implicit val multiPoolEc: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() - val future = Future - .traverse(1 to 100) { _ => - Future.sequence(List( - Future { - Thread.sleep(25) - tracer.currentTransaction().addCustomContext("future1", true) - }, - Future { - Thread.sleep(50) - tracer.currentTransaction().addCustomContext("future2", true) - }, - Future { - Thread.sleep(10) - tracer.currentTransaction().addCustomContext("future3", true) - } - )) - } + new TestFutureTraceMethods().invokeAsync(tracer) + transaction.deactivate().end() - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], - true - ) - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], - true - ) - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], - true - ) + assertEquals(reporter.getTransactions.size(), 1) + assertEquals(reporter.getSpans.size(), 0) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + } - } + test("Multiple async operations should be able to set context on the current transaction") { + implicit val multiPoolEc: ExecutionContext = munitExecutionContext + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + val future = Future + .traverse(1 to 100) { _ => + Future.sequence(List( + Future { + Thread.sleep(25) + tracer.currentTransaction().addCustomContext("future1", true) + }, + Future { + Thread.sleep(50) + tracer.currentTransaction().addCustomContext("future2", true) + }, + Future { + Thread.sleep(10) + tracer.currentTransaction().addCustomContext("future3", true) + } + )) + } + + Await.ready(future, 10.seconds) + + transaction.deactivate().end() + + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], + true + ) - test("Handle a combination of Promises and Futures correctly") { + } - implicit val multiPoolEc: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + test("Multiple async operations should be able to set context on the current transaction with global EC") { + implicit val multiPoolEc: ExecutionContext = ExecutionContext.global + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + val future = Future + .traverse(1 to 100) { _ => + Future.sequence(List( + Future { + Thread.sleep(25) + tracer.currentTransaction().addCustomContext("future1", true) + }, + Future { + Thread.sleep(50) + tracer.currentTransaction().addCustomContext("future2", true) + }, + Future { + Thread.sleep(10) + tracer.currentTransaction().addCustomContext("future3", true) + } + )) + } + + Await.ready(future, 10.seconds) + + transaction.deactivate().end() + + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], + true + ) - val promise = Promise[Int]() + } - Future { Thread.sleep(100) } - .map(_ => 42) - .onComplete { - case Success(value) => promise.success(value) - case Failure(exception) => promise.failure(exception) - } + test("Handle a combination of Promises and Futures correctly") { + implicit val multiPoolEc: ExecutionContext = munitExecutionContext - val future = promise - .future - .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - true - ) + val promise = Promise[Int]() - } + Future { Thread.sleep(100) } + .map(_ => 42) + .onComplete { + case Success(value) => promise.success(value) + case Failure(exception) => promise.failure(exception) + } - test("Handle a Future.sequence correctly") { + val future = promise + .future + .map(_ => tracer.currentTransaction().addCustomContext("future", true)) - implicit val multiPoolEc: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + Await.ready(future, 10.seconds) + transaction.deactivate().end() - val future = Future - .sequence(List( - Future(Thread.sleep(25)) - )) - .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - true - ) + } - } + test("Handle a Future.sequence correctly") { + implicit val multiPoolEc: ExecutionContext = munitExecutionContext + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + val future = Future + .sequence(List( + Future(Thread.sleep(25)) + )) + .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + + Await.ready(future, 10.seconds) + transaction.deactivate().end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + + } - test("Handle a combination of Promises and complex Futures correctly") { + test("Handle a combination of Promises and complex Futures correctly") { + implicit val multiPoolEc: ExecutionContext = munitExecutionContext - implicit val multiPoolEc: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() - val promise = Promise[Int]() + val promise = Promise[Int]() Future .sequence(List( @@ -174,65 +222,161 @@ class FutureInstrumentationSpec extends FunSuite { case Failure(exception) => promise.failure(exception) } - val future = promise - .future - .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) - - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], - true - ) - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], - true - ) + val future = promise + .future + .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) + + Await.ready(future, 10.seconds) + transaction.deactivate().end() + + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + true + ) + + } + + test("Scala Future should not propagate the tracing-context to unrelated threads with async deactivation") { + implicit val executionContext: ExecutionContext = munitExecutionContext + + val fs = (1 to 10).map(transactionNumber => Future { + Thread.sleep(10) + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate() + val futures = (1 to 10) + .map { futureNumber => + Future { + Thread.sleep(10) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + (transaction, futureNumber) + } + .map { case (transaction: Transaction, futureNumber: Int) => + Thread.sleep(10) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + transaction + } + } + + val future = Future.sequence(futures) + + future.onComplete(_ => transaction.deactivate().end()) + + future + + }) + + Await.result(Future.sequence(fs), 10.seconds) + + assert(tracer.currentTransaction() == null) } -} + test("Scala Future should not propagate the tracing-context to unrelated threads with deadlock") { + // See https://github.com/scala/bug/issues/12089 + implicit val executionContext: ExecutionContext = ExecutionContext.global + + val fs = (1 to 10).map { transactionNumber => + Future { + Thread.sleep(10) + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate() + + val futures = (1 to 10) + .map { futureNumber => + Future { + Thread.sleep(10) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + (transaction, futureNumber) + } + .map { case (transaction: Transaction, futureNumber: Int) => + Thread.sleep(10) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + transaction + } + } + + Await.result(Future.sequence(futures), Duration.Inf) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + transaction.deactivate().end() - private class TestFutureTraceMethods { - - /** - * Calling this method results in this method call tree: - * - * main thread | worker thread - * ------------------------------------------------------------------------------------------- - * invokeAsync | - * | | - * --- blockingMethodOnMainThread | - * | | - * --- nonBlockingMethodOnMainThread | - * | | - * --------------------------> methodOnWorkerThread - * | | - * | --- longMethod - * | - */ - def invokeAsync(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = blockingMethodOnMainThread(tracer) - - private def blockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = { - try { - Await.result(nonBlockingMethodOnMainThread(tracer), 10.seconds) - } catch { - case e: Exception => e.printStackTrace() } } - private def nonBlockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Future[Unit] = - Future(methodOnWorkerThread(tracer)) + Await.result(Future.sequence(fs), Duration.Inf) - private def methodOnWorkerThread(tracer: ElasticApmTracer): Unit = longMethod(tracer) + assert(tracer.currentTransaction() == null) + } - private def longMethod(tracer: ElasticApmTracer): Unit = { - try { - Thread.sleep(100) - tracer.currentTransaction().addCustomContext("future", true) - } catch { - case e: InterruptedException => e.printStackTrace() - } +} + +private class TestFutureTraceMethods { + + /** + * Calling this method results in this method call tree: + * + * main thread | worker thread + * ------------------------------------------------------------------------------------------- + * invokeAsync | + * | | + * --- blockingMethodOnMainThread | + * | | + * --- nonBlockingMethodOnMainThread | + * | | + * --------------------------> methodOnWorkerThread + * | | + * | --- longMethod + * | + */ + def invokeAsync(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = blockingMethodOnMainThread(tracer) + + private def blockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = { + try { + Await.result(nonBlockingMethodOnMainThread(tracer), 10.seconds) + } catch { + case e: Exception => e.printStackTrace() } + } + + private def nonBlockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Future[Unit] = + Future(methodOnWorkerThread(tracer)) + + private def methodOnWorkerThread(tracer: ElasticApmTracer): Unit = longMethod(tracer) + + private def longMethod(tracer: ElasticApmTracer): Unit = { + try { + Thread.sleep(100) + tracer.currentTransaction().addCustomContext("future", true) + } catch { + case e: InterruptedException => e.printStackTrace() + } + } }