Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context propagation for ratpack Execution.fork() #3416

Merged
merged 1 commit into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.ratpack;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import ratpack.func.Action;

public class DefaultExecStarterInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return nameStartsWith("ratpack.exec.internal.DefaultExecController$")
.and(implementsInterface(named("ratpack.exec.ExecStarter")));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("onComplete")
.or(named("onError"))
.and(takesArgument(0, named("ratpack.func.Action"))),
DefaultExecStarterInstrumentation.class.getName() + "$WrapActionAdvice");
}

public static class WrapActionAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapAction(@Advice.Argument(value = 0, readOnly = false) Action<?> action) {
action = ActionWrapper.wrapIfNeeded(action);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new ContinuationInstrumentation(),
new DefaultExecutionInstrumentation(),
new DefaultExecStarterInstrumentation(),
new ServerErrorHandlerInstrumentation(),
new ServerRegistryInstrumentation());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@ import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEn
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicServerSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan

import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse
import io.opentelemetry.testing.internal.armeria.common.HttpMethod
import ratpack.error.ServerErrorHandler
import ratpack.exec.Execution
import ratpack.exec.Promise
import ratpack.exec.Result
import ratpack.exec.util.ParallelBatch
import ratpack.server.RatpackServer

class RatpackForkedHttpServerTest extends RatpackHttpServerTest {
Expand Down Expand Up @@ -109,11 +117,47 @@ class RatpackForkedHttpServerTest extends RatpackHttpServerTest {
}
}
}
it.prefix("fork_and_yieldAll") {
it.all {context ->
def promise = Promise.async { upstream ->
Execution.fork().start({
upstream.accept(Result.success(SUCCESS))
})
}
ParallelBatch.of(promise).yieldAll().flatMap { list ->
Promise.sync { list.get(0).value }
} then { endpoint ->
controller(endpoint) {
context.response.status(endpoint.status).send(endpoint.body)
}
}
}
}
}
}

assert ratpack.bindPort == bindPort
assert ratpack.bindHost == 'localhost'
return ratpack
}

def "test fork and yieldAll"() {
setup:
def url = address.resolve("fork_and_yieldAll").toString()
url = url.replace("http://", "h1c://")
def request = AggregatedHttpRequest.of(HttpMethod.GET, url)
AggregatedHttpResponse response = client.execute(request).aggregate().join()

expect:
response.status().code() == SUCCESS.status
response.contentUtf8() == SUCCESS.body

assertTraces(1) {
trace(0, 3) {
basicServerSpan(it, 0, "/fork_and_yieldAll")
basicSpan(it, 1, "/fork_and_yieldAll", span(0))
basicSpan(it, 2, "controller", span(1))
}
}
}
}