Skip to content

Commit fd5541f

Browse files
xinlian12annie-mac
andauthored
FaultInjectionFix-WarmUpFlow (Azure#34096)
* apply connection delay rule on openConnection flow as well --------- Co-authored-by: annie-mac <[email protected]>
1 parent c602ac6 commit fd5541f

31 files changed

+268
-84
lines changed

sdk/cosmos/azure-cosmos-test/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where `CONNECTION_DELAY` fault injection rule is not applied during `openConnectionsAndInitCaches` - See [PR 34096](https://github.com/Azure/azure-sdk-for-java/pull/34096)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos-test/README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,5 +170,3 @@ or contact [[email protected]][coc_contact] with any additional questions o
170170
[troubleshooting]: https://docs.microsoft.com/azure/cosmos-db/troubleshoot-java-sdk-v4-sql
171171
[perf_guide]: https://docs.microsoft.com/azure/cosmos-db/performance-tips-java-sdk-v4-sql?tabs=api-async
172172
[quickstart]: https://docs.microsoft.com/azure/cosmos-db/create-sql-api-java?tabs=sync
173-
174-
![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Fcosmos%2Fazure-cosmos-encryption%2FREADME.png)

sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionConditionInternal.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,15 @@ public List<URI> getAddresses() {
5151
return physicalAddresses;
5252
}
5353

54-
public void setAddresses(List<URI> physicalAddresses) {
54+
public void setAddresses(List<URI> physicalAddresses, boolean primaryOnly) {
5555
this.physicalAddresses = physicalAddresses;
5656
if (physicalAddresses != null && physicalAddresses.size() > 0) {
5757
this.validators.add(new AddressValidator(physicalAddresses));
5858
}
59+
60+
if (primaryOnly) {
61+
this.validators.add(new PrimaryAddressValidator());
62+
}
5963
}
6064

6165
public boolean isApplicable(RxDocumentServiceRequest request) {
@@ -115,11 +119,18 @@ public boolean isApplicable(RxDocumentServiceRequest request) {
115119
&& addresses.size() > 0) {
116120
return this.addresses
117121
.stream()
118-
.anyMatch(address -> request.requestContext.storePhysicalAddress.toString().startsWith(address.toString()));
122+
.anyMatch(address -> request.requestContext.storePhysicalAddressUri.getURIAsString().startsWith(address.toString()));
119123
}
120124

121125
return true;
122126
}
123127
}
128+
129+
static class PrimaryAddressValidator implements IFaultInjectionConditionValidator {
130+
@Override
131+
public boolean isApplicable(RxDocumentServiceRequest request) {
132+
return request.requestContext.storePhysicalAddressUri.isPrimary();
133+
}
134+
}
124135
//endregion
125136
}

sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleProcessor.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,12 @@ private Mono<IFaultInjectionRuleInternal> getEffectiveServerErrorRule(
168168
// TODO: add handling for gateway mode
169169

170170
// Direct connection mode, populate physical addresses
171+
boolean primaryAddressesOnly = this.isWriteOnly(rule.getCondition());
171172
return BackoffRetryUtility.executeRetry(
172173
() -> this.resolvePhysicalAddresses(
173174
regionEndpoints,
174175
rule.getCondition().getEndpoints(),
175-
this.isWriteOnlyEndpoint(rule.getCondition()),
176+
primaryAddressesOnly,
176177
documentCollection),
177178
new FaultInjectionRuleProcessorRetryPolicy(this.retryOptions)
178179
)
@@ -186,7 +187,7 @@ private Mono<IFaultInjectionRuleInternal> getEffectiveServerErrorRule(
186187
.collect(Collectors.toList());
187188
}
188189

189-
effectiveCondition.setAddresses(effectiveAddresses);
190+
effectiveCondition.setAddresses(effectiveAddresses, primaryAddressesOnly);
190191
return effectiveCondition;
191192
});
192193
})
@@ -226,7 +227,7 @@ private Mono<IFaultInjectionRuleInternal> getEffectiveConnectionErrorRule(
226227
return this.resolvePhysicalAddresses(
227228
regionEndpoints,
228229
rule.getCondition().getEndpoints(),
229-
this.isWriteOnlyEndpoint(rule.getCondition()),
230+
this.isWriteOnly(rule.getCondition()),
230231
documentCollection)
231232
.map(physicalAddresses -> {
232233
List<URI> effectiveAddresses =
@@ -258,7 +259,7 @@ private Mono<IFaultInjectionRuleInternal> getEffectiveConnectionErrorRule(
258259
* @return the region service endpoints.
259260
*/
260261
private List<URI> getRegionEndpoints(FaultInjectionCondition condition) {
261-
boolean isWriteOnlyEndpoints = this.isWriteOnlyEndpoint(condition);
262+
boolean isWriteOnlyEndpoints = this.isWriteOnly(condition);
262263

263264
if (StringUtils.isNotEmpty(condition.getRegion())) {
264265
return Arrays.asList(
@@ -362,7 +363,7 @@ private Mono<List<URI>> resolvePhysicalAddresses(
362363
.collectList();
363364
}
364365

365-
private boolean isWriteOnlyEndpoint(FaultInjectionCondition condition) {
366+
private boolean isWriteOnly(FaultInjectionCondition condition) {
366367
return condition.getOperationType() != null
367368
&& this.getEffectiveOperationType(condition.getOperationType()).isWriteOperation();
368369
}

sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/RntbdServerErrorInjector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.azure.cosmos.CosmosException;
77
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
8+
import com.azure.cosmos.implementation.directconnectivity.rntbd.IRequestRecord;
89
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestRecord;
910
import com.azure.cosmos.implementation.faultinjection.IRntbdServerErrorInjector;
1011

@@ -67,7 +68,7 @@ public boolean injectRntbdServerResponseError(RntbdRequestRecord requestRecord)
6768

6869
@Override
6970
public boolean injectRntbdServerConnectionDelay(
70-
RntbdRequestRecord requestRecord,
71+
IRequestRecord requestRecord,
7172
Consumer<Duration> openConnectionWithDelayConsumer) {
7273
if (requestRecord == null) {
7374
return false;
@@ -79,11 +80,10 @@ public boolean injectRntbdServerConnectionDelay(
7980
if (serverConnectionDelayRule != null) {
8081
request.faultInjectionRequestContext
8182
.applyFaultInjectionRule(
82-
requestRecord.transportRequestId(),
83+
requestRecord.getRequestId(),
8384
serverConnectionDelayRule.getId());
8485
openConnectionWithDelayConsumer.accept(serverConnectionDelayRule.getResult().getDelay());
8586
return true;
86-
8787
}
8888

8989
return false;

sdk/cosmos/azure-cosmos-test/src/test/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorRuleTests.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,19 @@ public static Object[][] operationTypeProvider() {
9191
};
9292
}
9393

94+
@DataProvider(name = "faultInjectionOperationTypeProvider")
95+
public static Object[][] faultInjectionOperationTypeProvider() {
96+
return new Object[][]{
97+
// fault injection operation type, primaryAddressOnly
98+
{ FaultInjectionOperationType.READ_ITEM, false },
99+
{ FaultInjectionOperationType.REPLACE_ITEM, true },
100+
{ FaultInjectionOperationType.CREATE_ITEM, true },
101+
{ FaultInjectionOperationType.DELETE_ITEM, true},
102+
{ FaultInjectionOperationType.QUERY_ITEM, false },
103+
{ FaultInjectionOperationType.PATCH_ITEM, true }
104+
};
105+
}
106+
94107
@DataProvider(name = "faultInjectionServerErrorResponseProvider")
95108
public static Object[][] faultInjectionServerErrorResponseProvider() {
96109
return new Object[][]{
@@ -565,6 +578,64 @@ public void faultInjectionServerErrorRuleTests_ServerConnectionDelay() throws Js
565578
}
566579
}
567580

581+
@Test(groups = {"multi-region"}, dataProvider = "faultInjectionOperationTypeProvider", timeOut = TIMEOUT)
582+
public void faultInjectionServerErrorRuleTests_ServerConnectionDelay_warmup(
583+
FaultInjectionOperationType operationType,
584+
boolean primaryAddressesOnly) {
585+
586+
CosmosAsyncClient newClient = null; // creating new client to force creating new connections
587+
// simulate high channel acquisition/connectionTimeout during openConnection flow
588+
String ruleId = "serverErrorRule-serverConnectionDelay-warmup" + UUID.randomUUID();
589+
FaultInjectionRule serverConnectionDelayRule =
590+
new FaultInjectionRuleBuilder(ruleId)
591+
.condition(
592+
new FaultInjectionConditionBuilder()
593+
.operationType(operationType)
594+
.build()
595+
)
596+
.result(
597+
FaultInjectionResultBuilders
598+
.getResultBuilder(FaultInjectionServerErrorType.CONNECTION_DELAY)
599+
.delay(Duration.ofSeconds(2))
600+
.times(1)
601+
.build()
602+
)
603+
.duration(Duration.ofMinutes(5))
604+
.build();
605+
606+
try {
607+
DirectConnectionConfig directConnectionConfig = DirectConnectionConfig.getDefaultConfig();
608+
directConnectionConfig.setConnectTimeout(Duration.ofSeconds(1));
609+
610+
newClient = new CosmosClientBuilder()
611+
.endpoint(TestConfigurations.HOST)
612+
.key(TestConfigurations.MASTER_KEY)
613+
.contentResponseOnWriteEnabled(true)
614+
.consistencyLevel(BridgeInternal.getContextClient(this.client).getConsistencyLevel())
615+
.directMode(directConnectionConfig)
616+
.buildAsyncClient();
617+
618+
CosmosAsyncContainer container =
619+
newClient
620+
.getDatabase(cosmosAsyncContainer.getDatabase().getId())
621+
.getContainer(cosmosAsyncContainer.getId());
622+
623+
CosmosFaultInjectionHelper.configureFaultInjectionRules(container, Arrays.asList(serverConnectionDelayRule)).block();
624+
625+
int partitionSize = container.getFeedRanges().block().size();
626+
container.openConnectionsAndInitCaches().block();
627+
628+
if (primaryAddressesOnly) {
629+
assertThat(serverConnectionDelayRule.getHitCount()).isEqualTo(partitionSize);
630+
} else {
631+
assertThat(serverConnectionDelayRule.getHitCount()).isBetween(partitionSize * 3L, partitionSize * 5L);
632+
}
633+
} finally {
634+
serverConnectionDelayRule.disable();
635+
safeClose(newClient);
636+
}
637+
}
638+
568639
@Test(groups = {"multi-region"}, dataProvider = "faultInjectionServerErrorResponseProvider", timeOut = TIMEOUT)
569640
public void faultInjectionServerErrorRuleTests_ServerErrorResponse(
570641
FaultInjectionServerErrorType serverErrorType,

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class DocumentServiceRequestContext implements Cloneable {
3333
public volatile Boolean usePreferredLocations;
3434
public volatile Integer locationIndexToRoute;
3535
public volatile URI locationEndpointToRoute;
36-
public volatile URI storePhysicalAddress; // DIRECT: rntbd physical address; GATEWAY: service endpoint
36+
public volatile Uri storePhysicalAddressUri; // DIRECT: rntbd physical address; GATEWAY: service endpoint
3737
public volatile boolean performedBackgroundAddressRefresh;
3838
public volatile boolean performLocalRefreshOnGoneException;
3939
public volatile List<String> storeResponses;
@@ -118,7 +118,7 @@ public DocumentServiceRequestContext clone() {
118118
context.usePreferredLocations = this.usePreferredLocations;
119119
context.locationIndexToRoute = this.locationIndexToRoute;
120120
context.locationEndpointToRoute = this.locationEndpointToRoute;
121-
context.storePhysicalAddress = this.storePhysicalAddress;
121+
context.storePhysicalAddressUri = this.storePhysicalAddressUri;
122122
context.performLocalRefreshOnGoneException = this.performLocalRefreshOnGoneException;
123123
context.effectivePartitionKey = this.effectivePartitionKey;
124124
context.performedBackgroundAddressRefresh = this.performedBackgroundAddressRefresh;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IOpenConnectionsHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@
1010
import java.util.List;
1111

1212
public interface IOpenConnectionsHandler {
13-
Flux<OpenConnectionResponse> openConnections(URI serviceEndpoint, List<Uri> addresses);
13+
Flux<OpenConnectionResponse> openConnections(String collectionRid, URI serviceEndpoint, List<Uri> addresses);
1414
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResourceType.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ public enum ResourceType {
6464
ClientEncryptionKey("ClientEncryptionKey", 141),
6565

6666
//Adding client telemetry resource type, only meant for client side
67-
ClientTelemetry("ClientTelemetry", 1001);
67+
ClientTelemetry("ClientTelemetry", 1001),
68+
69+
//Only meant to use on client side during connection open
70+
Connection("Connection", 1002);
6871

6972
private final int value;
7073
private final String stringValue;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressInformation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public AddressInformation(boolean isPublic, boolean isPrimary, String physicalUr
2525
this.isPublic = isPublic;
2626
this.isPrimary = isPrimary;
2727
this.physicalUri = new Uri(normalizePhysicalUri(physicalUri));
28+
this.physicalUri.setPrimary(this.isPrimary);
2829
}
2930

3031
public AddressInformation(boolean isPublic, boolean isPrimary, String physicalUri, String protocolScheme) {

0 commit comments

Comments
 (0)