Skip to content

Commit b592daf

Browse files
Cosmos: Adds GlobalEndpointManager, ClientRetryPolicy and LocationCache To Support Cross-Regional Routing (#3348)
## Description: This PR is introducing `GlobalEndpointManager`, `ClientRetryPolicy`, `LocationCache` and `AccountProperties` to support cross-regional routing in the rust SDK. Some of the key design decisions are highlighted below: - We used [Moka ](https://docs.rs/moka/latest/moka/)as a caching library which supports thread safe access of the cached data and uses LRFU based cache eviction mechanism. - The cache refresh has been done on a TTL (time to live) model, where each incoming request has the opportunity to refresh `AccountProperties` and `LocationCache`, if the TTL expires or `force_refresh` is passed as `True`. - We haven't took the background refresh route yet, since we will need to be a little careful and optimized for the rust bg task management. However, our long term goal is to refresh the cache in the background, so that we do not penalize an incoming request to refresh the cache. The below flow diagram depicts the entire component interactions to better understand how the request will be made. ## Flow Diagram: ```mermaid sequenceDiagram participant J as ContainerClient participant A as CosmosPipeline participant B as RetryHandler participant C as ClientRetryPolicy participant D as Core Pipeline <br> send_raw <br> through delegate participant E as GlobalEndpointManager participant F as LocationCache J-->>A: 1. send(CosmosRequest) A->>B: 2. send<Sender, Fut>() loop Retry Iterations with force-refresh flag = false/ true B->>C: 3. before_send_request(request) C->>E: 4. refresh_location_async() <br> Refresh Cache only when TTL Expires OR <br> force_refresh is requested. <br><br> NO-OP when either of the above is false. E->>E: 5. get_database_account() <br> Fetch Account Properties <br> from Gateway. E->>F: 6. on_database_account_read() <br> Refresh Location Cache C->>E: 7. resolve_service_endpoint() <br> Fetch and set the next endpoint in <br> CosmosRequest to route request. B->>D: 8. Call sender delegate(request) B->>C: 9. Invoke should_retry(result) B->>B: 10. RetryAfter <br> (TimeSpan.Zero) end B->>J: 11. Return result if No Retry ``` ## Closing issues Fixes #3175
1 parent ba4ca41 commit b592daf

25 files changed

+3293
-328
lines changed

Cargo.lock

Lines changed: 43 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ wasm-bindgen-futures = "0.4"
153153
wasm-bindgen-test = "0.3"
154154
zerofrom = "0.1.5"
155155
zip = { version = "4.0.0", default-features = false, features = ["deflate"] }
156+
moka = { version = "0.12", features = ["future"] }
156157

157158
[workspace.lints.clippy]
158159
large_futures = "deny"

eng/dict/crates.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,4 @@ url
7272
uuid
7373
zerofrom
7474
zip
75+
moka

sdk/cosmos/.dict.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ udfs
88
backoff
99
pluggable
1010
cloneable
11+
moka
12+
substatus
13+
retryable
1114

1215
# Cosmos' docs all use "Autoscale" as a single word, rather than a compound "AutoScale" or "Auto Scale"
1316
autoscale

sdk/cosmos/azure_data_cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Features Added
66

7+
- Added GlobalEndpointManager, LocationCache to support Cross Regional Retry.
78
- Added `continuation_token` to `PagerOptions` for methods that return a `Pager`.
89

910
### Breaking Changes

sdk/cosmos/azure_data_cosmos/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ serde_json.workspace = true
2121
serde.workspace = true
2222
tracing.workspace = true
2323
url.workspace = true
24+
moka.workspace = true
2425

2526
[dev-dependencies]
2627
azure_identity.workspace = true

sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use crate::{
99
DeleteContainerOptions, FeedPager, ItemOptions, PartitionKey, Query, ReplaceContainerOptions,
1010
ThroughputOptions,
1111
};
12+
use std::sync::Arc;
1213

13-
use crate::cosmos_request::CosmosRequestBuilder;
14+
use crate::cosmos_request::CosmosRequest;
1415
use crate::operation_context::OperationType;
1516
use azure_core::http::response::Response;
1617
use serde::{de::DeserializeOwned, Serialize};
@@ -22,19 +23,19 @@ use serde::{de::DeserializeOwned, Serialize};
2223
pub struct ContainerClient {
2324
link: ResourceLink,
2425
items_link: ResourceLink,
25-
pipeline: CosmosPipeline,
26+
pipeline: Arc<CosmosPipeline>,
2627
}
2728

2829
impl ContainerClient {
2930
pub(crate) fn new(
30-
pipeline: CosmosPipeline,
31+
pipeline: Arc<CosmosPipeline>,
3132
database_link: &ResourceLink,
3233
container_id: &str,
3334
) -> Self {
3435
let link = database_link
3536
.feed(ResourceType::Containers)
3637
.item(container_id);
37-
let items_link = link.feed(ResourceType::Items);
38+
let items_link = link.feed(ResourceType::Documents);
3839

3940
Self {
4041
link,
@@ -65,13 +66,9 @@ impl ContainerClient {
6566
) -> azure_core::Result<Response<ContainerProperties>> {
6667
let options = options.unwrap_or_default();
6768
let cosmos_request =
68-
CosmosRequestBuilder::new(OperationType::Read, ResourceType::Containers).build()?;
69+
CosmosRequest::builder(OperationType::Read, self.link.clone()).build()?;
6970
self.pipeline
70-
.send(
71-
cosmos_request,
72-
self.link.clone(),
73-
options.method_options.context,
74-
)
71+
.send(cosmos_request, options.method_options.context)
7572
.await
7673
}
7774

@@ -111,14 +108,11 @@ impl ContainerClient {
111108
options: Option<ReplaceContainerOptions<'_>>,
112109
) -> azure_core::Result<Response<ContainerProperties>> {
113110
let options = options.unwrap_or_default();
114-
let builder = CosmosRequestBuilder::new(OperationType::Replace, ResourceType::Containers);
115-
let cosmos_request = builder.json(&properties).build()?;
111+
let cosmos_request = CosmosRequest::builder(OperationType::Replace, self.link.clone())
112+
.json(&properties)
113+
.build()?;
116114
self.pipeline
117-
.send(
118-
cosmos_request,
119-
self.link.clone(),
120-
options.method_options.context,
121-
)
115+
.send(cosmos_request, options.method_options.context)
122116
.await
123117
}
124118

@@ -181,14 +175,10 @@ impl ContainerClient {
181175
options: Option<DeleteContainerOptions<'_>>,
182176
) -> azure_core::Result<Response<()>> {
183177
let options = options.unwrap_or_default();
184-
let builder = CosmosRequestBuilder::new(OperationType::Delete, ResourceType::Containers);
185-
let cosmos_request = builder.build()?;
178+
let cosmos_request =
179+
CosmosRequest::builder(OperationType::Delete, self.link.clone()).build()?;
186180
self.pipeline
187-
.send(
188-
cosmos_request,
189-
self.link.clone(),
190-
options.method_options.context,
191-
)
181+
.send(cosmos_request, options.method_options.context)
192182
.await
193183
}
194184

@@ -264,19 +254,14 @@ impl ContainerClient {
264254
options: Option<ItemOptions<'_>>,
265255
) -> azure_core::Result<Response<()>> {
266256
let options = options.clone().unwrap_or_default();
267-
let builder = CosmosRequestBuilder::new(OperationType::Create, ResourceType::Items);
268-
let cosmos_request = builder
257+
let cosmos_request = CosmosRequest::builder(OperationType::Create, self.items_link.clone())
269258
.headers(&options)
270259
.json(&item)
271260
.partition_key(partition_key.into())
272261
.build()?;
273262

274263
self.pipeline
275-
.send(
276-
cosmos_request,
277-
self.items_link.clone(),
278-
options.method_options.context,
279-
)
264+
.send(cosmos_request, options.method_options.context)
280265
.await
281266
}
282267

@@ -354,15 +339,14 @@ impl ContainerClient {
354339
) -> azure_core::Result<Response<()>> {
355340
let link = self.items_link.item(item_id);
356341
let options = options.clone().unwrap_or_default();
357-
let builder = CosmosRequestBuilder::new(OperationType::Replace, ResourceType::Items);
358-
let cosmos_request = builder
342+
let cosmos_request = CosmosRequest::builder(OperationType::Replace, link)
359343
.headers(&options)
360344
.json(&item)
361345
.partition_key(partition_key.into())
362346
.build()?;
363347

364348
self.pipeline
365-
.send(cosmos_request, link, options.method_options.context)
349+
.send(cosmos_request, options.method_options.context)
366350
.await
367351
}
368352

@@ -441,19 +425,14 @@ impl ContainerClient {
441425
options: Option<ItemOptions<'_>>,
442426
) -> azure_core::Result<Response<()>> {
443427
let options = options.clone().unwrap_or_default();
444-
let builder = CosmosRequestBuilder::new(OperationType::Upsert, ResourceType::Items);
445-
let cosmos_request = builder
428+
let cosmos_request = CosmosRequest::builder(OperationType::Upsert, self.items_link.clone())
446429
.headers(&options)
447430
.json(&item)
448431
.partition_key(partition_key.into())
449432
.build()?;
450433

451434
self.pipeline
452-
.send(
453-
cosmos_request,
454-
self.items_link.clone(),
455-
options.method_options.context,
456-
)
435+
.send(cosmos_request, options.method_options.context)
457436
.await
458437
}
459438

@@ -499,14 +478,13 @@ impl ContainerClient {
499478
options.enable_content_response_on_write = true;
500479

501480
let link = self.items_link.item(item_id);
502-
let builder = CosmosRequestBuilder::new(OperationType::Read, ResourceType::Items);
503-
let cosmos_request = builder
481+
let cosmos_request = CosmosRequest::builder(OperationType::Read, link)
504482
.partition_key(partition_key.into())
505483
.headers(&options)
506484
.build()?;
507485

508486
self.pipeline
509-
.send(cosmos_request, link, options.method_options.context)
487+
.send(cosmos_request, options.method_options.context)
510488
.await
511489
}
512490

@@ -538,15 +516,13 @@ impl ContainerClient {
538516
) -> azure_core::Result<Response<()>> {
539517
let link = self.items_link.item(item_id);
540518
let options = options.clone().unwrap_or_default();
541-
542-
let builder = CosmosRequestBuilder::new(OperationType::Delete, ResourceType::Items);
543-
let cosmos_request = builder
519+
let cosmos_request = CosmosRequest::builder(OperationType::Delete, link)
544520
.partition_key(partition_key.into())
545521
.headers(&options)
546522
.build()?;
547523

548524
self.pipeline
549-
.send(cosmos_request, link, options.method_options.context)
525+
.send(cosmos_request, options.method_options.context)
550526
.await
551527
}
552528

@@ -614,15 +590,14 @@ impl ContainerClient {
614590
) -> azure_core::Result<Response<()>> {
615591
let options = options.clone().unwrap_or_default();
616592
let link = self.items_link.item(item_id);
617-
let builder = CosmosRequestBuilder::new(OperationType::Patch, ResourceType::Items);
618-
let cosmos_request = builder
593+
let cosmos_request = CosmosRequest::builder(OperationType::Patch, link)
619594
.partition_key(partition_key.into())
620595
.headers(&options)
621596
.json(&patch)
622597
.build()?;
623598

624599
self.pipeline
625-
.send(cosmos_request, link, options.method_options.context)
600+
.send(cosmos_request, options.method_options.context)
626601
.await
627602
}
628603

0 commit comments

Comments
 (0)