From bb3d84baa27c1ddd19874e48d91043211d82c0a8 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Mon, 16 Jan 2023 18:08:32 +0100 Subject: [PATCH] Add Stork support to the new Vert.x gRPC impl --- .../grpc/deployment/GrpcClientProcessor.java | 3 + .../grpc/runtime/GrpcServerRecorder.java | 3 +- .../config/GrpcClientConfiguration.java | 8 +- .../grpc/runtime/config/StorkConfig.java | 40 ++++ .../stork/AbstractStorkMeasuringCall.java | 29 +++ .../stork/GrpcLoadBalancerProvider.java | 4 +- .../stork/GrpcStorkServiceDiscovery.java | 2 +- .../grpc/runtime/stork/StorkGrpcChannel.java | 212 ++++++++++++++++++ .../runtime/stork/StorkMeasuringCall.java | 29 +++ .../stork/StorkMeasuringCallListener.java | 32 +++ .../stork/StorkMeasuringCollector.java | 15 ++ .../stork/StorkMeasuringGrpcInterceptor.java | 62 +---- .../VertxStorkMeasuringGrpcInterceptor.java | 68 ++++++ .../grpc/runtime/supports/Channels.java | 37 ++- .../src/main/resources/application.properties | 2 +- .../hello/HelloWorldMutualTlsServiceTest.java | 2 + .../HelloWorldMutualTlsServiceTestBase.java | 5 - .../VertxHelloWorldMutualTlsEndpointTest.java | 3 - .../VertxHelloWorldMutualTlsServiceIT.java | 43 ++++ .../VertxHelloWorldMutualTlsServiceTest.java | 36 +-- .../grpc-stork-response-time/pom.xml | 6 + .../src/main/resources/application.properties | 3 + .../GrpcStorkResponseTimeCollectionTest.java | 49 +--- ...pcStorkResponseTimeCollectionTestBase.java | 51 +++++ ...txGrpcStorkResponseTimeCollectionTest.java | 10 + integration-tests/grpc-stork-simple/pom.xml | 70 ++++++ .../examples/hello/HelloWorldNewService.java | 18 ++ .../src/main/proto/helloworld.proto | 53 +++++ .../src/main/resources/application.properties | 15 ++ .../hello/HelloWorldNewServiceTest.java | 8 + .../hello/HelloWorldNewServiceTestBase.java | 36 +++ .../hello/VertxHelloWorldNewServiceTest.java | 11 + integration-tests/pom.xml | 1 + .../grpc/test/utils/GRPCTestUtils.java | 41 ++++ 34 files changed, 856 insertions(+), 151 deletions(-) create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/StorkConfig.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/AbstractStorkMeasuringCall.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkGrpcChannel.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCall.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCallListener.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCollector.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/VertxStorkMeasuringGrpcInterceptor.java create mode 100644 integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsServiceIT.java create mode 100644 integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/GrpcStorkResponseTimeCollectionTestBase.java create mode 100644 integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/VertxGrpcStorkResponseTimeCollectionTest.java create mode 100644 integration-tests/grpc-stork-simple/pom.xml create mode 100644 integration-tests/grpc-stork-simple/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewService.java create mode 100644 integration-tests/grpc-stork-simple/src/main/proto/helloworld.proto create mode 100644 integration-tests/grpc-stork-simple/src/main/resources/application.properties create mode 100644 integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewServiceTest.java create mode 100644 integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewServiceTestBase.java create mode 100644 integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldNewServiceTest.java diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java index 8d63f6e75057f..46064866e0267 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java @@ -73,6 +73,7 @@ import io.quarkus.grpc.runtime.config.GrpcClientBuildTimeConfig; import io.quarkus.grpc.runtime.stork.GrpcStorkRecorder; import io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor; +import io.quarkus.grpc.runtime.stork.VertxStorkMeasuringGrpcInterceptor; import io.quarkus.grpc.runtime.supports.Channels; import io.quarkus.grpc.runtime.supports.GrpcClientConfigProvider; import io.quarkus.grpc.runtime.supports.IOThreadClientInterceptor; @@ -97,6 +98,7 @@ void registerBeans(BuildProducer beans) { @BuildStep void registerStorkInterceptor(BuildProducer beans) { beans.produce(new AdditionalBeanBuildItem(StorkMeasuringGrpcInterceptor.class)); + beans.produce(new AdditionalBeanBuildItem(VertxStorkMeasuringGrpcInterceptor.class)); } @BuildStep @@ -407,6 +409,7 @@ SyntheticBeanBuildItem clientInterceptorStorage(GrpcClientRecorder recorder, Rec // it's okay if this one is not used: superfluousInterceptors.remove(StorkMeasuringGrpcInterceptor.class.getName()); + superfluousInterceptors.remove(VertxStorkMeasuringGrpcInterceptor.class.getName()); if (!superfluousInterceptors.isEmpty()) { LOGGER.warnf("At least one unused gRPC client interceptor found: %s. If there are meant to be used globally, " + "annotate them with @GlobalInterceptor.", String.join(", ", superfluousInterceptors)); diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index 8465e4387bae1..48f4d4b39f254 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -94,11 +94,12 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, if (grpcContainer == null) { throw new IllegalStateException("gRPC not initialized, GrpcContainer not found"); } - Vertx vertx = vertxSupplier.getValue(); if (hasNoServices(grpcContainer.getServices()) && LaunchMode.current() != LaunchMode.DEVELOPMENT) { LOGGER.error("Unable to find beans exposing the `BindableService` interface - not starting the gRPC server"); + return; // OK? } + Vertx vertx = vertxSupplier.getValue(); GrpcServerConfiguration configuration = cfg.server; GrpcBuilderProvider provider = GrpcBuilderProvider.findServerBuilderProvider(configuration); diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java index 754913c635780..bd1ccca8623e9 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java @@ -34,6 +34,12 @@ public class GrpcClientConfiguration { @ConfigItem public InProcess inProcess; + /** + * Configure Stork usage with new Vert.x gRPC, if enabled. + */ + @ConfigItem + public StorkConfig stork; + /** * The gRPC service port. */ @@ -168,7 +174,7 @@ public class GrpcClientConfiguration { /** * Use a custom load balancing policy. - * Accepted values are: {@code pick_value}, {@code round_robin}, {@code grpclb}. + * Accepted values are: {@code pick_first}, {@code round_robin}, {@code grpclb}. * This value is ignored if name-resolver is set to 'stork'. */ @ConfigItem(defaultValue = "pick_first") diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/StorkConfig.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/StorkConfig.java new file mode 100644 index 0000000000000..0faaad79e3ec7 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/StorkConfig.java @@ -0,0 +1,40 @@ +package io.quarkus.grpc.runtime.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +/** + * Stork config for new Vert.x gRPC + */ +@ConfigGroup +public class StorkConfig { + /** + * Number of threads on a delayed gRPC ClientCall + */ + @ConfigItem(defaultValue = "10") + public int threads; + + /** + * Deadline in milliseconds of delayed gRPC call + */ + @ConfigItem(defaultValue = "5000") + public long deadline; + + /** + * Number of retries on a gRPC ClientCall + */ + @ConfigItem(defaultValue = "3") + public int retries; + + /** + * Initial delay in seconds on refresh check + */ + @ConfigItem(defaultValue = "60") + public long delay; + + /** + * Refresh period in seconds + */ + @ConfigItem(defaultValue = "120") + public long period; +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/AbstractStorkMeasuringCall.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/AbstractStorkMeasuringCall.java new file mode 100644 index 0000000000000..00f7c671f8e21 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/AbstractStorkMeasuringCall.java @@ -0,0 +1,29 @@ +package io.quarkus.grpc.runtime.stork; + +import io.grpc.ClientCall; +import io.grpc.ForwardingClientCall; +import io.smallrye.stork.api.ServiceInstance; + +abstract class AbstractStorkMeasuringCall extends ForwardingClientCall.SimpleForwardingClientCall + implements StorkMeasuringCollector { + final boolean recordTime; + + protected AbstractStorkMeasuringCall(ClientCall delegate, boolean recordTime) { + super(delegate); + this.recordTime = recordTime; + } + + protected abstract ServiceInstance serviceInstance(); + + public void recordReply() { + if (serviceInstance() != null && recordTime) { + serviceInstance().recordReply(); + } + } + + public void recordEnd(Throwable error) { + if (serviceInstance() != null) { + serviceInstance().recordEnd(error); + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcLoadBalancerProvider.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcLoadBalancerProvider.java index a71eea514e464..ef24163c1870a 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcLoadBalancerProvider.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcLoadBalancerProvider.java @@ -2,8 +2,8 @@ import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import static io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor.STORK_MEASURE_TIME; -import static io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor.STORK_SERVICE_INSTANCE; +import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_MEASURE_TIME; +import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_SERVICE_INSTANCE; import java.util.Collections; import java.util.Comparator; diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcStorkServiceDiscovery.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcStorkServiceDiscovery.java index 564992306212a..ccfc477a44fb0 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcStorkServiceDiscovery.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcStorkServiceDiscovery.java @@ -121,7 +121,7 @@ private void informListener(List instances) { socketAddresses.add(new InetSocketAddress(inetAddress, instance.getPort())); } } catch (UnknownHostException e) { - log.errorf(e, "Ignoring wrong host: '%s' for service name '%s'", instance.getHost(), + log.warnf(e, "Ignoring wrong host: '%s' for service name '%s'", instance.getHost(), serviceName); } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkGrpcChannel.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkGrpcChannel.java new file mode 100644 index 0000000000000..97791c2190fea --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkGrpcChannel.java @@ -0,0 +1,212 @@ +package io.quarkus.grpc.runtime.stork; + +import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_MEASURE_TIME; +import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_SERVICE_INSTANCE; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.Deadline; +import io.grpc.MethodDescriptor; +import io.grpc.internal.DelayedClientCall; +import io.quarkus.grpc.runtime.config.StorkConfig; +import io.smallrye.mutiny.Uni; +import io.smallrye.stork.Stork; +import io.smallrye.stork.api.Service; +import io.smallrye.stork.api.ServiceInstance; +import io.vertx.core.net.SocketAddress; +import io.vertx.grpc.client.GrpcClient; +import io.vertx.grpc.client.GrpcClientChannel; + +public class StorkGrpcChannel extends Channel implements AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(StorkGrpcChannel.class); + + private final Map services = new ConcurrentHashMap<>(); + private final Map channels = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler; + + private final GrpcClient client; + private final String serviceName; + private final StorkConfig stork; + private final Executor executor; + + private static class Context { + Service service; + boolean measureTime; + ServiceInstance instance; + InetSocketAddress address; + Channel channel; + AtomicReference ref; + } + + public StorkGrpcChannel(GrpcClient client, String serviceName, StorkConfig stork, Executor executor) { + this.client = client; + this.serviceName = serviceName; + this.stork = stork; + this.executor = executor; + this.scheduler = new ScheduledThreadPoolExecutor(stork.threads); + this.scheduler.scheduleAtFixedRate(this::refresh, stork.delay, stork.period, TimeUnit.SECONDS); + } + + @Override + public ClientCall newCall(MethodDescriptor methodDescriptor, + CallOptions callOptions) { + Service service = Stork.getInstance().getService(serviceName); + if (service == null) { + throw new IllegalStateException("No service definition for serviceName " + serviceName + " found."); + } + + Context context = new Context(); + context.service = service; + // handle this calls here + Boolean measureTime = STORK_MEASURE_TIME.get(); + context.measureTime = measureTime != null && measureTime; + context.ref = STORK_SERVICE_INSTANCE.get(); + + DelayedClientCall delayed = new StorkDelayedClientCall<>(executor, scheduler, + Deadline.after(stork.deadline, TimeUnit.MILLISECONDS)); + + asyncCall(methodDescriptor, callOptions, context) + .onFailure() + .retry() + .atMost(stork.retries) + .subscribe() + .asCompletionStage() + .thenApply(delayed::setCall) + .thenAccept(Runnable::run) + .exceptionally(t -> { + delayed.cancel("Failed to create new Stork ClientCall", t); + return null; + }); + + return delayed; + } + + private Uni> asyncCall( + MethodDescriptor methodDescriptor, CallOptions callOptions, Context context) { + Uni entry = pickServiceInstanceWithChannel(context); + return entry.map(c -> { + ServiceInstance instance = c.instance; + long serviceId = instance.getId(); + Channel channel = c.channel; + try { + services.put(serviceId, instance); + channels.put(serviceId, channel); + return channel.newCall(methodDescriptor, callOptions); + } catch (Exception ex) { + // remove, no good + services.remove(serviceId); + channels.remove(serviceId); + throw new IllegalStateException(ex); + } + }); + } + + @Override + public String authority() { + return null; + } + + @Override + public void close() { + scheduler.shutdown(); + } + + @Override + public String toString() { + return super.toString() + String.format(" [%s]", serviceName); + } + + private void refresh() { + // any better way to know which are OK / bad? + services.clear(); + channels.clear(); + } + + private Uni pickServiceInstanceWithChannel(Context context) { + Uni uni = pickServerInstance(context.service, context.measureTime); + return uni + .map(si -> { + context.instance = si; + if (si.gatherStatistics() && context.ref != null) { + context.ref.set(si); + } + return context; + }) + .invoke(this::checkSocketAddress) + .invoke(c -> { + ServiceInstance instance = context.instance; + InetSocketAddress isa = context.address; + context.channel = channels.computeIfAbsent(instance.getId(), id -> { + SocketAddress address = SocketAddress.inetSocketAddress(isa.getPort(), isa.getHostName()); + return new GrpcClientChannel(client, address); + }); + }); + } + + private Uni pickServerInstance(Service service, boolean measureTime) { + return Uni.createFrom() + .deferred(() -> { + if (services.isEmpty()) { + return service.getInstances() + .invoke(l -> l.forEach(s -> services.put(s.getId(), s))); + } else { + List list = new ArrayList<>(services.values()); + return Uni.createFrom().item(list); + } + }) + .invoke(list -> { + // list should not be empty + sort by id + list.sort(Comparator.comparing(ServiceInstance::getId)); + }) + .map(list -> service.selectInstanceAndRecordStart(list, measureTime)); + } + + private void checkSocketAddress(Context context) { + ServiceInstance instance = context.instance; + Set socketAddresses = new HashSet<>(); + try { + for (InetAddress inetAddress : InetAddress.getAllByName(instance.getHost())) { + socketAddresses.add(new InetSocketAddress(inetAddress, instance.getPort())); + } + } catch (UnknownHostException e) { + log.warn("Ignoring wrong host: '{}' for service name '{}'", instance.getHost(), serviceName, e); + } + + if (!socketAddresses.isEmpty()) { + context.address = socketAddresses.iterator().next(); // pick first + } else { + long serviceId = instance.getId(); + services.remove(serviceId); + channels.remove(serviceId); + throw new IllegalStateException("Failed to determine working socket addresses for service-name: " + serviceName); + } + } + + private static class StorkDelayedClientCall extends DelayedClientCall { + public StorkDelayedClientCall(Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) { + super(callExecutor, scheduler, deadline); + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCall.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCall.java new file mode 100644 index 0000000000000..2b3dcce876932 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCall.java @@ -0,0 +1,29 @@ +package io.quarkus.grpc.runtime.stork; + +import io.grpc.ClientCall; +import io.grpc.ForwardingClientCall; +import io.smallrye.stork.api.ServiceInstance; + +abstract class StorkMeasuringCall extends ForwardingClientCall.SimpleForwardingClientCall + implements StorkMeasuringCollector { + final boolean recordTime; + + protected StorkMeasuringCall(ClientCall delegate, boolean recordTime) { + super(delegate); + this.recordTime = recordTime; + } + + protected abstract ServiceInstance serviceInstance(); + + public void recordReply() { + if (serviceInstance() != null && recordTime) { + serviceInstance().recordReply(); + } + } + + public void recordEnd(Throwable error) { + if (serviceInstance() != null) { + serviceInstance().recordEnd(error); + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCallListener.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCallListener.java new file mode 100644 index 0000000000000..32dcb0cf6e154 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCallListener.java @@ -0,0 +1,32 @@ +package io.quarkus.grpc.runtime.stork; + +import io.grpc.ClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.Status; + +class StorkMeasuringCallListener + extends ForwardingClientCallListener.SimpleForwardingClientCallListener { + final StorkMeasuringCollector collector; + + public StorkMeasuringCallListener(ClientCall.Listener responseListener, StorkMeasuringCollector collector) { + super(responseListener); + this.collector = collector; + } + + @Override + public void onMessage(RespT message) { + collector.recordReply(); + super.onMessage(message); + } + + @Override + public void onClose(Status status, Metadata trailers) { + Exception error = null; + if (!status.isOk()) { + error = status.asException(trailers); + } + collector.recordEnd(error); + super.onClose(status, trailers); + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCollector.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCollector.java new file mode 100644 index 0000000000000..4bd14b44ea988 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringCollector.java @@ -0,0 +1,15 @@ +package io.quarkus.grpc.runtime.stork; + +import java.util.concurrent.atomic.AtomicReference; + +import io.grpc.Context; +import io.smallrye.stork.api.ServiceInstance; + +interface StorkMeasuringCollector { + Context.Key> STORK_SERVICE_INSTANCE = Context.key("stork.service-instance"); + Context.Key STORK_MEASURE_TIME = Context.key("stork.measure-time"); + + void recordReply(); + + void recordEnd(Throwable error); +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringGrpcInterceptor.java index 3dc8716b501cc..bd23be1764674 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringGrpcInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/StorkMeasuringGrpcInterceptor.java @@ -10,20 +10,13 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.Context; -import io.grpc.ForwardingClientCall; -import io.grpc.ForwardingClientCallListener; import io.grpc.Metadata; import io.grpc.MethodDescriptor; -import io.grpc.Status; import io.smallrye.stork.api.ServiceInstance; @ApplicationScoped public class StorkMeasuringGrpcInterceptor implements ClientInterceptor, Prioritized { - public static final Context.Key> STORK_SERVICE_INSTANCE = Context - .key("stork.service-instance"); - public static final Context.Key STORK_MEASURE_TIME = Context.key("stork.measure-time"); - @Override public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { @@ -35,19 +28,22 @@ public int getPriority() { return Integer.MAX_VALUE - 100; } - private static class StorkMeasuringCall extends ForwardingClientCall.SimpleForwardingClientCall { + private static class StorkMeasuringCall extends AbstractStorkMeasuringCall { ServiceInstance serviceInstance; - final boolean recordTime; - protected StorkMeasuringCall(ClientCall delegate, - MethodDescriptor.MethodType type) { - super(delegate); - this.recordTime = type == MethodDescriptor.MethodType.UNARY; + protected StorkMeasuringCall(ClientCall delegate, MethodDescriptor.MethodType type) { + super(delegate, type == MethodDescriptor.MethodType.UNARY); + } + + @Override + protected ServiceInstance serviceInstance() { + return serviceInstance; } @Override public void start(final ClientCall.Listener responseListener, final Metadata metadata) { - Context context = Context.current().withValues(STORK_SERVICE_INSTANCE, new AtomicReference<>(), + Context context = Context.current().withValues( + STORK_SERVICE_INSTANCE, new AtomicReference<>(), STORK_MEASURE_TIME, recordTime); Context oldContext = context.attach(); try { @@ -57,43 +53,5 @@ public void start(final ClientCall.Listener responseListener, final Metad context.detach(oldContext); } } - - void recordReply() { - if (serviceInstance != null && recordTime) { - serviceInstance.recordReply(); - } - } - - void recordEnd(Throwable error) { - if (serviceInstance != null) { - serviceInstance.recordEnd(error); - } - } - } - - private static class StorkMeasuringCallListener - extends ForwardingClientCallListener.SimpleForwardingClientCallListener { - final StorkMeasuringCall collector; - - public StorkMeasuringCallListener(ClientCall.Listener responseListener, StorkMeasuringCall collector) { - super(responseListener); - this.collector = collector; - } - - @Override - public void onMessage(RespT message) { - collector.recordReply(); - super.onMessage(message); - } - - @Override - public void onClose(Status status, Metadata trailers) { - Exception error = null; - if (!status.isOk()) { - error = status.asException(trailers); - } - collector.recordEnd(error); - super.onClose(status, trailers); - } } } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/VertxStorkMeasuringGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/VertxStorkMeasuringGrpcInterceptor.java new file mode 100644 index 0000000000000..07f5d3a3d53d4 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/VertxStorkMeasuringGrpcInterceptor.java @@ -0,0 +1,68 @@ +package io.quarkus.grpc.runtime.stork; + +import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_MEASURE_TIME; +import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_SERVICE_INSTANCE; + +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.spi.Prioritized; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.smallrye.stork.api.ServiceInstance; + +/** + * Similar to {@link StorkMeasuringGrpcInterceptor}, but with different entry points, + * since we use delayed {@link StorkGrpcChannel}. + */ +@ApplicationScoped +public class VertxStorkMeasuringGrpcInterceptor implements ClientInterceptor, Prioritized { + + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, + Channel next) { + boolean recordTime = method.getType() == MethodDescriptor.MethodType.UNARY; + Context context = Context.current().withValues( + STORK_SERVICE_INSTANCE, new AtomicReference<>(), + STORK_MEASURE_TIME, recordTime); + Context oldContext = context.attach(); + try { + return new VertxStorkMeasuringCall<>(next.newCall(method, callOptions), recordTime); + } finally { + context.detach(oldContext); + } + } + + @Override + public int getPriority() { + return Integer.MAX_VALUE - 100; + } + + private static class VertxStorkMeasuringCall extends AbstractStorkMeasuringCall { + ServiceInstance serviceInstance; + + protected VertxStorkMeasuringCall(ClientCall delegate, boolean recordTime) { + super(delegate, recordTime); + } + + @Override + protected ServiceInstance serviceInstance() { + return serviceInstance; + } + + @Override + public void start(final Listener responseListener, final Metadata metadata) { + AtomicReference ref = STORK_SERVICE_INSTANCE.get(); + if (ref != null) { + serviceInstance = ref.get(); + } + super.start(new StorkMeasuringCallListener<>(responseListener, this), metadata); + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java index 52e9b1ff0e888..e29e8320bc91e 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java @@ -26,6 +26,7 @@ import jakarta.enterprise.context.spi.CreationalContext; +import org.eclipse.microprofile.context.ManagedExecutor; import org.jboss.logging.Logger; import io.grpc.CallOptions; @@ -42,6 +43,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.quarkus.arc.Arc; +import io.quarkus.arc.ArcContainer; import io.quarkus.arc.BeanDestroyer; import io.quarkus.arc.InstanceHandle; import io.quarkus.grpc.GrpcClient; @@ -51,7 +53,9 @@ import io.quarkus.grpc.runtime.config.GrpcClientConfiguration; import io.quarkus.grpc.runtime.config.GrpcServerConfiguration; import io.quarkus.grpc.runtime.config.SslClientConfig; +import io.quarkus.grpc.runtime.stork.StorkGrpcChannel; import io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor; +import io.quarkus.grpc.runtime.stork.VertxStorkMeasuringGrpcInterceptor; import io.quarkus.grpc.spi.GrpcBuilderProvider; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.util.ClassPathUtils; @@ -76,8 +80,9 @@ private Channels() { @SuppressWarnings("rawtypes") public static Channel createChannel(String name, Set perClientInterceptors) throws Exception { - InstanceHandle instance = Arc.container().instance(GrpcClientConfigProvider.class); + ArcContainer container = Arc.container(); + InstanceHandle instance = container.instance(GrpcClientConfigProvider.class); if (!instance.isAvailable()) { throw new IllegalStateException("Unable to find the GrpcClientConfigProvider"); } @@ -117,11 +122,15 @@ public static Channel createChannel(String name, Set perClientIntercepto } // Client-side interceptors - GrpcClientInterceptorContainer interceptorContainer = Arc.container() + GrpcClientInterceptorContainer interceptorContainer = container .instance(GrpcClientInterceptorContainer.class).get(); if (stork) { perClientInterceptors = new HashSet<>(perClientInterceptors); - perClientInterceptors.add(StorkMeasuringGrpcInterceptor.class.getName()); + if (vertxGrpc) { + perClientInterceptors.add(VertxStorkMeasuringGrpcInterceptor.class.getName()); + } else { + perClientInterceptors.add(StorkMeasuringGrpcInterceptor.class.getName()); + } } boolean plainText = config.ssl.trustStore.isEmpty(); @@ -278,9 +287,15 @@ public static Channel createChannel(String name, Set perClientIntercepto // io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractClientName options.setMetricsName("grpc|" + name); - Vertx vertx = Arc.container().instance(Vertx.class).get(); + Vertx vertx = container.instance(Vertx.class).get(); io.vertx.grpc.client.GrpcClient client = io.vertx.grpc.client.GrpcClient.client(vertx, options); - Channel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(port, host)); + Channel channel; + if (stork) { + ManagedExecutor executor = container.instance(ManagedExecutor.class).get(); + channel = new StorkGrpcChannel(client, config.host, config.stork, executor); // host = service-name + } else { + channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(port, host)); + } LOGGER.debugf("Target for client '%s': %s", name, host + ":" + port); List interceptors = new ArrayList<>(); @@ -289,7 +304,7 @@ public static Channel createChannel(String name, Set perClientIntercepto LOGGER.info("Creating Vert.x gRPC channel ..."); - return new InternalGrpcChannel(client, ClientInterceptors.intercept(channel, interceptors)); + return new InternalGrpcChannel(client, channel, ClientInterceptors.intercept(channel, interceptors)); } } @@ -380,8 +395,12 @@ public void destroy(Channel instance, CreationalContext creationalConte } } else if (instance instanceof InternalGrpcChannel) { InternalGrpcChannel channel = (InternalGrpcChannel) instance; - LOGGER.info("Shutting down Vert.x gRPC channel " + channel.delegate); + Channel original = channel.original; + LOGGER.info("Shutting down Vert.x gRPC channel " + original); try { + if (original instanceof StorkGrpcChannel) { + ((StorkGrpcChannel) original).close(); + } channel.client.close().toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); } catch (ExecutionException | TimeoutException e) { LOGGER.warn("Unable to shutdown channel after 10 seconds", e); @@ -395,10 +414,12 @@ public void destroy(Channel instance, CreationalContext creationalConte private static class InternalGrpcChannel extends Channel { private final io.vertx.grpc.client.GrpcClient client; + private final Channel original; private final Channel delegate; - public InternalGrpcChannel(io.vertx.grpc.client.GrpcClient client, Channel delegate) { + public InternalGrpcChannel(io.vertx.grpc.client.GrpcClient client, Channel original, Channel delegate) { this.client = client; + this.original = original; this.delegate = delegate; } diff --git a/integration-tests/grpc-mutual-auth/src/main/resources/application.properties b/integration-tests/grpc-mutual-auth/src/main/resources/application.properties index 45a14d8fe3fe8..86259c5cf33cb 100644 --- a/integration-tests/grpc-mutual-auth/src/main/resources/application.properties +++ b/integration-tests/grpc-mutual-auth/src/main/resources/application.properties @@ -7,7 +7,7 @@ quarkus.grpc.clients.hello.name-resolver=stork quarkus.stork."hello-service".service-discovery.type=static quarkus.stork."hello-service".service-discovery.address-list=${quarkus.http.host}:9001 #%test.quarkus.stork."hello-service".service-discovery.address-list=${quarkus.http.host}:9001 -#%vertx.quarkus.stork."hello-service".service-discovery.address-list=${quarkus.http.host}:8444 +%vertx.quarkus.stork."hello-service".service-discovery.address-list=${quarkus.http.host}:8444 quarkus.stork."hello-service".load-balancer.type=round-robin quarkus.grpc.clients.hello.ssl.certificate=tls/client.pem diff --git a/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldMutualTlsServiceTest.java b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldMutualTlsServiceTest.java index 088f5e3a369fd..a0eae4cf521ae 100644 --- a/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldMutualTlsServiceTest.java +++ b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldMutualTlsServiceTest.java @@ -1,5 +1,7 @@ package io.quarkus.grpc.examples.hello; +import static io.quarkus.grpc.test.utils.GRPCTestUtils.stream; + import java.io.InputStream; import org.junit.jupiter.api.AfterEach; diff --git a/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldMutualTlsServiceTestBase.java b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldMutualTlsServiceTestBase.java index 76160043d3e87..39eb81b5be2f7 100644 --- a/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldMutualTlsServiceTestBase.java +++ b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldMutualTlsServiceTestBase.java @@ -2,7 +2,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.io.InputStream; import java.time.Duration; import org.junit.jupiter.api.Test; @@ -17,10 +16,6 @@ class HelloWorldMutualTlsServiceTestBase { Channel channel; - protected InputStream stream(String resource) { - return getClass().getClassLoader().getResourceAsStream(resource); - } - @Test public void testHelloWorldServiceUsingBlockingStub() { GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel); diff --git a/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsEndpointTest.java b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsEndpointTest.java index 2bdd006abdedc..6f152dd8291df 100644 --- a/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsEndpointTest.java +++ b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsEndpointTest.java @@ -2,8 +2,6 @@ import jakarta.inject.Inject; -import org.junit.jupiter.api.Disabled; - import io.quarkus.grpc.test.utils.VertxGRPCTestProfile; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; @@ -11,7 +9,6 @@ @QuarkusTest @TestProfile(VertxGRPCTestProfile.class) -@Disabled class VertxHelloWorldMutualTlsEndpointTest extends VertxHelloWorldMutualTlsEndpointTestBase { @Inject diff --git a/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsServiceIT.java b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsServiceIT.java new file mode 100644 index 0000000000000..77f4faec3c390 --- /dev/null +++ b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsServiceIT.java @@ -0,0 +1,43 @@ +package io.quarkus.grpc.examples.hello; + +import java.util.Map; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; + +import io.grpc.Channel; +import io.quarkus.grpc.test.utils.GRPCTestUtils; +import io.quarkus.grpc.test.utils.VertxGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; +import io.vertx.core.Vertx; +import io.vertx.grpc.client.GrpcClient; + +@QuarkusTest +@TestProfile(VertxGRPCTestProfile.class) +@Disabled("quarkus.http.ssl.client-auth is set to 'required' but it is build time fixed to 'NONE'. " + + "Did you change the property quarkus.http.ssl.client-auth after building the application?" + + "" + + "How to get around this? ... As this would be a good / needed requirement for Vert.x based gRPC native test?") +class VertxHelloWorldMutualTlsServiceIT extends HelloWorldMutualTlsServiceTestBase { + + Vertx vertx; + + GrpcClient client; + + @BeforeEach + public void init() throws Exception { + vertx = Vertx.vertx(); + Map.Entry pair = GRPCTestUtils.tls(vertx, "tls/ca.pem", "tls/client.pem", "tls/client.key"); + client = pair.getKey(); + channel = pair.getValue(); + } + + @AfterEach + public void cleanup() { + GRPCTestUtils.close(client); + GRPCTestUtils.close(vertx); + } + +} diff --git a/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsServiceTest.java b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsServiceTest.java index 6ad308b2db07b..b434365255c9d 100644 --- a/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsServiceTest.java +++ b/integration-tests/grpc-mutual-auth/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldMutualTlsServiceTest.java @@ -1,28 +1,22 @@ package io.quarkus.grpc.examples.hello; -import java.io.InputStream; +import java.util.Map; import jakarta.inject.Inject; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; +import io.grpc.Channel; +import io.quarkus.grpc.test.utils.GRPCTestUtils; import io.quarkus.grpc.test.utils.VertxGRPCTestProfile; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpClientOptions; -import io.vertx.core.net.PemKeyCertOptions; -import io.vertx.core.net.PemTrustOptions; -import io.vertx.core.net.SocketAddress; import io.vertx.grpc.client.GrpcClient; -import io.vertx.grpc.client.GrpcClientChannel; @QuarkusTest @TestProfile(VertxGRPCTestProfile.class) -@Disabled class VertxHelloWorldMutualTlsServiceTest extends HelloWorldMutualTlsServiceTestBase { @Inject @@ -32,30 +26,14 @@ class VertxHelloWorldMutualTlsServiceTest extends HelloWorldMutualTlsServiceTest @BeforeEach public void init() throws Exception { - HttpClientOptions options = new HttpClientOptions(); - options.setUseAlpn(true); - options.setSsl(true); - Buffer buffer; - try (InputStream stream = stream("tls/ca.pem")) { - buffer = Buffer.buffer(stream.readAllBytes()); - } - Buffer cb; - try (InputStream stream = stream("tls/client.pem")) { - cb = Buffer.buffer(stream.readAllBytes()); - } - Buffer ck; - try (InputStream stream = stream("tls/client.key")) { - ck = Buffer.buffer(stream.readAllBytes()); - } - options.setTrustOptions(new PemTrustOptions().addCertValue(buffer)); - options.setKeyCertOptions(new PemKeyCertOptions().setCertValue(cb).setKeyValue(ck)); - client = GrpcClient.client(vertx, options); - channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(8444, "localhost")); + Map.Entry pair = GRPCTestUtils.tls(vertx, "tls/ca.pem", "tls/client.pem", "tls/client.key"); + client = pair.getKey(); + channel = pair.getValue(); } @AfterEach public void cleanup() { - client.close().toCompletionStage().toCompletableFuture().join(); + GRPCTestUtils.close(client); } } diff --git a/integration-tests/grpc-stork-response-time/pom.xml b/integration-tests/grpc-stork-response-time/pom.xml index 0877560a8482b..eeec149dd4927 100644 --- a/integration-tests/grpc-stork-response-time/pom.xml +++ b/integration-tests/grpc-stork-response-time/pom.xml @@ -30,6 +30,12 @@ io.smallrye.stork stork-load-balancer-least-response-time + + io.quarkus + quarkus-test-grpc + ${project.version} + test + io.quarkus quarkus-junit5 diff --git a/integration-tests/grpc-stork-response-time/src/main/resources/application.properties b/integration-tests/grpc-stork-response-time/src/main/resources/application.properties index c717e692c3571..42e17510cf78f 100644 --- a/integration-tests/grpc-stork-response-time/src/main/resources/application.properties +++ b/integration-tests/grpc-stork-response-time/src/main/resources/application.properties @@ -13,3 +13,6 @@ quarkus.stork.hello-service2.service-discovery.address-list=localhost:9013,local quarkus.stork.hello-service2.load-balancer.type=least-response-time #quarkus.log.category."io.quarkus.grpc.runtime.stork".level=DEBUG + +%vertx.quarkus.grpc.clients.hello1.use-quarkus-grpc-client=true +%vertx.quarkus.grpc.clients.hello2.use-quarkus-grpc-client=true diff --git a/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/GrpcStorkResponseTimeCollectionTest.java b/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/GrpcStorkResponseTimeCollectionTest.java index df542dc9be698..2cd13722866fa 100644 --- a/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/GrpcStorkResponseTimeCollectionTest.java +++ b/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/GrpcStorkResponseTimeCollectionTest.java @@ -1,54 +1,7 @@ package io.quarkus.grpc.examples.stork; -import static io.restassured.RestAssured.get; -import static io.restassured.RestAssured.given; -import static io.restassured.RestAssured.post; -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.jupiter.api.Test; - import io.quarkus.test.junit.QuarkusTest; -import io.restassured.response.Response; @QuarkusTest -class GrpcStorkResponseTimeCollectionTest { - - @Test - public void shouldCallConfigurableIfFaster() { - given().body("0") - .when().post("/test/delay") - .then().statusCode(200); - List responses = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - Response response = get("/test/unary/1"); - response.then().statusCode(200); - responses.add(response.asString()); - } - - assertThat(responses.stream().filter(r -> r.equals("moderately-slow"))) - .hasSizeLessThan(5); - assertThat(responses.stream().filter(r -> r.equals("configurable"))) - .hasSizeGreaterThan(5); - } - - @Test - public void shouldCallModerateIfFaster() { - given().body("1000") - .when().post("/test/delay") - .then().statusCode(200); - List responses = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - Response response = get("/test/unary/2"); - response.then().statusCode(200); - responses.add(response.asString()); - } - - assertThat(responses.stream().filter(r -> r.equals("moderately-slow"))) - .hasSizeGreaterThan(5); - assertThat(responses.stream().filter(r -> r.equals("configurable"))) - .hasSizeLessThan(5); - } +class GrpcStorkResponseTimeCollectionTest extends GrpcStorkResponseTimeCollectionTestBase { } diff --git a/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/GrpcStorkResponseTimeCollectionTestBase.java b/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/GrpcStorkResponseTimeCollectionTestBase.java new file mode 100644 index 0000000000000..96e340c5730bc --- /dev/null +++ b/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/GrpcStorkResponseTimeCollectionTestBase.java @@ -0,0 +1,51 @@ +package io.quarkus.grpc.examples.stork; + +import static io.restassured.RestAssured.get; +import static io.restassured.RestAssured.given; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import io.restassured.response.Response; + +class GrpcStorkResponseTimeCollectionTestBase { + + @Test + public void shouldCallConfigurableIfFaster() { + given().body("0") + .when().post("/test/delay") + .then().statusCode(200); + List responses = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Response response = get("/test/unary/1"); + response.then().statusCode(200); + responses.add(response.asString()); + } + + assertThat(responses.stream().filter(r -> r.equals("moderately-slow"))) + .hasSizeLessThan(5); + assertThat(responses.stream().filter(r -> r.equals("configurable"))) + .hasSizeGreaterThan(5); + } + + @Test + public void shouldCallModerateIfFaster() { + given().body("1000") + .when().post("/test/delay") + .then().statusCode(200); + List responses = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Response response = get("/test/unary/2"); + response.then().statusCode(200); + responses.add(response.asString()); + } + + assertThat(responses.stream().filter(r -> r.equals("moderately-slow"))) + .hasSizeGreaterThan(5); + assertThat(responses.stream().filter(r -> r.equals("configurable"))) + .hasSizeLessThan(5); + } +} diff --git a/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/VertxGrpcStorkResponseTimeCollectionTest.java b/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/VertxGrpcStorkResponseTimeCollectionTest.java new file mode 100644 index 0000000000000..05011872ec5e1 --- /dev/null +++ b/integration-tests/grpc-stork-response-time/src/test/java/io/quarkus/grpc/examples/stork/VertxGrpcStorkResponseTimeCollectionTest.java @@ -0,0 +1,10 @@ +package io.quarkus.grpc.examples.stork; + +import io.quarkus.grpc.test.utils.VertxGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@QuarkusTest +@TestProfile(VertxGRPCTestProfile.class) +class VertxGrpcStorkResponseTimeCollectionTest extends GrpcStorkResponseTimeCollectionTestBase { +} diff --git a/integration-tests/grpc-stork-simple/pom.xml b/integration-tests/grpc-stork-simple/pom.xml new file mode 100644 index 0000000000000..7500fd12cc7ef --- /dev/null +++ b/integration-tests/grpc-stork-simple/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-integration-test-grpc-stork-simple + Quarkus - Integration Tests - gRPC - Stork - Simple + + + + io.quarkus + quarkus-grpc + + + io.smallrye.stork + stork-service-discovery-static-list + + + + io.quarkus + quarkus-junit5 + test + + + io.quarkus + quarkus-test-grpc + ${project.version} + test + + + + + io.quarkus + quarkus-grpc-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + + diff --git a/integration-tests/grpc-stork-simple/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewService.java b/integration-tests/grpc-stork-simple/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewService.java new file mode 100644 index 0000000000000..a0ac2a65df90a --- /dev/null +++ b/integration-tests/grpc-stork-simple/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewService.java @@ -0,0 +1,18 @@ +package io.quarkus.grpc.examples.hello; + +import examples.HelloReply; +import examples.HelloRequest; +import examples.MutinyGreeterGrpc; +import io.quarkus.grpc.GrpcService; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class HelloWorldNewService extends MutinyGreeterGrpc.GreeterImplBase { + + @Override + public Uni sayHello(HelloRequest request) { + String name = request.getName(); + return Uni.createFrom().item("Hello " + name) + .map(res -> HelloReply.newBuilder().setMessage(res).build()); + } +} diff --git a/integration-tests/grpc-stork-simple/src/main/proto/helloworld.proto b/integration-tests/grpc-stork-simple/src/main/proto/helloworld.proto new file mode 100644 index 0000000000000..5e400c9d4549c --- /dev/null +++ b/integration-tests/grpc-stork-simple/src/main/proto/helloworld.proto @@ -0,0 +1,53 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "examples"; +option java_outer_classname = "HelloWorldProto"; +option objc_class_prefix = "HLW"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/integration-tests/grpc-stork-simple/src/main/resources/application.properties b/integration-tests/grpc-stork-simple/src/main/resources/application.properties new file mode 100644 index 0000000000000..c0c4063977229 --- /dev/null +++ b/integration-tests/grpc-stork-simple/src/main/resources/application.properties @@ -0,0 +1,15 @@ +quarkus.grpc.clients.hello.host=hello-service +quarkus.grpc.clients.hello.name-resolver=stork + +quarkus.stork."hello-service".service-discovery.type=static +quarkus.stork."hello-service".service-discovery.address-list=badd-url:9000,${quarkus.http.host}:9001 +%vertx.quarkus.stork."hello-service".service-discovery.address-list=badd-url:8081,${quarkus.http.host}:8081 +quarkus.stork."hello-service".load-balancer.type=round-robin + +quarkus.grpc.server.port=9001 + +quarkus.grpc.clients.hello.port=9001 + +%vertx.quarkus.grpc.clients.hello.port=8081 +%vertx.quarkus.grpc.clients.hello.use-quarkus-grpc-client=true +%vertx.quarkus.grpc.server.use-separate-server=false \ No newline at end of file diff --git a/integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewServiceTest.java b/integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewServiceTest.java new file mode 100644 index 0000000000000..c679285d93c40 --- /dev/null +++ b/integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewServiceTest.java @@ -0,0 +1,8 @@ +package io.quarkus.grpc.examples.hello; + +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +class HelloWorldNewServiceTest extends HelloWorldNewServiceTestBase { + +} diff --git a/integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewServiceTestBase.java b/integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewServiceTestBase.java new file mode 100644 index 0000000000000..4838978d74a74 --- /dev/null +++ b/integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewServiceTestBase.java @@ -0,0 +1,36 @@ +package io.quarkus.grpc.examples.hello; + +import java.time.Duration; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import examples.GreeterGrpc; +import examples.HelloReply; +import examples.HelloRequest; +import examples.MutinyGreeterGrpc; +import io.quarkus.grpc.GrpcClient; + +class HelloWorldNewServiceTestBase { + + @GrpcClient("hello") + GreeterGrpc.GreeterBlockingStub stub; + + @GrpcClient("hello") + MutinyGreeterGrpc.MutinyGreeterStub mutiny; + + @Test + public void testHelloWorldServiceUsingBlockingStub() { + HelloReply reply = stub.sayHello(HelloRequest.newBuilder().setName("neo-blocking").build()); + Assertions.assertEquals(reply.getMessage(), "Hello neo-blocking"); + } + + @Test + public void testHelloWorldServiceUsingMutinyStub() { + HelloReply reply = mutiny + .sayHello(HelloRequest.newBuilder().setName("neo-blocking").build()) + .await().atMost(Duration.ofSeconds(5)); + Assertions.assertEquals(reply.getMessage(), "Hello neo-blocking"); + } + +} diff --git a/integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldNewServiceTest.java b/integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldNewServiceTest.java new file mode 100644 index 0000000000000..b44e4c24f6faa --- /dev/null +++ b/integration-tests/grpc-stork-simple/src/test/java/io/quarkus/grpc/examples/hello/VertxHelloWorldNewServiceTest.java @@ -0,0 +1,11 @@ +package io.quarkus.grpc.examples.hello; + +import io.quarkus.grpc.test.utils.VertxGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@QuarkusTest +@TestProfile(VertxGRPCTestProfile.class) +class VertxHelloWorldNewServiceTest extends HelloWorldNewServiceTestBase { + +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 3995a75f2be86..42662af0b9f60 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -351,6 +351,7 @@ grpc-external-proto grpc-external-proto-test grpc-stork-response-time + grpc-stork-simple grpc-exceptions google-cloud-functions-http google-cloud-functions diff --git a/test-framework/grpc/src/main/java/io/quarkus/grpc/test/utils/GRPCTestUtils.java b/test-framework/grpc/src/main/java/io/quarkus/grpc/test/utils/GRPCTestUtils.java index f12de0ab25199..6ccc2841ce71a 100644 --- a/test-framework/grpc/src/main/java/io/quarkus/grpc/test/utils/GRPCTestUtils.java +++ b/test-framework/grpc/src/main/java/io/quarkus/grpc/test/utils/GRPCTestUtils.java @@ -1,5 +1,9 @@ package io.quarkus.grpc.test.utils; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,6 +14,10 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.core.net.PemTrustOptions; import io.vertx.core.net.SocketAddress; import io.vertx.grpc.client.GrpcClient; import io.vertx.grpc.client.GrpcClientChannel; @@ -53,6 +61,39 @@ public static void close(GrpcClient client) { client.close().toCompletionStage().toCompletableFuture().join(); } + public static InputStream stream(String resource) { + return GRPCTestUtils.class.getClassLoader().getResourceAsStream(resource); + } + + public static Map.Entry tls( + Vertx vertx, + String caPem, + String clientPem, + String clientKey) throws IOException { + HttpClientOptions options = new HttpClientOptions(); + options.setUseAlpn(true); + options.setSsl(true); + Buffer buffer; + try (InputStream stream = stream(caPem)) { + buffer = Buffer.buffer(stream.readAllBytes()); + } + Buffer cb; + try (InputStream stream = stream(clientPem)) { + cb = Buffer.buffer(stream.readAllBytes()); + } + Buffer ck; + try (InputStream stream = stream(clientKey)) { + ck = Buffer.buffer(stream.readAllBytes()); + } + options.setTrustOptions(new PemTrustOptions().addCertValue(buffer)); + options.setKeyCertOptions(new PemKeyCertOptions().setCertValue(cb).setKeyValue(ck)); + + GrpcClient client = GrpcClient.client(vertx, options); + Channel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(8444, "localhost")); + + return Map.entry(client, channel); + } + private static class InternalChannel extends Channel { private final Channel delegate; private final GrpcClient client;