Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete aws_smithy_http::ResolveEndpoint and point usages to service-specific trait #3078

Merged
merged 3 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,15 @@ message = "The `idempotency_provider` field has been removed from config as a pu
references = ["smithy-rs#3072"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "rcoh"

[[smithy-rs]]
message = "The `config::Builder::endpoint_resolver` method no longer accepts `&'static str`. Use `config::Builder::endpoint_url` instead."
references = ["smithy-rs#3078"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "rcoh"

[[smithy-rs]]
message = "**This change has [detailed upgrade guidance](https://github.com/awslabs/smithy-rs/discussions/3079).** <br><br>The endpoint interfaces from `aws-smithy-http` have been removed. Service-specific endpoint resolver traits have been added."
references = ["smithy-rs#3043", "smithy-rs#3078"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "rcoh"
59 changes: 34 additions & 25 deletions aws/rust-runtime/aws-inlineable/src/endpoint_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
use aws_smithy_async::future::BoxFuture;
use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
use aws_smithy_async::time::SharedTimeSource;
use aws_smithy_http::endpoint::{ResolveEndpoint, ResolveEndpointError};
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::endpoint::{
EndpointFuture, EndpointResolverParams, ResolveEndpoint,
};
use aws_smithy_types::endpoint::Endpoint;
use std::fmt::{Debug, Formatter};
use std::future::Future;
Expand All @@ -20,11 +23,9 @@ use tokio::sync::oneshot::{Receiver, Sender};
/// Endpoint reloader
#[must_use]
pub struct ReloadEndpoint {
loader: Box<
dyn Fn() -> BoxFuture<'static, (Endpoint, SystemTime), ResolveEndpointError> + Send + Sync,
>,
loader: Box<dyn Fn() -> BoxFuture<'static, (Endpoint, SystemTime), BoxError> + Send + Sync>,
endpoint: Arc<Mutex<Option<ExpiringEndpoint>>>,
error: Arc<Mutex<Option<ResolveEndpointError>>>,
error: Arc<Mutex<Option<BoxError>>>,
rx: Receiver<()>,
sleep: SharedAsyncSleep,
time: SharedTimeSource,
Expand Down Expand Up @@ -79,14 +80,14 @@ impl ReloadEndpoint {

#[derive(Debug, Clone)]
pub(crate) struct EndpointCache {
error: Arc<Mutex<Option<ResolveEndpointError>>>,
error: Arc<Mutex<Option<BoxError>>>,
endpoint: Arc<Mutex<Option<ExpiringEndpoint>>>,
// When the sender is dropped, this allows the reload loop to stop
_drop_guard: Arc<Sender<()>>,
}

impl<T> ResolveEndpoint<T> for EndpointCache {
fn resolve_endpoint(&self, _params: &T) -> aws_smithy_http::endpoint::Result {
impl ResolveEndpoint for EndpointCache {
fn resolve_endpoint<'a>(&'a self, _params: &'a EndpointResolverParams) -> EndpointFuture<'a> {
self.resolve_endpoint()
}
}
Expand All @@ -111,9 +112,9 @@ pub(crate) async fn create_cache<F>(
loader_fn: impl Fn() -> F + Send + Sync + 'static,
sleep: SharedAsyncSleep,
time: SharedTimeSource,
) -> Result<(EndpointCache, ReloadEndpoint), ResolveEndpointError>
) -> Result<(EndpointCache, ReloadEndpoint), BoxError>
where
F: Future<Output = Result<(Endpoint, SystemTime), ResolveEndpointError>> + Send + 'static,
F: Future<Output = Result<(Endpoint, SystemTime), BoxError>> + Send + 'static,
{
let error_holder = Arc::new(Mutex::new(None));
let endpoint_holder = Arc::new(Mutex::new(None));
Expand All @@ -135,25 +136,24 @@ where
reloader.reload_once().await;
// if we didn't successfully get an endpoint, bail out so the client knows
// configuration failed to work
cache.resolve_endpoint()?;
cache.resolve_endpoint().await?;
Ok((cache, reloader))
}

impl EndpointCache {
fn resolve_endpoint(&self) -> aws_smithy_http::endpoint::Result {
fn resolve_endpoint(&self) -> EndpointFuture<'_> {
tracing::trace!("resolving endpoint from endpoint discovery cache");
self.endpoint
let ep = self
.endpoint
.lock()
.unwrap()
.as_ref()
.map(|e| e.endpoint.clone())
.ok_or_else(|| {
self.error
.lock()
.unwrap()
.take()
.unwrap_or_else(|| ResolveEndpointError::message("no endpoint loaded"))
})
let error: Option<BoxError> = self.error.lock().unwrap().take();
error.unwrap_or_else(|| "Failed to resolve endpoint".into())
});
EndpointFuture::ready(ep)
}
}

Expand Down Expand Up @@ -215,21 +215,21 @@ mod test {
.await
.expect("returns an endpoint");
assert_eq!(
cache.resolve_endpoint().expect("ok").url(),
cache.resolve_endpoint().await.expect("ok").url(),
"http://foo.com/1"
);
// 120 second buffer
reloader
.reload_increment(expiry - Duration::from_secs(240))
.await;
assert_eq!(
cache.resolve_endpoint().expect("ok").url(),
cache.resolve_endpoint().await.expect("ok").url(),
"http://foo.com/1"
);

reloader.reload_increment(expiry).await;
assert_eq!(
cache.resolve_endpoint().expect("ok").url(),
cache.resolve_endpoint().await.expect("ok").url(),
"http://foo.com/2"
);
}
Expand Down Expand Up @@ -266,18 +266,27 @@ mod test {
gate.expect_sleep().await.duration(),
Duration::from_secs(60)
);
assert_eq!(cache.resolve_endpoint().unwrap().url(), "http://foo.com/1");
assert_eq!(
cache.resolve_endpoint().await.unwrap().url(),
"http://foo.com/1"
);
// t = 60

let sleep = gate.expect_sleep().await;
// we're still holding the drop guard, so we haven't expired yet.
assert_eq!(cache.resolve_endpoint().unwrap().url(), "http://foo.com/1");
assert_eq!(
cache.resolve_endpoint().await.unwrap().url(),
"http://foo.com/1"
);
assert_eq!(sleep.duration(), Duration::from_secs(60));
sleep.allow_progress();
// t = 120

let sleep = gate.expect_sleep().await;
assert_eq!(cache.resolve_endpoint().unwrap().url(), "http://foo.com/2");
assert_eq!(
cache.resolve_endpoint().await.unwrap().url(),
"http://foo.com/2"
);
sleep.allow_progress();

let sleep = gate.expect_sleep().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ class TimestreamDecorator : ClientCodegenDecorator {
// helper function to resolve an endpoint given a base client
rustTemplate(
"""
async fn resolve_endpoint(client: &crate::Client) -> Result<(#{Endpoint}, #{SystemTime}), #{ResolveEndpointError}> {
async fn resolve_endpoint(client: &crate::Client) -> Result<(#{Endpoint}, #{SystemTime}), #{BoxError}> {
let describe_endpoints =
client.describe_endpoints().send().await.map_err(|e| {
#{ResolveEndpointError}::from_source("failed to call describe_endpoints", e)
})?;
client.describe_endpoints().send().await?;
let endpoint = describe_endpoints.endpoints().get(0).unwrap();
let expiry = client.config().time_source().expect("checked when ep discovery was enabled").now()
+ #{Duration}::from_secs(endpoint.cache_period_in_minutes() as u64 * 60);
Expand All @@ -75,7 +73,7 @@ class TimestreamDecorator : ClientCodegenDecorator {
/// Enable endpoint discovery for this client
///
/// This method MUST be called to construct a working client.
pub async fn with_endpoint_discovery_enabled(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{ResolveEndpointError}> {
pub async fn with_endpoint_discovery_enabled(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{BoxError}> {
let handle = self.handle.clone();
// The original client without endpoint discover gets moved into the endpoint discovery
Expand All @@ -92,22 +90,22 @@ class TimestreamDecorator : ClientCodegenDecorator {
.expect("endpoint discovery requires the client config to have a time source"),
).await?;
let client_with_discovery = crate::Client::from_conf(
handle.conf.to_builder()
.endpoint_resolver(#{SharedEndpointResolver}::new(resolver))
.build()
);
use #{IntoShared};
let mut conf = handle.conf.to_builder();
conf.set_endpoint_resolver(Some(resolver.into_shared()));
let client_with_discovery = crate::Client::from_conf(conf.build());
Ok((client_with_discovery, reloader))
}
}
""",
*RuntimeType.preludeScope,
"Arc" to RuntimeType.Arc,
"Duration" to RuntimeType.std.resolve("time::Duration"),
"SharedEndpointResolver" to RuntimeType.smithyHttp(codegenContext.runtimeConfig)
.resolve("endpoint::SharedEndpointResolver"),
"SystemTime" to RuntimeType.std.resolve("time::SystemTime"),
"endpoint_discovery" to endpointDiscovery.toType(),
"BoxError" to RuntimeType.boxError(codegenContext.runtimeConfig),
"IntoShared" to RuntimeType.smithyRuntimeApi(codegenContext.runtimeConfig).resolve("shared::IntoShared"),
*Types(codegenContext.runtimeConfig).toArray(),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class ClientModuleDocProvider(
ClientRustModule.Config.endpoint -> strDoc("Types needed to configure endpoint resolution.")
ClientRustModule.Config.retry -> strDoc("Retry configuration.")
ClientRustModule.Config.timeout -> strDoc("Timeout configuration.")
ClientRustModule.Config.interceptors -> strDoc("Types needed to implement [`Interceptor`](crate::config::Interceptor).")
ClientRustModule.Config.interceptors -> strDoc("Types needed to implement [`Intercept`](crate::config::Intercept).")
jdisanti marked this conversation as resolved.
Show resolved Hide resolved
ClientRustModule.Error -> strDoc("Common errors and error handling utilities.")
ClientRustModule.Operation -> strDoc("All operations that this crate can perform.")
ClientRustModule.Meta -> strDoc("Information about this crate.")
Expand Down
Loading