Skip to content

Commit

Permalink
Support server event streams (#1479)
Browse files Browse the repository at this point in the history
* Support server event streams
* Rename EventStreamInput to EventStreamSender
* Custom event stream errors
* EventStreamSender and Receiver are parametrized also on event stream error types
* Pokemon service model updated
* Pokemon server event handler
* Pokemon client to test event streams
* EventStreamDecorator to make optional using SigV4 signing
* Use forInlineFun for all errors

Closes: #1157

Signed-off-by: Daniele Ahmed <[email protected]>

Co-authored-by: John DiSanti <[email protected]>
  • Loading branch information
82marbag and jdisanti authored Jul 25, 2022
1 parent 5edd9d2 commit 3610085
Show file tree
Hide file tree
Showing 59 changed files with 2,070 additions and 397 deletions.
134 changes: 133 additions & 1 deletion CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,136 @@
# message = "Fix typos in module documentation for generated crates"
# references = ["smithy-rs#920"]
# meta = { "breaking" = false, "tada" = false, "bug" = false, "sdk" = "client | server | all"}
# author = "rcoh"
# author = "rcoh"

[[smithy-rs]]
message = "Rename EventStreamInput to EventStreamSender"
references = ["smithy-rs#1157"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "82marbag"

[[aws-sdk-rust]]
message = "Rename EventStreamInput to EventStreamSender"
references = ["smithy-rs#1157"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "82marbag"

[[aws-sdk-rust]]
message = """
The type of streaming unions that contain errors is generated without those errors.
Errors in a streaming union `Union` are generated as members of the type `UnionError`.
Taking Transcribe as an example, the `AudioStream` streaming union generates, in the client, both the `AudioStream` type:
```rust
pub enum AudioStream {
AudioEvent(crate::model::AudioEvent),
Unknown,
}
```
and its error type,
```rust
pub struct AudioStreamError {
/// Kind of error that occurred.
pub kind: AudioStreamErrorKind,
/// Additional metadata about the error, including error code, message, and request ID.
pub(crate) meta: aws_smithy_types::Error,
}
```
`AudioStreamErrorKind` contains all error variants for the union.
Before, the generated code looked as:
```rust
pub enum AudioStream {
AudioEvent(crate::model::AudioEvent),
... all error variants,
Unknown,
}
```
"""
references = ["smithy-rs#1157"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "82marbag"

[[aws-sdk-rust]]
message = """
`aws_smithy_http::event_stream::EventStreamSender` and `aws_smithy_http::event_stream::Receiver` are now generic over `<T, E>`,
where `T` is a streaming union and `E` the union's errors.
This means that event stream errors are now sent as `Err` of the union's error type.
With this example model:
```smithy
@streaming union Event {
throttlingError: ThrottlingError
}
@error("client") structure ThrottlingError {}
```
Before:
```rust
stream! { yield Ok(Event::ThrottlingError ...) }
```
After:
```rust
stream! { yield Err(EventError::ThrottlingError ...) }
```
An example from the SDK is in [transcribe streaming](https://github.com/awslabs/smithy-rs/blob/4f51dd450ea3234a7faf481c6025597f22f03805/aws/sdk/integration-tests/transcribestreaming/tests/test.rs#L80).
"""
references = ["smithy-rs#1157"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "82marbag"

[[smithy-rs]]
message = """
The type of streaming unions that contain errors is generated without those errors.
Errors in a streaming union `Union` are generated as members of the type `UnionError`.
Taking Transcribe as an example, the `AudioStream` streaming union generates, in the client, both the `AudioStream` type:
```rust
pub enum AudioStream {
AudioEvent(crate::model::AudioEvent),
Unknown,
}
```
and its error type,
```rust
pub struct AudioStreamError {
/// Kind of error that occurred.
pub kind: AudioStreamErrorKind,
/// Additional metadata about the error, including error code, message, and request ID.
pub(crate) meta: aws_smithy_types::Error,
}
```
`AudioStreamErrorKind` contains all error variants for the union.
Before, the generated code looked as:
```rust
pub enum AudioStream {
AudioEvent(crate::model::AudioEvent),
... all error variants,
Unknown,
}
```
"""
references = ["smithy-rs#1157"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "82marbag"

[[smithy-rs]]
message = """
`aws_smithy_http::event_stream::EventStreamSender` and `aws_smithy_http::event_stream::Receiver` are now generic over `<T, E>`,
where `T` is a streaming union and `E` the union's errors.
This means that event stream errors are now sent as `Err` of the union's error type.
With this example model:
```smithy
@streaming union Event {
throttlingError: ThrottlingError
}
@error("client") structure ThrottlingError {}
```
Before:
```rust
stream! { yield Ok(Event::ThrottlingError ...) }
```
After:
```rust
stream! { yield Err(EventError::ThrottlingError ...) }
```
An example from the SDK is in [transcribe streaming](https://github.com/awslabs/smithy-rs/blob/4f51dd450ea3234a7faf481c6025597f22f03805/aws/sdk/integration-tests/transcribestreaming/tests/test.rs#L80).
"""
references = ["smithy-rs#1157"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "82marbag"
4 changes: 2 additions & 2 deletions aws/rust-runtime/aws-sig-auth/src/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl SignMessage for SigV4Signer {
Ok(signed_message)
}

fn sign_empty(&mut self) -> Result<Message, SignMessageError> {
fn sign_empty(&mut self) -> Option<Result<Message, SignMessageError>> {
let properties = self.properties.acquire();
if self.last_signature.is_none() {
// The Signature property should exist in the property bag for all Event Stream requests.
Expand All @@ -83,7 +83,7 @@ impl SignMessage for SigV4Signer {
sign_empty_message(self.last_signature.as_ref().unwrap(), &params).into_parts()
};
self.last_signature = Some(signature);
Ok(signed_message)
Some(Ok(signed_message))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class AwsInputPresignedMethod(
}

private fun RustWriter.writeInputPresignedMethod(section: OperationSection.InputImpl) {
val operationError = operationShape.errorSymbol(symbolProvider)
val operationError = operationShape.errorSymbol(coreCodegenContext.model, symbolProvider, coreCodegenContext.target)
val presignableOp = PRESIGNABLE_OPERATIONS.getValue(operationShape.id)

val makeOperationOp = if (presignableOp.hasModelTransforms()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import software.amazon.smithy.model.shapes.OperationShape
import software.amazon.smithy.model.shapes.ServiceShape
import software.amazon.smithy.model.shapes.ShapeId
import software.amazon.smithy.model.traits.OptionalAuthTrait
import software.amazon.smithy.rust.codegen.rustlang.CargoDependency
import software.amazon.smithy.rust.codegen.rustlang.Writable
import software.amazon.smithy.rust.codegen.rustlang.asType
import software.amazon.smithy.rust.codegen.rustlang.rust
Expand All @@ -27,7 +26,7 @@ import software.amazon.smithy.rust.codegen.smithy.customize.OperationCustomizati
import software.amazon.smithy.rust.codegen.smithy.customize.OperationSection
import software.amazon.smithy.rust.codegen.smithy.customize.RustCodegenDecorator
import software.amazon.smithy.rust.codegen.smithy.generators.config.ConfigCustomization
import software.amazon.smithy.rust.codegen.smithy.generators.config.ServiceConfig
import software.amazon.smithy.rust.codegen.smithy.generators.config.EventStreamSigningConfig
import software.amazon.smithy.rust.codegen.smithy.letIf
import software.amazon.smithy.rust.codegen.util.dq
import software.amazon.smithy.rust.codegen.util.expectTrait
Expand Down Expand Up @@ -82,51 +81,45 @@ class SigV4SigningConfig(
runtimeConfig: RuntimeConfig,
private val serviceHasEventStream: Boolean,
private val sigV4Trait: SigV4Trait
) : ConfigCustomization() {
) : EventStreamSigningConfig(runtimeConfig) {
private val codegenScope = arrayOf(
"SigV4Signer" to RuntimeType(
"SigV4Signer",
runtimeConfig.awsRuntimeDependency("aws-sig-auth", setOf("sign-eventstream")),
"aws_sig_auth::event_stream"
),
"SharedPropertyBag" to RuntimeType(
"SharedPropertyBag",
CargoDependency.SmithyHttp(runtimeConfig),
"aws_smithy_http::property_bag"
)
)

override fun section(section: ServiceConfig): Writable {
return when (section) {
is ServiceConfig.ConfigImpl -> writable {
override fun configImplSection(): Writable {
return writable {
rustTemplate(
"""
/// The signature version 4 service signing name to use in the credential scope when signing requests.
///
/// The signing service may be overridden by the `Endpoint`, or by specifying a custom
/// [`SigningService`](aws_types::SigningService) during operation construction
pub fn signing_service(&self) -> &'static str {
${sigV4Trait.name.dq()}
}
""",
*codegenScope
)
if (serviceHasEventStream) {
rustTemplate(
"""
/// The signature version 4 service signing name to use in the credential scope when signing requests.
///
/// The signing service may be overridden by the `Endpoint`, or by specifying a custom
/// [`SigningService`](aws_types::SigningService) during operation construction
pub fn signing_service(&self) -> &'static str {
${sigV4Trait.name.dq()}
}
""",
*codegenScope
)
if (serviceHasEventStream) {
rustTemplate(
"""
/// Creates a new Event Stream `SignMessage` implementor.
pub fn new_event_stream_signer(
&self,
properties: #{SharedPropertyBag}
) -> #{SigV4Signer} {
#{SigV4Signer}::new(properties)
"#{signerFn:W}",
"signerFn" to
renderEventStreamSignerFn { propertiesName ->
writable {
rustTemplate(
"""
#{SigV4Signer}::new($propertiesName)
""",
*codegenScope
)
}
}
""",
*codegenScope
)
}
)
}
else -> emptySection
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"test"
],
"content-type": [
"application/json"
"application/vnd.amazon.eventstream"
]
},
"method": "POST"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"AWS4-HMAC-SHA256 Credential=test/test/us-west-2/transcribe/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-user-agent;x-amzn-transcribe-language-code;x-amzn-transcribe-media-encoding;x-amzn-transcribe-sample-rate, Signature=test"
],
"content-type": [
"application/json"
"application/vnd.amazon.eventstream"
],
"x-amzn-transcribe-language-code": [
"en-GB"
Expand Down
9 changes: 4 additions & 5 deletions aws/sdk/integration-tests/transcribestreaming/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use async_stream::stream;
use aws_sdk_transcribestreaming::error::{
StartStreamTranscriptionError, StartStreamTranscriptionErrorKind,
AudioStreamError, TranscriptResultStreamError, TranscriptResultStreamErrorKind,
};
use aws_sdk_transcribestreaming::model::{
AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream,
Expand All @@ -15,7 +15,6 @@ use aws_sdk_transcribestreaming::types::{Blob, SdkError};
use aws_sdk_transcribestreaming::{Client, Config, Credentials, Region};
use aws_smithy_client::dvr::{Event, ReplayingConnection};
use aws_smithy_eventstream::frame::{DecodedFrame, HeaderValue, Message, MessageFrameDecoder};
use aws_smithy_http::event_stream::BoxError;
use bytes::BufMut;
use futures_core::Stream;
use std::collections::{BTreeMap, BTreeSet};
Expand Down Expand Up @@ -78,8 +77,8 @@ async fn test_error() {
match output.transcript_result_stream.recv().await {
Err(SdkError::ServiceError {
err:
StartStreamTranscriptionError {
kind: StartStreamTranscriptionErrorKind::BadRequestException(err),
TranscriptResultStreamError {
kind: TranscriptResultStreamErrorKind::BadRequestException(err),
..
},
..
Expand All @@ -102,7 +101,7 @@ async fn test_error() {
async fn start_request(
region: &'static str,
events_json: &str,
input_stream: impl Stream<Item = Result<AudioStream, BoxError>> + Send + Sync + 'static,
input_stream: impl Stream<Item = Result<AudioStream, AudioStreamError>> + Send + Sync + 'static,
) -> (ReplayingConnection, StartStreamTranscriptionOutput) {
let events: Vec<Event> = serde_json::from_str(events_json).unwrap();
let replayer = ReplayingConnection::new(events);
Expand Down
Loading

0 comments on commit 3610085

Please sign in to comment.