Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-test/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue where `CONNECTION_DELAY` fault injection rule is not applied during `openConnectionsAndInitCaches` - See [PR 34096](https://github.com/Azure/azure-sdk-for-java/pull/34096)

#### Other Changes

Expand Down
2 changes: 0 additions & 2 deletions sdk/cosmos/azure-cosmos-test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,3 @@ or contact [[email protected]][coc_contact] with any additional questions o
[troubleshooting]: https://docs.microsoft.com/azure/cosmos-db/troubleshoot-java-sdk-v4-sql
[perf_guide]: https://docs.microsoft.com/azure/cosmos-db/performance-tips-java-sdk-v4-sql?tabs=api-async
[quickstart]: https://docs.microsoft.com/azure/cosmos-db/create-sql-api-java?tabs=sync

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Fcosmos%2Fazure-cosmos-encryption%2FREADME.png)
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@ public List<URI> getAddresses() {
return physicalAddresses;
}

public void setAddresses(List<URI> physicalAddresses) {
public void setAddresses(List<URI> physicalAddresses, boolean primaryOnly) {
this.physicalAddresses = physicalAddresses;
if (physicalAddresses != null && physicalAddresses.size() > 0) {
this.validators.add(new AddressValidator(physicalAddresses));
}

if (primaryOnly) {
this.validators.add(new PrimaryAddressValidator());
}
}

public boolean isApplicable(RxDocumentServiceRequest request) {
Expand Down Expand Up @@ -115,11 +119,18 @@ public boolean isApplicable(RxDocumentServiceRequest request) {
&& addresses.size() > 0) {
return this.addresses
.stream()
.anyMatch(address -> request.requestContext.storePhysicalAddress.toString().startsWith(address.toString()));
.anyMatch(address -> request.requestContext.storePhysicalAddressUri.getURIAsString().startsWith(address.toString()));
}

return true;
}
}

static class PrimaryAddressValidator implements IFaultInjectionConditionValidator {
@Override
public boolean isApplicable(RxDocumentServiceRequest request) {
return request.requestContext.storePhysicalAddressUri.isPrimary();
}
}
//endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,12 @@ private Mono<IFaultInjectionRuleInternal> getEffectiveServerErrorRule(
// TODO: add handling for gateway mode

// Direct connection mode, populate physical addresses
boolean primaryAddressesOnly = this.isWriteOnly(rule.getCondition());
return BackoffRetryUtility.executeRetry(
() -> this.resolvePhysicalAddresses(
regionEndpoints,
rule.getCondition().getEndpoints(),
this.isWriteOnlyEndpoint(rule.getCondition()),
primaryAddressesOnly,
documentCollection),
new FaultInjectionRuleProcessorRetryPolicy(this.retryOptions)
)
Expand All @@ -186,7 +187,7 @@ private Mono<IFaultInjectionRuleInternal> getEffectiveServerErrorRule(
.collect(Collectors.toList());
}

