diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ClientTelemetryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ClientTelemetryTest.java index 9a7e7cdf32b2..dacc9a31c6da 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ClientTelemetryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ClientTelemetryTest.java @@ -22,6 +22,7 @@ import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.rx.TestSuiteBase; import com.azure.cosmos.rx.proxy.HttpProxyServer; +import io.netty.channel.ChannelOption; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import org.testng.annotations.AfterClass; @@ -34,6 +35,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -126,7 +128,7 @@ public void operationsList(CosmosClient cosmosClient) throws Exception { ClientTelemetry clientTelemetry = cosmosClient.asyncClient().getContextClient().getClientTelemetry(); setClientTelemetrySchedulingInSec(clientTelemetry, 5); - clientTelemetry.init(); + clientTelemetry.init().subscribe(); InternalObjectNode internalObjectNode = getInternalObjectNode(); cosmosContainer.createItem(internalObjectNode); //create operation @@ -190,7 +192,7 @@ public void operationsListWithNoTelemetry() throws Exception { "clientTelemetrySchedulingSec"); backgroundRefreshLocationTimeIntervalInMSField.setAccessible(true); backgroundRefreshLocationTimeIntervalInMSField.setInt(clientTelemetry, 5); - clientTelemetry.init(); + clientTelemetry.init().subscribe(); InternalObjectNode internalObjectNode = getInternalObjectNode(); cosmosContainer.createItem(internalObjectNode); // create operation @@ -240,8 +242,8 @@ public void systemInfo(CosmosClient cosmosClient) throws Exception { public void httpClientTests(CosmosClient cosmosClient) throws Exception { // Test using different http client for client telemetry requests and metaRequests ClientTelemetry clientTelemetry = cosmosClient.asyncClient().getContextClient().getClientTelemetry(); - HttpClient clientTelemetryHttpClient = ReflectionUtils.getClientTelemetryMetadataHttpClient(clientTelemetry); - HttpClient clientTelemetryMetadataHttpClient = ReflectionUtils.getClientTelemetryHttpClint(clientTelemetry); + HttpClient clientTelemetryMetadataHttpClient = ReflectionUtils.getClientTelemetryMetadataHttpClient(clientTelemetry); + HttpClient clientTelemetryHttpClient = ReflectionUtils.getClientTelemetryHttpClint(clientTelemetry); assertThat(clientTelemetryHttpClient).isNotSameAs(clientTelemetryMetadataHttpClient); @@ -251,10 +253,58 @@ public void httpClientTests(CosmosClient cosmosClient) throws Exception { AtomicReference vmMetadata = ReflectionUtils.getAzureVMMetadata(clientTelemetry); vmMetadata.set(null); - clientTelemetry.init(); + clientTelemetry.init().subscribe(); assertThat(clientTelemetryMetadataHttpClientWrapper.capturedRequests.size()).isEqualTo(1); } + + @Test(groups = {"emulator"}, dataProvider = "clients", timeOut = TIMEOUT) + public void shouldDisableIMDSAccess(CosmosClient cosmosClient) throws Exception { + // Test using different http client for client telemetry requests and metaRequests + + System.setProperty("COSMOS.DISABLE_IMDS_ACCESS", "true"); + + ClientTelemetry clientTelemetry = cosmosClient.asyncClient().getContextClient().getClientTelemetry(); + HttpClient clientTelemetryMetadataHttpClient = ReflectionUtils.getClientTelemetryMetadataHttpClient(clientTelemetry); + HttpClient clientTelemetryHttpClient = ReflectionUtils.getClientTelemetryHttpClint(clientTelemetry); + + assertThat(clientTelemetryHttpClient).isNotSameAs(clientTelemetryMetadataHttpClient); + + // Test metadataHttpClient is used for IMDS requests + HttpClientUnderTestWrapper clientTelemetryMetadataHttpClientWrapper = new HttpClientUnderTestWrapper(clientTelemetryHttpClient); + ReflectionUtils.setClientTelemetryMetadataHttpClient(clientTelemetry, clientTelemetryMetadataHttpClientWrapper.getSpyHttpClient()); + AtomicReference vmMetadata = ReflectionUtils.getAzureVMMetadata(clientTelemetry); + vmMetadata.set(null); + + clientTelemetry.init().subscribe(); + // Call should not go through loading azure VM metadata + assertThat(clientTelemetryMetadataHttpClientWrapper.capturedRequests.size()).isEqualTo(0); + + System.setProperty("COSMOS.DISABLE_IMDS_ACCESS", "false");// setting it back for other tests + } + + + @Test(groups = {"emulator"}, dataProvider = "clients", timeOut = TIMEOUT) + public void httpClientsConfigurationTests(CosmosClient cosmosClient) throws Exception { + // Test using different http client for client telemetry requests and metaRequests + ClientTelemetry clientTelemetry = cosmosClient.asyncClient().getContextClient().getClientTelemetry(); + HttpClient clientTelemetryMetadataHttpClient = ReflectionUtils.getClientTelemetryMetadataHttpClient(clientTelemetry); + HttpClient clientTelemetryHttpClient = ReflectionUtils.getClientTelemetryHttpClint(clientTelemetry); + + assertThat(clientTelemetryHttpClient).isNotSameAs(clientTelemetryMetadataHttpClient); + + reactor.netty.http.client.HttpClient reactorHttpClient = + ReflectionUtils.get(reactor.netty.http.client.HttpClient.class, clientTelemetryMetadataHttpClient, + "httpClient"); + + int maxConnections = reactorHttpClient.configuration().connectionProvider().maxConnections(); + Integer connectionAcquireTimeout = (Integer) reactorHttpClient.configuration().options().get(ChannelOption.CONNECT_TIMEOUT_MILLIS); + + assertThat(maxConnections).isEqualTo(ClientTelemetry.IMDS_DEFAULT_MAX_CONNECTION_POOL_SIZE); + assertThat(connectionAcquireTimeout).isEqualTo((int) ClientTelemetry.IMDS_DEFAULT_CONNECTION_ACQUIRE_TIMEOUT.toMillis()); + } + + @Test(groups = {"unit"}) public void clientTelemetryScheduling() { assertThat(Configs.getClientTelemetrySchedulingInSec()).isEqualTo(600); @@ -304,7 +354,7 @@ public void clientTelemetryWithStageJunoEndpoint(boolean useProxy) throws Interr cosmosClient.getDatabase(databaseId).getContainer(containerId); ClientTelemetry clientTelemetry = cosmosClient.asyncClient().getContextClient().getClientTelemetry(); setClientTelemetrySchedulingInSec(clientTelemetry, 5); - clientTelemetry.init(); + clientTelemetry.init().subscribe(); // If this test need to run on local machine please add below env property, // in test env we add the env property with cosmos-client-telemetry-endpoint variable in tests.yml, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java index c3e720a90d95..fe5fab57ecc6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java @@ -202,7 +202,7 @@ public void rntbd() throws Exception { assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2); assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, mm: false, prgns: [null])"); assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("(cto:PT5S, nrto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:true)"); - assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:null, nrto:null, icto:null, p:false)"); + assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:1000, nrto:PT1M, icto:PT1M, cto:PT45S, p:false)"); assertThat(objectNode.get("connCfg").get("other").asText()).isEqualTo("(ed: false, cs: false, rv: true)"); } @@ -237,7 +237,7 @@ public void gw() throws Exception { assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2); assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, mm: false, prgns: [null])"); assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null"); - assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:500, nrto:PT18S, icto:PT17S, p:false)"); + assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:500, nrto:PT18S, icto:PT17S, cto:PT45S, p:false)"); assertThat(objectNode.get("connCfg").get("other").asText()).isEqualTo("(ed: false, cs: false, rv: true)"); } @@ -309,7 +309,7 @@ public void full( assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2); assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, mm: false, prgns: [westus1,westus2])"); assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null"); - assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:500, nrto:PT18S, icto:PT17S, p:false)"); + assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:500, nrto:PT18S, icto:PT17S, cto:PT45S, p:false)"); assertThat(objectNode.get("connCfg").get("other").asText()).isEqualTo("(ed: true, cs: true, rv: false)"); assertThat(objectNode.get("excrgns").asText()).isEqualTo("[westus2]"); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SpyClientUnderTestFactory.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SpyClientUnderTestFactory.java index 2798d707dc8c..2ee8e5ab717a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SpyClientUnderTestFactory.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SpyClientUnderTestFactory.java @@ -5,7 +5,6 @@ import com.azure.core.credential.AzureKeyCredential; import com.azure.cosmos.ConnectionMode; import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker; import com.azure.cosmos.implementation.directconnectivity.Protocol; import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; import com.azure.cosmos.implementation.http.HttpClient; @@ -199,15 +198,11 @@ public static class ClientUnderTest extends SpyBaseClass { private Mono captureHttpRequest(InvocationOnMock invocationOnMock) { HttpRequest httpRequest = invocationOnMock.getArgument(0, HttpRequest.class); - Duration responseTimeout = Duration.ofSeconds(Configs.getHttpResponseTimeoutInSeconds()); - if (invocationOnMock.getArguments().length == 2) { - responseTimeout = invocationOnMock.getArgument(1, Duration.class); - } CompletableFuture f = new CompletableFuture<>(); this.requestsResponsePairs.add(Pair.of(httpRequest, f)); return origHttpClient - .send(httpRequest, responseTimeout) + .send(httpRequest) .doOnNext(httpResponse -> f.complete(httpResponse.buffer())) .doOnError(f::completeExceptionally); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpClientMockWrapper.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpClientMockWrapper.java index 0b6290b95d00..eda00591f295 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpClientMockWrapper.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpClientMockWrapper.java @@ -140,6 +140,16 @@ private HttpClientMockWrapper(long responseAfterMillis, final HttpResponse httpR return Mono.delay(Duration.ofMillis(responseAfterMillis)).flatMap(t -> httpResponseOrException(httpResponse, e)); } }).when(httpClient).send(Mockito.any(HttpRequest.class), Mockito.any(Duration.class)); + + Mockito.doAnswer(invocationOnMock -> { + HttpRequest httpRequest = invocationOnMock.getArgument(0, HttpRequest.class); + requests.add(httpRequest); + if (responseAfterMillis <= 0) { + return httpResponseOrException(httpResponse, e); + } else { + return Mono.delay(Duration.ofMillis(responseAfterMillis)).flatMap(t -> httpResponseOrException(httpResponse, e)); + } + }).when(httpClient).send(Mockito.any(HttpRequest.class)); } public HttpClientMockWrapper(HttpClientBehaviourBuilder builder) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClientTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClientTest.java index cc1a9a11fedc..af649c2f505b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClientTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClientTest.java @@ -113,7 +113,7 @@ public HttpTransportClientUnderTest(ConnectionPolicy connectionPolicy, UserAgent } @Override - HttpClient createHttpClient(ConnectionPolicy connectionPolicy) { + HttpClient createHttpClient(Configs configs, ConnectionPolicy connectionPolicy) { return httpClient; } } @@ -141,6 +141,7 @@ public void validateDefaultHeaders() { RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(), OperationType.Create, "dbs/db/colls/col", ResourceType.Document); request.setContentBytes(new byte[0]); + request.setResponseTimeout(connectionPolicy.getHttpNetworkRequestTimeout()); transportClient.invokeResourceOperationAsync(Uri.create(physicalAddress), request).block(); @@ -460,6 +461,7 @@ public void failuresWithHttpStatusCodes(HttpClientMockWrapper.HttpClientBehaviou httpClientMockWrapper.getClient()); RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(), OperationType.Create, "dbs/db/colls/col", ResourceType.Document); + request.setResponseTimeout(connectionPolicy.getHttpNetworkRequestTimeout()); request.setContentBytes(new byte[0]); request.requestContext.resourcePhysicalAddress = "dbs/db/colls/col"; @@ -568,6 +570,7 @@ public void networkFailures(RxDocumentServiceRequest request, UserAgentContainer userAgentContainer = new UserAgentContainer(); ConnectionPolicy connectionPolicy = ConnectionPolicy.getDefaultPolicy(); connectionPolicy.setHttpNetworkRequestTimeout(Duration.ofSeconds(100)); + request.setResponseTimeout(connectionPolicy.getHttpNetworkRequestTimeout()); HttpTransportClient transportClient = getHttpTransportClientUnderTest( connectionPolicy, userAgentContainer, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/http/ReactorNettyHttpClientTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/http/ReactorNettyHttpClientTest.java index fcbf9b292bc1..74a00185967a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/http/ReactorNettyHttpClientTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/http/ReactorNettyHttpClientTest.java @@ -14,21 +14,22 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.time.Duration; + import static org.assertj.core.api.Assertions.assertThat; /** - * Tests that partition manager correctly resolves addresses for requests and does appropriate number of cache refreshes. + * Tests that partition manager correctly resolves addresses for requests and does appropriate number of cache + * refreshes. */ public class ReactorNettyHttpClientTest { private static final Logger logger = LoggerFactory.getLogger(ReactorNettyHttpClientTest.class); - private Configs configs; private HttpClient reactorNettyHttpClient; @BeforeClass(groups = "unit") public void before_ReactorNettyHttpClientTest() { - this.configs = new Configs(); - this.reactorNettyHttpClient = HttpClient.createFixed(new HttpClientConfig(this.configs)); + this.reactorNettyHttpClient = HttpClient.createFixed(new HttpClientConfig(new Configs())); } @AfterClass(groups = "unit") @@ -40,21 +41,21 @@ public void after_ReactorNettyHttpClientTest() { public void httpClientWithMaxHeaderSize() { reactor.netty.http.client.HttpClient httpClient = ReflectionUtils.get(reactor.netty.http.client.HttpClient.class, this.reactorNettyHttpClient, "httpClient"); - assertThat(httpClient.configuration().decoder().maxHeaderSize()).isEqualTo(this.configs.getMaxHttpHeaderSize()); + assertThat(httpClient.configuration().decoder().maxHeaderSize()).isEqualTo(Configs.getMaxHttpHeaderSize()); } @Test(groups = "unit") public void httpClientWithMaxChunkSize() { reactor.netty.http.client.HttpClient httpClient = ReflectionUtils.get(reactor.netty.http.client.HttpClient.class, this.reactorNettyHttpClient, "httpClient"); - assertThat(httpClient.configuration().decoder().maxChunkSize()).isEqualTo(this.configs.getMaxHttpChunkSize()); + assertThat(httpClient.configuration().decoder().maxChunkSize()).isEqualTo(Configs.getMaxHttpChunkSize()); } @Test(groups = "unit") public void httpClientWithMaxInitialLineLength() { reactor.netty.http.client.HttpClient httpClient = ReflectionUtils.get(reactor.netty.http.client.HttpClient.class, this.reactorNettyHttpClient, "httpClient"); - assertThat(httpClient.configuration().decoder().maxInitialLineLength()).isEqualTo(this.configs.getMaxHttpInitialLineLength()); + assertThat(httpClient.configuration().decoder().maxInitialLineLength()).isEqualTo(Configs.getMaxHttpInitialLineLength()); } @Test(groups = "unit") @@ -65,10 +66,36 @@ public void httpClientWithValidateHeaders() { } @Test(groups = "unit") - public void httpClientWithOptions() { + public void httpClientWithConnectionAcquireTimeout() { + reactor.netty.http.client.HttpClient httpClient = + ReflectionUtils.get(reactor.netty.http.client.HttpClient.class, this.reactorNettyHttpClient, "httpClient"); + Integer connectionTimeoutInMillis = + (Integer) httpClient.configuration().options().get(ChannelOption.CONNECT_TIMEOUT_MILLIS); + assertThat(connectionTimeoutInMillis).isEqualTo((int) Configs.getConnectionAcquireTimeout().toMillis()); + } + + @Test(groups = "unit") + public void httpClientWithMaxPoolSize() { + reactor.netty.http.client.HttpClient httpClient = + ReflectionUtils.get(reactor.netty.http.client.HttpClient.class, this.reactorNettyHttpClient, "httpClient"); + int maxConnectionPoolSize = httpClient.configuration().connectionProvider().maxConnections(); + assertThat(maxConnectionPoolSize).isEqualTo(Configs.getDefaultHttpPoolSize()); + } + + @Test(groups = "unit") + // We don't set any default response timeout to http client + public void httpClientWithResponseTimeout() { + reactor.netty.http.client.HttpClient httpClient = + ReflectionUtils.get(reactor.netty.http.client.HttpClient.class, this.reactorNettyHttpClient, "httpClient"); + Duration responseTimeout = httpClient.configuration().responseTimeout(); + assertThat(responseTimeout).isNull(); + } + + @Test(groups = "unit") + public void httpClientWithConnectionProviderName() { reactor.netty.http.client.HttpClient httpClient = ReflectionUtils.get(reactor.netty.http.client.HttpClient.class, this.reactorNettyHttpClient, "httpClient"); - Integer connectionTimeoutInMillis = (Integer) httpClient.configuration().options().get(ChannelOption.CONNECT_TIMEOUT_MILLIS); - assertThat(connectionTimeoutInMillis).isEqualTo((int) this.configs.getConnectionAcquireTimeout().toMillis()); + String name = httpClient.configuration().connectionProvider().name(); + assertThat(name).isEqualTo(Configs.getReactorNettyConnectionPoolName()); } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelperTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelperTest.java index dab03d2c2cbf..c35e80ee4494 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelperTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelperTest.java @@ -6,6 +6,7 @@ import com.azure.cosmos.implementation.IRoutingMapProvider; import com.azure.cosmos.implementation.MetadataDiagnosticsContext; import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.Resource; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair; import org.apache.commons.lang3.StringUtils; @@ -90,6 +91,18 @@ public Mono> tryGetPartitionKeyRangeByIdAsy } } + private class MockIRoutingMapProviderWithNullRoutingMap extends MockIRoutingMapProvider { + + public MockIRoutingMapProviderWithNullRoutingMap(List ranges) { + super(ranges); + } + + @Override + public Mono>> tryGetOverlappingRangesAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String collectionResourceId, Range range, boolean forceRefresh, Map properties) { + return Mono.just(new Utils.ValueHolder<>(null)); + } + } + @Test(groups = { "unit" }, expectedExceptions = IllegalArgumentException.class) public void nonSortedRanges() { @@ -213,4 +226,21 @@ public String apply(PartitionKeyRange range) { assertThat(2).isEqualTo(overLappingRangeList.size()); assertThat("0,1").isEqualTo(overLappingRangeList.stream().map(func).collect(Collectors.joining(","))); } + + @Test(groups = {"unit"}) + // This test is to verify that the NPE has been fixed in RoutingMapProviderHelper.getOverlappingRanges + public void getOverlappingRangesWithoutOverlapping() { + + Function func = Resource::getId; + + List rangeList = Arrays.asList(new PartitionKeyRange("0", "", "FF")); + + IRoutingMapProvider routingMapProviderMock = new MockIRoutingMapProviderWithNullRoutingMap(rangeList); + + Mono> overlappingRanges; + overlappingRanges = RoutingMapProviderHelper.getOverlappingRanges(routingMapProviderMock, + "coll1", + Arrays.asList(new Range("", "FF", true, false))); + assertThat("").isEqualTo(overlappingRanges.block().stream().map(func).collect(Collectors.joining(","))); + } } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 0f4a2638411b..bbaf7a4edcd1 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -5,11 +5,13 @@ #### Features Added * Added `CosmosFullTextPolicy` in `CosmosContainerProperties` and `CosmosFullTextIndexes` in `IndexingPolicy` to support Full Text Search in Cosmos DB - See [PR 42278](https://github.com/Azure/azure-sdk-for-java/pull/42278) * Added two new properties `quantizationSizeInBytes` and `indexingSearchListSize` to the `CosmosVectorIndexSpec` to support Partitioned DiskANN for vector search in Cosmos DB - See [PR 42333](https://github.com/Azure/azure-sdk-for-java/pull/42333) +* Added system property `COSMOS.LOAD_AZURE_VM_META_DATA` to allow customers to disable/enable loading Azure VM metadata for diagnostics - See [PR 42874](https://github.com/Azure/azure-sdk-for-java/pull/42874) #### Breaking Changes #### Bugs Fixed * Fixed a Null Pointer Exception in `ContainerThroughputConrolGroupProperties` if defaultGroup is not set. - See [PR 42835](https://github.com/Azure/azure-sdk-for-java/pull/42835) +* Fixed a Null Pointer Exception in `RoutingMapProviderHelpers#getOverlappingRanges()` in case of Routing map being null - See [PR 42874](https://github.com/Azure/azure-sdk-for-java/pull/42874) #### Other Changes * Enable `JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS` by default for objectMapper. - See [PR 42520](https://github.com/Azure/azure-sdk-for-java/pull/42520) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index 1df73d31a519..18486470e6ee 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -255,6 +255,9 @@ public class Configs { private static final boolean DEFAULT_PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN = false; private static final String PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN = "COSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN"; + private static final String COSMOS_DISABLE_IMDS_ACCESS = "COSMOS.DISABLE_IMDS_ACCESS"; + private static final String COSMOS_DISABLE_IMDS_ACCESS_VARIABLE = "COSMOS_DISABLE_IMDS_ACCESS"; + private static final boolean COSMOS_DISABLE_IMDS_ACCESS_DEFAULT = false; // Config to indicate whether allow insecure connections, for example allow http connection or disable cert verification // Please note that this config should only during development or test, please do not use in prod env private static final boolean DEFAULT_INSECURE_EMULATOR_CONNECTION_ALLOWED = false; @@ -338,43 +341,43 @@ public int getDirectHttpsMaxConnectionLimit() { return getJVMConfigAsInt(MAX_DIRECT_HTTPS_POOL_SIZE, DEFAULT_DIRECT_HTTPS_POOL_SIZE); } - public int getMaxHttpHeaderSize() { + public int getGlobalEndpointManagerMaxInitializationTimeInSeconds() { + return getJVMConfigAsInt(GLOBAL_ENDPOINT_MANAGER_INITIALIZATION_TIME_IN_SECONDS, DEFAULT_GLOBAL_ENDPOINT_MANAGER_INITIALIZATION_TIME_IN_SECONDS); + } + + public int getUnavailableLocationsExpirationTimeInSeconds() { + return getJVMConfigAsInt(UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS, DEFAULT_UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS); + } + + public static int getMaxHttpHeaderSize() { return getJVMConfigAsInt(MAX_HTTP_HEADER_SIZE_IN_BYTES, DEFAULT_MAX_HTTP_REQUEST_HEADER_SIZE); } - public int getMaxHttpInitialLineLength() { + public static int getMaxHttpInitialLineLength() { return getJVMConfigAsInt(MAX_HTTP_INITIAL_LINE_LENGTH_IN_BYTES, DEFAULT_MAX_HTTP_INITIAL_LINE_LENGTH); } - public int getMaxHttpChunkSize() { + public static int getMaxHttpChunkSize() { return getJVMConfigAsInt(MAX_HTTP_CHUNK_SIZE_IN_BYTES, DEFAULT_MAX_HTTP_CHUNK_SIZE_IN_BYTES); } - public int getMaxHttpBodyLength() { + public static int getMaxHttpBodyLength() { return getJVMConfigAsInt(MAX_HTTP_BODY_LENGTH_IN_BYTES, DEFAULT_MAX_HTTP_BODY_LENGTH_IN_BYTES); } - public int getUnavailableLocationsExpirationTimeInSeconds() { - return getJVMConfigAsInt(UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS, DEFAULT_UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS); - } - public static int getClientTelemetrySchedulingInSec() { return getJVMConfigAsInt(CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS, DEFAULT_CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS); } - public int getGlobalEndpointManagerMaxInitializationTimeInSeconds() { - return getJVMConfigAsInt(GLOBAL_ENDPOINT_MANAGER_INITIALIZATION_TIME_IN_SECONDS, DEFAULT_GLOBAL_ENDPOINT_MANAGER_INITIALIZATION_TIME_IN_SECONDS); - } - - public String getReactorNettyConnectionPoolName() { + public static String getReactorNettyConnectionPoolName() { return REACTOR_NETTY_CONNECTION_POOL_NAME; } - public Duration getMaxIdleConnectionTimeout() { + public static Duration getMaxIdleConnectionTimeout() { return MAX_IDLE_CONNECTION_TIMEOUT; } - public Duration getConnectionAcquireTimeout() { + public static Duration getConnectionAcquireTimeout() { return CONNECTION_ACQUIRE_TIMEOUT; } @@ -826,6 +829,17 @@ public static String getCharsetDecoderErrorActionOnUnmappedCharacter() { DEFAULT_CHARSET_DECODER_ERROR_ACTION_ON_UNMAPPED_CHARACTER)); } + public static boolean shouldDisableIMDSAccess() { + String shouldDisableIMDSAccess = + System.getProperty( + COSMOS_DISABLE_IMDS_ACCESS, + firstNonNull( + emptyToNull(System.getenv().get(COSMOS_DISABLE_IMDS_ACCESS_VARIABLE)), + String.valueOf(COSMOS_DISABLE_IMDS_ACCESS_DEFAULT))); + + return Boolean.parseBoolean(shouldDisableIMDSAccess); + } + public static boolean isInsecureEmulatorConnectionAllowed() { String httpForEmulatorAllowed = System.getProperty( INSECURE_EMULATOR_CONNECTION_ALLOWED, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index 24edec682d22..1b46cd560ee4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -85,8 +85,6 @@ public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPol } public void init() { - // TODO: add support for openAsync - // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 startRefreshLocationTimerAsync(true).block(maxInitializationTime); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 6b16efe23af7..619b356aca6f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -552,7 +552,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, this.httpClientInterceptor = null; this.reactorHttpClient = httpClient(); - this.globalEndpointManager = new GlobalEndpointManager(asDatabaseAccountManagerInternal(), this.connectionPolicy, /**/configs); + this.globalEndpointManager = new GlobalEndpointManager(asDatabaseAccountManagerInternal(), this.connectionPolicy, configs); this.isRegionScopedSessionCapturingEnabledOnClientOrSystemConfig = isRegionScopedSessionCapturingEnabled; this.sessionContainer = new SessionContainer(this.serviceEndpoint.getHost(), disableSessionCapturing); @@ -623,8 +623,6 @@ private DatabaseAccount initializeGatewayConfigurationReader() { this.useMultipleWriteLocations = this.connectionPolicy.isMultipleWriteRegionsEnabled() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount); return databaseAccount; - // TODO: add support for openAsync - // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 } private void resetSessionContainerIfNeeded(DatabaseAccount databaseAccount) { @@ -672,8 +670,6 @@ private void updateGatewayProxy() { public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Function httpClientInterceptor) { try { - // TODO: add support for openAsync - // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 this.httpClientInterceptor = httpClientInterceptor; if (httpClientInterceptor != null) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java index ca7321aab679..bb5e937c9a4a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java @@ -130,7 +130,7 @@ public Mono>> tryGetOverlappingRangesA // maybe we should consider changing to ArrayList to avoid conversion return new Utils.ValueHolder<>(new ArrayList<>(routingMapValueHolder.v.getOverlappingRanges(range))); } else { - logger.debug("Routing Map Null for collection: {} for range: {}, forceRefresh:{}", collectionRid, range, forceRefresh); + logger.warn("Routing Map Null for collection: {} for range: {}, forceRefresh:{}", collectionRid, range, forceRefresh); return new Utils.ValueHolder<>(null); } }); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/clienttelemetry/ClientTelemetry.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/clienttelemetry/ClientTelemetry.java index 4027dee776ac..8065f6307be7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/clienttelemetry/ClientTelemetry.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/clienttelemetry/ClientTelemetry.java @@ -105,6 +105,13 @@ public class ClientTelemetry { private static final String USER_AGENT = Utils.getUserAgent(); private final int clientTelemetrySchedulingSec; + //IMDS Constants + public static final String IMDS_AZURE_VM_METADATA = "http://169.254.169.254:80/metadata/instance?api-version=2020-06-01"; + public static final Duration IMDS_DEFAULT_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(5); + public static final Duration IMDS_DEFAULT_IDLE_CONNECTION_TIMEOUT = Duration.ofSeconds(60); + public static final Duration IMDS_DEFAULT_CONNECTION_ACQUIRE_TIMEOUT = Duration.ofSeconds(5); + public static final int IMDS_DEFAULT_MAX_CONNECTION_POOL_SIZE = 5; + private final IAuthorizationTokenProvider tokenProvider; private final String globalDatabaseAccountName; @@ -237,9 +244,10 @@ private HttpClient getHttpClientForClientTelemetry() { private HttpClient getHttpClientForIMDS() { // Proxy is not supported for azure instance metadata service HttpClientConfig httpClientConfig = new HttpClientConfig(this.configs) - .withMaxIdleConnectionTimeout(IMDSConfig.DEFAULT_IDLE_CONNECTION_TIMEOUT) - .withPoolSize(IMDSConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE) - .withNetworkRequestTimeout(IMDSConfig.DEFAULT_NETWORK_REQUEST_TIMEOUT); + .withMaxIdleConnectionTimeout(IMDS_DEFAULT_IDLE_CONNECTION_TIMEOUT) + .withPoolSize(IMDS_DEFAULT_MAX_CONNECTION_POOL_SIZE) + .withNetworkRequestTimeout(IMDS_DEFAULT_NETWORK_REQUEST_TIMEOUT) + .withConnectionAcquireTimeout(IMDS_DEFAULT_CONNECTION_ACQUIRE_TIMEOUT); return HttpClient.createFixed(httpClientConfig); } @@ -305,8 +313,7 @@ private Mono sendClientTelemetry() { HttpRequest httpRequest = new HttpRequest(HttpMethod.POST, targetEndpoint, targetEndpoint.getPort(), httpHeaders) .withBody(tempBuffer); - Mono httpResponseMono = this.httpClient.send(httpRequest, - Duration.ofSeconds(Configs.getHttpResponseTimeoutInSeconds())); + Mono httpResponseMono = this.httpClient.send(httpRequest); return httpResponseMono.flatMap(response -> { if (response.statusCode() != HttpConstants.StatusCodes.NO_CONTENT) { logger.error("Client telemetry request did not succeeded, status code {}, request body {}", @@ -350,6 +357,10 @@ private void populateAzureVmMetaData(AzureVMMetadata azureVMMetadata) { } private Mono loadAzureVmMetaData() { + if (Configs.shouldDisableIMDSAccess()) { + logger.info("Access to IMDS to get Azure VM metadata is disabled"); + return Mono.empty(); + } AzureVMMetadata metadataSnapshot = azureVmMetaDataSingleton.get(); if (metadataSnapshot != null) { @@ -359,7 +370,7 @@ private Mono loadAzureVmMetaData() { URI targetEndpoint = null; try { - targetEndpoint = new URI(IMDSConfig.AZURE_VM_METADATA); + targetEndpoint = new URI(IMDS_AZURE_VM_METADATA); } catch (URISyntaxException ex) { logger.info("Unable to parse azure vm metadata url"); return Mono.empty(); @@ -370,9 +381,12 @@ private Mono loadAzureVmMetaData() { HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, targetEndpoint, targetEndpoint.getPort(), httpHeaders); Mono httpResponseMono = this.metadataHttpClient.send(httpRequest); - Mono mono = httpResponseMono - .flatMap(response -> response.bodyAsString()).map(metadataJson -> parse(metadataJson, - AzureVMMetadata.class)).doOnSuccess(metadata -> { + + return httpResponseMono + .flatMap(HttpResponse::bodyAsString) + .map(metadataJson -> parse(metadataJson, + AzureVMMetadata.class)) + .doOnSuccess(metadata -> { azureVmMetaDataSingleton.compareAndSet(null, metadata); this.populateAzureVmMetaData(metadata); }).onErrorResume(throwable -> { @@ -380,8 +394,6 @@ private Mono loadAzureVmMetaData() { logger.debug("Unable to get azure vm metadata", throwable); return Mono.empty(); }); - - return mono; } private static T parse(String itemResponseBodyAsString, Class itemClassType) { @@ -452,11 +464,4 @@ private void fillMetricsInfo(ReportPayload payload, ConcurrentDoubleHistogram hi percentile.put(PERCENTILE_999, copyHistogram.getValueAtPercentile(PERCENTILE_999)); payload.getMetricInfo().setPercentiles(percentile); } - - static class IMDSConfig { - private static String AZURE_VM_METADATA = "http://169.254.169.254:80/metadata/instance?api-version=2020-06-01"; - private static final Duration DEFAULT_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(60); - private static final Duration DEFAULT_IDLE_CONNECTION_TIMEOUT = Duration.ofSeconds(60); - private static final int DEFAULT_MAX_CONNECTION_POOL_SIZE = 1000; - } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java index 8196be4d2bef..dc8b28c1c40f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java @@ -72,22 +72,18 @@ public class HttpTransportClient extends TransportClient { private final Logger logger = LoggerFactory.getLogger(HttpTransportClient.class); private final HttpClient httpClient; private final Map defaultHeaders; - private final Configs configs; private final GlobalEndpointManager globalEndpointManager; - HttpClient createHttpClient(ConnectionPolicy connectionPolicy) { - // TODO: use one instance of SSL context everywhere - HttpClientConfig httpClientConfig = new HttpClientConfig(this.configs); + HttpClient createHttpClient(Configs configs, ConnectionPolicy connectionPolicy) { + HttpClientConfig httpClientConfig = new HttpClientConfig(configs); httpClientConfig.withNetworkRequestTimeout(connectionPolicy.getHttpNetworkRequestTimeout()); httpClientConfig.withPoolSize(configs.getDirectHttpsMaxConnectionLimit()); - return HttpClient.createFixed(httpClientConfig); } public HttpTransportClient(Configs configs, ConnectionPolicy connectionPolicy, UserAgentContainer userAgent, GlobalEndpointManager globalEndpointManager) { - this.configs = configs; - this.httpClient = createHttpClient(connectionPolicy); + this.httpClient = createHttpClient(configs, connectionPolicy); this.defaultHeaders = new HashMap<>(); @@ -141,15 +137,19 @@ public Mono invokeStoreAsync( MutableVolatile sendTimeUtc = new MutableVolatile<>(); - Duration responseTimeout = Duration.ofSeconds(Configs.getHttpResponseTimeoutInSeconds()); + Duration responseTimeout = null; if (OperationType.QueryPlan.equals(request.getOperationType())) { responseTimeout = Duration.ofSeconds(Configs.getQueryPlanResponseTimeoutInSeconds()); } else if (request.isAddressRefresh()) { responseTimeout = Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds()); } - Mono httpResponseMono = this.httpClient - .send(httpRequest, responseTimeout) + Mono sendRequest = this.httpClient.send(httpRequest); + if (responseTimeout != null) { + sendRequest = this.httpClient.send(httpRequest, responseTimeout); + } + + Mono httpResponseMono = sendRequest .doOnSubscribe(subscription -> { sendTimeUtc.v = Instant.now(); this.beforeRequest( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java index 7c93bf31d04c..90b9c96bda78 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java @@ -151,7 +151,7 @@ public Mono injectGatewayErrors( } if (this.injectGatewayServerConnectionDelay(faultInjectionRequestArgs, delayToBeInjected)) { - Duration connectionAcquireTimeout = this.configs.getConnectionAcquireTimeout(); + Duration connectionAcquireTimeout = Configs.getConnectionAcquireTimeout(); if (delayToBeInjected.v.toMillis() >= connectionAcquireTimeout.toMillis()) { return Mono.delay(connectionAcquireTimeout) .then(Mono.error(new ConnectTimeoutException())); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java index c371a5c47d05..2f8747c2df0e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java @@ -2,7 +2,6 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation.http; -import com.azure.cosmos.implementation.Configs; import reactor.core.publisher.Mono; import reactor.netty.resources.ConnectionProvider; @@ -40,24 +39,11 @@ static HttpClient createFixed(HttpClientConfig httpClientConfig) { throw new IllegalArgumentException("HttpClientConfig is null"); } - Duration maxIdleConnectionTimeoutInMillis = httpClientConfig.getConfigs().getMaxIdleConnectionTimeout(); - if (httpClientConfig.getMaxIdleConnectionTimeout() != null) { - maxIdleConnectionTimeoutInMillis = httpClientConfig.getMaxIdleConnectionTimeout(); - } - - // Default pool size - Integer maxPoolSize = Configs.getDefaultHttpPoolSize(); - if (httpClientConfig.getMaxPoolSize() != null) { - maxPoolSize = httpClientConfig.getMaxPoolSize(); - } - - Duration connectionAcquireTimeout = httpClientConfig.getConfigs().getConnectionAcquireTimeout(); - ConnectionProvider.Builder fixedConnectionProviderBuilder = ConnectionProvider - .builder(httpClientConfig.getConfigs().getReactorNettyConnectionPoolName()); - fixedConnectionProviderBuilder.maxConnections(maxPoolSize); - fixedConnectionProviderBuilder.pendingAcquireTimeout(connectionAcquireTimeout); - fixedConnectionProviderBuilder.maxIdleTime(maxIdleConnectionTimeoutInMillis); + .builder(httpClientConfig.getConnectionPoolName()); + fixedConnectionProviderBuilder.maxConnections(httpClientConfig.getMaxPoolSize()); + fixedConnectionProviderBuilder.pendingAcquireTimeout(httpClientConfig.getConnectionAcquireTimeout()); + fixedConnectionProviderBuilder.maxIdleTime(httpClientConfig.getMaxIdleConnectionTimeout()); return ReactorNettyClient.createWithConnectionProvider(fixedConnectionProviderBuilder.build(), httpClientConfig); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClientConfig.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClientConfig.java index dbffc168c93a..d3831f2e1378 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClientConfig.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClientConfig.java @@ -5,7 +5,6 @@ import com.azure.core.http.ProxyOptions; import com.azure.cosmos.implementation.Configs; -import com.azure.cosmos.implementation.guava27.Strings; import java.time.Duration; @@ -13,12 +12,18 @@ * Helper class internally used for instantiating reactor netty http client. */ public class HttpClientConfig { - public final static String REACTOR_NETWORK_LOG_CATEGORY = "com.azure.cosmos.netty-network"; private final Configs configs; - private Integer maxPoolSize; - private Duration maxIdleConnectionTimeout; - private Duration networkRequestTimeout; + private Duration connectionAcquireTimeout = Configs.getConnectionAcquireTimeout(); + private int maxPoolSize = Configs.getDefaultHttpPoolSize(); + private Duration maxIdleConnectionTimeout = Configs.getMaxIdleConnectionTimeout(); + private Duration networkRequestTimeout = Duration.ofSeconds(Configs.getHttpResponseTimeoutInSeconds()); + private String connectionPoolName = Configs.getReactorNettyConnectionPoolName(); + private int maxHeaderSize = Configs.getMaxHttpHeaderSize(); + private int maxInitialLineLength = Configs.getMaxHttpInitialLineLength(); + private int maxChunkSize = Configs.getMaxHttpChunkSize(); + private int maxBodyLength = Configs.getMaxHttpBodyLength(); + public String reactorNetworkLogCategory = "com.azure.cosmos.netty-network"; private ProxyOptions proxy; private boolean connectionKeepAlive = true; @@ -26,6 +31,41 @@ public HttpClientConfig(Configs configs) { this.configs = configs; } + public HttpClientConfig withMaxHeaderSize(int maxHeaderSize) { + this.maxHeaderSize = maxHeaderSize; + return this; + } + + public HttpClientConfig withMaxInitialLineLength(int maxInitialLineLength) { + this.maxInitialLineLength = maxInitialLineLength; + return this; + } + + public HttpClientConfig withMaxChunkSize(int maxChunkSize) { + this.maxChunkSize = maxChunkSize; + return this; + } + + public HttpClientConfig withMaxBodyLength(int maxBodyLength) { + this.maxBodyLength = maxBodyLength; + return this; + } + + public HttpClientConfig withReactorNetworkLogCategory(String reactorNetworkLogCategory) { + this.reactorNetworkLogCategory = reactorNetworkLogCategory; + return this; + } + + public HttpClientConfig withConnectionPoolName(String connectionPoolName) { + this.connectionPoolName = connectionPoolName; + return this; + } + + public HttpClientConfig withConnectionAcquireTimeout(Duration connectionAcquireTimeout) { + this.connectionAcquireTimeout = connectionAcquireTimeout; + return this; + } + public HttpClientConfig withPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; return this; @@ -55,7 +95,7 @@ public Configs getConfigs() { return configs; } - public Integer getMaxPoolSize() { + public int getMaxPoolSize() { return maxPoolSize; } @@ -75,13 +115,40 @@ public boolean isConnectionKeepAlive() { return connectionKeepAlive; } - // TODO(kuthapar): Do we really need to use Strings.lenientFormat() here? - // Even the documentation of this API suggests to use String.format or just string appends if possible. + public Duration getConnectionAcquireTimeout() { + return connectionAcquireTimeout; + } + + public String getConnectionPoolName() { + return this.connectionPoolName; + } + + public String getReactorNetworkLogCategory() { + return reactorNetworkLogCategory; + } + + public int getMaxHeaderSize() { + return maxHeaderSize; + } + + public int getMaxInitialLineLength() { + return maxInitialLineLength; + } + + public int getMaxChunkSize() { + return maxChunkSize; + } + + public int getMaxBodyLength() { + return maxBodyLength; + } + public String toDiagnosticsString() { - return Strings.lenientFormat("(cps:%s, nrto:%s, icto:%s, p:%s)", + return String.format("(cps:%s, nrto:%s, icto:%s, cto:%s, p:%s)", maxPoolSize, networkRequestTimeout, maxIdleConnectionTimeout, + connectionAcquireTimeout, proxy != null); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java index 5edcaf843898..c0a70a09b4fe 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java @@ -34,8 +34,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import static com.azure.cosmos.implementation.http.HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY; - /** * HttpClient that is implemented using reactor-netty. */ @@ -63,6 +61,7 @@ public class ReactorNettyClient implements HttpClient { private HttpClientConfig httpClientConfig; private reactor.netty.http.client.HttpClient httpClient; private ConnectionProvider connectionProvider; + private String reactorNetworkLogCategory; private ReactorNettyClient() {} @@ -72,6 +71,7 @@ private ReactorNettyClient() {} public static ReactorNettyClient create(HttpClientConfig httpClientConfig) { ReactorNettyClient reactorNettyClient = new ReactorNettyClient(); reactorNettyClient.httpClientConfig = httpClientConfig; + reactorNettyClient.reactorNetworkLogCategory = httpClientConfig.getReactorNetworkLogCategory(); reactorNettyClient.httpClient = reactor.netty.http.client.HttpClient .newConnection() .observe(getConnectionObserver()) @@ -88,6 +88,7 @@ public static ReactorNettyClient createWithConnectionProvider(ConnectionProvider ReactorNettyClient reactorNettyClient = new ReactorNettyClient(); reactorNettyClient.connectionProvider = connectionProvider; reactorNettyClient.httpClientConfig = httpClientConfig; + reactorNettyClient.reactorNetworkLogCategory = httpClientConfig.getReactorNetworkLogCategory(); reactorNettyClient.httpClient = reactor.netty.http.client.HttpClient .create(connectionProvider) .observe(getConnectionObserver()) @@ -128,23 +129,22 @@ private void configureChannelPipelineHandlers() { ); } - if (LoggerFactory.getLogger(REACTOR_NETWORK_LOG_CATEGORY).isTraceEnabled()) { - this.httpClient = this.httpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.INFO); + if (LoggerFactory.getLogger(reactorNetworkLogCategory).isTraceEnabled()) { + this.httpClient = this.httpClient.wiretap(this.httpClientConfig.getReactorNetworkLogCategory(), LogLevel.INFO); } this.httpClient = this.httpClient.secure(sslContextSpec -> sslContextSpec.sslContext(configs.getSslContext())) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) configs.getConnectionAcquireTimeout().toMillis()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) this.httpClientConfig.getConnectionAcquireTimeout().toMillis()) .httpResponseDecoder(httpResponseDecoderSpec -> - httpResponseDecoderSpec.maxInitialLineLength(configs.getMaxHttpInitialLineLength()) - .maxHeaderSize(configs.getMaxHttpHeaderSize()) - .maxChunkSize(configs.getMaxHttpChunkSize()) + httpResponseDecoderSpec.maxInitialLineLength(this.httpClientConfig.getMaxInitialLineLength()) + .maxHeaderSize(this.httpClientConfig.getMaxHeaderSize()) + .maxChunkSize(this.httpClientConfig.getMaxChunkSize()) .validateHeaders(true)); } @Override public Mono send(HttpRequest request) { - // By default, Configs.getHttpsResponseTimeoutInSeconds default value is used as response timeout - return send(request, Duration.ofSeconds(Configs.getHttpResponseTimeoutInSeconds())); + return send(request, this.httpClientConfig.getNetworkRequestTimeout()); } @Override @@ -375,17 +375,17 @@ private enum ReactorNettyResponseState { * This changes the logging level of Reactor Netty Http Client. */ public void enableNetworkLogging() { - Logger logger = LoggerFactory.getLogger(REACTOR_NETWORK_LOG_CATEGORY); + Logger logger = LoggerFactory.getLogger(this.reactorNetworkLogCategory); if (logger.isTraceEnabled()) { - this.httpClient = this.httpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.TRACE); + this.httpClient = this.httpClient.wiretap(this.reactorNetworkLogCategory, LogLevel.TRACE); } else if (logger.isDebugEnabled()) { - this.httpClient = this.httpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.DEBUG); + this.httpClient = this.httpClient.wiretap(this.reactorNetworkLogCategory, LogLevel.DEBUG); } else if (logger.isInfoEnabled()) { - this.httpClient = this.httpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.INFO); + this.httpClient = this.httpClient.wiretap(this.reactorNetworkLogCategory, LogLevel.INFO); } else if (logger.isWarnEnabled()) { - this.httpClient = this.httpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.WARN); + this.httpClient = this.httpClient.wiretap(this.reactorNetworkLogCategory, LogLevel.WARN); } else if (logger.isErrorEnabled()) { - this.httpClient = this.httpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.ERROR); + this.httpClient = this.httpClient.wiretap(this.reactorNetworkLogCategory, LogLevel.ERROR); } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelper.java index e49dc632c79d..a3863187732e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelper.java @@ -123,7 +123,7 @@ public static Mono> getOverlappingRanges( } return routingMapProvider.tryGetOverlappingRangesAsync(null, resourceId, queryRange, false, null) - .map(ranges -> ranges.v) + .map(ranges -> ranges.v != null ? ranges.v : new ArrayList()) .map(targetRanges::addAll) .flatMap(aBoolean -> { if (!targetRanges.isEmpty()) {