Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bootstrap event streams #597

Merged
merged 3 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 87 additions & 19 deletions docs/design/event-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,19 @@ structure ThrottlingError {}

### Event Stream Type Representation

The members of an operation input or output that target a stream will be represented with an asynchronous [Flow](https://kotlinlang.org/docs/reference/coroutines/flow.html)
from the `kotlinx-coroutines-core` library. `Flow` is a natural fit for representing asynchronous streams.
The members of an operation input or output that target a stream will be represented with an asynchronous
[Flow](https://kotlinlang.org/docs/reference/coroutines/flow.html) from the `kotlinx-coroutines-core` library.
`Flow` is a natural fit for representing asynchronous streams.

`Flow` was chosen for pagination and already in use as part of our public API contract. Any alternative to this would require a custom but similar type that doesn't play well with
the rest of the coroutine ecosystem. There is also prior art for representing streaming requests and responses, see [gRPC Kotlin](https://github.com/grpc/grpc-kotlin).
`Flow` was chosen for pagination and already in use as part of our public API contract. Any alternative to this
would require a custom but similar type that doesn't play well with the rest of the coroutine ecosystem. There is
also prior art for representing streaming requests and responses, see [gRPC Kotlin](https://github.com/grpc/grpc-kotlin).

The following types and service would be generated.

NOTE: only the input and output types are shown, the other structures or unions in the model would be generated as described in [Kotlin Smithy Design](kotlin-smithy-sdk.md).
NOTE: only the input and output types are shown, the other structures or unions in the model would be generated as
described in [Kotlin Smithy Design](kotlin-smithy-sdk.md) with the exception of event stream members targeting errors
which is described below in more detail.

#### Input Event Streams

Expand All @@ -120,7 +124,8 @@ class PublishMessagesRequest private constructor(builder: Builder){

#### Output Event Streams

Output event streams would be modeled the same way as input streams. The response object would have a `Flow<T>` field that represents the response stream.
Output event streams would be modeled the same way as input streams. The response object would have a `Flow<T>`
field that represents the response stream.

```kt
class SubscribeToMovementsResponse private constructor(builder: Builder){
Expand All @@ -137,20 +142,81 @@ class SubscribeToMovementsResponse private constructor(builder: Builder){
```


Modeling the event stream as a field of the request or response allows for [initial messages](https://awslabs.github.io/smithy/1.0/spec/core/stream-traits.html#initial-messages)
to be implemented. If we directly returned or took a `Flow<T>` as the input or output type we would not be able to represent the initial request or response fields when present.
Modeling the event stream as a field of the request or response allows for
[initial messages](https://awslabs.github.io/smithy/1.0/spec/core/stream-traits.html#initial-messages) to be
implemented. If we directly returned or took a `Flow<T>` as the input or output type we would not be able to
represent the initial request or response fields when present.


#### Event Stream Error Representation

Event stream unions may model exceptions that can appear on the stream. These exceptions are terminal messages that
are intended to be surfaced to the client using idiomatic error handling mechanisms of the target language. Thus,
the modeled errors a consumer may see on the stream are part of the overall union that makes up the possible events.

NOTE: the set of errors on the operation MAY not be the same set of errors modeled on the event stream.


Using the example from above:

```
@streaming
union MovementEvents {
up: Movement,
down: Movement,
left: Movement,
right: Movement,
throttlingError: ThrottlingError
}

```

The default representation of a union (as documented in [Kotlin Smithy Design](kotlin-smithy-sdk.md)) is generated as:

```kotlin
sealed class MovementEvents {
data class Up(val value: Movement): MovementEvents()
data class Down(val value: Movement): MovementEvents()
data class Left(val value: Movement): MovementEvents()
data class Right(val value: Movement): MovementEvents()
data class ThrottlingError(val value: ThrottlingError): MovementEvents()
object SdkUnknown : MovementEvents()
}
```

This is undesirable though since event stream errors are terminal and end the stream. Keeping them in the set of
possible events also means it may be easier for consumers to ignore errors depending on what events they are looking
for (e.g. by having a catch all `else` branch they may inadvertently ignore an error and think the stream completed
successfully).


Event stream unions will be special-cased to filter out variants targeting error shapes. When these errors are
emitted by the service on the stream they will be converted to the appropriate modeled exception and thrown rather
than being emitted on the stream the consumer sees.

As an example, the generated event stream union will look like this (note the absence of `ThrottlingError`):

```kotlin
sealed class MovementEvents {
data class Up(val value: Movement): MovementEvents()
data class Down(val value: Movement): MovementEvents()
data class Left(val value: Movement): MovementEvents()
data class Right(val value: Movement): MovementEvents()
object SdkUnknown : MovementEvents()
}
```

### **Service and Usage**

NOTE: There are types and internal details here not important to the design of how customers will interact with
streaming requests/responses (e.g. serialization/deserialization).
Those details are subject to change and not part of this design document. The focus here is on the way
streaming is exposed to a customer.
streaming requests/responses (e.g. serialization/deserialization). Those details are subject to change and not part of
this design document. The focus here is on the way streaming is exposed to a customer.


The signatures generated match that of binary streaming requests and responses. Notably that output streams take a lambda instead of returning the response directly (see [binary-streaming design](binary-streaming.md) which discusses this pattern).
The response (and event stream) are only valid in that scope, after which the resources consumed by the stream are closed and no longer valid.
The signatures generated match that of binary streaming requests and responses. Notably that output streams take a
lambda instead of returning the response directly (see [binary-streaming design](binary-streaming.md) which
discusses this pattern). The response (and event stream) are only valid in that scope, after which the resources
consumed by the stream are closed and no longer valid.


```kt
Expand Down Expand Up @@ -206,7 +272,6 @@ fun main() = runBlocking{
is MovementEvents.Down,
is MovementEvents.Left,
is MovementEvents.Right -> handleMovement(event)
is MovementEvents.ThrottlingError -> throw event.throttlingError
else -> error("unknown event type: $event")
}
}
Expand All @@ -218,8 +283,9 @@ fun main() = runBlocking{
private fun handleMovement(event: MovementEvents) { ... }
```

Accepting a lambda matches what is generated for binary streams (see [binary-streaming design](binary-streaming.md)) and will provide a consistent API experience as well
as the same benefits to the SDK (properly scoped lifetime for resources).
Accepting a lambda matches what is generated for binary streams (see [binary-streaming design](binary-streaming.md))
and will provide a consistent API experience as well as the same benefits to the SDK (properly scoped lifetime
for resources).


# Appendix
Expand All @@ -228,9 +294,10 @@ as the same benefits to the SDK (properly scoped lifetime for resources).
## Java Interop

`Flow<T>` is not easily consumable directly from Java due to the `suspend` nature of it. JetBrains provides
[reactive adapters](https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive) that can be used to convert rxJava and JDK-9
reactive streams to or from an equivalent `Flow`. Users would be responsible for creating a shim layer using these primitives provided
by JetBrains which would allow them to expose the Kotlin functions however they see fit to their applications.
[reactive adapters](https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive) that can be used to convert
rxJava and JDK-9 reactive streams to or from an equivalent `Flow`. Users would be responsible for creating a shim
layer using these primitives provided by JetBrains which would allow them to expose the Kotlin functions however
they see fit to their applications.


## Additional References
Expand All @@ -243,4 +310,5 @@ by JetBrains which would allow them to expose the Kotlin functions however they

# Revision history

* 02/17/2022 - Remove errors from generated event stream
* 01/19/2022 - Created
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class SdkByteBuffer internal constructor(
*/
@OptIn(ExperimentalIoApi::class)
override fun writeByte(value: Byte) {
reserve(1)
memory.storeAt(writePosition.toLong(), value)
advance(1u)
}
Expand Down Expand Up @@ -223,6 +224,7 @@ class SdkByteBuffer internal constructor(
*/
@OptIn(ExperimentalIoApi::class)
override fun writeShort(value: Short) {
reserve(2)
memory.storeShortAt(writePosition.toLong(), value)
advance(2u)
}
Expand All @@ -232,6 +234,7 @@ class SdkByteBuffer internal constructor(
*/
@OptIn(ExperimentalIoApi::class)
override fun writeUShort(value: UShort) {
reserve(2)
memory.storeUShortAt(writePosition.toLong(), value)
advance(2u)
}
Expand Down Expand Up @@ -261,6 +264,7 @@ class SdkByteBuffer internal constructor(
*/
@OptIn(ExperimentalIoApi::class)
override fun writeInt(value: Int) {
reserve(4)
memory.storeIntAt(writePosition.toLong(), value)
advance(4u)
}
Expand All @@ -270,6 +274,7 @@ class SdkByteBuffer internal constructor(
*/
@OptIn(ExperimentalIoApi::class)
override fun writeUInt(value: UInt) {
reserve(4)
memory.storeUIntAt(writePosition.toLong(), value)
advance(4u)
}
Expand Down Expand Up @@ -299,6 +304,7 @@ class SdkByteBuffer internal constructor(
*/
@OptIn(ExperimentalIoApi::class)
override fun writeLong(value: Long) {
reserve(8)
memory.storeLongAt(writePosition.toLong(), value)
advance(8u)
}
Expand All @@ -308,6 +314,7 @@ class SdkByteBuffer internal constructor(
*/
@OptIn(ExperimentalIoApi::class)
override fun writeULong(value: ULong) {
reserve(8)
memory.storeULongAt(writePosition.toLong(), value)
advance(8u)
}
Expand All @@ -327,6 +334,7 @@ class SdkByteBuffer internal constructor(
*/
@OptIn(ExperimentalIoApi::class)
override fun writeFloat(value: Float) {
reserve(4)
memory.storeFloatAt(writePosition.toLong(), value)
advance(4u)
}
Expand All @@ -345,6 +353,7 @@ class SdkByteBuffer internal constructor(
*/
@OptIn(ExperimentalIoApi::class)
override fun writeDouble(value: Double) {
reserve(8)
memory.storeDoubleAt(writePosition.toLong(), value)
advance(8u)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package aws.smithy.kotlin.runtime.http

import aws.smithy.kotlin.runtime.content.ByteStream
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.util.InternalApi

/**
* HTTP payload to be sent to a peer
Expand Down Expand Up @@ -86,6 +87,20 @@ fun ByteStream.toHttpBody(): HttpBody = when (val byteStream = this) {
}
}

/**
* Convert a [SdkByteReadChannel] to an [HttpBody]
* @param contentLength the total content length of the channel if known
*/
@InternalApi
fun SdkByteReadChannel.toHttpBody(contentLength: Long? = null): HttpBody {
val ch = this
return object : HttpBody.Streaming() {
override val contentLength: Long? = contentLength
override val isReplayable: Boolean = false
override fun readFrom(): SdkByteReadChannel = ch
}
}

/**
* Consume the [HttpBody] and pull the entire contents into memory as a [ByteArray].
* Only do this if you are sure the contents fit in-memory as this will read the entire contents
Expand Down Expand Up @@ -120,3 +135,13 @@ fun HttpBody.toByteStream(): ByteStream? = when (val body = this) {
override fun readFrom(): SdkByteReadChannel = body.readFrom()
}
}

/**
* Convenience function to treat all [HttpBody] variants with a payload as an [SdkByteReadChannel]
*/
@InternalApi
fun HttpBody.toSdkByteReadChannel(): SdkByteReadChannel? = when (val body = this) {
is HttpBody.Empty -> null
is HttpBody.Bytes -> SdkByteReadChannel(body.bytes())
is HttpBody.Streaming -> body.readFrom()
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class KotlinDelegator(
unprocessedDependencies(writtenDependencies).forEach { generated ->
writtenDependencies.add(generated.fullName)
val writer = checkoutWriter(generated.definitionFile, generated.namespace)
writer.putContext("identifier.name", generated.name)
writer.apply(generated.renderer)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,27 @@ class KotlinSymbolProvider(private val model: Model, private val settings: Kotli
override fun memberShape(shape: MemberShape): Symbol {
val targetShape =
model.getShape(shape.target).orElseThrow { CodegenException("Shape not found: ${shape.target}") }
return toSymbol(targetShape)

val targetSymbol = toSymbol(targetShape)

// figure out if we are referencing an event stream or not.
// NOTE: unlike blob streams we actually re-use the target (union) shape which is why we can't do this
// when visiting a unionShape() like we can for blobShape()
val container = model.expectShape(shape.container) as? StructureShape
val isOperationInputOrOutput = container != null && (container.isOperationInput || container.isOperationOutput)
val isEventStream = targetShape.isStreaming && targetShape.isUnionShape

return if (isOperationInputOrOutput && isEventStream) {
// a top level operation input/output member referencing a streaming union is represented by a Flow<T>
buildSymbol {
name = "Flow<${targetSymbol.fullName}>"
nullable = true
reference(targetSymbol, SymbolReference.ContextOption.DECLARE)
reference(RuntimeTypes.KotlinxCoroutines.Flow.Flow, SymbolReference.ContextOption.DECLARE)
}
} else {
targetSymbol
}
}

override fun timestampShape(shape: TimestampShape?): Symbol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.util.function.BiFunction

/**
* A function that renders a symbol to the given writer
* The writer will have `identifier.name` set in the context to whatever the symbol name is that is being generated.
*/
typealias SymbolRenderer = (KotlinWriter) -> Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package software.amazon.smithy.kotlin.codegen.core

import software.amazon.smithy.kotlin.codegen.lang.isValidKotlinIdentifier
import software.amazon.smithy.kotlin.codegen.model.shape
import software.amazon.smithy.kotlin.codegen.utils.splitOnWordBoundaries
import software.amazon.smithy.kotlin.codegen.utils.toCamelCase
import software.amazon.smithy.kotlin.codegen.utils.toPascalCase
Expand All @@ -14,6 +15,8 @@ import software.amazon.smithy.model.shapes.OperationShape
import software.amazon.smithy.model.shapes.ServiceShape
import software.amazon.smithy.model.shapes.Shape
import software.amazon.smithy.model.traits.EnumDefinition
import java.security.MessageDigest
import java.util.*
import java.util.logging.Logger

// (somewhat) centralized naming rules
Expand Down Expand Up @@ -98,3 +101,22 @@ fun MemberShape.unionVariantName(): String = this.memberName.toPascalCase()
* e.g. `register{OperationName}Middleware
*/
fun OperationShape.registerMiddlewareName(): String = "register${this.capitalizedDefaultName()}Middleware"

/**
* Generate a mangled name based on the [shape] and the members contained in [members]
* If the set of [members] contains all members of [shape] then an empty suffix is returned.
*/
internal fun Shape.mangledSuffix(members: Collection<MemberShape> = members()): String {
check(members().containsAll(members)) { "One or more members given $members is not a member of $this" }
if (members().size == members.size) return ""

val md = MessageDigest.getInstance("SHA-256")
md.update(id.toString().encodeToByteArray())

members.forEach { md.update(it.id.toString().encodeToByteArray()) }

val b64Encoder = Base64.getUrlEncoder().withoutPadding()
val encoded = b64Encoder.encodeToString(md.digest())

return encoded.filter { it.isLetterOrDigit() }.substring(0, 8)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package software.amazon.smithy.kotlin.codegen.core
import software.amazon.smithy.codegen.core.Symbol
import software.amazon.smithy.kotlin.codegen.model.buildSymbol
import software.amazon.smithy.kotlin.codegen.model.namespace
import software.amazon.smithy.kotlin.codegen.model.toSymbol

/**
* Commonly used runtime types. Provides a single definition of a runtime symbol such that codegen isn't littered
Expand All @@ -31,6 +32,7 @@ object RuntimeTypes {
val toHttpBody = runtimeSymbol("toHttpBody", KotlinDependency.HTTP)
val isSuccess = runtimeSymbol("isSuccess", KotlinDependency.HTTP)
val StatusCode = runtimeSymbol("HttpStatusCode", KotlinDependency.HTTP)
val toSdkByteReadChannel = runtimeSymbol("toSdkByteReadChannel", KotlinDependency.HTTP)

object Util {
val encodeLabel = runtimeSymbol("encodeLabel", KotlinDependency.HTTP, "util")
Expand Down Expand Up @@ -180,6 +182,14 @@ object RuntimeTypes {
object IO {
val Closeable = runtimeSymbol("Closeable", KotlinDependency.IO)
}

object KotlinxCoroutines {
object Flow {
// NOTE: smithy-kotlin core has an API dependency on this already
val Flow = "kotlinx.coroutines.flow.Flow".toSymbol()
val map = "kotlinx.coroutines.flow.map".toSymbol()
}
}
}

private fun runtimeSymbol(name: String, dependency: KotlinDependency, subpackage: String = ""): Symbol = buildSymbol {
Expand Down
Loading