Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,17 @@ public Duration shutdownTimeout() {
return this.shutdownTimeout;
}

@Override
public String toString() {
return RntbdObjectMapper.toJson(this);
public UserAgentContainer userAgent() {
return this.userAgent;
}

// endregion

// region Methods

public UserAgentContainer userAgent() {
return this.userAgent;
@Override
public String toString() {
return RntbdObjectMapper.toJson(this);
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class RntbdClientChannelHandler extends ChannelInitializer<Channel> imple

private static final AttributeKey<RntbdRequestManager> REQUEST_MANAGER = AttributeKey.newInstance("requestManager");
private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelHandler.class);
private final Config config;
private final ChannelHealthChecker healthChecker;
private final Config config;

RntbdClientChannelHandler(final Config config, final ChannelHealthChecker healthChecker) {
checkNotNull(healthChecker, "expected non-null healthChecker");
Expand Down Expand Up @@ -96,9 +96,9 @@ protected void initChannel(final Channel channel) {
checkNotNull(channel);

final RntbdRequestManager requestManager = new RntbdRequestManager(this.healthChecker, this.config.maxRequestsPerChannel());
final long readerIdleTime = this.config.receiveHangDetectionTime();
final long writerIdleTime = this.config.sendHangDetectionTime();
final long allIdleTime = this.config.idleConnectionTimeout();
final long readerIdleTime = this.config.receiveHangDetectionTimeInNanos();
final long writerIdleTime = this.config.sendHangDetectionTimeInNanos();
final long allIdleTime = this.config.idleConnectionTimeoutInNanos();
final ChannelPipeline pipeline = channel.pipeline();

pipeline.addFirst(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ public RntbdClientChannelHealthChecker(final Config config) {

checkNotNull(config, "config: null");

this.idleConnectionTimeout = config.idleConnectionTimeout();
this.idleConnectionTimeout = config.idleConnectionTimeoutInNanos();

this.readDelayLimit = config.receiveHangDetectionTime();
this.readDelayLimit = config.receiveHangDetectionTimeInNanos();
checkArgument(this.readDelayLimit > readHangGracePeriod, "config.receiveHangDetectionTime: %s", this.readDelayLimit);

this.writeDelayLimit = config.sendHangDetectionTime();
this.writeDelayLimit = config.sendHangDetectionTimeInNanos();
checkArgument(this.writeDelayLimit > writeHangGracePeriod, "config.sendHangDetectionTime: %s", this.writeDelayLimit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void onTimeout(AcquireTask task) {
}
}

final long idleEndpointTimeout = config.idleEndpointTimeout();
final long idleEndpointTimeout = config.idleEndpointTimeoutInNanos();

this.idleStateDetectionScheduledFuture = this.executor.scheduleAtFixedRate(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,19 @@ public int bufferPageSize() {
}

@JsonProperty
public int connectionTimeout() {
public int connectionTimeoutInMillis() {
final long value = this.options.connectionTimeout().toMillis();
assert value <= Integer.MAX_VALUE;
return (int)value;
}

@JsonProperty
public long idleConnectionTimeout() {
public long idleConnectionTimeoutInNanos() {
return this.options.idleChannelTimeout().toNanos();
}

@JsonProperty
public long idleEndpointTimeout() {
public long idleEndpointTimeoutInNanos() {
return this.options.idleEndpointTimeout().toNanos();
}

Expand All @@ -137,22 +137,22 @@ public int maxRequestsPerChannel() {
}

@JsonProperty
public long receiveHangDetectionTime() {
public long receiveHangDetectionTimeInNanos() {
return this.options.receiveHangDetectionTime().toNanos();
}

@JsonProperty
public long requestTimeout() {
public long requestTimeoutInNanos() {
return this.options.requestTimeout().toNanos();
}

@JsonProperty
public long sendHangDetectionTime() {
public long sendHangDetectionTimeInNanos() {
return this.options.sendHangDetectionTime().toNanos();
}

@JsonProperty
public long shutdownTimeout() {
public long shutdownTimeoutInNanos() {
return this.options.shutdownTimeout().toNanos();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,6 @@ private RntbdRequestArgs addPendingRequestRecord(final ChannelHandlerContext con
}
});

boolean done = record.isDone();

record.whenComplete((response, error) -> {
this.pendingRequests.remove(id);
pendingRequestTimeout.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public final class RntbdRequestRecord extends CompletableFuture<StoreResponse> {

private static final AtomicReferenceFieldUpdater<RntbdRequestRecord, Stage>
stateUpdater = AtomicReferenceFieldUpdater.newUpdater(RntbdRequestRecord.class, Stage.class, "stage");
stageUpdater = AtomicReferenceFieldUpdater.newUpdater(RntbdRequestRecord.class, Stage.class, "stage");

private final RntbdRequestArgs args;
private final RntbdRequestTimer timer;
Expand Down Expand Up @@ -74,13 +74,12 @@ public Timeout newTimeout(final TimerTask task) {
return this.timer.newTimeout(task);
}

@JsonProperty
public Stage stage() {
return stateUpdater.get(this);
return stageUpdater.get(this);
}

public RntbdRequestRecord stage(Stage value) {
stateUpdater.set(this, value);
stageUpdater.set(this, value);
return this;
}

Expand Down Expand Up @@ -120,8 +119,10 @@ static final class JsonSerializer extends StdSerializer<RntbdRequestRecord> {
}

@Override
public void serialize(final RntbdRequestRecord value, final JsonGenerator generator,
final SerializerProvider provider) throws IOException {
public void serialize(
final RntbdRequestRecord value,
final JsonGenerator generator,
final SerializerProvider provider) throws IOException {

generator.writeStartObject();
generator.writeObjectFieldStart("status");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Strings.lenientFormat;

public final class RntbdRequestTimer implements AutoCloseable {

private static final long FIVE_MILLISECONDS = 5000000L;
Expand All @@ -22,11 +24,9 @@ public final class RntbdRequestTimer implements AutoCloseable {
private final Timer timer;

public RntbdRequestTimer(final long requestTimeout) {

// Inspection of the HashWheelTimer code indicates that our choice of a 5 millisecond timer resolution ensures
// a request will expire within 10 milliseconds of the specified requestTimeout interval. This is because
// cancellation of a timeout takes two timer resolution units to complete.

this.timer = new HashedWheelTimer(FIVE_MILLISECONDS, TimeUnit.NANOSECONDS);
this.requestTimeout = requestTimeout;
}
Expand All @@ -37,13 +37,28 @@ public long getRequestTimeout(final TimeUnit unit) {

@Override
public void close() {

final Set<Timeout> timeouts = this.timer.stop();
if (logger.isDebugEnabled()) {
final int count = timeouts.size();
if (count > 0) {
logger.debug("request expiration tasks cancelled: {}", count);
final int count = timeouts.size();

if (count == 0) {
logger.debug("no outstanding request timeout tasks");
return;
}

logger.debug("stopping {} request timeout tasks", count);

for (final Timeout timeout : timeouts) {
if (!timeout.isExpired()) {
try {
timeout.task().run(timeout);
} catch (Throwable error) {
logger.warn(lenientFormat("request timeout task failed due to ", error));
}
}
}

logger.debug("{} request timeout tasks stopped", count);
}

public Timeout newTimeout(final TimerTask task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
// region Fields

private static final String TAG_NAME = RntbdServiceEndpoint.class.getSimpleName();
private static final long QUIET_PERIOD = 2L * 1_000_000_000L; // 2 seconds
private static final long QUIET_PERIOD = 2_000_000_000L; // 2 seconds

private static final AtomicLong instanceCount = new AtomicLong();
private static final Logger logger = LoggerFactory.getLogger(RntbdServiceEndpoint.class);
Expand Down Expand Up @@ -79,7 +79,7 @@ private RntbdServiceEndpoint(
.group(group)
.option(ChannelOption.ALLOCATOR, config.allocator())
.option(ChannelOption.AUTO_READ, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeout())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeoutInMillis())
.option(ChannelOption.RCVBUF_ALLOCATOR, receiveBufferAllocator)
.option(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress(physicalAddress.getHost(), physicalAddress.getPort());
Expand Down Expand Up @@ -330,7 +330,7 @@ public Provider(final RntbdTransportClient transportClient, final Options option

this.transportClient = transportClient;
this.config = new Config(options, sslContext, wireLogLevel);
this.requestTimer = new RntbdRequestTimer(config.requestTimeout());
this.requestTimer = new RntbdRequestTimer(config.requestTimeoutInNanos());
this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);

this.endpoints = new ConcurrentHashMap<>();
Expand All @@ -349,14 +349,15 @@ public void close() {
endpoint.close();
}

this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeout(), NANOSECONDS)
this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeoutInNanos(), NANOSECONDS)
.addListener(future -> {
if (future.isSuccess()) {
logger.debug("\n [{}]\n closed endpoints", this);
return;
}
logger.error("\n [{}]\n failed to close endpoints due to ", this, future.cause());
});

return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ final int computeLength() {

static <T extends RntbdTokenStream<?>> T decode(final T stream) {

ByteBuf in = stream.in;
final ByteBuf in = stream.in;

while (in.readableBytes() > 0) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ static class Provider implements RntbdEndpoint.Provider {

Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected) {
this.config = new Config(options, sslContext, LogLevel.WARN);
this.timer = new RntbdRequestTimer(config.requestTimeout());
this.timer = new RntbdRequestTimer(config.requestTimeoutInNanos());
this.expected = expected;
}

Expand Down