Skip to content

Commit

Permalink
Fix Timestream in the orchestrator (#2846)
Browse files Browse the repository at this point in the history
This PR fixes Timestream in the orchestrator implementation.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
jdisanti authored Jul 14, 2023
1 parent 2999766 commit 7d1d35c
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 80 deletions.
12 changes: 6 additions & 6 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ author = "rcoh"
message = "Bump dependency on `lambda_http` by `aws-smithy-http-server` to 0.8.0. This version of `aws-smithy-http-server` is only guaranteed to be compatible with 0.8.0, or semver-compatible versions of 0.8.0 of the `lambda_http` crate. It will not work with versions prior to 0.8.0 _at runtime_, making requests to your smithy-rs service unroutable, so please make sure you're running your service in a compatible configuration"
author = "david-perez"
references = ["smithy-rs#2676", "smithy-rs#2685"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
meta = { "breaking" = true, "tada" = false, "bug" = false, target = "server" }

[[smithy-rs]]
message = """Remove `PollError` from an operations `Service::Error`.
Expand All @@ -141,9 +141,9 @@ meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "server" }
author = "hlbarber"

[[aws-sdk-rust]]
message = "The SDK has added support for timestreamwrite and timestreamquery. Support for these services is considered experimental at this time. In order to use these services, you MUST call `.enable_endpoint_discovery()` on the `Client` after construction."
message = "The SDK has added support for timestreamwrite and timestreamquery. Support for these services is considered experimental at this time. In order to use these services, you MUST call `.with_endpoint_discovery_enabled()` on the `Client` after construction."
meta = { "breaking" = false, "tada" = true, "bug" = false }
references = ["smithy-rs#2707", "aws-sdk-rust#114"]
references = ["smithy-rs#2707", "aws-sdk-rust#114", "smithy-rs#2846"]
author = "rcoh"

[[smithy-rs]]
Expand Down Expand Up @@ -197,7 +197,7 @@ filter_by_operation_id(plugin, |id| id.absolute() != "namespace#name");
"""
author = "82marbag"
references = ["smithy-rs#2678"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
meta = { "breaking" = true, "tada" = false, "bug" = false, target = "server" }

[[smithy-rs]]
message = "The occurrences of `Arc<dyn ResolveEndpoint>` have now been replaced with `SharedEndpointResolver` in public APIs."
Expand Down Expand Up @@ -532,7 +532,7 @@ let scoped_plugin = Scoped::new::<SomeScope>(plugin);
"""
references = ["smithy-rs#2740", "smithy-rs#2759", "smithy-rs#2779", "smithy-rs#2827"]
meta = { "breaking" = true, "tada" = true, "bug" = false }
meta = { "breaking" = true, "tada" = true, "bug" = false, target = "server" }
author = "hlbarber"

[[smithy-rs]]
Expand Down Expand Up @@ -608,7 +608,7 @@ let plugin = plugin_from_operation_fn(map);
```
"""
references = ["smithy-rs#2740", "smithy-rs#2759", "smithy-rs#2779"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
meta = { "breaking" = true, "tada" = false, "bug" = false, target = "server" }
author = "hlbarber"

[[smithy-rs]]
Expand Down
3 changes: 3 additions & 0 deletions aws/rust-runtime/aws-inlineable/src/endpoint_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl ReloadEndpoint {
pub async fn reload_once(&self) {
match (self.loader)().await {
Ok((endpoint, expiry)) => {
tracing::debug!("caching resolved endpoint: {:?}", (&endpoint, &expiry));
*self.endpoint.lock().unwrap() = Some(ExpiringEndpoint { endpoint, expiry })
}
Err(err) => *self.error.lock().unwrap() = Some(err),
Expand Down Expand Up @@ -128,6 +129,7 @@ where
sleep,
time,
};
tracing::debug!("populating initial endpoint discovery cache");
reloader.reload_once().await;
// if we didn't successfully get an endpoint, bail out so the client knows
// configuration failed to work
Expand All @@ -137,6 +139,7 @@ where

impl EndpointCache {
fn resolve_endpoint(&self) -> aws_smithy_http::endpoint::Result {
tracing::trace!("resolving endpoint from endpoint discovery cache");
self.endpoint
.lock()
.unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package software.amazon.smithy.rustsdk.customize.timestream

import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext
import software.amazon.smithy.rust.codegen.client.smithy.ClientRustModule
import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegenDecorator
import software.amazon.smithy.rust.codegen.client.smithy.endpoint.Types
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
Expand All @@ -14,45 +15,54 @@ import software.amazon.smithy.rust.codegen.core.rustlang.Visibility
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.toType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope
import software.amazon.smithy.rust.codegen.core.smithy.RustCrate
import software.amazon.smithy.rust.codegen.core.smithy.customize.AdHocCustomization
import software.amazon.smithy.rust.codegen.core.smithy.customize.adhocCustomization
import software.amazon.smithy.rust.codegen.core.util.letIf
import software.amazon.smithy.rustsdk.AwsCargoDependency
import software.amazon.smithy.rustsdk.DocSection
import software.amazon.smithy.rustsdk.InlineAwsDependency

/**
* This decorator does two things:
* 1. Adds the `endpoint_discovery` inlineable
* 2. Adds a `enable_endpoint_discovery` method on client that returns a wrapped client with endpoint discovery enabled
* 2. Adds a `with_endpoint_discovery_enabled` method on client that returns a wrapped client with endpoint discovery enabled
*/
class TimestreamDecorator : ClientCodegenDecorator {
override val name: String = "Timestream"
override val order: Byte = -1

override fun extraSections(codegenContext: ClientCodegenContext): List<AdHocCustomization> {
return listOf(
adhocCustomization<DocSection.CreateClient> {
addDependency(AwsCargoDependency.awsConfig(codegenContext.runtimeConfig).toDevDependency())
rustTemplate(
"""
let config = aws_config::load_from_env().await;
// You MUST call `enable_endpoint_discovery` to produce a working client for this service.
let ${it.clientName} = ${it.crateName}::Client::new(&config).enable_endpoint_discovery().await;
""".replaceIndent(it.indent),
)
},
)
}
private fun applies(codegenContext: ClientCodegenContext): Boolean =
codegenContext.smithyRuntimeMode.defaultToOrchestrator

override fun extraSections(codegenContext: ClientCodegenContext): List<AdHocCustomization> =
emptyList<AdHocCustomization>().letIf(applies(codegenContext)) {
listOf(
adhocCustomization<DocSection.CreateClient> {
addDependency(AwsCargoDependency.awsConfig(codegenContext.runtimeConfig).toDevDependency())
rustTemplate(
"""
let config = aws_config::load_from_env().await;
// You MUST call `with_endpoint_discovery_enabled` to produce a working client for this service.
let ${it.clientName} = ${it.crateName}::Client::new(&config).with_endpoint_discovery_enabled().await;
""".replaceIndent(it.indent),
)
},
)
}

override fun extras(codegenContext: ClientCodegenContext, rustCrate: RustCrate) {
if (!applies(codegenContext)) {
return
}

val endpointDiscovery = InlineAwsDependency.forRustFile(
"endpoint_discovery",
Visibility.PUBLIC,
CargoDependency.Tokio.copy(scope = DependencyScope.Compile, features = setOf("sync")),
CargoDependency.smithyAsync(codegenContext.runtimeConfig).toDevDependency().withFeature("test-util"),
)
rustCrate.lib {
rustCrate.withModule(ClientRustModule.client) {
// helper function to resolve an endpoint given a base client
rustTemplate(
"""
Expand All @@ -76,33 +86,40 @@ class TimestreamDecorator : ClientCodegenDecorator {
/// Enable endpoint discovery for this client
///
/// This method MUST be called to construct a working client.
pub async fn enable_endpoint_discovery(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{ResolveEndpointError}> {
let mut new_conf = self.conf().clone();
let sleep = self.conf().sleep_impl().expect("sleep impl must be provided");
let time = self.conf().time_source().expect("time source must be provided");
pub async fn with_endpoint_discovery_enabled(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{ResolveEndpointError}> {
let handle = self.handle.clone();
// The original client without endpoint discover gets moved into the endpoint discovery
// resolver since calls to DescribeEndpoint without discovery need to be made.
let client_without_discovery = self;
let (resolver, reloader) = #{endpoint_discovery}::create_cache(
move || {
let client = self.clone();
let client = client_without_discovery.clone();
async move { resolve_endpoint(&client).await }
},
sleep,
time
)
.await?;
new_conf.endpoint_resolver = #{SharedEndpointResolver}::new(resolver);
Ok((Self::from_conf(new_conf), reloader))
handle.conf.sleep_impl()
.expect("endpoint discovery requires the client config to have a sleep impl"),
handle.conf.time_source()
.expect("endpoint discovery requires the client config to have a time source"),
).await?;
let client_with_discovery = crate::Client::from_conf(
handle.conf.to_builder()
.endpoint_resolver(#{SharedEndpointResolver}::new(resolver))
.build()
);
Ok((client_with_discovery, reloader))
}
}
""",
"endpoint_discovery" to endpointDiscovery.toType(),
"SystemTime" to RuntimeType.std.resolve("time::SystemTime"),
*RuntimeType.preludeScope,
"Arc" to RuntimeType.Arc,
"Duration" to RuntimeType.std.resolve("time::Duration"),
"SharedEndpointResolver" to RuntimeType.smithyHttp(codegenContext.runtimeConfig)
.resolve("endpoint::SharedEndpointResolver"),
"SystemTimeSource" to RuntimeType.smithyAsync(codegenContext.runtimeConfig)
.resolve("time::SystemTimeSource"),
"SystemTime" to RuntimeType.std.resolve("time::SystemTime"),
"endpoint_discovery" to endpointDiscovery.toType(),
*Types(codegenContext.runtimeConfig).toArray(),
*preludeScope,
)
}
}
Expand Down
18 changes: 18 additions & 0 deletions aws/sdk/integration-tests/s3/tests/config_to_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#[cfg(aws_sdk_orchestrator_mode)]
#[tokio::test]
async fn test_config_to_builder() {
use aws_sdk_s3::config::AppName;

let config = aws_config::load_from_env().await;
let config = aws_sdk_s3::Config::new(&config);
// should not panic
let _ = config
.to_builder()
.app_name(AppName::new("SomeAppName").unwrap())
.build();
}
5 changes: 3 additions & 2 deletions aws/sdk/integration-tests/timestreamquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ publish = false
[dev-dependencies]
aws-credential-types = { path = "../../build/aws-sdk/sdk/aws-credential-types", features = ["test-util"] }
aws-sdk-timestreamquery = { path = "../../build/aws-sdk/sdk/timestreamquery" }
tokio = { version = "1.23.1", features = ["full", "test-util"] }
aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", features = ["test-util"] }
aws-smithy-async = { path = "../../build/aws-sdk/sdk/aws-smithy-async", features = ["test-util"] }
aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", features = ["test-util"] }
aws-smithy-runtime = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime", features = ["test-util"] }
aws-types = { path = "../../build/aws-sdk/sdk/aws-types" }
tokio = { version = "1.23.1", features = ["full", "test-util"] }
tracing-subscriber = "0.3.17"
28 changes: 15 additions & 13 deletions aws/sdk/integration-tests/timestreamquery/tests/endpoint_disco.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
* SPDX-License-Identifier: Apache-2.0
*/

use aws_credential_types::provider::SharedCredentialsProvider;
use aws_sdk_timestreamquery as query;
use aws_sdk_timestreamquery::config::Credentials;
use aws_smithy_async::rt::sleep::SharedAsyncSleep;
use aws_smithy_async::test_util::controlled_time_and_sleep;
use aws_smithy_async::time::{SharedTimeSource, TimeSource};
use aws_smithy_client::dvr::{MediaType, ReplayingConnection};
use aws_types::region::Region;
use aws_types::SdkConfig;
use std::time::{Duration, UNIX_EPOCH};

#[cfg(aws_sdk_orchestrator_mode)]
#[tokio::test]
async fn do_endpoint_discovery() {
tracing_subscriber::fmt::init();
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_sdk_timestreamquery as query;
use aws_sdk_timestreamquery::config::Credentials;
use aws_smithy_async::rt::sleep::SharedAsyncSleep;
use aws_smithy_async::test_util::controlled_time_and_sleep;
use aws_smithy_async::time::{SharedTimeSource, TimeSource};
use aws_smithy_client::dvr::{MediaType, ReplayingConnection};
use aws_types::region::Region;
use aws_types::SdkConfig;
use std::time::{Duration, UNIX_EPOCH};

let _logs = aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs();

let conn = ReplayingConnection::from_file("tests/traffic.json").unwrap();
//let conn = aws_smithy_client::dvr::RecordingConnection::new(conn);
let start = UNIX_EPOCH + Duration::from_secs(1234567890);
Expand All @@ -32,7 +34,7 @@ async fn do_endpoint_discovery() {
.idempotency_token_provider("0000-0000-0000")
.build();
let (client, reloader) = query::Client::from_conf(conf)
.enable_endpoint_discovery()
.with_endpoint_discovery_enabled()
.await
.expect("initial setup of endpoint discovery failed");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ class ConfigOverrideRuntimePluginGenerator(
initial_config: #{FrozenLayer},
initial_components: &#{RuntimeComponentsBuilder}
) -> Self {
let mut layer = #{Layer}::from(config_override.config)
.with_name("$moduleUseName::config::ConfigOverrideRuntimePlugin");
let mut layer = config_override.config;
let mut components = config_override.runtime_components;
let mut resolver = #{Resolver}::overrid(initial_config, initial_components, &mut layer, &mut components);
#{config}
let _ = resolver;
Self {
config: layer.freeze(),
config: #{Layer}::from(layer)
.with_name("$moduleUseName::config::ConfigOverrideRuntimePlugin").freeze(),
components,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,11 @@ class ServiceConfigGenerator(
if (runtimeMode.defaultToOrchestrator) {
rustTemplate(
"""
// Both `config` and `cloneable` are the same config, but the cloneable one
// is kept around so that it is possible to convert back into a builder. This can be
// optimized in the future.
pub(crate) config: #{FrozenLayer},
cloneable: #{CloneableLayer},
pub(crate) runtime_components: #{RuntimeComponentsBuilder},
pub(crate) runtime_plugins: #{Vec}<#{SharedRuntimePlugin}>,
""",
Expand Down Expand Up @@ -369,6 +373,20 @@ class ServiceConfigGenerator(
pub fn builder() -> Builder { Builder::default() }
""",
)
if (runtimeMode.defaultToOrchestrator) {
writer.rustTemplate(
"""
/// Converts this config back into a builder so that it can be tweaked.
pub fn to_builder(&self) -> Builder {
Builder {
config: self.cloneable.clone(),
runtime_components: self.runtime_components.clone(),
runtime_plugins: self.runtime_plugins.clone(),
}
}
""",
)
}
customizations.forEach {
it.section(ServiceConfig.ConfigImpl)(this)
}
Expand Down Expand Up @@ -478,12 +496,7 @@ class ServiceConfigGenerator(
rustBlock("pub fn build(mut self) -> Config") {
rustTemplate(
"""
// The builder is being turned into a service config. While doing so, we'd like to avoid
// requiring that items created and stored _during_ the build method be `Clone`, since they
// will soon be part of a `FrozenLayer` owned by the service config. So we will convert the
// current `CloneableLayer` into a `Layer` that does not impose the `Clone` requirement.
let mut layer = #{Layer}::from(self.config).with_name("$moduleUseName::config::Config");
##[allow(unused)]
let mut layer = self.config;
let mut resolver = #{Resolver}::initial(&mut layer, &mut self.runtime_components);
""",
*codegenScope,
Expand All @@ -495,12 +508,14 @@ class ServiceConfigGenerator(
customizations.forEach {
it.section(ServiceConfig.BuilderBuildExtras)(this)
}
rust(
rustTemplate(
"""
config: layer.freeze(),
config: #{Layer}::from(layer.clone()).with_name("$moduleUseName::config::Config").freeze(),
cloneable: layer,
runtime_components: self.runtime_components,
runtime_plugins: self.runtime_plugins,
""",
*codegenScope,
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-client/src/dvr/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl ReplayingConnection {
))?
.take()
.await;
aws_smithy_protocol_test::assert_uris_match(actual.uri(), expected.uri());
aws_smithy_protocol_test::assert_uris_match(expected.uri(), actual.uri());
body_comparer(expected.body().as_ref(), actual.body().as_ref())?;
let expected_headers = expected
.headers()
Expand Down
Loading

0 comments on commit 7d1d35c

Please sign in to comment.