diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java index 5debbaa660c2..d16f27dde069 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java @@ -256,17 +256,17 @@ public Duration shutdownTimeout() { return this.shutdownTimeout; } - @Override - public String toString() { - return RntbdObjectMapper.toJson(this); + public UserAgentContainer userAgent() { + return this.userAgent; } // endregion // region Methods - public UserAgentContainer userAgent() { - return this.userAgent; + @Override + public String toString() { + return RntbdObjectMapper.toJson(this); } // endregion diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHandler.java index e5b91632cb61..5518a6e4b3c4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHandler.java @@ -25,8 +25,8 @@ public class RntbdClientChannelHandler extends ChannelInitializer imple private static final AttributeKey REQUEST_MANAGER = AttributeKey.newInstance("requestManager"); private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelHandler.class); - private final Config config; private final ChannelHealthChecker healthChecker; + private final Config config; RntbdClientChannelHandler(final Config config, final ChannelHealthChecker healthChecker) { checkNotNull(healthChecker, "expected non-null healthChecker"); @@ -96,9 +96,9 @@ protected void initChannel(final Channel channel) { checkNotNull(channel); final RntbdRequestManager requestManager = new RntbdRequestManager(this.healthChecker, this.config.maxRequestsPerChannel()); - final long readerIdleTime = this.config.receiveHangDetectionTime(); - final long writerIdleTime = this.config.sendHangDetectionTime(); - final long allIdleTime = this.config.idleConnectionTimeout(); + final long readerIdleTime = this.config.receiveHangDetectionTimeInNanos(); + final long writerIdleTime = this.config.sendHangDetectionTimeInNanos(); + final long allIdleTime = this.config.idleConnectionTimeoutInNanos(); final ChannelPipeline pipeline = channel.pipeline(); pipeline.addFirst( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index 7c0efb45b1aa..c844c3fb1f1a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -69,12 +69,12 @@ public RntbdClientChannelHealthChecker(final Config config) { checkNotNull(config, "config: null"); - this.idleConnectionTimeout = config.idleConnectionTimeout(); + this.idleConnectionTimeout = config.idleConnectionTimeoutInNanos(); - this.readDelayLimit = config.receiveHangDetectionTime(); + this.readDelayLimit = config.receiveHangDetectionTimeInNanos(); checkArgument(this.readDelayLimit > readHangGracePeriod, "config.receiveHangDetectionTime: %s", this.readDelayLimit); - this.writeDelayLimit = config.sendHangDetectionTime(); + this.writeDelayLimit = config.sendHangDetectionTimeInNanos(); checkArgument(this.writeDelayLimit > writeHangGracePeriod, "config.sendHangDetectionTime: %s", this.writeDelayLimit); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java index 9130ddcbc683..6814916f1b77 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -146,7 +146,7 @@ public void onTimeout(AcquireTask task) { } } - final long idleEndpointTimeout = config.idleEndpointTimeout(); + final long idleEndpointTimeout = config.idleEndpointTimeoutInNanos(); this.idleStateDetectionScheduledFuture = this.executor.scheduleAtFixedRate( () -> { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java index c1e19967a6fe..25f8b3f947e6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java @@ -105,19 +105,19 @@ public int bufferPageSize() { } @JsonProperty - public int connectionTimeout() { + public int connectionTimeoutInMillis() { final long value = this.options.connectionTimeout().toMillis(); assert value <= Integer.MAX_VALUE; return (int)value; } @JsonProperty - public long idleConnectionTimeout() { + public long idleConnectionTimeoutInNanos() { return this.options.idleChannelTimeout().toNanos(); } @JsonProperty - public long idleEndpointTimeout() { + public long idleEndpointTimeoutInNanos() { return this.options.idleEndpointTimeout().toNanos(); } @@ -137,22 +137,22 @@ public int maxRequestsPerChannel() { } @JsonProperty - public long receiveHangDetectionTime() { + public long receiveHangDetectionTimeInNanos() { return this.options.receiveHangDetectionTime().toNanos(); } @JsonProperty - public long requestTimeout() { + public long requestTimeoutInNanos() { return this.options.requestTimeout().toNanos(); } @JsonProperty - public long sendHangDetectionTime() { + public long sendHangDetectionTimeInNanos() { return this.options.sendHangDetectionTime().toNanos(); } @JsonProperty - public long shutdownTimeout() { + public long shutdownTimeoutInNanos() { return this.options.shutdownTimeout().toNanos(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java index 04a4ce7d0d45..831402659854 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java @@ -576,8 +576,6 @@ private RntbdRequestArgs addPendingRequestRecord(final ChannelHandlerContext con } }); - boolean done = record.isDone(); - record.whenComplete((response, error) -> { this.pendingRequests.remove(id); pendingRequestTimeout.cancel(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java index e4f001cc8bed..ebf4db38a038 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java @@ -30,7 +30,7 @@ public final class RntbdRequestRecord extends CompletableFuture { private static final AtomicReferenceFieldUpdater - stateUpdater = AtomicReferenceFieldUpdater.newUpdater(RntbdRequestRecord.class, Stage.class, "stage"); + stageUpdater = AtomicReferenceFieldUpdater.newUpdater(RntbdRequestRecord.class, Stage.class, "stage"); private final RntbdRequestArgs args; private final RntbdRequestTimer timer; @@ -74,13 +74,12 @@ public Timeout newTimeout(final TimerTask task) { return this.timer.newTimeout(task); } - @JsonProperty public Stage stage() { - return stateUpdater.get(this); + return stageUpdater.get(this); } public RntbdRequestRecord stage(Stage value) { - stateUpdater.set(this, value); + stageUpdater.set(this, value); return this; } @@ -120,8 +119,10 @@ static final class JsonSerializer extends StdSerializer { } @Override - public void serialize(final RntbdRequestRecord value, final JsonGenerator generator, - final SerializerProvider provider) throws IOException { + public void serialize( + final RntbdRequestRecord value, + final JsonGenerator generator, + final SerializerProvider provider) throws IOException { generator.writeStartObject(); generator.writeObjectFieldStart("status"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestTimer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestTimer.java index afc36733d514..c0650c897e74 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestTimer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestTimer.java @@ -13,6 +13,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static com.google.common.base.Strings.lenientFormat; + public final class RntbdRequestTimer implements AutoCloseable { private static final long FIVE_MILLISECONDS = 5000000L; @@ -22,11 +24,9 @@ public final class RntbdRequestTimer implements AutoCloseable { private final Timer timer; public RntbdRequestTimer(final long requestTimeout) { - // Inspection of the HashWheelTimer code indicates that our choice of a 5 millisecond timer resolution ensures // a request will expire within 10 milliseconds of the specified requestTimeout interval. This is because // cancellation of a timeout takes two timer resolution units to complete. - this.timer = new HashedWheelTimer(FIVE_MILLISECONDS, TimeUnit.NANOSECONDS); this.requestTimeout = requestTimeout; } @@ -37,13 +37,28 @@ public long getRequestTimeout(final TimeUnit unit) { @Override public void close() { + final Set timeouts = this.timer.stop(); - if (logger.isDebugEnabled()) { - final int count = timeouts.size(); - if (count > 0) { - logger.debug("request expiration tasks cancelled: {}", count); + final int count = timeouts.size(); + + if (count == 0) { + logger.debug("no outstanding request timeout tasks"); + return; + } + + logger.debug("stopping {} request timeout tasks", count); + + for (final Timeout timeout : timeouts) { + if (!timeout.isExpired()) { + try { + timeout.task().run(timeout); + } catch (Throwable error) { + logger.warn(lenientFormat("request timeout task failed due to ", error)); + } } } + + logger.debug("{} request timeout tasks stopped", count); } public Timeout newTimeout(final TimerTask task) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java index e7c6a1e3a83e..ed0602e42d6d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java @@ -48,7 +48,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint { // region Fields private static final String TAG_NAME = RntbdServiceEndpoint.class.getSimpleName(); - private static final long QUIET_PERIOD = 2L * 1_000_000_000L; // 2 seconds + private static final long QUIET_PERIOD = 2_000_000_000L; // 2 seconds private static final AtomicLong instanceCount = new AtomicLong(); private static final Logger logger = LoggerFactory.getLogger(RntbdServiceEndpoint.class); @@ -79,7 +79,7 @@ private RntbdServiceEndpoint( .group(group) .option(ChannelOption.ALLOCATOR, config.allocator()) .option(ChannelOption.AUTO_READ, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeout()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeoutInMillis()) .option(ChannelOption.RCVBUF_ALLOCATOR, receiveBufferAllocator) .option(ChannelOption.SO_KEEPALIVE, true) .remoteAddress(physicalAddress.getHost(), physicalAddress.getPort()); @@ -330,7 +330,7 @@ public Provider(final RntbdTransportClient transportClient, final Options option this.transportClient = transportClient; this.config = new Config(options, sslContext, wireLogLevel); - this.requestTimer = new RntbdRequestTimer(config.requestTimeout()); + this.requestTimer = new RntbdRequestTimer(config.requestTimeoutInNanos()); this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory); this.endpoints = new ConcurrentHashMap<>(); @@ -349,7 +349,7 @@ public void close() { endpoint.close(); } - this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeout(), NANOSECONDS) + this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeoutInNanos(), NANOSECONDS) .addListener(future -> { if (future.isSuccess()) { logger.debug("\n [{}]\n closed endpoints", this); @@ -357,6 +357,7 @@ public void close() { } logger.error("\n [{}]\n failed to close endpoints due to ", this, future.cause()); }); + return; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdTokenStream.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdTokenStream.java index b0e73932f767..89ee834c9306 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdTokenStream.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdTokenStream.java @@ -60,7 +60,7 @@ final int computeLength() { static > T decode(final T stream) { - ByteBuf in = stream.in; + final ByteBuf in = stream.in; while (in.readableBytes() > 0) { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java index d737315ffc1b..576649a303e8 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java @@ -914,7 +914,7 @@ static class Provider implements RntbdEndpoint.Provider { Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected) { this.config = new Config(options, sslContext, LogLevel.WARN); - this.timer = new RntbdRequestTimer(config.requestTimeout()); + this.timer = new RntbdRequestTimer(config.requestTimeoutInNanos()); this.expected = expected; }