-
Notifications
You must be signed in to change notification settings - Fork 332
Add Scala concurrent plugin #1048
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
404f6fb
9ce7f3f
e0a4335
e57e2d8
3ef57c0
2dd336d
167ecde
90d0b6f
1b5b369
f20a929
b2d2bb7
2feb98f
0af62ce
f97b0b1
649207b
324ce67
9d0eb39
7ab2834
ee6da7d
ec2b7eb
d9e0762
84417b0
199e514
ad42993
49bca5f
e41f63c
a0375b7
accd469
7843966
0f7b257
f9b8e56
063ed39
6c638d9
40033fa
5ee2b3b
1721794
737bb0e
1885c8e
c893ac0
f844049
0f530a0
c6a6581
b68d3d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| 9 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| <?xml version="1.0" encoding="UTF-8"?> | ||
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
| <modelVersion>4.0.0</modelVersion> | ||
|
|
||
| <parent> | ||
| <artifactId>apm-agent-plugins</artifactId> | ||
| <groupId>co.elastic.apm</groupId> | ||
| <version>1.15.1-SNAPSHOT</version> | ||
| </parent> | ||
|
|
||
| <artifactId>apm-scala-concurrent-plugin</artifactId> | ||
| <name>${project.groupId}:${project.artifactId}</name> | ||
|
|
||
| <properties> | ||
| <apm-agent-parent.base.dir>${project.basedir}/../..</apm-agent-parent.base.dir> | ||
| <maven.compiler.source>1.9</maven.compiler.source> | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be removed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me check since Scala does not support all Java version.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tested Scala 2.12 and it doesn't work with that version. Not sure how to prevent a plugin to run with a specific version though. |
||
| <maven.compiler.target>1.9</maven.compiler.target> | ||
| <maven.compiler.testTarget>1.9</maven.compiler.testTarget> | ||
| </properties> | ||
|
|
||
| <dependencies> | ||
| <dependency> | ||
| <groupId>org.scala-lang</groupId> | ||
| <artifactId>scala-library</artifactId> | ||
| <version>2.13.1</version> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.scalameta</groupId> | ||
| <artifactId>munit_2.13</artifactId> | ||
| <version>0.7.2</version> | ||
| <scope>test</scope> | ||
| </dependency> | ||
|
felixbarny marked this conversation as resolved.
|
||
| </dependencies> | ||
|
|
||
| <build> | ||
| <testSourceDirectory>src/test/scala</testSourceDirectory> | ||
| <plugins> | ||
| <plugin> | ||
| <groupId>net.alchim31.maven</groupId> | ||
| <artifactId>scala-maven-plugin</artifactId> | ||
| <version>4.3.1</version> | ||
| <configuration> | ||
| <scalaVersion>2.13.1</scalaVersion> | ||
| </configuration> | ||
| <executions> | ||
| <execution> | ||
| <id>scala-test-compile</id> | ||
| <phase>process-test-resources</phase> | ||
| <goals> | ||
| <goal>testCompile</goal> | ||
| </goals> | ||
| </execution> | ||
| </executions> | ||
| </plugin> | ||
| </plugins> | ||
| </build> | ||
|
|
||
|
milanvdm marked this conversation as resolved.
|
||
| </project> | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,101 @@ | ||||||
| /*- | ||||||
| * #%L | ||||||
| * Elastic APM Java agent | ||||||
| * %% | ||||||
| * Copyright (C) 2018 - 2020 Elastic and contributors | ||||||
| * %% | ||||||
| * Licensed to Elasticsearch B.V. under one or more contributor | ||||||
| * license agreements. See the NOTICE file distributed with | ||||||
| * this work for additional information regarding copyright | ||||||
| * ownership. Elasticsearch B.V. licenses this file to you under | ||||||
| * the Apache License, Version 2.0 (the "License"); you may | ||||||
| * not use this file except in compliance with the License. | ||||||
| * You may obtain a copy of the License at | ||||||
| * | ||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| * | ||||||
| * Unless required by applicable law or agreed to in writing, | ||||||
| * software distributed under the License is distributed on an | ||||||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||
| * KIND, either express or implied. See the License for the | ||||||
| * specific language governing permissions and limitations | ||||||
| * under the License. | ||||||
| * #L% | ||||||
| */ | ||||||
| package co.elastic.apm.agent.scala.concurrent; | ||||||
|
|
||||||
| import co.elastic.apm.agent.bci.ElasticApmInstrumentation; | ||||||
| import co.elastic.apm.agent.bci.VisibleForAdvice; | ||||||
| import co.elastic.apm.agent.impl.transaction.TraceContextHolder; | ||||||
| import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; | ||||||
| import net.bytebuddy.asm.Advice; | ||||||
| import net.bytebuddy.description.method.MethodDescription; | ||||||
| import net.bytebuddy.description.type.TypeDescription; | ||||||
| import net.bytebuddy.matcher.ElementMatcher; | ||||||
| import scala.concurrent.Promise; | ||||||
|
|
||||||
| import javax.annotation.Nonnull; | ||||||
| import java.util.Arrays; | ||||||
| import java.util.Collection; | ||||||
|
|
||||||
| import static net.bytebuddy.matcher.ElementMatchers.*; | ||||||
|
|
||||||
| public abstract class FutureInstrumentation extends ElasticApmInstrumentation { | ||||||
|
|
||||||
| @VisibleForAdvice | ||||||
| @SuppressWarnings("WeakerAccess") | ||||||
| public static final WeakConcurrentMap<Promise<?>, TraceContextHolder<?>> promisesToContext = | ||||||
| new WeakConcurrentMap.WithInlinedExpunction<>(); | ||||||
|
|
||||||
| @Nonnull | ||||||
| @Override | ||||||
| public Collection<String> getInstrumentationGroupNames() { | ||||||
| return Arrays.asList("concurrent", "future"); | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Marking as experimental disables this instrumentation by default.
Suggested change
|
||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public ElementMatcher<? super TypeDescription> getTypeMatcher() { | ||||||
| return hasSuperType(named("scala.concurrent.impl.Promise$Transformation")); | ||||||
| } | ||||||
|
|
||||||
| public static class ConstructorInstrumentation extends FutureInstrumentation { | ||||||
|
|
||||||
| @Override | ||||||
| public ElementMatcher<? super MethodDescription> getMethodMatcher() { | ||||||
| return isConstructor(); | ||||||
|
felixbarny marked this conversation as resolved.
|
||||||
| } | ||||||
|
|
||||||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||||||
| public static void onExit(@Advice.This Promise<?> thiz) { | ||||||
| final TraceContextHolder<?> active = getActive(); | ||||||
| if (active != null) { | ||||||
| promisesToContext.put(thiz, active); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| } | ||||||
|
|
||||||
| public static class RunInstrumentation extends FutureInstrumentation { | ||||||
|
|
||||||
| @Override | ||||||
| public ElementMatcher<? super MethodDescription> getMethodMatcher() { | ||||||
| return named("run").and(returns(void.class)); | ||||||
| } | ||||||
|
|
||||||
| @VisibleForAdvice | ||||||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||||||
| public static void onEnter(@Advice.This Promise<?> thiz) { | ||||||
|
milanvdm marked this conversation as resolved.
Outdated
|
||||||
| final TraceContextHolder<?> context = promisesToContext.getIfPresent(thiz); | ||||||
| if (tracer != null && context != null) { | ||||||
| tracer.activate(context); | ||||||
|
milanvdm marked this conversation as resolved.
|
||||||
| } | ||||||
| } | ||||||
|
|
||||||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||||||
| public static void onExit(@Advice.This Promise<?> thiz) { | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Catch thrown exceptions
milanvdm marked this conversation as resolved.
Outdated
|
||||||
| promisesToContext.remove(thiz); | ||||||
|
milanvdm marked this conversation as resolved.
Outdated
milanvdm marked this conversation as resolved.
Outdated
|
||||||
| } | ||||||
|
|
||||||
| } | ||||||
|
|
||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| co.elastic.apm.agent.scala.concurrent.FutureInstrumentation$ConstructorInstrumentation | ||
| co.elastic.apm.agent.scala.concurrent.FutureInstrumentation$RunInstrumentation |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,152 @@ | ||
| package co.elastic.apm.agent.scala.concurrent | ||
|
|
||
| import java.util.concurrent.Executors | ||
|
|
||
| import co.elastic.apm.agent.MockReporter | ||
| import co.elastic.apm.agent.bci.ElasticApmAgent | ||
| import co.elastic.apm.agent.configuration.{CoreConfiguration, SpyConfiguration} | ||
| import co.elastic.apm.agent.impl.transaction.Transaction | ||
| import co.elastic.apm.agent.impl.{ElasticApmTracer, ElasticApmTracerBuilder} | ||
| import munit.FunSuite | ||
| import net.bytebuddy.agent.ByteBuddyAgent | ||
| import org.stagemonitor.configuration.ConfigurationRegistry | ||
|
|
||
| import scala.concurrent.duration._ | ||
| import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} | ||
|
|
||
| class FutureInstrumentationSpec extends FunSuite { | ||
|
|
||
| private var reporter: MockReporter = _ | ||
| private var tracer: ElasticApmTracer = _ | ||
| private var coreConfiguration: CoreConfiguration = _ | ||
| private var transaction: Transaction = _ | ||
|
|
||
| override def beforeEach(context: BeforeEach): Unit = { | ||
| reporter = new MockReporter | ||
| val config: ConfigurationRegistry = SpyConfiguration.createSpyConfig | ||
| coreConfiguration = config.getConfig(classOf[CoreConfiguration]) | ||
| tracer = new ElasticApmTracerBuilder().configurationRegistry(config).reporter(reporter).build | ||
| ElasticApmAgent.initInstrumentation(tracer, ByteBuddyAgent.install) | ||
| transaction = tracer.startRootTransaction(null).withName("Transaction").activate() | ||
|
milanvdm marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| override def afterEach(context: AfterEach): Unit = ElasticApmAgent.reset() | ||
|
|
||
| test("Scala Future should propagate the tracing-context correctly across different threads") { | ||
| implicit val executionContext: ExecutionContextExecutor = | ||
| ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) | ||
|
|
||
| Future("Test") | ||
| .map(_.length) | ||
| .flatMap(l => Future(l * 2)) | ||
| .map(_.toString) | ||
| .flatMap(s => Future(s"$s-$s")) | ||
| .map(_ => tracer.currentTransaction().addCustomContext("future", true)) | ||
| .map { _ => | ||
| transaction.deactivate().end() | ||
| assertEquals( | ||
| reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], | ||
| true | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| test("Worker thread should correctly set context on the current transaction") { | ||
| implicit val executionContext: ExecutionContextExecutor = | ||
| ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) | ||
|
|
||
| new TestFutureTraceMethods().invokeAsync(tracer) | ||
| transaction.deactivate().end() | ||
| assertEquals(reporter.getTransactions.size(), 1) | ||
| assertEquals(reporter.getSpans.size(), 0) | ||
| assertEquals( | ||
| reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], | ||
| true | ||
| ) | ||
| } | ||
|
|
||
| test("Multiple async operations should be able to set context on the current transaction") { | ||
|
|
||
| implicit val multiPoolEc: ExecutionContextExecutor = | ||
| ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) | ||
|
|
||
| Future | ||
| .traverse(1 to 100) { _ => | ||
| Future.sequence(List( | ||
| Future { | ||
| Thread.sleep(25) | ||
| tracer.currentTransaction().addCustomContext("future1", true) | ||
| }, | ||
| Future { | ||
| Thread.sleep(50) | ||
| tracer.currentTransaction().addCustomContext("future2", true) | ||
| }, | ||
| Future { | ||
| Thread.sleep(10) | ||
| tracer.currentTransaction().addCustomContext("future3", true) | ||
| } | ||
| )) | ||
| } | ||
| .map { _ => | ||
| transaction.deactivate().end() | ||
| assertEquals( | ||
| reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], | ||
| true | ||
| ) | ||
| assertEquals( | ||
| reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], | ||
| true | ||
| ) | ||
| assertEquals( | ||
| reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], | ||
| true | ||
| ) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| } | ||
|
|
||
| private class TestFutureTraceMethods { | ||
|
|
||
| /** | ||
| * Calling this method results in this method call tree: | ||
| * | ||
| * main thread | worker thread | ||
| * ------------------------------------------------------------------------------------------- | ||
| * invokeAsync | | ||
| * | | | ||
| * --- blockingMethodOnMainThread | | ||
| * | | | ||
| * --- nonBlockingMethodOnMainThread | | ||
| * | | | ||
| * --------------------------> methodOnWorkerThread | ||
| * | | | ||
| * | --- longMethod | ||
| * | | ||
| */ | ||
| def invokeAsync(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = blockingMethodOnMainThread(tracer) | ||
|
|
||
| private def blockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = { | ||
| try { | ||
| Await.result(nonBlockingMethodOnMainThread(tracer), 10.seconds) | ||
| } catch { | ||
| case e: Exception => e.printStackTrace() | ||
| } | ||
| } | ||
|
|
||
| private def nonBlockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Future[Unit] = | ||
| Future(methodOnWorkerThread(tracer)) | ||
|
|
||
| private def methodOnWorkerThread(tracer: ElasticApmTracer): Unit = longMethod(tracer) | ||
|
|
||
| private def longMethod(tracer: ElasticApmTracer): Unit = { | ||
| try { | ||
| Thread.sleep(100) | ||
| tracer.currentTransaction().addCustomContext("future", true) | ||
| } catch { | ||
| case e: InterruptedException => e.printStackTrace() | ||
| } | ||
| } | ||
|
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.