diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index c500943e45..540d5cac80 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -30,3 +30,19 @@ message = """ references = ["smithy-rs#3139"] meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all"} author = "ysaito1001" + +[[smithy-rs]] +message = """ +An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. The new-type supports the `.recv()` method whose signature is the same as [`aws_smithy_http::event_stream::receiver::Receiver::recv`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv). +""" +references = ["smithy-rs#3100", "smithy-rs#3114"] +meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } +author = "ysaito1001" + +[[aws-sdk-rust]] +message = """ +An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. The new-type supports the `.recv()` method whose signature is the same as [`aws_smithy_http::event_stream::receiver::Receiver::recv`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv). +""" +references = ["smithy-rs#3100", "smithy-rs#3114"] +meta = { "breaking" = true, "tada" = false, "bug" = false } +author = "ysaito1001" diff --git a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt index e9e26724d9..6b8c28826c 100644 --- a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt +++ b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt @@ -69,7 +69,7 @@ class EventStreamSymbolProviderTest { listOf(someStream, someStreamError), ) outputType shouldBe RustType.Application( - RuntimeType.eventStreamReceiver(TestRuntimeConfig).toSymbol().rustType(), + RuntimeType.eventReceiver(TestRuntimeConfig).toSymbol().rustType(), listOf(someStream, someStreamError), ) } diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt index 1ff88da814..79c215009a 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt @@ -89,6 +89,14 @@ class InlineDependency( private fun forInlineableRustFile(name: String, vararg additionalDependencies: RustDependency) = forRustFile(RustModule.private(name), "/inlineable/src/$name.rs", *additionalDependencies) + fun eventReceiver(runtimeConfig: RuntimeConfig) = + forInlineableRustFile( + "event_receiver", + CargoDependency.smithyHttp(runtimeConfig), + CargoDependency.smithyRuntimeApi(runtimeConfig), + CargoDependency.smithyTypes(runtimeConfig), + ) + fun defaultAuthPlugin(runtimeConfig: RuntimeConfig) = forInlineableRustFile("auth_plugin", CargoDependency.smithyRuntimeApi(runtimeConfig)) fun jsonErrors(runtimeConfig: RuntimeConfig) = diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt index fdb6e35a83..97ef843fe7 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt @@ -53,7 +53,13 @@ class EventStreamSymbolProvider( (shape.isOutputEventStream(model) && target == CodegenTarget.SERVER) val outer = when (isSender) { true -> RuntimeType.eventStreamSender(runtimeConfig).toSymbol().rustType() - else -> RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType() + else -> { + if (target == CodegenTarget.SERVER) { + RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType() + } else { + RuntimeType.eventReceiver(runtimeConfig).toSymbol().rustType() + } + } } val rustType = RustType.Application(outer, listOf(innerT, errorT)) return initial.toBuilder() diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt index 8f33e70753..ba6bcb618c 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt @@ -405,6 +405,9 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null) fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::Receiver") + fun eventReceiver(runtimeConfig: RuntimeConfig) = + forInlineDependency(InlineDependency.eventReceiver(runtimeConfig)).resolve("EventReceiver") + fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender") diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt index 7268dfd12a..2491d07c0e 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt @@ -38,6 +38,7 @@ import software.amazon.smithy.rust.codegen.core.rustlang.rustBlockTemplate import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.rustlang.stripOuter import software.amazon.smithy.rust.codegen.core.rustlang.withBlock +import software.amazon.smithy.rust.codegen.core.rustlang.writable import software.amazon.smithy.rust.codegen.core.smithy.CodegenContext import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType @@ -254,15 +255,25 @@ class HttpBindingGenerator( operationShape, targetShape, ).render() - val receiver = outputT.rustType().qualifiedName() rustTemplate( """ let unmarshaller = #{unmarshallerConstructorFn}(); let body = std::mem::replace(body, #{SdkBody}::taken()); - Ok($receiver::new(unmarshaller, body)) + Ok(#{receiver:W}) """, "SdkBody" to RuntimeType.sdkBody(runtimeConfig), "unmarshallerConstructorFn" to unmarshallerConstructorFn, + "receiver" to writable { + if (codegenTarget == CodegenTarget.SERVER) { + rust("${outputT.rustType().qualifiedName()}::new(unmarshaller, body)") + } else { + rustTemplate( + "#{EventReceiver}::new(#{Receiver}::new(unmarshaller, body))", + "EventReceiver" to RuntimeType.eventReceiver(runtimeConfig), + "Receiver" to RuntimeType.eventStreamReceiver(runtimeConfig), + ) + } + }, ) } diff --git a/rust-runtime/aws-smithy-http/src/event_stream.rs b/rust-runtime/aws-smithy-http/src/event_stream.rs index 0b95bb2d1e..3d4bab78ff 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream.rs @@ -17,4 +17,4 @@ pub type BoxError = Box; pub use sender::{EventStreamSender, MessageStreamAdapter, MessageStreamError}; #[doc(inline)] -pub use receiver::{RawMessage, Receiver, ReceiverError}; +pub use receiver::{Receiver, ReceiverError}; diff --git a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs index 69e1c4381b..6d94194511 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs @@ -8,7 +8,7 @@ use aws_smithy_eventstream::frame::{ }; use aws_smithy_runtime_api::client::result::{ConnectorError, SdkError}; use aws_smithy_types::body::SdkBody; -use aws_smithy_types::event_stream::Message; +use aws_smithy_types::event_stream::{Message, RawMessage}; use bytes::Buf; use bytes::Bytes; use bytes_utils::SegmentedBuf; @@ -87,23 +87,6 @@ impl RecvBuf { } } -/// Raw message from a [`Receiver`] when a [`SdkError::ResponseError`] is returned. -#[derive(Debug)] -#[non_exhaustive] -pub enum RawMessage { - /// Message was decoded into a valid frame, but failed to unmarshall into a modeled type. - Decoded(Message), - /// Message failed to be decoded into a valid frame. The raw bytes may not be available in the - /// case where decoding consumed the buffer. - Invalid(Option), -} - -impl RawMessage { - pub(crate) fn invalid(buf: &mut SegmentedBuf) -> Self { - Self::Invalid(Some(buf.copy_to_bytes(buf.remaining()))) - } -} - #[derive(Debug)] enum ReceiverErrorKind { /// The stream ended before a complete message frame was received. @@ -210,11 +193,12 @@ impl Receiver { } if self.buffer.has_data() { trace!(remaining_data = ?self.buffer, "data left over in the event stream response stream"); + let buf = self.buffer.buffered(); return Err(SdkError::response_error( ReceiverError { kind: ReceiverErrorKind::UnexpectedEndOfStream, }, - RawMessage::invalid(self.buffer.buffered()), + RawMessage::invalid(Some(buf.copy_to_bytes(buf.remaining()))), )); } Ok(None) diff --git a/rust-runtime/aws-smithy-types/src/event_stream.rs b/rust-runtime/aws-smithy-types/src/event_stream.rs index a63c44e8ed..0d98dabd99 100644 --- a/rust-runtime/aws-smithy-types/src/event_stream.rs +++ b/rust-runtime/aws-smithy-types/src/event_stream.rs @@ -183,3 +183,21 @@ impl Message { &self.payload } } + +/// Raw message from an event stream receiver when a response error is encountered. +#[derive(Debug)] +#[non_exhaustive] +pub enum RawMessage { + /// Message was decoded into a valid frame, but failed to unmarshall into a modeled type. + Decoded(Message), + /// Message failed to be decoded into a valid frame. The raw bytes may not be available in the + /// case where decoding consumed the buffer. + Invalid(Option), +} + +impl RawMessage { + /// Creates a `RawMessage` for failure to decode a message into a valid frame. + pub fn invalid(bytes: Option) -> Self { + Self::Invalid(bytes) + } +} diff --git a/rust-runtime/inlineable/Cargo.toml b/rust-runtime/inlineable/Cargo.toml index 6d0c099429..fd460dc08f 100644 --- a/rust-runtime/inlineable/Cargo.toml +++ b/rust-runtime/inlineable/Cargo.toml @@ -19,7 +19,7 @@ default = ["gated-tests"] [dependencies] async-trait = "0.1" -aws-smithy-http = { path = "../aws-smithy-http" } +aws-smithy-http = { path = "../aws-smithy-http", features = ["event-stream"] } aws-smithy-http-server = { path = "../aws-smithy-http-server" } aws-smithy-json = { path = "../aws-smithy-json" } aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["client"] } diff --git a/rust-runtime/inlineable/src/event_receiver.rs b/rust-runtime/inlineable/src/event_receiver.rs new file mode 100644 index 0000000000..50b78f7aa3 --- /dev/null +++ b/rust-runtime/inlineable/src/event_receiver.rs @@ -0,0 +1,28 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_http::event_stream::Receiver; +use aws_smithy_runtime_api::client::result::SdkError; +use aws_smithy_types::event_stream::RawMessage; + +#[derive(Debug)] +/// Receives unmarshalled events at a time out of an Event Stream. +pub struct EventReceiver { + inner: Receiver, +} + +impl EventReceiver { + pub(crate) fn new(inner: Receiver) -> Self { + Self { inner } + } + + /// Asynchronously tries to receive an event from the stream. If the stream has ended, it + /// returns an `Ok(None)`. If there is a transport layer error, it will return + /// `Err(SdkError::DispatchFailure)`. Service-modeled errors will be a part of the returned + /// messages. + pub async fn recv(&mut self) -> Result, SdkError> { + self.inner.recv().await + } +} diff --git a/rust-runtime/inlineable/src/lib.rs b/rust-runtime/inlineable/src/lib.rs index 01490bcee5..10f9ed7c2e 100644 --- a/rust-runtime/inlineable/src/lib.rs +++ b/rust-runtime/inlineable/src/lib.rs @@ -13,6 +13,8 @@ mod client_idempotency_token; mod constrained; #[allow(dead_code)] mod ec2_query_errors; +#[allow(unused)] +mod event_receiver; #[allow(dead_code)] mod idempotency_token; #[allow(dead_code)] diff --git a/tools/ci-cdk/canary-runner/src/build_bundle.rs b/tools/ci-cdk/canary-runner/src/build_bundle.rs index 00b628f968..4da1e0b4c9 100644 --- a/tools/ci-cdk/canary-runner/src/build_bundle.rs +++ b/tools/ci-cdk/canary-runner/src/build_bundle.rs @@ -66,7 +66,8 @@ const REQUIRED_SDK_CRATES: &[&str] = &[ // The elements in this `Vec` should be sorted in an ascending order by the release date. lazy_static! { static ref NOTABLE_SDK_RELEASE_TAGS: Vec = vec![ - ReleaseTag::from_str("release-2023-10-26").unwrap(), // last version before addition of Sigv4a MRAP test + // last version before addition of Sigv4a MRAP test + ReleaseTag::from_str("release-2023-10-26").unwrap(), ]; }