diff --git a/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWriteWorkflow.java b/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWriteWorkflow.java
index bc7a72289..a915f9a19 100644
--- a/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWriteWorkflow.java
+++ b/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWriteWorkflow.java
@@ -33,7 +33,6 @@
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
import com.microsoft.azure.cosmosdb.internal.Utils;
import com.microsoft.azure.cosmosdb.rx.internal.NotFoundException;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import rx.Observable;
import rx.Subscriber;
diff --git a/benchmark/src/test/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWritesConsistencyTest.java b/benchmark/src/test/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWritesConsistencyTest.java
index ae4c8824e..71cc1345c 100644
--- a/benchmark/src/test/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWritesConsistencyTest.java
+++ b/benchmark/src/test/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWritesConsistencyTest.java
@@ -69,11 +69,6 @@ public class ReadMyWritesConsistencyTest {
StringUtils.defaultString(Strings.emptyToNull(
System.getenv().get("DESIRED_CONSISTENCY")), "Session"));
- private final String directModeProtocol =
- System.getProperty("azure.cosmos.directModeProtocol",
- StringUtils.defaultString(Strings.emptyToNull(
- System.getenv().get("DIRECT_MODE_PROTOCOL")), Protocol.Tcp.name()));
-
private final int initialCollectionThroughput = 10_000;
private final String maxRunningTime =
@@ -122,7 +117,6 @@ public void readMyWrites(boolean useNameLink) throws Exception {
Configuration cfg = new Configuration();
new JCommander(cfg, StringUtils.split(cmd));
- logger.info("azure.cosmos.directModeProtocol={}, {}", directModeProtocol, cfg);
AtomicInteger success = new AtomicInteger();
AtomicInteger error = new AtomicInteger();
diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/Configs.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/Configs.java
index ac6db6b2f..5d58b6696 100644
--- a/commons/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/Configs.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/Configs.java
@@ -88,7 +88,13 @@ public SslContext getSslContext() {
}
public Protocol getProtocol() {
- String protocol = getJVMConfigAsString(PROTOCOL, DEFAULT_PROTOCOL.name());
+
+ String protocol = getJVMConfigAsString(PROTOCOL, StringUtils.defaultString(
+ StringUtils.defaultString(
+ System.getProperty("azure.cosmos.directModeProtocol"),
+ System.getenv("DIRECT_MODE_PROTOCOL")),
+ DEFAULT_PROTOCOL.name()));
+
try {
return Protocol.valueOf(WordUtils.capitalize(protocol.toLowerCase()));
} catch (Exception e) {
diff --git a/commons/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ConfigsTests.java b/commons/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ConfigsTests.java
index 545cb241c..9995db217 100644
--- a/commons/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ConfigsTests.java
+++ b/commons/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ConfigsTests.java
@@ -26,6 +26,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol;
+import org.apache.commons.lang3.StringUtils;
import org.testng.annotations.Test;
public class ConfigsTests {
@@ -45,7 +46,12 @@ public void maxHttpBodyLength() {
@Test(groups = { "unit" })
public void getProtocol() {
Configs config = new Configs();
- assertThat(config.getProtocol()).isEqualTo(Protocol.valueOf(System.getProperty("cosmos.directModeProtocol", "Tcp")));
+ Protocol expected = Protocol.valueOf(System.getProperty("cosmos.directModeProtocol",
+ System.getProperty("azure.cosmos.directModeProtocol",
+ StringUtils.defaultString(
+ System.getenv("DIRECT_MODE_PROTOCOL"),
+ "Tcp"))));
+ assertThat(config.getProtocol()).isEqualTo(expected);
}
@Test(groups = { "unit" })
diff --git a/direct-impl/pom.xml b/direct-impl/pom.xml
index 0e91fd03f..5b62b2b73 100644
--- a/direct-impl/pom.xml
+++ b/direct-impl/pom.xml
@@ -36,6 +36,7 @@ SOFTWARE.
UTF-8
unit
27.0.1-jre
+ 2.9.9
4.1.0
1.2.0
@@ -295,6 +296,11 @@ SOFTWARE.
${project.version}
test
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ ${jackson-datatype-jsr310.version}
+
com.google.guava
guava
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClient.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClient.java
index 251893cbc..cef5ef125 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClient.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClient.java
@@ -24,6 +24,8 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@@ -45,7 +47,9 @@
import rx.Single;
import rx.SingleEmitter;
+import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Iterator;
@@ -190,42 +194,88 @@ public static final class Options {
// region Fields
+ @JsonProperty()
private final int bufferPageSize;
- private final String certificateHostNameOverride;
+
+ @JsonProperty()
private final Duration connectionTimeout;
+
+ @JsonProperty()
private final Duration idleChannelTimeout;
+
+ @JsonProperty()
private final Duration idleEndpointTimeout;
+
+ @JsonProperty()
private final int maxBufferCapacity;
+
+ @JsonProperty()
private final int maxChannelsPerEndpoint;
+
+ @JsonProperty()
private final int maxRequestsPerChannel;
- private final int partitionCount;
+
+ @JsonProperty()
private final Duration receiveHangDetectionTime;
+
+ @JsonProperty()
private final Duration requestExpiryInterval;
+
+ @JsonProperty()
private final Duration requestTimeout;
+
+ @JsonProperty()
+ private final Duration requestTimerResolution;
+
+ @JsonProperty()
private final Duration sendHangDetectionTime;
+
+ @JsonProperty()
private final Duration shutdownTimeout;
+
+ @JsonIgnore()
private final UserAgentContainer userAgent;
// endregion
// region Constructors
+ private Options() {
+ this.bufferPageSize = 8192;
+ this.connectionTimeout = null;
+ this.idleChannelTimeout = Duration.ZERO;
+ this.idleEndpointTimeout = Duration.ofSeconds(70L);
+ this.maxBufferCapacity = 8192 << 10;
+ this.maxChannelsPerEndpoint = 10;
+ this.maxRequestsPerChannel = 30;
+ this.receiveHangDetectionTime = Duration.ofSeconds(65L);
+ this.requestExpiryInterval = Duration.ofSeconds(5L);
+ this.requestTimeout = null;
+ this.requestTimerResolution = Duration.ofMillis(5L);
+ this.sendHangDetectionTime = Duration.ofSeconds(10L);
+ this.shutdownTimeout = Duration.ofSeconds(15L);
+ this.userAgent = new UserAgentContainer();
+ }
+
private Options(Builder builder) {
+
this.bufferPageSize = builder.bufferPageSize;
- this.certificateHostNameOverride = builder.certificateHostNameOverride;
- this.connectionTimeout = builder.connectionTimeout == null ? builder.requestTimeout : builder.connectionTimeout;
this.idleChannelTimeout = builder.idleChannelTimeout;
this.idleEndpointTimeout = builder.idleEndpointTimeout;
this.maxBufferCapacity = builder.maxBufferCapacity;
this.maxChannelsPerEndpoint = builder.maxChannelsPerEndpoint;
this.maxRequestsPerChannel = builder.maxRequestsPerChannel;
- this.partitionCount = builder.partitionCount;
this.receiveHangDetectionTime = builder.receiveHangDetectionTime;
this.requestExpiryInterval = builder.requestExpiryInterval;
this.requestTimeout = builder.requestTimeout;
+ this.requestTimerResolution = builder.requestTimerResolution;
this.sendHangDetectionTime = builder.sendHangDetectionTime;
this.shutdownTimeout = builder.shutdownTimeout;
this.userAgent = builder.userAgent;
+
+ this.connectionTimeout = builder.connectionTimeout == null
+ ? builder.requestTimeout
+ : builder.connectionTimeout;
}
// endregion
@@ -236,10 +286,6 @@ public int bufferPageSize() {
return this.bufferPageSize;
}
- public String certificateHostNameOverride() {
- return this.certificateHostNameOverride;
- }
-
public Duration connectionTimeout() {
return this.connectionTimeout;
}
@@ -264,10 +310,6 @@ public int maxRequestsPerChannel() {
return this.maxRequestsPerChannel;
}
- public int partitionCount() {
- return this.partitionCount;
- }
-
public Duration receiveHangDetectionTime() {
return this.receiveHangDetectionTime;
}
@@ -280,6 +322,10 @@ public Duration requestTimeout() {
return this.requestTimeout;
}
+ public Duration requestTimerResolution() {
+ return this.requestTimerResolution;
+ }
+
public Duration sendHangDetectionTime() {
return this.sendHangDetectionTime;
}
@@ -288,17 +334,17 @@ public Duration shutdownTimeout() {
return this.shutdownTimeout;
}
- @Override
- public String toString() {
- return RntbdObjectMapper.toJson(this);
+ public UserAgentContainer userAgent() {
+ return this.userAgent;
}
// endregion
// region Methods
- public UserAgentContainer userAgent() {
- return this.userAgent;
+ @Override
+ public String toString() {
+ return RntbdObjectMapper.toJson(this);
}
// endregion
@@ -310,35 +356,105 @@ public static class Builder {
// region Fields
- private static final UserAgentContainer DEFAULT_USER_AGENT_CONTAINER = new UserAgentContainer();
- private static final Duration FIFTEEN_SECONDS = Duration.ofSeconds(15L);
- private static final Duration FIVE_SECONDS =Duration.ofSeconds(5L);
- private static final Duration SEVENTY_SECONDS = Duration.ofSeconds(70L);
- private static final Duration SIXTY_FIVE_SECONDS = Duration.ofSeconds(65L);
- private static final Duration TEN_SECONDS = Duration.ofSeconds(10L);
-
- private int bufferPageSize = 8192;
- private String certificateHostNameOverride = null;
- private Duration connectionTimeout = null;
- private Duration idleChannelTimeout = Duration.ZERO;
- private Duration idleEndpointTimeout = SEVENTY_SECONDS;
- private int maxBufferCapacity = 8192 << 10;
- private int maxChannelsPerEndpoint = 10;
- private int maxRequestsPerChannel = 30;
- private int partitionCount = 1;
- private Duration receiveHangDetectionTime = SIXTY_FIVE_SECONDS;
- private Duration requestExpiryInterval = FIVE_SECONDS;
+ private static final String DEFAULT_OPTIONS_PROPERTY_NAME = "azure.cosmos.directTcp.defaultOptions";
+ private static final Options DEFAULT_OPTIONS;
+
+ static {
+
+ // In priority order we take default Direct TCP options from:
+ //
+ // 1. the string value of system property "azure.cosmos.directTcp.options", or
+ // 2. the contents of the file located by the system property "azure.cosmos.directTcp.optionsFile", or
+ // 3. the contents of the resource file named "azure.cosmos.directTcp.options.json"
+ //
+ // Otherwise, if none of these values are set or an error occurs we create default options based on a
+ // set of hard-wired values defined in the default private parameterless constructor for
+ // RntbdTransportClient.Options.
+
+ Options options = null;
+
+ try {
+ final String string = System.getProperty(DEFAULT_OPTIONS_PROPERTY_NAME);
+
+ if (string != null) {
+ // Attempt to set default options based on the JSON string value of "{propertyName}"
+ try {
+ options = RntbdObjectMapper.readValue(string, Options.class);
+ } catch (IOException error) {
+ logger.error("failed to parse default Direct TCP options {} due to ", string, error);
+ }
+ }
+
+ if (options == null) {
+
+ final String path = System.getProperty(DEFAULT_OPTIONS_PROPERTY_NAME + "File");
+
+ if (path != null) {
+ // Attempt to load default options from the JSON file on the path specified by
+ // "{propertyName}File"
+ try {
+ options = RntbdObjectMapper.readValue(new File(path), Options.class);
+ } catch (IOException error) {
+ logger.error("failed to load default Direct TCP options from {} due to ", path, error);
+ }
+ }
+ }
+
+ if (options == null) {
+
+ final ClassLoader loader = RntbdTransportClient.class.getClassLoader();
+ final String name = DEFAULT_OPTIONS_PROPERTY_NAME + ".json";
+
+ try (final InputStream stream = loader.getResourceAsStream(name)) {
+ if (stream != null) {
+ // Attempt to load default options from the JSON resource file "{propertyName}.json"
+ options = RntbdObjectMapper.readValue(stream, Options.class);
+ }
+ } catch (IOException error) {
+ logger.error("failed to load Direct TCP options from resource {} due to ", name, error);
+ }
+ }
+ } finally {
+ DEFAULT_OPTIONS = options != null ? options : new Options();
+ }
+ }
+
+ private int bufferPageSize;
+ private Duration connectionTimeout;
+ private Duration idleChannelTimeout;
+ private Duration idleEndpointTimeout;
+ private int maxBufferCapacity;
+ private int maxChannelsPerEndpoint;
+ private int maxRequestsPerChannel;
+ private Duration receiveHangDetectionTime;
+ private Duration requestExpiryInterval;
private Duration requestTimeout;
- private Duration sendHangDetectionTime = TEN_SECONDS;
- private Duration shutdownTimeout = FIFTEEN_SECONDS;
- private UserAgentContainer userAgent = DEFAULT_USER_AGENT_CONTAINER;
+ private Duration requestTimerResolution;
+ private Duration sendHangDetectionTime;
+ private Duration shutdownTimeout;
+ private UserAgentContainer userAgent;
// endregion
// region Constructors
public Builder(Duration requestTimeout) {
+
this.requestTimeout(requestTimeout);
+
+ this.bufferPageSize = DEFAULT_OPTIONS.bufferPageSize;
+ this.connectionTimeout = DEFAULT_OPTIONS.connectionTimeout;
+ this.idleChannelTimeout = DEFAULT_OPTIONS.idleChannelTimeout;
+ this.idleEndpointTimeout = DEFAULT_OPTIONS.idleEndpointTimeout;
+ this.maxBufferCapacity = DEFAULT_OPTIONS.maxBufferCapacity;
+ this.maxChannelsPerEndpoint = DEFAULT_OPTIONS.maxChannelsPerEndpoint;
+ this.maxRequestsPerChannel = DEFAULT_OPTIONS.maxRequestsPerChannel;
+ this.receiveHangDetectionTime = DEFAULT_OPTIONS.receiveHangDetectionTime;
+ this.requestExpiryInterval = DEFAULT_OPTIONS.requestExpiryInterval;
+ this.requestTimerResolution = DEFAULT_OPTIONS.requestTimerResolution;
+ this.sendHangDetectionTime = DEFAULT_OPTIONS.sendHangDetectionTime;
+ this.shutdownTimeout = DEFAULT_OPTIONS.shutdownTimeout;
+ this.userAgent = DEFAULT_OPTIONS.userAgent;
}
public Builder(int requestTimeoutInSeconds) {
@@ -365,11 +481,6 @@ public Options build() {
return new Options(this);
}
- public Builder certificateHostNameOverride(final String value) {
- this.certificateHostNameOverride = value;
- return this;
- }
-
public Builder connectionTimeout(final Duration value) {
checkArgument(value == null || value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
@@ -412,12 +523,6 @@ public Builder maxRequestsPerChannel(final int value) {
return this;
}
- public Builder partitionCount(final int value) {
- checkArgument(value > 0, "expected positive value, not %s", value);
- this.partitionCount = value;
- return this;
- }
-
public Builder receiveHangDetectionTime(final Duration value) {
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
@@ -442,6 +547,14 @@ public Builder requestTimeout(final Duration value) {
return this;
}
+ public Builder requestTimerResolution(final Duration value) {
+ checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
+ "expected positive value, not %s",
+ value);
+ this.requestTimerResolution = value;
+ return this;
+ }
+
public Builder sendHangDetectionTime(final Duration value) {
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java
index a4d502582..ba5d2eb3c 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java
@@ -117,9 +117,9 @@ protected void initChannel(final Channel channel) {
checkNotNull(channel);
final RntbdRequestManager requestManager = new RntbdRequestManager(this.healthChecker, this.config.maxRequestsPerChannel());
- final long readerIdleTime = this.config.receiveHangDetectionTime();
- final long writerIdleTime = this.config.sendHangDetectionTime();
- final long allIdleTime = this.config.idleConnectionTimeout();
+ final long readerIdleTime = this.config.receiveHangDetectionTimeInNanos();
+ final long writerIdleTime = this.config.sendHangDetectionTimeInNanos();
+ final long allIdleTime = this.config.idleConnectionTimeoutInNanos();
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java
index d599b9ce2..d0b7c4fd2 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java
@@ -90,12 +90,12 @@ public RntbdClientChannelHealthChecker(final Config config) {
checkNotNull(config, "config: null");
- this.idleConnectionTimeout = config.idleConnectionTimeout();
+ this.idleConnectionTimeout = config.idleConnectionTimeoutInNanos();
- this.readDelayLimit = config.receiveHangDetectionTime();
+ this.readDelayLimit = config.receiveHangDetectionTimeInNanos();
checkArgument(this.readDelayLimit > readHangGracePeriod, "config.receiveHangDetectionTime: %s", this.readDelayLimit);
- this.writeDelayLimit = config.sendHangDetectionTime();
+ this.writeDelayLimit = config.sendHangDetectionTimeInNanos();
checkArgument(this.writeDelayLimit > writeHangGracePeriod, "config.sendHangDetectionTime: %s", this.writeDelayLimit);
}
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelPool.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelPool.java
index 6a0421e86..74ebe1343 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelPool.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdClientChannelPool.java
@@ -167,7 +167,7 @@ public void onTimeout(AcquireTask task) {
}
}
- final long idleEndpointTimeout = config.idleEndpointTimeout();
+ final long idleEndpointTimeout = config.idleEndpointTimeoutInNanos();
this.idleStateDetectionScheduledFuture = this.executor.scheduleAtFixedRate(
() -> {
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdEndpoint.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdEndpoint.java
index 2e7b58e83..42fc4436e 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdEndpoint.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdEndpoint.java
@@ -126,19 +126,19 @@ public int bufferPageSize() {
}
@JsonProperty
- public int connectionTimeout() {
+ public int connectionTimeoutInMillis() {
final long value = this.options.connectionTimeout().toMillis();
assert value <= Integer.MAX_VALUE;
return (int)value;
}
@JsonProperty
- public long idleConnectionTimeout() {
+ public long idleConnectionTimeoutInNanos() {
return this.options.idleChannelTimeout().toNanos();
}
@JsonProperty
- public long idleEndpointTimeout() {
+ public long idleEndpointTimeoutInNanos() {
return this.options.idleEndpointTimeout().toNanos();
}
@@ -158,22 +158,27 @@ public int maxRequestsPerChannel() {
}
@JsonProperty
- public long receiveHangDetectionTime() {
+ public long receiveHangDetectionTimeInNanos() {
return this.options.receiveHangDetectionTime().toNanos();
}
@JsonProperty
- public long requestTimeout() {
+ public long requestTimeoutInNanos() {
return this.options.requestTimeout().toNanos();
}
@JsonProperty
- public long sendHangDetectionTime() {
+ public long requestTimerResolutionInNanos() {
+ return this.options.requestTimerResolution().toNanos();
+ }
+
+ @JsonProperty
+ public long sendHangDetectionTimeInNanos() {
return this.options.sendHangDetectionTime().toNanos();
}
@JsonProperty
- public long shutdownTimeout() {
+ public long shutdownTimeoutInNanos() {
return this.options.shutdownTimeout().toNanos();
}
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdObjectMapper.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdObjectMapper.java
index c44ee62ab..ec378090d 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdObjectMapper.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdObjectMapper.java
@@ -28,18 +28,23 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.ser.PropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
+import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
+import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.handler.codec.CorruptedFrameException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -49,13 +54,38 @@ public final class RntbdObjectMapper {
private static final Logger logger = LoggerFactory.getLogger(RntbdObjectMapper.class);
private static final SimpleFilterProvider filterProvider = new SimpleFilterProvider();
- private static final ObjectMapper objectMapper = new ObjectMapper().setFilterProvider(filterProvider);
+
+ private static final ObjectMapper objectMapper = new ObjectMapper()
+ .registerModule(new SimpleModule()
+ .addSerializer(Duration.class, ToStringSerializer.instance)
+ .addDeserializer(Duration.class, DurationDeserializer.INSTANCE))
+ .setFilterProvider(filterProvider);
+
private static final ObjectWriter objectWriter = objectMapper.writer();
+
private static final ConcurrentHashMap, String> simpleClassNames = new ConcurrentHashMap<>();
private RntbdObjectMapper() {
}
+ public static T readValue(File file, Class type) throws IOException {
+ checkNotNull(file, "expected non-null file");
+ checkNotNull(type, "expected non-null type");
+ return objectMapper.readValue(file, type);
+ }
+
+ public static T readValue(InputStream stream, Class type) throws IOException {
+ checkNotNull(stream, "expected non-null stream");
+ checkNotNull(type, "expected non-null type");
+ return objectMapper.readValue(stream, type);
+ }
+
+ public static T readValue(String string, Class type) throws IOException {
+ checkNotNull(string, "expected non-null string");
+ checkNotNull(type, "expected non-null type");
+ return objectMapper.readValue(string, type);
+ }
+
public static String toJson(final Object value) {
try {
return objectWriter.writeValueAsString(value);
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdRequestTimer.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdRequestTimer.java
index 698690222..96196a924 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdRequestTimer.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdRequestTimer.java
@@ -36,24 +36,18 @@
public final class RntbdRequestTimer implements AutoCloseable {
- private static final long FIVE_MILLISECONDS = 5000000L;
-
private static final Logger logger = LoggerFactory.getLogger(RntbdRequestTimer.class);
- private final long requestTimeout;
+ private final long requestTimeoutInNanos;
private final Timer timer;
- public RntbdRequestTimer(final long requestTimeout) {
-
- // Inspection of the HashWheelTimer code indicates that our choice of a 5 millisecond timer resolution ensures
- // a request will expire within 10 milliseconds of the specified requestTimeout interval. This is because
- // cancellation of a timeout takes two timer resolution units to complete.
-
- this.timer = new HashedWheelTimer(FIVE_MILLISECONDS, TimeUnit.NANOSECONDS);
- this.requestTimeout = requestTimeout;
+ public RntbdRequestTimer(final long requestTimeoutInNanos, final long requestTimerResolutionInNanos) {
+ // The HashWheelTimer code shows that cancellation of a timeout takes two timer resolution units to complete.
+ this.timer = new HashedWheelTimer(requestTimerResolutionInNanos, TimeUnit.NANOSECONDS);
+ this.requestTimeoutInNanos = requestTimeoutInNanos;
}
public long getRequestTimeout(final TimeUnit unit) {
- return unit.convert(requestTimeout, TimeUnit.NANOSECONDS);
+ return unit.convert(requestTimeoutInNanos, TimeUnit.NANOSECONDS);
}
@Override
@@ -68,6 +62,6 @@ public void close() {
}
public Timeout newTimeout(final TimerTask task) {
- return this.timer.newTimeout(task, this.requestTimeout, TimeUnit.NANOSECONDS);
+ return this.timer.newTimeout(task, this.requestTimeoutInNanos, TimeUnit.NANOSECONDS);
}
}
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java
index aca862fc2..2bc726664 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java
@@ -100,7 +100,7 @@ private RntbdServiceEndpoint(
.group(group)
.option(ChannelOption.ALLOCATOR, config.allocator())
.option(ChannelOption.AUTO_READ, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeout())
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeoutInMillis())
.option(ChannelOption.RCVBUF_ALLOCATOR, receiveBufferAllocator)
.option(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress(physicalAddress.getHost(), physicalAddress.getPort());
@@ -351,9 +351,12 @@ public Provider(final RntbdTransportClient transportClient, final Options option
this.transportClient = transportClient;
this.config = new Config(options, sslContext, wireLogLevel);
- this.requestTimer = new RntbdRequestTimer(config.requestTimeout());
- this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);
+ this.requestTimer = new RntbdRequestTimer(
+ config.requestTimeoutInNanos(),
+ config.requestTimerResolutionInNanos());
+
+ this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);
this.endpoints = new ConcurrentHashMap<>();
this.evictions = new AtomicInteger();
this.closed = new AtomicBoolean();
@@ -370,7 +373,7 @@ public void close() {
endpoint.close();
}
- this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeout(), NANOSECONDS)
+ this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeoutInNanos(), NANOSECONDS)
.addListener(future -> {
if (future.isSuccess()) {
logger.debug("\n [{}]\n closed endpoints", this);
diff --git a/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClientTest.java b/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClientTest.java
index cbb8e45a0..6005b461f 100644
--- a/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClientTest.java
+++ b/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClientTest.java
@@ -934,7 +934,9 @@ static class Provider implements RntbdEndpoint.Provider {
Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected) {
this.config = new Config(options, sslContext, LogLevel.WARN);
- this.timer = new RntbdRequestTimer(config.requestTimeout());
+ this.timer = new RntbdRequestTimer(
+ config.requestTimeoutInNanos(),
+ config.requestTimerResolutionInNanos());
this.expected = expected;
}