Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always set default ServiceTalkSocketOptions#IDLE_TIMEOUT #2174

Merged
merged 4 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@

final class H2ClientParentConnectionContext extends H2ParentConnectionContext {
private H2ClientParentConnectionContext(Channel channel, HttpExecutionContext executionContext,
FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs,
FlushStrategy flushStrategy, long idleTimeoutMs,
@Nullable final SslConfig sslConfig,
final KeepAliveManager keepAliveManager) {
super(channel, executionContext, flushStrategy, idleTimeoutMs, sslConfig, keepAliveManager);
Expand All @@ -106,7 +106,7 @@ static Single<H2ClientParentConnection> initChannel(Channel channel, HttpExecuti
H2ProtocolConfig config,
StreamingHttpRequestResponseFactory reqRespFactory,
FlushStrategy parentFlushStrategy,
@Nullable Long idleTimeoutMs,
long idleTimeoutMs,
@Nullable SslConfig sslConfig,
ChannelInitializer initializer,
ConnectionObserver observer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,12 @@ class H2ParentConnectionContext extends NettyChannelListenableAsyncCloseable imp
private final KeepAliveManager keepAliveManager;
@Nullable
private final SslConfig sslConfig;
@Nullable
final Long idleTimeoutMs;
final long idleTimeoutMs;
@Nullable
private SSLSession sslSession;

H2ParentConnectionContext(final Channel channel, final HttpExecutionContext executionContext,
final FlushStrategy flushStrategy, @Nullable final Long idleTimeoutMs,
final FlushStrategy flushStrategy, final long idleTimeoutMs,
@Nullable final SslConfig sslConfig,
final KeepAliveManager keepAliveManager) {
super(channel, executionContext.executor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ final class H2ServerParentConnectionContext extends H2ParentConnectionContext im
private final SocketAddress listenAddress;
private H2ServerParentConnectionContext(final Channel channel, final HttpExecutionContext executionContext,
final FlushStrategy flushStrategy,
@Nullable final Long idleTimeoutMs,
final long idleTimeoutMs,
@Nullable final SslConfig sslConfig,
final SocketAddress listenAddress,
final KeepAliveManager keepAliveManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,18 @@ void stIdleTimeoutSocketOption(HttpProtocol protocol) throws Exception {

@ParameterizedTest(name = "protocol={0}")
@EnumSource(HttpProtocol.class)
void stIdleTimeoutSocketOptionIsNull(HttpProtocol protocol) throws Exception {
testSocketOption(ServiceTalkSocketOptions.IDLE_TIMEOUT, is(nullValue()), equalTo("null"), null, protocol);
void stIdleTimeoutSocketOptionIsZero(HttpProtocol protocol) throws Exception {
testSocketOption(ServiceTalkSocketOptions.IDLE_TIMEOUT, is(0L), equalTo("0"), 0L, protocol);
}

private <T> void testSocketOption(SocketOption<T> socketOption, Matcher<Object> clientMatcher,
Matcher<Object> serverMatcher, HttpProtocol protocol) throws Exception {
private static <T> void testSocketOption(SocketOption<T> socketOption, Matcher<Object> clientMatcher,
Matcher<Object> serverMatcher, HttpProtocol protocol) throws Exception {
testSocketOption(socketOption, clientMatcher, serverMatcher, null, protocol);
}

private <T> void testSocketOption(SocketOption<T> socketOption, Matcher<Object> clientMatcher,
Matcher<Object> serverMatcher, @Nullable Long idleTimeoutMs,
HttpProtocol protocol)
private static <T> void testSocketOption(SocketOption<T> socketOption, Matcher<Object> clientMatcher,
Matcher<Object> serverMatcher, @Nullable Long idleTimeoutMs,
HttpProtocol protocol)
throws Exception {
try (ServerContext serverContext = startServer(idleTimeoutMs, socketOption, protocol);
BlockingHttpClient client = newClient(serverContext, idleTimeoutMs, protocol);
Expand All @@ -104,8 +104,8 @@ private <T> void testSocketOption(SocketOption<T> socketOption, Matcher<Object>
}
}

private <T> ServerContext startServer(@Nullable Long idleTimeoutMs, SocketOption<T> socketOption,
HttpProtocol protocol) throws Exception {
private static <T> ServerContext startServer(@Nullable Long idleTimeoutMs, SocketOption<T> socketOption,
HttpProtocol protocol) throws Exception {
final HttpServerBuilder builder = HttpServers.forAddress(localAddress(0))
.protocols(protocol.config);
if (idleTimeoutMs != null) {
Expand All @@ -115,8 +115,8 @@ private <T> ServerContext startServer(@Nullable Long idleTimeoutMs, SocketOption
.payloadBody(valueOf(ctx.socketOption(socketOption)), textSerializerUtf8()));
}

private BlockingHttpClient newClient(ServerContext serverContext, @Nullable Long idleTimeoutMs,
HttpProtocol protocol) {
private static BlockingHttpClient newClient(ServerContext serverContext, @Nullable Long idleTimeoutMs,
HttpProtocol protocol) {
SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builder =
HttpClients.forSingleAddress(serverHostAndPort(serverContext))
.protocols(protocol.config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exception
SEC, null,
(channel, observer) -> DefaultNettyConnection.initChannel(channel, SEC.bufferAllocator(),
SEC.executor(), SEC.ioExecutor(),
forPipelinedRequestResponse(false, channel.config()), defaultFlushStrategy(), null,
forPipelinedRequestResponse(false, channel.config()), defaultFlushStrategy(), 0L,
null,
new TcpServerChannelInitializer(sConfig, observer).andThen(
channel2 -> {
Expand All @@ -440,7 +440,7 @@ void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exception
closeHandlerRef.compareAndSet(null, closeHandler);
return DefaultNettyConnection.initChannel(channel, CEC.bufferAllocator(),
CEC.executor(), CEC.ioExecutor(),
closeHandler, defaultFlushStrategy(), null,
closeHandler, defaultFlushStrategy(), 0L,
cConfig.tcpConfig().sslConfig(),
new TcpClientChannelInitializer(cConfig.tcpConfig(),
connectionObserver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void setUp() throws Exception {
CloseHandler closeHandler = UNSUPPORTED_PROTOCOL_CLOSE_HANDLER;
final DefaultNettyConnection<Integer, Integer> connection =
DefaultNettyConnection.<Integer, Integer>initChannel(channel, DEFAULT_ALLOCATOR,
immediate(), null, closeHandler, defaultFlushStrategy(), null, null, channel2 -> {
immediate(), null, closeHandler, defaultFlushStrategy(), 0L, null, channel2 -> {
channel2.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
abstract class AbstractReadOnlyTcpConfig<SecurityConfig> {
@SuppressWarnings("rawtypes")
private final Map<ChannelOption, Object> options;
@Nullable
private final Long idleTimeoutMs;
private final long idleTimeoutMs;
private final FlushStrategy flushStrategy;
@Nullable
private final UserDataLoggerConfig wireLoggerConfig;
Expand Down Expand Up @@ -77,8 +76,7 @@ public final Map<ChannelOption, Object> options() {
*
* @return idle timeout in milliseconds
*/
@Nullable
public final Long idleTimeoutMs() {
public final long idleTimeoutMs() {
return idleTimeoutMs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ abstract class AbstractTcpConfig<SslConfigType> {
@Nullable
@SuppressWarnings("rawtypes")
private Map<ChannelOption, Object> options;
@Nullable
private Long idleTimeoutMs;
private long idleTimeoutMs = 300_000L; // 5 min
private FlushStrategy flushStrategy = defaultFlushStrategy();
@Nullable
private UserDataLoggerConfig wireLoggerConfig;
Expand All @@ -70,8 +69,7 @@ final Map<ChannelOption, Object> options() {
return options;
}

@Nullable
final Long idleTimeoutMs() {
final long idleTimeoutMs() {
return idleTimeoutMs;
}

Expand Down Expand Up @@ -109,6 +107,9 @@ public final <T> void socketOption(final SocketOption<T> option, T value) {
requireNonNull(value);
if (option == ServiceTalkSocketOptions.IDLE_TIMEOUT) {
idleTimeoutMs = (Long) value;
if (idleTimeoutMs < 0) {
throw new IllegalArgumentException("IDLE_TIMEOUT: " + idleTimeoutMs + " (expected>=0)");
}
} else {
if (options == null) {
options = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config,
sslContext != null && !deferSslHandler, true));
}

if (config.idleTimeoutMs() != null) {
if (config.idleTimeoutMs() > 0L) {
delegate = delegate.andThen(new IdleTimeoutInitializer(config.idleTimeoutMs()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public TcpServerChannelInitializer(final ReadOnlyTcpServerConfig config,
new ConnectionObserverInitializer(observer, config.sslContext() != null, false));
}

if (config.idleTimeoutMs() != null) {
if (config.idleTimeoutMs() > 0L) {
delegate = delegate.andThen(new IdleTimeoutInitializer(config.idleTimeoutMs()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void testRegisteredAndActiveEventsFired() throws Exception {
serverContext.listenAddress(), new TcpClientConfig().asReadOnly(), false,
CLIENT_CTX, (channel, connectionObserver) -> DefaultNettyConnection.initChannel(channel,
CLIENT_CTX.bufferAllocator(), CLIENT_CTX.executor(), CLIENT_CTX.ioExecutor(), closeHandler,
defaultFlushStrategy(), null, null, channel2 ->
defaultFlushStrategy(), 0L, null, channel2 ->
channel2.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class ServiceTalkSocketOptions {
new ServiceTalkSocketOption<>("WRITE_BUFFER_THRESHOLD", Integer.class);

/**
* Connection idle timeout in milliseconds after which the connection is closed.
* Connection idle timeout in milliseconds after which the connection is closed. {@code 0} disables idle timeout.
*/
public static final SocketOption<Long> IDLE_TIMEOUT = new ServiceTalkSocketOption<>("IDLE_TIMEOUT", Long.class);

Expand Down
Loading