From c712efe44fc9b1209344eeb9039709b91b978146 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Wed, 23 Nov 2022 15:55:46 -0800 Subject: [PATCH 1/2] H2PriorKnowledgeFeatureParityTest.serverGracefulClose ConnectionAcceptor disable offloading Motivation: H2PriorKnowledgeFeatureParityTest.serverGracefulClose installs a ConnectionAcceptor which intercepts messages through the netty pipeline. However the filter is offloaded by default and may miss events. Modifications: - Override `requiredOffloads` when creating the `ConnectionAcceptorFactory` to disable offloading. Fixes: https://github.com/apple/servicetalk/issues/2378 --- .../H2PriorKnowledgeFeatureParityTest.java | 21 ++++++++++--- .../netty/NettyHttpServerConnectionTest.java | 31 ++++++++++++++++--- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java index b82cf89211..863a7b29c5 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java @@ -58,6 +58,9 @@ import io.servicetalk.http.api.StreamingHttpServiceFilterFactory; import io.servicetalk.http.netty.NettyHttp2ExceptionUtils.H2StreamResetException; import io.servicetalk.logging.api.LogLevel; +import io.servicetalk.transport.api.ConnectExecutionStrategy; +import io.servicetalk.transport.api.ConnectionAcceptor; +import io.servicetalk.transport.api.ConnectionAcceptorFactory; import io.servicetalk.transport.api.ConnectionContext; import io.servicetalk.transport.api.DelegatingConnectionAcceptor; import io.servicetalk.transport.api.HostAndPort; @@ -1843,11 +1846,21 @@ private InetSocketAddress bindHttpEchoServer(@Nullable StreamingHttpServiceFilte } if (connectionOnClosingLatch != null) { - serverBuilder.appendConnectionAcceptorFilter(original -> new DelegatingConnectionAcceptor(original) { + serverBuilder.appendConnectionAcceptorFilter(new ConnectionAcceptorFactory() { @Override - public Completable accept(final ConnectionContext context) { - onGracefulClosureStarted(context, connectionOnClosingLatch); - return completed(); + public ConnectionAcceptor create(final ConnectionAcceptor original) { + return new DelegatingConnectionAcceptor(original) { + @Override + public Completable accept(final ConnectionContext context) { + onGracefulClosureStarted(context, connectionOnClosingLatch); + return completed(); + } + }; + } + + @Override + public ConnectExecutionStrategy requiredOffloads() { + return ConnectExecutionStrategy.offloadNone(); } }); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java index efdae10c5c..e232c8d52e 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java @@ -17,11 +17,17 @@ import io.servicetalk.buffer.api.Buffer; import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.TestPublisher; import io.servicetalk.concurrent.api.TestSubscription; import io.servicetalk.http.api.HttpExecutionStrategy; import io.servicetalk.http.api.StreamingHttpClient; import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.transport.api.ConnectExecutionStrategy; +import io.servicetalk.transport.api.ConnectionAcceptor; +import io.servicetalk.transport.api.ConnectionAcceptorFactory; +import io.servicetalk.transport.api.ConnectionContext; +import io.servicetalk.transport.api.DelegatingConnectionAcceptor; import io.servicetalk.transport.api.ServerContext; import io.servicetalk.transport.netty.internal.ExecutionContextExtension; import io.servicetalk.transport.netty.internal.FlushStrategy; @@ -89,11 +95,26 @@ void updateFlushStrategy(HttpExecutionStrategy serverExecutionStrategy, serverContext = HttpServers.forAddress(localAddress(0)) .ioExecutor(contextRule.ioExecutor()) - .appendConnectionAcceptorFilter(original -> original.append(ctx -> { - customCancellableRef.set( - ((NettyConnectionContext) ctx).updateFlushStrategy((__, ___) -> customStrategy)); - return completed(); - })) + .appendConnectionAcceptorFilter(new ConnectionAcceptorFactory() { + @Override + public ConnectionAcceptor create(ConnectionAcceptor original) { + return new DelegatingConnectionAcceptor(original) { + @Override + public Completable accept(final ConnectionContext context) { + return Completable.defer(() -> { + customCancellableRef.set(((NettyConnectionContext) context) + .updateFlushStrategy((__, ___) -> customStrategy)); + return completed(); + }); + } + }; + } + + @Override + public ConnectExecutionStrategy requiredOffloads() { + return ConnectExecutionStrategy.offloadNone(); + } + }) .executionStrategy(serverExecutionStrategy) .listenStreaming((ctx, request, responseFactory) -> { if (handledFirstRequest.compareAndSet(false, true)) { From bfeb54bbdfe8291fb345cac8fce47ca77ca35e95 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Wed, 23 Nov 2022 16:04:47 -0800 Subject: [PATCH 2/2] make operation lazy --- .../http/netty/H2PriorKnowledgeFeatureParityTest.java | 6 ++++-- .../http/netty/NettyHttpServerConnectionTest.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java index 863a7b29c5..471dda5680 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java @@ -1852,8 +1852,10 @@ public ConnectionAcceptor create(final ConnectionAcceptor original) { return new DelegatingConnectionAcceptor(original) { @Override public Completable accept(final ConnectionContext context) { - onGracefulClosureStarted(context, connectionOnClosingLatch); - return completed(); + return Completable.defer(() -> { + onGracefulClosureStarted(context, connectionOnClosingLatch); + return completed().shareContextOnSubscribe(); + }); } }; } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java index e232c8d52e..99da8190a6 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java @@ -104,7 +104,7 @@ public Completable accept(final ConnectionContext context) { return Completable.defer(() -> { customCancellableRef.set(((NettyConnectionContext) context) .updateFlushStrategy((__, ___) -> customStrategy)); - return completed(); + return completed().shareContextOnSubscribe(); }); } };