Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid exposing aws_smithy_http::event_stream::receiver::Receiver in SDK's public API #3114

Merged
merged 15 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class EventStreamSymbolProviderTest {
listOf(someStream, someStreamError),
)
outputType shouldBe RustType.Application(
RuntimeType.eventStreamReceiver(TestRuntimeConfig).toSymbol().rustType(),
RuntimeType.eventStreamReceiverWrapper(TestRuntimeConfig).toSymbol().rustType(),
listOf(someStream, someStreamError),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ class InlineDependency(
private fun forInlineableRustFile(name: String, vararg additionalDependencies: RustDependency) =
forRustFile(RustModule.private(name), "/inlineable/src/$name.rs", *additionalDependencies)

fun eventStreamReceiver(runtimeConfig: RuntimeConfig) =
forInlineableRustFile(
"event_stream_receiver",
CargoDependency.smithyHttp(runtimeConfig),
CargoDependency.smithyRuntimeApi(runtimeConfig),
)

fun defaultAuthPlugin(runtimeConfig: RuntimeConfig) = forInlineableRustFile("auth_plugin", CargoDependency.smithyRuntimeApi(runtimeConfig))

fun jsonErrors(runtimeConfig: RuntimeConfig) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class EventStreamSymbolProvider(
(shape.isOutputEventStream(model) && target == CodegenTarget.SERVER)
val outer = when (isSender) {
true -> RuntimeType.eventStreamSender(runtimeConfig).toSymbol().rustType()
else -> RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType()
else -> RuntimeType.eventStreamReceiverWrapper(runtimeConfig).toSymbol().rustType()
}
val rustType = RustType.Application(outer, listOf(innerT, errorT))
return initial.toBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null)
fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::Receiver")

fun eventStreamReceiverWrapper(runtimeConfig: RuntimeConfig) =
RuntimeType.forInlineDependency(InlineDependency.eventStreamReceiver(runtimeConfig)).resolve("EventStreamReceiver")

fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import software.amazon.smithy.rust.codegen.core.rustlang.rustBlockTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.stripOuter
import software.amazon.smithy.rust.codegen.core.rustlang.withBlock
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.CodegenContext
import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
Expand Down Expand Up @@ -254,15 +255,25 @@ class HttpBindingGenerator(
operationShape,
targetShape,
).render()
val receiver = outputT.rustType().qualifiedName()
rustTemplate(
"""
let unmarshaller = #{unmarshallerConstructorFn}();
let body = std::mem::replace(body, #{SdkBody}::taken());
Ok($receiver::new(unmarshaller, body))
Ok(#{receiver:W})
""",
"SdkBody" to RuntimeType.sdkBody(runtimeConfig),
"unmarshallerConstructorFn" to unmarshallerConstructorFn,
"receiver" to writable {
if (codegenTarget == CodegenTarget.SERVER) {
rust("${outputT.rustType().qualifiedName()}::new(unmarshaller, body)")
} else {
rustTemplate(
"#{Wrapper}::new(#{Receiver}::new(unmarshaller, body))",
"Wrapper" to RuntimeType.eventStreamReceiverWrapper(runtimeConfig),
"Receiver" to RuntimeType.eventStreamReceiver(runtimeConfig),
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
)
}
},
)
}

Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/inlineable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ default = ["gated-tests"]

[dependencies]
async-trait = "0.1"
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-http = { path = "../aws-smithy-http", features = ["event-stream"] }
aws-smithy-http-server = { path = "../aws-smithy-http-server" }
aws-smithy-json = { path = "../aws-smithy-json" }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["client"] }
Expand Down
30 changes: 30 additions & 0 deletions rust-runtime/inlineable/src/event_stream_receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use aws_smithy_http::event_stream::Receiver;
use aws_smithy_runtime_api::box_error::BoxError;

#[derive(Debug)]
#[non_exhaustive]
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
/// Receives messages out of an Event Stream.
pub struct EventStreamReceiver<T, E> {
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
inner: Receiver<T, E>,
}

impl<T, E> EventStreamReceiver<T, E> {
pub(crate) fn new(inner: Receiver<T, E>) -> Self {
Self { inner }
}

/// Asynchronously tries to receive a message from the stream. If the stream has ended,
/// it returns an `Ok(None)`. If there is an error, such as failing to unmarshall a message in
/// the stream, it returns an [`BoxError`].
pub async fn recv(&mut self) -> Result<Option<T>, BoxError>
where
E: std::error::Error + Send + Sync + 'static,
{
self.inner.recv().await.map_err(Into::into)
}
}
2 changes: 2 additions & 0 deletions rust-runtime/inlineable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ mod client_idempotency_token;
mod constrained;
#[allow(dead_code)]
mod ec2_query_errors;
#[allow(unused)]
mod event_stream_receiver;
#[allow(dead_code)]
mod idempotency_token;
#[allow(dead_code)]
Expand Down
Loading