effectiveCondition.setAddresses(effectiveAddresses);
effectiveCondition.setAddresses(effectiveAddresses, primaryAddressesOnly);
return effectiveCondition;
});
})
Expand Down Expand Up @@ -226,7 +227,7 @@ private Mono<IFaultInjectionRuleInternal> getEffectiveConnectionErrorRule(
return this.resolvePhysicalAddresses(
regionEndpoints,
rule.getCondition().getEndpoints(),
this.isWriteOnlyEndpoint(rule.getCondition()),
this.isWriteOnly(rule.getCondition()),
documentCollection)
.map(physicalAddresses -> {
List<URI> effectiveAddresses =
Expand Down Expand Up @@ -258,7 +259,7 @@ private Mono<IFaultInjectionRuleInternal> getEffectiveConnectionErrorRule(
* @return the region service endpoints.
*/
private List<URI> getRegionEndpoints(FaultInjectionCondition condition) {
boolean isWriteOnlyEndpoints = this.isWriteOnlyEndpoint(condition);
boolean isWriteOnlyEndpoints = this.isWriteOnly(condition);

if (StringUtils.isNotEmpty(condition.getRegion())) {
return Arrays.asList(
Expand Down Expand Up @@ -362,7 +363,7 @@ private Mono<List<URI>> resolvePhysicalAddresses(
.collectList();
}

private boolean isWriteOnlyEndpoint(FaultInjectionCondition condition) {
private boolean isWriteOnly(FaultInjectionCondition condition) {
return condition.getOperationType() != null
&& this.getEffectiveOperationType(condition.getOperationType()).isWriteOperation();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.rntbd.IRequestRecord;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestRecord;
import com.azure.cosmos.implementation.faultinjection.IRntbdServerErrorInjector;

Expand Down Expand Up @@ -67,7 +68,7 @@ public boolean injectRntbdServerResponseError(RntbdRequestRecord requestRecord)

@Override
public boolean injectRntbdServerConnectionDelay(
RntbdRequestRecord requestRecord,
IRequestRecord requestRecord,
Consumer<Duration> openConnectionWithDelayConsumer) {
if (requestRecord == null) {
return false;
Expand All @@ -79,11 +80,10 @@ public boolean injectRntbdServerConnectionDelay(
if (serverConnectionDelayRule != null) {
request.faultInjectionRequestContext
.applyFaultInjectionRule(
requestRecord.transportRequestId(),
requestRecord.getRequestId(),
serverConnectionDelayRule.getId());
openConnectionWithDelayConsumer.accept(serverConnectionDelayRule.getResult().getDelay());
return true;

}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ public static Object[][] operationTypeProvider() {
};
}

@DataProvider(name = "faultInjectionOperationTypeProvider")
public static Object[][] faultInjectionOperationTypeProvider() {
return new Object[][]{
// fault injection operation type, primaryAddressOnly
{ FaultInjectionOperationType.READ_ITEM, false },
{ FaultInjectionOperationType.REPLACE_ITEM, true },
{ FaultInjectionOperationType.CREATE_ITEM, true },
{ FaultInjectionOperationType.DELETE_ITEM, true},
{ FaultInjectionOperationType.QUERY_ITEM, false },
{ FaultInjectionOperationType.PATCH_ITEM, true }
};
}

@DataProvider(name = "faultInjectionServerErrorResponseProvider")
public static Object[][] faultInjectionServerErrorResponseProvider() {
return new Object[][]{
Expand Down Expand Up @@ -565,6 +578,64 @@ public void faultInjectionServerErrorRuleTests_ServerConnectionDelay() throws Js
}
}

@Test(groups = {"multi-region"}, dataProvider = "faultInjectionOperationTypeProvider", timeOut = TIMEOUT)
public void faultInjectionServerErrorRuleTests_ServerConnectionDelay_warmup(
FaultInjectionOperationType operationType,
boolean primaryAddressesOnly) {

CosmosAsyncClient newClient = null; // creating new client to force creating new connections
// simulate high channel acquisition/connectionTimeout during openConnection flow
String ruleId = "serverErrorRule-serverConnectionDelay-warmup" + UUID.randomUUID();
FaultInjectionRule serverConnectionDelayRule =
new FaultInjectionRuleBuilder(ruleId)
.condition(
new FaultInjectionConditionBuilder()
.operationType(operationType)
.build()
)
.result(
FaultInjectionResultBuilders
.getResultBuilder(FaultInjectionServerErrorType.CONNECTION_DELAY)
.delay(Duration.ofSeconds(2))
.times(1)
.build()
)
.duration(Duration.ofMinutes(5))
.build();

try {
DirectConnectionConfig directConnectionConfig = DirectConnectionConfig.getDefaultConfig();
directConnectionConfig.setConnectTimeout(Duration.ofSeconds(1));

newClient = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(BridgeInternal.getContextClient(this.client).getConsistencyLevel())
.directMode(directConnectionConfig)
.buildAsyncClient();

CosmosAsyncContainer container =
newClient
.getDatabase(cosmosAsyncContainer.getDatabase().getId())
.getContainer(cosmosAsyncContainer.getId());

CosmosFaultInjectionHelper.configureFaultInjectionRules(container, Arrays.asList(serverConnectionDelayRule)).block();

int partitionSize = container.getFeedRanges().block().size();
container.openConnectionsAndInitCaches().block();

if (primaryAddressesOnly) {
assertThat(serverConnectionDelayRule.getHitCount()).isEqualTo(partitionSize);
} else {
assertThat(serverConnectionDelayRule.getHitCount()).isBetween(partitionSize * 3L, partitionSize * 5L);
}
} finally {
serverConnectionDelayRule.disable();
safeClose(newClient);
}
}

@Test(groups = {"multi-region"}, dataProvider = "faultInjectionServerErrorResponseProvider", timeOut = TIMEOUT)
public void faultInjectionServerErrorRuleTests_ServerErrorResponse(
FaultInjectionServerErrorType serverErrorType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class DocumentServiceRequestContext implements Cloneable {
public volatile Boolean usePreferredLocations;
public volatile Integer locationIndexToRoute;
public volatile URI locationEndpointToRoute;
public volatile URI storePhysicalAddress; // DIRECT: rntbd physical address; GATEWAY: service endpoint
public volatile Uri storePhysicalAddressUri; // DIRECT: rntbd physical address; GATEWAY: service endpoint
public volatile boolean performedBackgroundAddressRefresh;
public volatile boolean performLocalRefreshOnGoneException;
public volatile List<String> storeResponses;
Expand Down Expand Up @@ -118,7 +118,7 @@ public DocumentServiceRequestContext clone() {
context.usePreferredLocations = this.usePreferredLocations;
context.locationIndexToRoute = this.locationIndexToRoute;
context.locationEndpointToRoute = this.locationEndpointToRoute;
context.storePhysicalAddress = this.storePhysicalAddress;
context.storePhysicalAddressUri = this.storePhysicalAddressUri;
context.performLocalRefreshOnGoneException = this.performLocalRefreshOnGoneException;
context.effectivePartitionKey = this.effectivePartitionKey;
context.performedBackgroundAddressRefresh = this.performedBackgroundAddressRefresh;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
import java.util.List;

public interface IOpenConnectionsHandler {
Flux<OpenConnectionResponse> openConnections(URI serviceEndpoint, List<Uri> addresses);
Flux<OpenConnectionResponse> openConnections(String collectionRid, URI serviceEndpoint, List<Uri> addresses);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ public enum ResourceType {
ClientEncryptionKey("ClientEncryptionKey", 141),

//Adding client telemetry resource type, only meant for client side
ClientTelemetry("ClientTelemetry", 1001);
ClientTelemetry("ClientTelemetry", 1001),

//Only meant to use on client side during connection open
Connection("Connection", 1002);

private final int value;
private final String stringValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public AddressInformation(boolean isPublic, boolean isPrimary, String physicalUr
this.isPublic = isPublic;
this.isPrimary = isPrimary;
this.physicalUri = new Uri(normalizePhysicalUri(physicalUri));
this.physicalUri.setPrimary(this.isPrimary);
}

public AddressInformation(boolean isPublic, boolean isPrimary, String physicalUri, String protocolScheme) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class GatewayAddressCache implements IAddressCache {
Expand Down Expand Up @@ -666,7 +667,7 @@ private Mono<AddressInformation[]> getAddressesForRangeId(
}

if (this.replicaAddressValidationEnabled) {
this.validateReplicaAddresses(mergedAddresses);
this.validateReplicaAddresses(collectionRid, mergedAddresses);
}

return Mono.just(mergedAddresses);
Expand Down Expand Up @@ -855,8 +856,9 @@ private AddressInformation[] mergeAddresses(AddressInformation[] newAddresses, A
return mergedAddresses.toArray(new AddressInformation[mergedAddresses.size()]);
}

private void validateReplicaAddresses(AddressInformation[] addresses) {
private void validateReplicaAddresses(String collectionRid, AddressInformation[] addresses) {
checkNotNull(addresses, "Argument 'addresses' can not be null");
checkArgument(StringUtils.isNotEmpty(collectionRid), "Argument 'collectionRid' can not be null");

// By theory, when we reach here, the status of the address should be in one of the three status: Unknown, Connected, UnhealthyPending
// using open connection to validate addresses in UnhealthyPending status
Expand All @@ -882,7 +884,7 @@ private void validateReplicaAddresses(AddressInformation[] addresses) {

if (addressesNeedToValidation.size() > 0) {
this.openConnectionsHandler
.openConnections(this.serviceEndpoint, addressesNeedToValidation)
.openConnections(collectionRid, this.serviceEndpoint, addressesNeedToValidation)
.subscribeOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC)
.subscribe();
}
Expand Down Expand Up @@ -972,6 +974,7 @@ public Flux<OpenConnectionResponse> openConnectionsAndInitCaches(

if (this.openConnectionsHandler != null) {
return this.openConnectionsHandler.openConnections(
collection.getResourceId(),
this.serviceEndpoint,
Arrays
.stream(addressInfo.getRight())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public Mono<StoreResponse> invokeStoreAsync(
}

@Override
public Mono<OpenConnectionResponse> openConnection(URI serviceEndpoint, Uri addressUri) {
public Mono<OpenConnectionResponse> openConnection(Uri addressUri, RxDocumentServiceRequest openConnectionRequest) {
throw new NotImplementedException("openConnection is not supported in httpTransportClient");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,11 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
this.throwIfClosed();

final URI address = addressUri.getURI();
request.requestContext.storePhysicalAddress = address;
request.requestContext.storePhysicalAddressUri = addressUri;

final RntbdRequestArgs requestArgs = new RntbdRequestArgs(request, addressUri);

final RntbdEndpoint endpoint = this.endpointProvider.createIfAbsent(request.requestContext.locationEndpointToRoute, address);
final RntbdEndpoint endpoint = this.endpointProvider.createIfAbsent(request.requestContext.locationEndpointToRoute, addressUri.getURI());
final RntbdRequestRecord record = endpoint.request(requestArgs);

final Context reactorContext = Context.of(KEY_ON_ERROR_DROPPED, onErrorDropHookWithReduceLogLevel);
Expand Down Expand Up @@ -349,16 +349,19 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
}

@Override
public Mono<OpenConnectionResponse> openConnection(URI serviceEndpoint, Uri addressUri) {
public Mono<OpenConnectionResponse> openConnection(Uri addressUri, RxDocumentServiceRequest openConnectionRequest) {
checkNotNull(openConnectionRequest, "Argument 'openConnectionRequest' should not be null");
checkNotNull(addressUri, "Argument 'addressUri' should not be null");
checkNotNull(serviceEndpoint, "Argument 'serviceEndpoint' should not be null");

this.throwIfClosed();

final URI address = addressUri.getURI();
final RntbdRequestArgs requestArgs = new RntbdRequestArgs(openConnectionRequest, addressUri);
final RntbdEndpoint endpoint =
this.endpointProvider.createIfAbsent(
openConnectionRequest.requestContext.locationEndpointToRoute,
addressUri.getURI());

final RntbdEndpoint endpoint = this.endpointProvider.createIfAbsent(serviceEndpoint, address);
return Mono.fromFuture(endpoint.openConnection(addressUri));
return Mono.fromFuture(endpoint.openConnection(requestArgs));
}

public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ protected Mono<StoreResponse> invokeStoreAsync(Uri physicalAddress, RxDocumentSe
}

@Override
public Mono<OpenConnectionResponse> openConnection(URI serviceEndpoint, Uri addressUri) {
return this.transportClient.openConnection(serviceEndpoint, addressUri);
public Mono<OpenConnectionResponse> openConnection(Uri physicalAddress, RxDocumentServiceRequest openConnectionRequest) {
return this.transportClient.openConnection(physicalAddress, openConnectionRequest);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ protected abstract Mono<StoreResponse> invokeStoreAsync(
/***
* Only open new connection if there is no existed established connection.
*
* @param addressUri the replica address.
*
* @param physicalAddress the store physical addresses.
* @param openConnectionRequest open connection request.
* @return the {@link OpenConnectionResponse}.
*/
public abstract Mono<OpenConnectionResponse> openConnection(URI serviceEndpoint, final Uri addressUri);
public abstract Mono<OpenConnectionResponse> openConnection(Uri physicalAddress, RxDocumentServiceRequest openConnectionRequest);

public abstract void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider);

Expand Down
Loading