-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add codegen wrappers for retries #490
Changes from 4 commits
dfd0a6b
cb8cfad
e07a541
86975a2
fcf4bfe
3f16d51
5cf9224
6c3f563
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,24 +5,33 @@ | |
package aws.smithy.kotlin.runtime.http | ||
|
||
import aws.smithy.kotlin.runtime.content.ByteStream | ||
import aws.smithy.kotlin.runtime.http.util.CanDeepCopy | ||
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel | ||
|
||
/** | ||
* HTTP payload to be sent to a peer | ||
*/ | ||
sealed class HttpBody { | ||
sealed class HttpBody : CanDeepCopy<HttpBody> { | ||
/** | ||
* Flag indicating the body can be consumed multiple times. | ||
*/ | ||
open val isReplayable: Boolean = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question/style why move this? The only type we expect to need to "replay" is streaming right? |
||
|
||
/** | ||
* Specifies the length of this [HttpBody] content | ||
* If null it is assumed to be a streaming source using e.g. `Transfer-Encoding: Chunked` | ||
*/ | ||
open val contentLength: Long? = null | ||
|
||
abstract fun reset() | ||
|
||
/** | ||
* Variant of a [HttpBody] without a payload | ||
*/ | ||
object Empty : HttpBody() { | ||
override val contentLength: Long = 0 | ||
override fun deepCopy(): HttpBody = this // Deep copies are unnecessary for empty bodies | ||
override fun reset() { } // Resets are unnecessary for empty bodies | ||
} | ||
|
||
/** | ||
|
@@ -35,6 +44,14 @@ sealed class HttpBody { | |
* Provides [ByteArray] to be sent to peer | ||
*/ | ||
abstract fun bytes(): ByteArray | ||
|
||
override fun deepCopy(): Bytes = object : Bytes() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment I don't love that we are going to deep copy a payload. I'm not convinced this type should implement |
||
private val copiedBytes = [email protected]().copyOf() | ||
override fun bytes(): ByteArray = copiedBytes | ||
override val contentLength: Long? = copiedBytes.size.toLong() | ||
} | ||
|
||
override fun reset() { } // Resets are unnecessary for byte bodies | ||
} | ||
|
||
/** | ||
|
@@ -52,14 +69,20 @@ sealed class HttpBody { | |
* Flag indicating the stream can be consumed multiple times. If `false` [reset] will throw an | ||
* [UnsupportedOperationException]. | ||
*/ | ||
open val isReplayable: Boolean = false | ||
override val isReplayable: Boolean = false | ||
|
||
/** | ||
* Reset the stream such that the next call to [readFrom] provides a fresh channel. | ||
* @throws UnsupportedOperationException if the stream can only be consumed once. Consumers can check | ||
* [isReplayable] before calling | ||
*/ | ||
open fun reset() { throw UnsupportedOperationException("${this::class.simpleName} can only be consumed once") } | ||
open override fun reset() { throwSingleConsumptionException() } | ||
|
||
/** | ||
* Throw a general exception upon attempting to consume the stream more than once. | ||
*/ | ||
protected fun throwSingleConsumptionException(): Nothing = | ||
throw UnsupportedOperationException("${this::class.simpleName} can only be consumed once") | ||
} | ||
} | ||
|
||
|
@@ -73,11 +96,13 @@ fun ByteStream.toHttpBody(): HttpBody = when (val byteStream = this) { | |
} | ||
is ByteStream.OneShotStream -> object : HttpBody.Streaming() { | ||
override val contentLength: Long? = byteStream.contentLength | ||
override fun deepCopy(): Streaming = throwSingleConsumptionException() | ||
override fun readFrom(): SdkByteReadChannel = byteStream.readFrom() | ||
} | ||
is ByteStream.ReplayableStream -> object : HttpBody.Streaming() { | ||
private var channel: SdkByteReadChannel? = null | ||
override val contentLength: Long? = byteStream.contentLength | ||
override fun deepCopy(): Streaming = this // Replayable streams copy themselves by default | ||
override fun readFrom(): SdkByteReadChannel = channel ?: byteStream.newReader().also { channel = it } | ||
override val isReplayable: Boolean = true | ||
override fun reset() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
*/ | ||
package aws.smithy.kotlin.runtime.http | ||
|
||
import aws.smithy.kotlin.runtime.http.util.CanDeepCopy | ||
import aws.smithy.kotlin.runtime.util.InternalApi | ||
import aws.smithy.kotlin.runtime.util.text.encodeUrlPath | ||
|
||
|
@@ -105,7 +106,7 @@ data class UserInfo(val username: String, val password: String) | |
/** | ||
* Construct a URL by it's individual components | ||
*/ | ||
class UrlBuilder { | ||
class UrlBuilder : CanDeepCopy<UrlBuilder> { | ||
var scheme = Protocol.HTTPS | ||
var host: String = "" | ||
var port: Int? = null | ||
|
@@ -130,6 +131,17 @@ class UrlBuilder { | |
forceQuery | ||
) | ||
|
||
override fun deepCopy(): UrlBuilder = UrlBuilder().apply { | ||
scheme = [email protected] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style another ambiguous override fun deepCopy(): UrlBuilder {
val builder = this
return UrlBuilder().apply {
scheme = builder.scheme
...
}
} |
||
host = [email protected] | ||
port = [email protected] | ||
path = [email protected] | ||
parameters = [email protected]() | ||
fragment = [email protected] | ||
userInfo = [email protected]?.copy() | ||
forceQuery = [email protected] | ||
} | ||
|
||
override fun toString(): String = | ||
"UrlBuilder(scheme=$scheme, host='$host', port=$port, path='$path', parameters=$parameters, fragment=$fragment, userInfo=$userInfo, forceQuery=$forceQuery)" | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0. | ||
*/ | ||
|
||
package aws.smithy.kotlin.runtime.http.middleware | ||
|
||
import aws.smithy.kotlin.runtime.http.Feature | ||
import aws.smithy.kotlin.runtime.http.FeatureKey | ||
import aws.smithy.kotlin.runtime.http.HttpClientFeatureFactory | ||
import aws.smithy.kotlin.runtime.http.operation.SdkHttpOperation | ||
import aws.smithy.kotlin.runtime.http.operation.deepCopy | ||
import aws.smithy.kotlin.runtime.http.request.HttpRequestBuilder | ||
import aws.smithy.kotlin.runtime.retries.RetryPolicy | ||
import aws.smithy.kotlin.runtime.retries.RetryStrategy | ||
|
||
class RetryFeature(private val strategy: RetryStrategy, private val policy: RetryPolicy<Any?>) : Feature { | ||
class Config { | ||
var strategy: RetryStrategy? = null | ||
var policy: RetryPolicy<Any?>? = null | ||
} | ||
|
||
companion object Feature : HttpClientFeatureFactory<Config, RetryFeature> { | ||
override val key: FeatureKey<RetryFeature> = FeatureKey("RetryFeature") | ||
|
||
override fun create(block: Config.() -> Unit): RetryFeature { | ||
val config = Config().apply(block) | ||
val strategy = requireNotNull(config.strategy) { "strategy is required" } | ||
val policy = requireNotNull(config.policy) { "policy is required" } | ||
return RetryFeature(strategy, policy) | ||
} | ||
} | ||
|
||
override fun <I, O> install(operation: SdkHttpOperation<I, O>) { | ||
operation.execution.finalize.intercept { req, next -> | ||
if (req.subject.isRetryable) { | ||
strategy.retry(policy) { | ||
// Deep copy the request because later middlewares (e.g., signing) mutate it | ||
val reqCopy = req.deepCopy() | ||
aajtodd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Reset bodies back to beginning (mainly for streaming bodies) | ||
reqCopy.subject.body.reset() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment I don't know that I love how much of retry has seeped into |
||
|
||
next.call(reqCopy) | ||
aajtodd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} else { | ||
next.call(req) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Indicates whether this HTTP request could be retried. Some requests with streaming bodies are unsuitable for | ||
* retries. | ||
*/ | ||
val HttpRequestBuilder.isRetryable: Boolean | ||
get() = body.isReplayable |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0. | ||
*/ | ||
|
||
package aws.smithy.kotlin.runtime.http.util | ||
|
||
interface CanDeepCopy<T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style
|
||
fun deepCopy(): T | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question
Does this really need to support deep copy? Payload bytes aren't modified by signing (and they really shouldn't be modified by any middleware)