Skip to content

Commit 1df4e35

Browse files
Pass PagerOptions, not Context into pager callback (#3352)
Fixes #3299
1 parent 4c03252 commit 1df4e35

File tree

11 files changed

+71
-77
lines changed

11 files changed

+71
-77
lines changed

sdk/core/azure_core/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
### Breaking Changes
88

9+
The callback for `Pager::with_callback` has a `PagerOptions` second parameter rather than a `Context` parameter.
10+
911
### Bugs Fixed
1012

1113
### Other Changes

sdk/core/azure_core/src/http/pager.rs

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ impl<P: Page> ItemIterator<P> {
299299
/// To page results using a next link:
300300
///
301301
/// ```rust,no_run
302-
/// # use azure_core::{Result, http::{RawResponse, Context, ItemIterator, pager::{Page, PagerResult, PagerState}, Pipeline, Request, Response, Method, Url}, json};
302+
/// # use azure_core::{Result, http::{RawResponse, ItemIterator, pager::{Page, PagerOptions, PagerResult, PagerState}, Pipeline, Request, Response, Method, Url}, json};
303303
/// # let api_version = "2025-06-04".to_string();
304304
/// # let pipeline: Pipeline = panic!("Not a runnable example");
305305
/// #[derive(serde::Deserialize)]
@@ -318,7 +318,7 @@ impl<P: Page> ItemIterator<P> {
318318
/// }
319319
/// let url = "https://example.com/my_paginated_api".parse().unwrap();
320320
/// let mut base_req = Request::new(url, Method::Get);
321-
/// let pager = ItemIterator::from_callback(move |next_link: PagerState<Url>, ctx: Context| {
321+
/// let pager = ItemIterator::from_callback(move |next_link: PagerState<Url>, options: PagerOptions| {
322322
/// // The callback must be 'static, so you have to clone and move any values you want to use.
323323
/// let pipeline = pipeline.clone();
324324
/// let api_version = api_version.clone();
@@ -337,7 +337,7 @@ impl<P: Page> ItemIterator<P> {
337337
/// .append_pair("api-version", &api_version);
338338
/// }
339339
/// let resp = pipeline
340-
/// .send(&ctx, &mut req, None)
340+
/// .send(&options.context, &mut req, None)
341341
/// .await?;
342342
/// let (status, headers, body) = resp.deconstruct();
343343
/// let result: ListItemsResult = json::from_json(&body)?;
@@ -373,7 +373,7 @@ impl<P: Page> ItemIterator<P> {
373373
/// }
374374
/// let url = "https://example.com/my_paginated_api".parse().unwrap();
375375
/// let mut base_req = Request::new(url, Method::Get);
376-
/// let pager = ItemIterator::from_callback(move |continuation, ctx| {
376+
/// let pager = ItemIterator::from_callback(move |continuation, options| {
377377
/// // The callback must be 'static, so you have to clone and move any values you want to use.
378378
/// let pipeline = pipeline.clone();
379379
/// let mut req = base_req.clone();
@@ -382,7 +382,7 @@ impl<P: Page> ItemIterator<P> {
382382
/// req.insert_header("x-ms-continuation", continuation);
383383
/// }
384384
/// let resp: Response<ListItemsResult> = pipeline
385-
/// .send(&ctx, &mut req, None)
385+
/// .send(&options.context, &mut req, None)
386386
/// .await?
387387
/// .into();
388388
/// Ok(PagerResult::from_response_header(resp, &HeaderName::from_static("x-next-continuation")))
@@ -392,10 +392,10 @@ impl<P: Page> ItemIterator<P> {
392392
pub fn from_callback<
393393
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
394394
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + FromStr + Send + 'static,
395-
#[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState<C>, Context<'static>) -> Fut + Send + 'static,
395+
#[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState<C>, PagerOptions<'static>) -> Fut + Send + 'static,
396396
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
397397
#[cfg(target_arch = "wasm32")] C: AsRef<str> + FromStr + 'static,
398-
#[cfg(target_arch = "wasm32")] F: Fn(PagerState<C>, Context<'static>) -> Fut + 'static,
398+
#[cfg(target_arch = "wasm32")] F: Fn(PagerState<C>, PagerOptions<'static>) -> Fut + 'static,
399399
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
400400
>(
401401
make_request: F,
@@ -406,7 +406,7 @@ impl<P: Page> ItemIterator<P> {
406406
{
407407
let options = options.unwrap_or_default();
408408
let next_token = Arc::new(Mutex::new(None::<String>));
409-
let stream = iter_from_callback(make_request, options.context.clone(), next_token.clone());
409+
let stream = iter_from_callback(make_request, options, next_token.clone());
410410

411411
Self {
412412
stream: Box::pin(stream),
@@ -619,7 +619,7 @@ impl<P> PageIterator<P> {
619619
/// To page results using a next link:
620620
///
621621
/// ```rust,no_run
622-
/// # use azure_core::{Result, http::{RawResponse, Context, pager::{PageIterator, PagerResult, PagerState}, Pipeline, Request, Response, Method, Url}, json};
622+
/// # use azure_core::{Result, http::{RawResponse, pager::{PageIterator, PagerOptions, PagerResult, PagerState}, Pipeline, Request, Response, Method, Url}, json};
623623
/// # let api_version = "2025-06-04".to_string();
624624
/// # let pipeline: Pipeline = panic!("Not a runnable example");
625625
/// #[derive(serde::Deserialize)]
@@ -629,7 +629,7 @@ impl<P> PageIterator<P> {
629629
/// }
630630
/// let url = "https://example.com/my_paginated_api".parse().unwrap();
631631
/// let mut base_req = Request::new(url, Method::Get);
632-
/// let pager = PageIterator::from_callback(move |next_link: PagerState<Url>, ctx| {
632+
/// let pager = PageIterator::from_callback(move |next_link: PagerState<Url>, options: PagerOptions<'static>| {
633633
/// // The callback must be 'static, so you have to clone and move any values you want to use.
634634
/// let pipeline = pipeline.clone();
635635
/// let api_version = api_version.clone();
@@ -648,7 +648,7 @@ impl<P> PageIterator<P> {
648648
/// .append_pair("api-version", &api_version);
649649
/// }
650650
/// let resp = pipeline
651-
/// .send(&ctx, &mut req, None)
651+
/// .send(&options.context, &mut req, None)
652652
/// .await?;
653653
/// let (status, headers, body) = resp.deconstruct();
654654
/// let result: ListItemsResult = json::from_json(&body)?;
@@ -675,7 +675,7 @@ impl<P> PageIterator<P> {
675675
/// }
676676
/// let url = "https://example.com/my_paginated_api".parse().unwrap();
677677
/// let mut base_req = Request::new(url, Method::Get);
678-
/// let pager = PageIterator::from_callback(move |continuation, ctx| {
678+
/// let pager = PageIterator::from_callback(move |continuation, options| {
679679
/// // The callback must be 'static, so you have to clone and move any values you want to use.
680680
/// let pipeline = pipeline.clone();
681681
/// let mut req = base_req.clone();
@@ -684,7 +684,7 @@ impl<P> PageIterator<P> {
684684
/// req.insert_header("x-ms-continuation", continuation);
685685
/// }
686686
/// let resp: Response<ListItemsResult> = pipeline
687-
/// .send(&ctx, &mut req, None)
687+
/// .send(&options.context, &mut req, None)
688688
/// .await?
689689
/// .into();
690690
/// Ok(PagerResult::from_response_header(resp, &HeaderName::from_static("x-ms-continuation")))
@@ -694,10 +694,10 @@ impl<P> PageIterator<P> {
694694
pub fn from_callback<
695695
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
696696
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + FromStr + Send + 'static,
697-
#[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState<C>, Context<'static>) -> Fut + Send + 'static,
697+
#[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState<C>, PagerOptions<'static>) -> Fut + Send + 'static,
698698
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
699699
#[cfg(target_arch = "wasm32")] C: AsRef<str> + FromStr + 'static,
700-
#[cfg(target_arch = "wasm32")] F: Fn(PagerState<C>, Context<'static>) -> Fut + 'static,
700+
#[cfg(target_arch = "wasm32")] F: Fn(PagerState<C>, PagerOptions<'static>) -> Fut + 'static,
701701
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
702702
>(
703703
make_request: F,
@@ -708,11 +708,7 @@ impl<P> PageIterator<P> {
708708
{
709709
let options = options.unwrap_or_default();
710710
let continuation_token = Arc::new(Mutex::new(None::<String>));
711-
let stream = iter_from_callback(
712-
make_request,
713-
options.context.clone(),
714-
continuation_token.clone(),
715-
);
711+
let stream = iter_from_callback(make_request, options, continuation_token.clone());
716712

717713
Self {
718714
stream: Box::pin(stream),
@@ -855,22 +851,22 @@ where
855851
state: State<C>,
856852
make_request: F,
857853
continuation_token: Arc<Mutex<Option<String>>>,
858-
ctx: Context<'a>,
854+
options: PagerOptions<'a>,
859855
added_span: bool,
860856
}
861857

862858
fn iter_from_callback<
863859
P,
864860
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
865861
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + FromStr + Send + 'static,
866-
#[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState<C>, Context<'static>) -> Fut + Send + 'static,
862+
#[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState<C>, PagerOptions<'static>) -> Fut + Send + 'static,
867863
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
868864
#[cfg(target_arch = "wasm32")] C: AsRef<str> + FromStr + 'static,
869-
#[cfg(target_arch = "wasm32")] F: Fn(PagerState<C>, Context<'static>) -> Fut + 'static,
865+
#[cfg(target_arch = "wasm32")] F: Fn(PagerState<C>, PagerOptions<'static>) -> Fut + 'static,
870866
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
871867
>(
872868
make_request: F,
873-
ctx: Context<'static>,
869+
options: PagerOptions<'static>,
874870
continuation_token: Arc<Mutex<Option<String>>>,
875871
) -> impl Stream<Item = crate::Result<P>> + 'static
876872
where
@@ -881,7 +877,7 @@ where
881877
state: State::Init,
882878
make_request,
883879
continuation_token,
884-
ctx,
880+
options,
885881
added_span: false,
886882
},
887883
|mut stream_state| async move {
@@ -891,10 +887,11 @@ where
891887
tracing::debug!("establish a public API span for new pager.");
892888

893889
// At the very start of polling, create a span for the entire request, and attach it to the context
894-
let span = create_public_api_span(&stream_state.ctx, None, None);
890+
let span = create_public_api_span(&stream_state.options.context, None, None);
895891
if let Some(ref s) = span {
896892
stream_state.added_span = true;
897-
stream_state.ctx = stream_state.ctx.with_value(s.clone());
893+
stream_state.options.context =
894+
stream_state.options.context.with_value(s.clone());
898895
}
899896
}
900897

@@ -929,11 +926,13 @@ where
929926
let result = match stream_state.state {
930927
State::Init => {
931928
tracing::debug!("initial page request");
932-
(stream_state.make_request)(PagerState::Initial, stream_state.ctx.clone()).await
929+
(stream_state.make_request)(PagerState::Initial, stream_state.options.clone())
930+
.await
933931
}
934932
State::More(n) => {
935933
tracing::debug!("subsequent page request to {:?}", AsRef::<str>::as_ref(&n));
936-
(stream_state.make_request)(PagerState::More(n), stream_state.ctx.clone()).await
934+
(stream_state.make_request)(PagerState::More(n), stream_state.options.clone())
935+
.await
937936
}
938937
State::Done => {
939938
tracing::debug!("done");
@@ -947,7 +946,7 @@ where
947946
let (item, next_state) = match result {
948947
Err(e) => {
949948
if stream_state.added_span {
950-
if let Some(span) = stream_state.ctx.value::<Arc<dyn Span>>() {
949+
if let Some(span) = stream_state.options.context.value::<Arc<dyn Span>>() {
951950
// Mark the span as an error with an appropriate description.
952951
span.set_status(SpanStatus::Error {
953952
description: e.to_string(),
@@ -978,7 +977,7 @@ where
978977
// When the result is done, finalize the span. Note that we only do that if we created the span in the first place,
979978
// otherwise it is the responsibility of the caller to end their span.
980979
if stream_state.added_span {
981-
if let Some(span) = stream_state.ctx.value::<Arc<dyn Span>>() {
980+
if let Some(span) = stream_state.options.context.value::<Arc<dyn Span>>() {
982981
// P is unconstrained, so it's not possible to retrieve the status code for now.
983982

984983
span.end();
@@ -1000,8 +999,8 @@ mod tests {
1000999
error::ErrorKind,
10011000
http::{
10021001
headers::{HeaderName, HeaderValue},
1003-
pager::{PageIterator, Pager, PagerResult, PagerState},
1004-
Context, RawResponse, Response, StatusCode,
1002+
pager::{PageIterator, Pager, PagerOptions, PagerResult, PagerState},
1003+
RawResponse, Response, StatusCode,
10051004
},
10061005
};
10071006
use async_trait::async_trait;
@@ -1083,7 +1082,7 @@ mod tests {
10831082
#[tokio::test]
10841083
async fn callback_item_pagination_error() {
10851084
let pager: Pager<Page> = Pager::from_callback(
1086-
|continuation: PagerState<String>, _ctx| async move {
1085+
|continuation: PagerState<String>, _options| async move {
10871086
match continuation {
10881087
PagerState::Initial => Ok(PagerResult::More {
10891088
response: RawResponse::from_bytes(
@@ -1556,12 +1555,12 @@ mod tests {
15561555
#[allow(clippy::type_complexity)]
15571556
fn make_three_page_callback() -> impl Fn(
15581557
PagerState<String>,
1559-
Context<'_>,
1558+
PagerOptions<'_>,
15601559
) -> Pin<
15611560
Box<dyn Future<Output = crate::Result<PagerResult<Response<Page>, String>>> + Send>,
15621561
> + Send
15631562
+ 'static {
1564-
|continuation: PagerState<String>, _ctx| {
1563+
|continuation: PagerState<String>, _options| {
15651564
Box::pin(async move {
15661565
match continuation.as_deref() {
15671566
PagerState::Initial => Ok(PagerResult::More {

sdk/core/azure_core/src/http/poller.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,14 +178,6 @@ impl<'a> PollerOptions<'a> {
178178
frequency: self.frequency,
179179
}
180180
}
181-
182-
/// Creates a new owned instance of `PollerOptions` so it can outlive the current scope.
183-
pub fn to_owned(&self) -> PollerOptions<'static> {
184-
PollerOptions {
185-
context: self.context.to_owned(),
186-
frequency: self.frequency,
187-
}
188-
}
189181
}
190182

191183
/// The result of fetching the status monitor from a [`Poller`], whether the long-running operation (LRO) is in progress or done.

sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,19 +119,20 @@ impl CosmosPipeline {
119119
context: ctx.with_value(resource_link).into_owned(),
120120
};
121121
Ok(FeedPager::from_callback(
122-
move |continuation, ctx| {
122+
move |continuation, pager_options| {
123123
// Then we have to clone it again to pass it in to the async block.
124124
// This is because Pageable can't borrow any data, it has to own it all.
125125
// That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning.
126126
let pipeline = pipeline.clone();
127127
let mut req = base_request.clone();
128-
let ctx = ctx.clone();
129128
async move {
130129
if let PagerState::More(continuation) = continuation {
131130
req.insert_header(constants::CONTINUATION, continuation);
132131
}
133132

134-
let resp = pipeline.send(&ctx, &mut req, None).await?;
133+
let resp = pipeline
134+
.send(&pager_options.context, &mut req, None)
135+
.await?;
135136
let page = FeedPage::<T>::from_response(resp).await?;
136137

137138
Ok(page.into())

sdk/keyvault/azure_security_keyvault_certificates/src/clients.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl CertificateClient {
7878
parameters: RequestContent<CreateCertificateParameters>,
7979
options: Option<CertificateClientCreateCertificateOptions<'_>>,
8080
) -> Result<Poller<CertificateOperation>> {
81-
let options = options.unwrap_or_default();
81+
let options = options.unwrap_or_default().into_owned();
8282
let pipeline = self.pipeline.clone();
8383

8484
let mut url = self.endpoint.clone();
@@ -189,7 +189,7 @@ impl CertificateClient {
189189
})
190190
}
191191
},
192-
Some(options.method_options.into_owned()),
192+
Some(options.method_options),
193193
))
194194
}
195195
}

sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)