Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not propagate gRPC deadline when propagating OTel context via javaagent. #5543

Merged
merged 2 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public final class GrpcSingletons {

public static final ServerInterceptor SERVER_INTERCEPTOR;

public static final Context.Storage STORAGE = new ContextStorageBridge();
public static final Context.Storage STORAGE = new ContextStorageBridge(false);

static {
boolean experimentalSpanAttributes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
public final class ContextStorageOverride extends Context.Storage {

private static final Context.Storage delegate = new ContextStorageBridge();
private static final Context.Storage delegate = new ContextStorageBridge(true);

@Override
public Context doAttach(Context toAttach) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public final class ContextStorageBridge extends Context.Storage {
Context.key("otel-context");
private static final Context.Key<Scope> OTEL_SCOPE = Context.key("otel-scope");

private final boolean propagateGrpcDeadline;

public ContextStorageBridge(boolean propagateGrpcDeadline) {
this.propagateGrpcDeadline = propagateGrpcDeadline;
}

@Override
public Context doAttach(Context toAttach) {
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();
Expand Down Expand Up @@ -87,6 +93,20 @@ public Context current() {
// This context has already been previously attached and associated with an OTel context. Just
// create a new context referring to the current OTel context to reflect the current stack.
// The previous context is unaffected and will continue to live in its own stack.

if (!propagateGrpcDeadline) {
// Because we are propagating gRPC context via OpenTelemetry here, we may also propagate a
// deadline where it
// wasn't present before. Notably, this could happen with no user intention when using the
// javaagent which will
// add OpenTelemetry propagation automatically, and cause that code to fail with a deadline
// cancellation. While
// ideally we could propagate deadline as well as gRPC intended, we cannot have existing
// code fail because it
// added the javaagent and choose to fork here.
current = current.fork();
}

return current.withValue(OTEL_CONTEXT, otelContext);
}
return current;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
Expand Down Expand Up @@ -1370,6 +1372,87 @@ public void sayHello(
SemanticAttributes.MESSAGE_ID, 2L))))));
}

// Regression test for
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4169
@Test
void clientCallAfterServerCompleted() throws Exception {
Server backend =
configureServer(
ServerBuilder.forPort(0)
.addService(
new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
Helloworld.Request request,
StreamObserver<Helloworld.Response> responseObserver) {
responseObserver.onNext(
Helloworld.Response.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
}))
.build()
.start();
ManagedChannel backendChannel = createChannel(backend);
closer.add(() -> backendChannel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS));
closer.add(() -> backend.shutdownNow().awaitTermination());
GreeterGrpc.GreeterBlockingStub backendStub = GreeterGrpc.newBlockingStub(backendChannel);

// This executor does not propagate context without the javaagent available.
ExecutorService executor = Executors.newFixedThreadPool(1);
closer.add(executor::shutdownNow);

CountDownLatch clientCallDone = new CountDownLatch(1);
AtomicReference<Throwable> error = new AtomicReference<>();

Server frontend =
configureServer(
ServerBuilder.forPort(0)
.addService(
new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
Helloworld.Request request,
StreamObserver<Helloworld.Response> responseObserver) {
responseObserver.onNext(
Helloworld.Response.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();

executor.execute(
() -> {
try {
backendStub.sayHello(request);
} catch (Throwable t) {
error.set(t);
}
clientCallDone.countDown();
});
}
}))
.build()
.start();
ManagedChannel frontendChannel = createChannel(frontend);
closer.add(() -> frontendChannel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS));
closer.add(() -> frontend.shutdownNow().awaitTermination());

GreeterGrpc.GreeterBlockingStub frontendStub = GreeterGrpc.newBlockingStub(frontendChannel);
frontendStub.sayHello(Helloworld.Request.newBuilder().setName("test").build());

// We don't assert on telemetry - the intention of this test is to verify that adding
// instrumentation, either as
// library or javaagent, does not cause exceptions in the business logic. The produced telemetry
// will be different
// for the two cases due to lack of context propagation in the library case, but that isn't what
// we're testing here.

clientCallDone.await(10, TimeUnit.SECONDS);

assertThat(error).hasValue(null);
}

private ManagedChannel createChannel(Server server) throws Exception {
ManagedChannelBuilder<?> channelBuilder =
configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort()));
Expand Down