Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions eng/test/mock_transport/src/player_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ impl MockTransportPlayerPolicy {
}
}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl Policy for MockTransportPlayerPolicy {
async fn send(
&self,
Expand Down
3 changes: 1 addition & 2 deletions eng/test/mock_transport/src/recorder_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ impl MockTransportRecorderPolicy {
}
}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl Policy for MockTransportRecorderPolicy {
async fn send(
&self,
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ impl TokenResponse {
}

/// Represents a credential capable of providing an OAuth token.
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
pub trait TokenCredential: Send + Sync {
/// Gets a `TokenResponse` for the specified resource
async fn get_token(&self, resource: &str) -> crate::Result<TokenResponse>;
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/bytes_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ impl Stream for BytesStream {
}
}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl SeekableStream for BytesStream {
async fn reset(&mut self) -> crate::Result<()> {
self.bytes_read = 0;
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/http_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ use bytes::Bytes;
use serde::Serialize;

/// An HTTP client which can send requests.
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[async_trait]
pub trait HttpClient: Send + Sync + std::fmt::Debug {
/// Send out a request using `azure_core`'s types.
///
Expand Down
6 changes: 1 addition & 5 deletions sdk/core/src/http_client/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ use async_trait::async_trait;
#[derive(Debug)]
pub struct NoopClient;

// TODO(rylev): we probably don't want to limit this to wasm32
// as there will be wasm environments with threads.
// This should instead be a feature flag
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[async_trait]
impl crate::HttpClient for NoopClient {
async fn execute_request(&self, request: &crate::Request) -> crate::Result<crate::Response> {
panic!(
Expand Down
24 changes: 2 additions & 22 deletions sdk/core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ macro_rules! operation {
@nosetter
$($nosetter: $nstype),*
}
$crate::future!($name);

azure_core::__private::paste! {
pub type $name = futures::future::BoxFuture<'static, azure_core::Result<[<$name Response>]>>;
#[cfg(feature = "into_future")]
impl <$($generic: $first_constraint $(+ $constraint)*)* $(+ $lt)*> std::future::IntoFuture for [<$name Builder>]<$($generic),*> {
type IntoFuture = $name;
Expand Down Expand Up @@ -278,27 +279,6 @@ macro_rules! operation {
}
}

/// Declare a `Future` with the given name
///
/// `Future::Output` will be set to `azure_core::Result<$NAMEResponse>.
/// The `Future` will be `Send` for all targets but `wasm32`.
#[macro_export]
macro_rules! future {
($name:ident) => {
$crate::future!($name<>);
};
($name:ident<$($generic:ident)?>) => {
azure_core::__private::paste! {
#[cfg(target_arch = "wasm32")]
pub type $name<$($generic)*> =
std::pin::Pin<std::boxed::Box<dyn std::future::Future<Output = azure_core::Result<[<$name Response>]<$($generic)*>>> + 'static>>;
#[cfg(not(target_arch = "wasm32"))]
pub type $name<$($generic)*> =
futures::future::BoxFuture<'static, azure_core::Result<[<$name Response>]<$($generic)*>>>;
}
};
}

/// The following macro invocation:
/// ```
/// # #[macro_use] extern crate azure_core;
Expand Down
118 changes: 49 additions & 69 deletions sdk/core/src/pageable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,82 +14,62 @@ macro_rules! r#try {
};
}

/// Helper macro for declaring the `Pageable` and `Continuable types which easily allows
/// for conditionally compiling with a `Send` constraint or not.
macro_rules! declare {
($($extra:tt)*) => {
// The use of a module here is a hack to get around the fact that `pin_project`
// generates a method `project_ref` which is never used and generates a warning.
// The module allows us to declare that `dead_code` is allowed but only for
// the `Pageable` type.
mod pageable {
#![allow(dead_code)]
use super::*;
/// A pageable stream that yields items of type `T`
///
/// Internally uses the Azure specific continuation header to
/// make repeated requests to Azure yielding a new page each time.
#[pin_project::pin_project]
// This is to surpress the unused `project_ref` warning
pub struct Pageable<T, E> {
#[pin]
pub(crate) stream: std::pin::Pin<Box<dyn Stream<Item = Result<T, E>> $($extra)*>>,
}
}
pub use pageable::Pageable;
/// A pageable stream that yields items of type `T`
///
/// Internally uses the Azure specific continuation header to
/// make repeated requests to Azure yielding a new page each time.
#[pin_project::pin_project]
// This is to surpress the unused `project_ref` warning
pub struct Pageable<T, E> {
#[pin]
pub(crate) stream: std::pin::Pin<Box<dyn Stream<Item = Result<T, E>> + Send>>,
}

impl<T, E> Pageable<T, E>
where
T: Continuable,
{
pub fn new<F>(
make_request: impl Fn(Option<T::Continuation>) -> F + Clone $($extra)* + 'static,
) -> Self
where
F: std::future::Future<Output = Result<T, E>> $($extra)* + 'static,
{
let stream = unfold(State::Init, move |state: State<T::Continuation>| {
let make_request = make_request.clone();
async move {
let response = match state {
State::Init => {
let request = make_request(None);
r#try!(request.await)
}
State::Continuation(token) => {
let request = make_request(Some(token));
r#try!(request.await)
}
State::Done => {
return None;
}
};
impl<T, E> Pageable<T, E>
where
T: Continuable,
{
pub fn new<F>(
make_request: impl Fn(Option<T::Continuation>) -> F + Clone + Send + 'static,
) -> Self
where
F: std::future::Future<Output = Result<T, E>> + Send + 'static,
{
let stream = unfold(State::Init, move |state: State<T::Continuation>| {
let make_request = make_request.clone();
async move {
let response = match state {
State::Init => {
let request = make_request(None);
r#try!(request.await)
}
State::Continuation(token) => {
let request = make_request(Some(token));
r#try!(request.await)
}
State::Done => {
return None;
}
};

let next_state = response
.continuation()
.map_or(State::Done, State::Continuation);
let next_state = response
.continuation()
.map_or(State::Done, State::Continuation);

Some((Ok(response), next_state))
}
});
Self {
stream: Box::pin(stream),
}
Some((Ok(response), next_state))
}
});
Self {
stream: Box::pin(stream),
}

/// A type that can yield an optional continuation token
pub trait Continuable {
type Continuation: 'static $($extra)*;
fn continuation(&self) -> Option<Self::Continuation>;
}
};
}
}

#[cfg(not(target_arch = "wasm32"))]
declare!(+ Send);
#[cfg(target_arch = "wasm32")]
declare!();
/// A type that can yield an optional continuation token
pub trait Continuable {
type Continuation: Send + 'static;
fn continuation(&self) -> Option<Self::Continuation>;
}

impl<T, E> Stream for Pageable<T, E> {
type Item = Result<T, E>;
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/policies/custom_headers_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ impl From<Headers> for CustomHeaders {
#[derive(Clone, Debug, Default)]
pub struct CustomHeadersPolicy {}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl Policy for CustomHeadersPolicy {
async fn send(
&self,
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/policies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ pub type PolicyResult = crate::error::Result<Response>;
/// The only runtime enforced check is that the last policy must be a Transport policy. It's up to
/// the implementer to call the following policy.
/// The `C` generic represents the *contents* of the AuthorizationPolicy specific of this pipeline.
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[async_trait]
pub trait Policy: Send + Sync + std::fmt::Debug {
async fn send(
&self,
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/policies/retry_policies/no_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ pub struct NoRetryPolicy {
_priv: std::marker::PhantomData<u32>,
}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl Policy for NoRetryPolicy {
async fn send(
&self,
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/policies/retry_policies/retry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ const RETRY_STATUSES: &[StatusCode] = &[
StatusCode::GatewayTimeout,
];

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl<T> Policy for T
where
T: RetryPolicy + std::fmt::Debug + Send + Sync,
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/policies/telemetry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ impl<'a> TelemetryPolicy {
}
}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl Policy for TelemetryPolicy {
async fn send(
&self,
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/policies/timeout_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ impl TimeoutPolicy {
}
}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl Policy for TimeoutPolicy {
async fn send(
&self,
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/policies/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ impl TransportPolicy {
}
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[async_trait]
impl Policy for TransportPolicy {
async fn send(
&self,
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/src/seekable_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use futures::task::Poll;

/// Enable a type implementing `AsyncRead` to be consumed as if it were
/// a `Stream` of `Bytes`.
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
pub trait SeekableStream:
AsyncRead + Unpin + std::fmt::Debug + Send + Sync + dyn_clone::DynClone
{
Expand Down
6 changes: 2 additions & 4 deletions sdk/data_cosmos/examples/query_document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,13 @@ async fn main() -> azure_core::Result<()> {
// First, we'll look at the results as JSON.
let mut stream = query.clone().into_stream::<serde_json::Value>();
while let Some(respo) = stream.next().await {
let respo = respo?;
println!("JSON: {:#?}", respo.results);
println!("JSON: {:#?}", respo?.results);
}

// Then, we'll look at the results as `Family` structs.
let mut stream = query.into_stream::<Family>();
while let Some(respo) = stream.next().await {
let respo = respo?;
println!("Structs: {:#?}", respo.results);
println!("Structs: {:#?}", respo?.results);
}

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions sdk/data_cosmos/src/authorization_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ impl AuthorizationPolicy {
}
}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl Policy for AuthorizationPolicy {
async fn send(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl<T: DeserializeOwned + Send> ExecuteStoredProcedureBuilder<T> {
}
}

azure_core::future!(ExecuteStoredProcedure<T>);
pub type ExecuteStoredProcedure<T> =
futures::future::BoxFuture<'static, azure_core::Result<ExecuteStoredProcedureResponse<T>>>;

#[cfg(feature = "into_future")]
impl<T: DeserializeOwned + Send> std::future::IntoFuture for ExecuteStoredProcedureBuilder<T> {
Expand Down
3 changes: 2 additions & 1 deletion sdk/data_cosmos/src/operations/get_document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ impl<T: DeserializeOwned + Send> GetDocumentBuilder<T> {
}
}

azure_core::future!(GetDocument<T>);
pub type GetDocument<T> =
futures::future::BoxFuture<'static, azure_core::Result<GetDocumentResponse<T>>>;

#[cfg(feature = "into_future")]
impl<T: DeserializeOwned + Send> std::future::IntoFuture for GetDocumentBuilder<T> {
Expand Down
3 changes: 2 additions & 1 deletion sdk/data_tables/src/operations/get_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl<T: DeserializeOwned + Send> GetEntityBuilder<T> {
}
}

azure_core::future!(GetEntity<T>);
pub type GetEntity<T> =
futures::future::BoxFuture<'static, azure_core::Result<GetEntityResponse<T>>>;

#[cfg(feature = "into_future")]
impl<T: DeserializeOwned + Send> std::future::IntoFuture for GetEntityBuilder<T> {
Expand Down
3 changes: 2 additions & 1 deletion sdk/data_tables/src/operations/insert_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ where
}
}

azure_core::future!(InsertEntity<T>);
pub type InsertEntity<T> =
futures::future::BoxFuture<'static, azure_core::Result<InsertEntityResponse<T>>>;

#[cfg(feature = "into_future")]
impl<T: DeserializeOwned + Send> std::future::IntoFuture for InsertEntityBuilder<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ impl AutoRefreshingTokenCredential {
}
}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[async_trait::async_trait]
impl TokenCredential for AutoRefreshingTokenCredential {
async fn get_token(&self, resource: &str) -> azure_core::Result<TokenResponse> {
if let Some(Ok(token)) = self.current_token.read().await.as_ref() {
Expand Down
Loading