From 5861128a08621087a942f70eaf2e1bbaa0ff18af Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Wed, 11 Jan 2023 21:54:52 -0800 Subject: [PATCH 01/19] Added initial prototype for the crt http client integration. --- aws-cpp-sdk-core/CMakeLists.txt | 9 + .../include/aws/core/VersionConfig.h | 20 +- .../include/aws/core/http/crt/CRTHttpClient.h | 58 ++++ .../source/http/crt/CRTHttpClient.cpp | 288 ++++++++++++++++++ 4 files changed, 365 insertions(+), 10 deletions(-) create mode 100644 aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h create mode 100644 aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp diff --git a/aws-cpp-sdk-core/CMakeLists.txt b/aws-cpp-sdk-core/CMakeLists.txt index ab20cec84836..f70280d45619 100644 --- a/aws-cpp-sdk-core/CMakeLists.txt +++ b/aws-cpp-sdk-core/CMakeLists.txt @@ -198,6 +198,8 @@ elseif(ENABLE_WINDOWS_CLIENT) endif() endif() +file(GLOB CRT_HTTP_HEADERS "include/aws/core/http/crt/*.h") +file(GLOB CRT_HTTP_SOURCE "${CMAKE_CURRENT_SOURCE_DIR}/source/http/crt/*.cpp") if (PLATFORM_WINDOWS) file(GLOB NET_SOURCE "${CMAKE_CURRENT_SOURCE_DIR}/source/net/windows/*.cpp") @@ -229,6 +231,7 @@ file(GLOB AWS_NATIVE_SDK_COMMON_SRC ${AWS_CLIENT_SOURCE} ${HTTP_STANDARD_SOURCE} ${HTTP_CLIENT_SOURCE} + ${CRT_HTTP_SOURCE} ${CONFIG_SOURCE} ${CONFIG_DEFAULTS_SOURCE} ${ENDPOINT_SOURCE} @@ -401,6 +404,8 @@ if(MSVC) elseif(ENABLE_WINDOWS_CLIENT) source_group("Header Files\\aws\\core\\http\\windows" FILES ${HTTP_WINDOWS_CLIENT_HEADERS}) endif() + source_group("Header Files\\aws\\core\\http\\crt" FILES ${CRT_HTTP_HEADERS}) + # encryption conditional headers if(ENABLE_BCRYPT_ENCRYPTION) @@ -449,6 +454,8 @@ if(MSVC) elseif(ENABLE_WINDOWS_CLIENT) source_group("Source Files\\http\\windows" FILES ${HTTP_WINDOWS_CLIENT_SOURCE}) endif() + source_group("Source Files\\http\\crt" FILES ${CRT_HTTP_SOURCE}) + # encryption conditional source if(ENABLE_BCRYPT_ENCRYPTION) @@ -628,6 +635,8 @@ if(ENABLE_CURL_CLIENT) elseif(ENABLE_WINDOWS_CLIENT) install (FILES ${HTTP_WINDOWS_CLIENT_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/core/http/windows) endif() + install (FILES ${CRT_HTTP_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/core/http/crt) + # encryption headers if(ENABLE_BCRYPT_ENCRYPTION) diff --git a/aws-cpp-sdk-core/include/aws/core/VersionConfig.h b/aws-cpp-sdk-core/include/aws/core/VersionConfig.h index 33400f013b72..d208396c9014 100644 --- a/aws-cpp-sdk-core/include/aws/core/VersionConfig.h +++ b/aws-cpp-sdk-core/include/aws/core/VersionConfig.h @@ -1,10 +1,10 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ -#pragma once - -#define AWS_SDK_VERSION_STRING "1.10.48" -#define AWS_SDK_VERSION_MAJOR 1 -#define AWS_SDK_VERSION_MINOR 10 -#define AWS_SDK_VERSION_PATCH 48 +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#pragma once + +#define AWS_SDK_VERSION_STRING "1.10.48" +#define AWS_SDK_VERSION_MAJOR 1 +#define AWS_SDK_VERSION_MINOR 10 +#define AWS_SDK_VERSION_PATCH 48 diff --git a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h new file mode 100644 index 000000000000..95c87756bfa7 --- /dev/null +++ b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h @@ -0,0 +1,58 @@ +#include + +#include +#include + +#include + +namespace Aws +{ + namespace Crt + { + namespace Http + { + class HttpClientConnectionManager; + class HttpClientConnectionOptions; + } + } + + namespace Client + { + struct ClientConfiguration; + } // namespace Client + + namespace Http + { + class CRTHttpClient : public HttpClient { + public: + using Base = HttpClient; + + /** + * Initializes the client with relevant parameters from clientConfig. + */ + CRTHttpClient(const Aws::Client::ClientConfiguration& clientConfig, Crt::Io::ClientBootstrap& bootstrap); + ~CRTHttpClient(); + + std::shared_ptr MakeRequest(const std::shared_ptr& request, + Aws::Utils::RateLimits::RateLimiterInterface* readLimiter = nullptr, + Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter = nullptr) const override; + + private: + mutable std::unordered_map> m_connectionPools; + mutable std::mutex m_connectionPoolLock; + + Crt::Optional m_context; + Crt::Optional< Crt::Http::HttpClientConnectionProxyOptions> m_proxyOptions; + + Crt::Io::ClientBootstrap& m_bootstrap; + + Client::ClientConfiguration m_configuration; + + const std::shared_ptr GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& connectionOptions) const; + static Aws::String ResolveConnectionPoolHash(const URI& uri); + + Crt::Http::HttpClientConnectionOptions CreateConnectionOptionsForRequest(const std::shared_ptr& request) const; + void CheckAndInitializeProxySettings(const Aws::Client::ClientConfiguration& clientConfig); + }; + } +} \ No newline at end of file diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp new file mode 100644 index 000000000000..0e018c6ed053 --- /dev/null +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -0,0 +1,288 @@ + +#include +#include +#include + +namespace Aws +{ + namespace Http + { + + CRTHttpClient::CRTHttpClient(const Aws::Client::ClientConfiguration& clientConfig, Crt::Io::ClientBootstrap& bootstrap) : + HttpClient(), m_bootstrap(bootstrap), m_configuration(clientConfig) + { + //first need to figure TLS out... + Crt::Io::TlsContextOptions tlsContextOptions = Crt::Io::TlsContextOptions::InitDefaultClient(); + + CheckAndInitializeProxySettings(clientConfig); + + // if ca is overridden and a proxy is configured, it's intended for the proxy, not this context. + if (!m_proxyOptions.has_value()) + { + if (!m_configuration.caPath.empty() || !m_configuration.caFile.empty()) + { + const char* caPath = m_configuration.caPath.empty() ? nullptr : m_configuration.caPath.c_str(); + const char* caFile = m_configuration.caFile.empty() ? nullptr : m_configuration.caFile.c_str(); + tlsContextOptions.OverrideDefaultTrustStore(caPath, caFile); + } + } + + tlsContextOptions.SetVerifyPeer(m_configuration.verifySSL); + + if (tlsContextOptions.IsAlpnSupported()) + { + // this may need to be pulled from the client configuration.... + tlsContextOptions.SetAlpnList("h2;http/1.1"); + } + + m_context = Crt::Io::TlsContext::TlsContext(tlsContextOptions, Crt::Io::TlsMode::CLIENT); + } + + CRTHttpClient::~CRTHttpClient() + { + Aws::Vector> shutdownFutures; + + for (auto& managerPair : m_connectionPools) + { + shutdownFutures.push_back(managerPair.second->InitiateShutdown()); + } + + for (auto& shutdownFuture : shutdownFutures) + { + shutdownFuture.get(); + } + + shutdownFutures.clear(); + m_connectionPools.clear(); + } + + std::shared_ptr CRTHttpClient::MakeRequest(const std::shared_ptr& request, + Aws::Utils::RateLimits::RateLimiterInterface*, + Aws::Utils::RateLimits::RateLimiterInterface*) const + { + auto requestConnOptions = CreateConnectionOptionsForRequest(request); + auto connectionManager = GetWithCreateConnectionManagerForRequest(request, requestConnOptions); + + auto crtRequest = Crt::MakeShared(Crt::g_allocator); + for (const auto& header : request->GetHeaders()) + { + Crt::Http::HttpHeader crtHeader; + + auto nameCursor = Crt::ByteCursorFromCString(header.first.c_str()); + auto valueCursor = Crt::ByteCursorFromCString(header.second.c_str()); + crtHeader.name = nameCursor; + crtHeader.value = valueCursor; + + crtRequest->AddHeader(crtHeader); + } + + auto methodCursor = Crt::ByteCursorFromCString(Aws::Http::HttpMethodMapper::GetNameForHttpMethod(request->GetMethod())); + crtRequest->SetMethod(methodCursor); + + auto pathStrCpy = request->GetUri().GetURLEncodedPathRFC3986(); + auto queryStrCpy = request->GetUri().GetQueryString(); + Aws::StringStream ss; + ss << pathStrCpy << queryStrCpy; + auto fullPathAndQueryCpy = ss.str(); + auto pathCursor = Crt::ByteCursorFromCString(fullPathAndQueryCpy.c_str()); + crtRequest->SetPath(pathCursor); + + if (request->GetContentBody()) + { + crtRequest->SetBody(request->GetContentBody()); + } + + auto response = Aws::MakeShared("CRTHttpClient", request); + + Crt::Http::HttpRequestOptions requestOptions; + requestOptions.onIncomingBody = + [response](Crt::Http::HttpStream& stream, const Crt::ByteCursor& body) + { + response->GetResponseBody().write((const char*)body.ptr, body.len); + }; + + requestOptions.onIncomingHeaders = + [response](Crt::Http::HttpStream&, enum aws_http_header_block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount) + { + for (auto i = 0; i < headersCount; ++i) + { + const Crt::Http::HttpHeader* header = &headersArray[i]; + Aws::String headerNameStr((const char* const)header->name.ptr, header->name.len); + Aws::String headerValueStr((const char* const)header->value.ptr, header->value.len); + response->AddHeader(std::move(headerNameStr), std::move(headerValueStr)); + } + }; + + requestOptions.onIncomingHeadersBlockDone = + [response](Crt::Http::HttpStream& stream, enum aws_http_header_block block) + { + if (block == AWS_HTTP_HEADER_BLOCK_MAIN) + { + // gotta set it somewhere, and this seems to get called at a good enough time to do it, and it only gets called once. + response->SetResponseCode((HttpResponseCode)stream.GetResponseStatusCode()); + } + }; + + std::mutex waiterLock; + std::condition_variable waiterCVar; + bool waitCompletedIntentionally = false; + + requestOptions.onStreamComplete = + [&waiterCVar, &waiterLock, &waitCompletedIntentionally, &response](Crt::Http::HttpStream& stream, int errorCode) + { + if (errorCode) + { + /* come back to this one and get the right error parsed out. */ + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + } + { + std::lock_guard locker(waiterLock); + waitCompletedIntentionally = true; + } + waiterCVar.notify_all(); + }; + + connectionManager->AcquireConnection([requestOptions, response](std::shared_ptr connection, int errorCode) { + if (connection) + { + auto clientStream = connection->NewClientStream(requestOptions); + clientStream->Activate(); + } + else + { + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + } + }); + + std::unique_lock cvarUniqueLock(waiterLock); + waiterCVar.wait(cvarUniqueLock, [&waitCompletedIntentionally]() { return waitCompletedIntentionally; }); + } + + Aws::String CRTHttpClient::ResolveConnectionPoolHash(const URI& uri) + { + Aws::StringStream ss; + ss << SchemeMapper::ToString(uri.GetScheme()) << "://" << uri.GetAuthority() << uri.GetPort(); + + return ss.str(); + } + + const std::shared_ptr CRTHttpClient::GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& options) const + { + const auto connManagerRequestHash = ResolveConnectionPoolHash(request->GetUri()); + + std::lock_guard locker(m_connectionPoolLock); + + const auto& foundManager = m_connectionPools.find(connManagerRequestHash); + + if (foundManager != m_connectionPools.cend()) { + return foundManager->second; + } + + Crt::Http::HttpClientConnectionManagerOptions connectionManagerOptions; + connectionManagerOptions.ConnectionOptions = options; + connectionManagerOptions.MaxConnections = m_configuration.maxConnections; + connectionManagerOptions.EnableBlockingShutdown = true; + + auto connectionManager = Crt::Http::HttpClientConnectionManager::NewClientConnectionManager(connectionManagerOptions); + m_connectionPools.emplace(std::move(connManagerRequestHash), connectionManager); + + return connectionManager; + } + + Crt::Http::HttpClientConnectionOptions CRTHttpClient::CreateConnectionOptionsForRequest(const std::shared_ptr& request) const + { + Crt::Http::HttpClientConnectionOptions connectionOptions; + connectionOptions.HostName = request->GetUri().GetAuthority().c_str(); + // probably want to come back and update this when we hook up the rate limiters. + connectionOptions.ManualWindowManagement = false; + connectionOptions.Port = request->GetUri().GetPort(); + + if (m_context.has_value() && request->GetUri().GetScheme() == Scheme::HTTPS) + { + connectionOptions.TlsOptions = m_context.value().NewConnectionOptions(); + auto serverName = request->GetUri().GetAuthority(); + auto serverNameCursor = Crt::ByteCursorFromCString(serverName.c_str()); + connectionOptions.TlsOptions->SetServerName(serverNameCursor); + } + + connectionOptions.Bootstrap = &m_bootstrap; + + if (m_proxyOptions.has_value()) + { + connectionOptions.ProxyOptions = m_proxyOptions.value(); + } + + connectionOptions.SocketOptions.SetConnectTimeoutMs(m_configuration.connectTimeoutMs); + connectionOptions.SocketOptions.SetKeepAlive(m_configuration.enableTcpKeepAlive); + connectionOptions.SocketOptions.SetKeepAliveIntervalSec(m_configuration.tcpKeepAliveIntervalMs * 1000); + connectionOptions.SocketOptions.SetSocketType(Crt::Io::SocketType::Stream); + + return connectionOptions; + } + + void CRTHttpClient::CheckAndInitializeProxySettings(const Aws::Client::ClientConfiguration& clientConfig) + { + if (!m_configuration.proxyHost.empty()) + { + Crt::Http::HttpClientConnectionProxyOptions proxyOptions; + + if (!m_configuration.proxyUserName.empty()) + { + proxyOptions.AuthType = Crt::Http::AwsHttpProxyAuthenticationType::Basic; + proxyOptions.BasicAuthUsername = m_configuration.proxyUserName.c_str(); + proxyOptions.BasicAuthPassword = m_configuration.proxyPassword.c_str(); + } + + proxyOptions.HostName = m_configuration.proxyHost.c_str(); + + if (m_configuration.proxyPort != 0) + { + proxyOptions.Port = m_configuration.proxyPort; + } + else + { + proxyOptions.Port = m_configuration.proxyScheme == Scheme::HTTPS ? 443 : 80; + } + + if (m_configuration.proxyScheme == Scheme::HTTPS) + { + + Crt::Io::TlsContextOptions contextOptions; + contextOptions.SetVerifyPeer(m_configuration.verifySSL); + + if (m_configuration.proxySSLKeyPath.empty()) + { + contextOptions = Crt::Io::TlsContextOptions::InitDefaultClient(); + } + else if (m_configuration.proxySSLKeyPassword.empty()) + { + const char* certPath = m_configuration.proxySSLCertPath.empty() ? nullptr : m_configuration.proxySSLCertPath.c_str(); + const char* certFile = m_configuration.proxySSLKeyPath.empty() ? nullptr : m_configuration.proxySSLKeyPath.c_str(); + contextOptions = Crt::Io::TlsContextOptions::InitClientWithMtls(certPath, certFile); + } + else + { + const char* pkcs12CertFile = m_configuration.proxySSLKeyPath.empty() ? nullptr : m_configuration.proxySSLKeyPath.c_str(); + const char* pkcs12Pwd = m_configuration.proxySSLKeyPassword.c_str(); + contextOptions = Crt::Io::TlsContextOptions::InitClientWithMtlsPkcs12(pkcs12CertFile, pkcs12Pwd); + } + + if (!m_configuration.caFile.empty() || !m_configuration.caPath.empty()) + { + const char* caPath = m_configuration.caPath.empty() ? nullptr : m_configuration.caPath.c_str(); + const char* caFile = m_configuration.caFile.empty() ? nullptr : m_configuration.caFile.c_str(); + contextOptions.OverrideDefaultTrustStore(caPath, caFile); + } + + Crt::Io::TlsContext context = Crt::Io::TlsContext(contextOptions, Crt::Io::TlsMode::CLIENT); + proxyOptions.TlsOptions = context.NewConnectionOptions(); + } + + m_proxyOptions = std::move(proxyOptions); + } + } + + } +} From 39b21bea72e417c80469ead45df933557fc6868a Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Thu, 12 Jan 2023 12:53:52 -0800 Subject: [PATCH 02/19] Added crt http client to the http client factory. Can not run tests on windows for some reason so, gonna push it up so i can run them from my mac. --- aws-cpp-sdk-core/CMakeLists.txt | 2 + .../include/aws/core/http/crt/CRTHttpClient.h | 11 +++- aws-cpp-sdk-core/source/Aws.cpp | 2 + .../source/http/HttpClientFactory.cpp | 9 +++- .../source/http/crt/CRTHttpClient.cpp | 51 ++++++++++--------- 5 files changed, 48 insertions(+), 27 deletions(-) diff --git a/aws-cpp-sdk-core/CMakeLists.txt b/aws-cpp-sdk-core/CMakeLists.txt index f70280d45619..e5d31fc971c8 100644 --- a/aws-cpp-sdk-core/CMakeLists.txt +++ b/aws-cpp-sdk-core/CMakeLists.txt @@ -476,6 +476,8 @@ target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_SDK_VERSION_MAJOR=${AWSSD target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_SDK_VERSION_MINOR=${AWSSDK_VERSION_MINOR}") target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_SDK_VERSION_PATCH=${AWSSDK_VERSION_PATCH}") +target_compile_definitions(${PROJECT_NAME} PRIVATE "AWS_SDK_USE_CRT_HTTP") + if (WININET_HAS_H2) target_compile_definitions(${PROJECT_NAME} PRIVATE "WININET_HAS_H2") endif() diff --git a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h index 95c87756bfa7..842d1af6920a 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h +++ b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h @@ -1,7 +1,9 @@ #include #include +#include #include +#include #include @@ -14,6 +16,11 @@ namespace Aws class HttpClientConnectionManager; class HttpClientConnectionOptions; } + + namespace Io + { + class ClientBootstrap; + }; } namespace Client @@ -23,7 +30,7 @@ namespace Aws namespace Http { - class CRTHttpClient : public HttpClient { + class AWS_CORE_API CRTHttpClient : public HttpClient { public: using Base = HttpClient; @@ -42,7 +49,7 @@ namespace Aws mutable std::mutex m_connectionPoolLock; Crt::Optional m_context; - Crt::Optional< Crt::Http::HttpClientConnectionProxyOptions> m_proxyOptions; + Crt::Optional m_proxyOptions; Crt::Io::ClientBootstrap& m_bootstrap; diff --git a/aws-cpp-sdk-core/source/Aws.cpp b/aws-cpp-sdk-core/source/Aws.cpp index 4fd97618f308..45364b24ab67 100644 --- a/aws-cpp-sdk-core/source/Aws.cpp +++ b/aws-cpp-sdk-core/source/Aws.cpp @@ -16,6 +16,8 @@ #include #include +#include + namespace Aws { static const char* ALLOCATION_TAG = "Aws_Init_Cleanup"; diff --git a/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp b/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp index da1759ef9776..9c75683018b1 100644 --- a/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp +++ b/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp @@ -5,7 +5,10 @@ #include -#if ENABLE_CURL_CLIENT +#if AWS_SDK_USE_CRT_HTTP +#include +#include +#elif ENABLE_CURL_CLIENT #include #include @@ -62,10 +65,12 @@ namespace Aws { std::shared_ptr CreateHttpClient(const ClientConfiguration& clientConfiguration) const override { +#if AWS_SDK_USE_CRT_HTTP + return Aws::MakeShared(HTTP_CLIENT_FACTORY_ALLOCATION_TAG, clientConfiguration, *GetDefaultClientBootstrap()); // Figure out whether the selected option is available but fail gracefully and return a default of some type if not // Windows clients: Http and Inet are always options, Curl MIGHT be an option if USE_CURL_CLIENT is on, and http is "default" // Other clients: Curl is your default -#if ENABLE_WINDOWS_CLIENT +#elif ENABLE_WINDOWS_CLIENT #if ENABLE_WINDOWS_IXML_HTTP_REQUEST_2_CLIENT #if BYPASS_DEFAULT_PROXY switch (clientConfiguration.httpLibOverride) diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 0e018c6ed053..0858fc9ef512 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -2,6 +2,9 @@ #include #include #include +#include +#include + namespace Aws { @@ -9,7 +12,7 @@ namespace Aws { CRTHttpClient::CRTHttpClient(const Aws::Client::ClientConfiguration& clientConfig, Crt::Io::ClientBootstrap& bootstrap) : - HttpClient(), m_bootstrap(bootstrap), m_configuration(clientConfig) + HttpClient(), m_context(), m_proxyOptions(), m_bootstrap(bootstrap), m_configuration(clientConfig) { //first need to figure TLS out... Crt::Io::TlsContextOptions tlsContextOptions = Crt::Io::TlsContextOptions::InitDefaultClient(); @@ -92,11 +95,11 @@ namespace Aws crtRequest->SetBody(request->GetContentBody()); } - auto response = Aws::MakeShared("CRTHttpClient", request); + auto response = Aws::MakeShared("CRTHttpClient", request); Crt::Http::HttpRequestOptions requestOptions; requestOptions.onIncomingBody = - [response](Crt::Http::HttpStream& stream, const Crt::ByteCursor& body) + [response](Crt::Http::HttpStream&, const Crt::ByteCursor& body) { response->GetResponseBody().write((const char*)body.ptr, body.len); }; @@ -128,7 +131,7 @@ namespace Aws bool waitCompletedIntentionally = false; requestOptions.onStreamComplete = - [&waiterCVar, &waiterLock, &waitCompletedIntentionally, &response](Crt::Http::HttpStream& stream, int errorCode) + [&waiterCVar, &waiterLock, &waitCompletedIntentionally, &response](Crt::Http::HttpStream&, int errorCode) { if (errorCode) { @@ -158,6 +161,8 @@ namespace Aws std::unique_lock cvarUniqueLock(waiterLock); waiterCVar.wait(cvarUniqueLock, [&waitCompletedIntentionally]() { return waitCompletedIntentionally; }); + + return response; } Aws::String CRTHttpClient::ResolveConnectionPoolHash(const URI& uri) @@ -186,7 +191,7 @@ namespace Aws connectionManagerOptions.EnableBlockingShutdown = true; auto connectionManager = Crt::Http::HttpClientConnectionManager::NewClientConnectionManager(connectionManagerOptions); - m_connectionPools.emplace(std::move(connManagerRequestHash), connectionManager); + m_connectionPools.emplace(connManagerRequestHash, connectionManager); return connectionManager; } @@ -216,7 +221,7 @@ namespace Aws connectionOptions.SocketOptions.SetConnectTimeoutMs(m_configuration.connectTimeoutMs); connectionOptions.SocketOptions.SetKeepAlive(m_configuration.enableTcpKeepAlive); - connectionOptions.SocketOptions.SetKeepAliveIntervalSec(m_configuration.tcpKeepAliveIntervalMs * 1000); + connectionOptions.SocketOptions.SetKeepAliveIntervalSec((uint16_t)(m_configuration.tcpKeepAliveIntervalMs / 1000)); connectionOptions.SocketOptions.SetSocketType(Crt::Io::SocketType::Stream); return connectionOptions; @@ -224,55 +229,55 @@ namespace Aws void CRTHttpClient::CheckAndInitializeProxySettings(const Aws::Client::ClientConfiguration& clientConfig) { - if (!m_configuration.proxyHost.empty()) + if (!clientConfig.proxyHost.empty()) { Crt::Http::HttpClientConnectionProxyOptions proxyOptions; - if (!m_configuration.proxyUserName.empty()) + if (!clientConfig.proxyUserName.empty()) { proxyOptions.AuthType = Crt::Http::AwsHttpProxyAuthenticationType::Basic; - proxyOptions.BasicAuthUsername = m_configuration.proxyUserName.c_str(); - proxyOptions.BasicAuthPassword = m_configuration.proxyPassword.c_str(); + proxyOptions.BasicAuthUsername = clientConfig.proxyUserName.c_str(); + proxyOptions.BasicAuthPassword = clientConfig.proxyPassword.c_str(); } proxyOptions.HostName = m_configuration.proxyHost.c_str(); - if (m_configuration.proxyPort != 0) + if (clientConfig.proxyPort != 0) { - proxyOptions.Port = m_configuration.proxyPort; + proxyOptions.Port = static_cast(clientConfig.proxyPort); } else { - proxyOptions.Port = m_configuration.proxyScheme == Scheme::HTTPS ? 443 : 80; + proxyOptions.Port = clientConfig.proxyScheme == Scheme::HTTPS ? 443 : 80; } - if (m_configuration.proxyScheme == Scheme::HTTPS) + if (clientConfig.proxyScheme == Scheme::HTTPS) { Crt::Io::TlsContextOptions contextOptions; - contextOptions.SetVerifyPeer(m_configuration.verifySSL); + contextOptions.SetVerifyPeer(clientConfig.verifySSL); - if (m_configuration.proxySSLKeyPath.empty()) + if (clientConfig.proxySSLKeyPath.empty()) { contextOptions = Crt::Io::TlsContextOptions::InitDefaultClient(); } - else if (m_configuration.proxySSLKeyPassword.empty()) + else if (clientConfig.proxySSLKeyPassword.empty()) { - const char* certPath = m_configuration.proxySSLCertPath.empty() ? nullptr : m_configuration.proxySSLCertPath.c_str(); - const char* certFile = m_configuration.proxySSLKeyPath.empty() ? nullptr : m_configuration.proxySSLKeyPath.c_str(); + const char* certPath = clientConfig.proxySSLCertPath.empty() ? nullptr : clientConfig.proxySSLCertPath.c_str(); + const char* certFile = clientConfig.proxySSLKeyPath.empty() ? nullptr : clientConfig.proxySSLKeyPath.c_str(); contextOptions = Crt::Io::TlsContextOptions::InitClientWithMtls(certPath, certFile); } else { - const char* pkcs12CertFile = m_configuration.proxySSLKeyPath.empty() ? nullptr : m_configuration.proxySSLKeyPath.c_str(); - const char* pkcs12Pwd = m_configuration.proxySSLKeyPassword.c_str(); + const char* pkcs12CertFile = clientConfig.proxySSLKeyPath.empty() ? nullptr : clientConfig.proxySSLKeyPath.c_str(); + const char* pkcs12Pwd = clientConfig.proxySSLKeyPassword.c_str(); contextOptions = Crt::Io::TlsContextOptions::InitClientWithMtlsPkcs12(pkcs12CertFile, pkcs12Pwd); } if (!m_configuration.caFile.empty() || !m_configuration.caPath.empty()) { - const char* caPath = m_configuration.caPath.empty() ? nullptr : m_configuration.caPath.c_str(); - const char* caFile = m_configuration.caFile.empty() ? nullptr : m_configuration.caFile.c_str(); + const char* caPath = clientConfig.caPath.empty() ? nullptr : clientConfig.caPath.c_str(); + const char* caFile = clientConfig.caFile.empty() ? nullptr : clientConfig.caFile.c_str(); contextOptions.OverrideDefaultTrustStore(caPath, caFile); } From 46fb1cc2e99128b7dfe2af37d45dab7133b5265b Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Thu, 12 Jan 2023 14:27:40 -0800 Subject: [PATCH 03/19] Working client plumbing, but need to run the integration tests to make sure the client actually works. --- CMakeLists.txt | 2 +- .../include/aws/core/VersionConfig.h | 20 ++++---- .../include/aws/core/http/HttpResponse.h | 5 ++ .../include/aws/core/http/crt/CRTHttpClient.h | 2 +- .../core/http/standard/StandardHttpResponse.h | 17 ++++--- .../source/http/HttpClientFactory.cpp | 3 +- .../source/http/crt/CRTHttpClient.cpp | 47 ++++++++++--------- .../http/standard/StandardHttpResponse.cpp | 5 ++ 8 files changed, 61 insertions(+), 40 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3ea40bd16dcf..d38777052d6d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,7 +40,7 @@ option(NO_ENCRYPTION "If enabled, no platform-default encryption will be include option(USE_IXML_HTTP_REQUEST_2 "If enabled on windows, the com object IXmlHttpRequest2 will be used for the http stack" OFF) option(ENABLE_RTTI "Flag to enable/disable rtti within the library" ON) option(ENABLE_TESTING "Flag to enable/disable building unit and integration tests" ON) -option(AUTORUN_UNIT_TESTS "Flag to enable/disable automatically run unit tests after building" ON) +option(AUTORUN_UNIT_TESTS "Flag to enable/disable automatically run unit tests after building" OFF) option(ANDROID_BUILD_CURL "When building for Android, should curl be built as well" ON) option(ANDROID_BUILD_OPENSSL "When building for Android, should Openssl be built as well" ON) option(ANDROID_BUILD_ZLIB "When building for Android, should Zlib be built as well" ON) diff --git a/aws-cpp-sdk-core/include/aws/core/VersionConfig.h b/aws-cpp-sdk-core/include/aws/core/VersionConfig.h index d208396c9014..33400f013b72 100644 --- a/aws-cpp-sdk-core/include/aws/core/VersionConfig.h +++ b/aws-cpp-sdk-core/include/aws/core/VersionConfig.h @@ -1,10 +1,10 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ -#pragma once - -#define AWS_SDK_VERSION_STRING "1.10.48" -#define AWS_SDK_VERSION_MAJOR 1 -#define AWS_SDK_VERSION_MINOR 10 -#define AWS_SDK_VERSION_PATCH 48 +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#pragma once + +#define AWS_SDK_VERSION_STRING "1.10.48" +#define AWS_SDK_VERSION_MAJOR 1 +#define AWS_SDK_VERSION_MINOR 10 +#define AWS_SDK_VERSION_PATCH 48 diff --git a/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h b/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h index 8082547a7175..db9351a4a186 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h +++ b/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h @@ -195,6 +195,11 @@ namespace Aws * Adds a header to the http response object. */ virtual void AddHeader(const Aws::String&, const Aws::String&) = 0; + /** + * Add a header to the http response object, and move the value. + * The name can't be moved as it is converted to lower-case. + */ + virtual void AddHeader(const Aws::String& headerName, Aws::String&& headerValue) = 0; /** * Sets the content type header on the http response object. */ diff --git a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h index 842d1af6920a..a6ffa48bb24f 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h +++ b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h @@ -55,7 +55,7 @@ namespace Aws Client::ClientConfiguration m_configuration; - const std::shared_ptr GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& connectionOptions) const; + std::shared_ptr GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& connectionOptions) const; static Aws::String ResolveConnectionPoolHash(const URI& uri); Crt::Http::HttpClientConnectionOptions CreateConnectionOptionsForRequest(const std::shared_ptr& request) const; diff --git a/aws-cpp-sdk-core/include/aws/core/http/standard/StandardHttpResponse.h b/aws-cpp-sdk-core/include/aws/core/http/standard/StandardHttpResponse.h index 309206f1f345..a3c5a20bdc73 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/standard/StandardHttpResponse.h +++ b/aws-cpp-sdk-core/include/aws/core/http/standard/StandardHttpResponse.h @@ -37,28 +37,33 @@ namespace Aws /** * Get the headers from this response */ - HeaderValueCollection GetHeaders() const; + HeaderValueCollection GetHeaders() const override; /** * Returns true if the response contains a header by headerName */ - bool HasHeader(const char* headerName) const; + bool HasHeader(const char* headerName) const override; /** * Returns the value for a header at headerName if it exists. */ - const Aws::String& GetHeader(const Aws::String&) const; + const Aws::String& GetHeader(const Aws::String&) const override; /** * Gets the response body of the response. */ - inline Aws::IOStream& GetResponseBody() const { return bodyStream.GetUnderlyingStream(); } + inline Aws::IOStream& GetResponseBody() const override { return bodyStream.GetUnderlyingStream(); } /** * Gives full control of the memory of the ResponseBody over to the caller. At this point, it is the caller's * responsibility to clean up this object. */ - inline Utils::Stream::ResponseStream&& SwapResponseStreamOwnership() { return std::move(bodyStream); } + inline Utils::Stream::ResponseStream&& SwapResponseStreamOwnership() override { return std::move(bodyStream); } /** * Adds a header to the http response object. */ - void AddHeader(const Aws::String&, const Aws::String&); + void AddHeader(const Aws::String&, const Aws::String&) override; + /** + * Add a header to the http response object, and move the value. + * The name can't be moved as it is converted to lower-case. + */ + void AddHeader(const Aws::String& headerName, Aws::String&& headerValue) override; private: StandardHttpResponse(const StandardHttpResponse&); diff --git a/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp b/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp index 9c75683018b1..ab1d5fe53812 100644 --- a/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp +++ b/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp @@ -8,7 +8,8 @@ #if AWS_SDK_USE_CRT_HTTP #include #include -#elif ENABLE_CURL_CLIENT +#endif +#if ENABLE_CURL_CLIENT #include #include diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 0858fc9ef512..81dbc04279a6 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -32,13 +32,14 @@ namespace Aws tlsContextOptions.SetVerifyPeer(m_configuration.verifySSL); - if (tlsContextOptions.IsAlpnSupported()) + if (Crt::Io::TlsContextOptions::IsAlpnSupported()) { // this may need to be pulled from the client configuration.... tlsContextOptions.SetAlpnList("h2;http/1.1"); } - m_context = Crt::Io::TlsContext::TlsContext(tlsContextOptions, Crt::Io::TlsMode::CLIENT); + Crt::Io::TlsContext newContext(tlsContextOptions, Crt::Io::TlsMode::CLIENT); + m_context = std::move(newContext); } CRTHttpClient::~CRTHttpClient() @@ -71,11 +72,8 @@ namespace Aws { Crt::Http::HttpHeader crtHeader; - auto nameCursor = Crt::ByteCursorFromCString(header.first.c_str()); - auto valueCursor = Crt::ByteCursorFromCString(header.second.c_str()); - crtHeader.name = nameCursor; - crtHeader.value = valueCursor; - + crtHeader.name = Crt::ByteCursorFromArray((const uint8_t *)header.first.data(), header.first.length());; + crtHeader.value = Crt::ByteCursorFromArray((const uint8_t *)header.second.data(), header.second.length()); crtRequest->AddHeader(crtHeader); } @@ -107,12 +105,12 @@ namespace Aws requestOptions.onIncomingHeaders = [response](Crt::Http::HttpStream&, enum aws_http_header_block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount) { - for (auto i = 0; i < headersCount; ++i) + for (size_t i = 0; i < headersCount; ++i) { const Crt::Http::HttpHeader* header = &headersArray[i]; Aws::String headerNameStr((const char* const)header->name.ptr, header->name.len); Aws::String headerValueStr((const char* const)header->value.ptr, header->value.len); - response->AddHeader(std::move(headerNameStr), std::move(headerValueStr)); + response->AddHeader(headerNameStr, std::move(headerValueStr)); } }; @@ -146,7 +144,9 @@ namespace Aws waiterCVar.notify_all(); }; - connectionManager->AcquireConnection([requestOptions, response](std::shared_ptr connection, int errorCode) { + connectionManager->AcquireConnection( + [requestOptions, response, &waitCompletedIntentionally, &waiterCVar, &waiterLock] + (std::shared_ptr connection, int errorCode) { if (connection) { auto clientStream = connection->NewClientStream(requestOptions); @@ -156,6 +156,11 @@ namespace Aws { response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + { + std::lock_guard locker(waiterLock); + waitCompletedIntentionally = true; + } + waiterCVar.notify_all(); } }); @@ -173,7 +178,7 @@ namespace Aws return ss.str(); } - const std::shared_ptr CRTHttpClient::GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& options) const + std::shared_ptr CRTHttpClient::GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& options) const { const auto connManagerRequestHash = ResolveConnectionPoolHash(request->GetUri()); @@ -221,7 +226,12 @@ namespace Aws connectionOptions.SocketOptions.SetConnectTimeoutMs(m_configuration.connectTimeoutMs); connectionOptions.SocketOptions.SetKeepAlive(m_configuration.enableTcpKeepAlive); - connectionOptions.SocketOptions.SetKeepAliveIntervalSec((uint16_t)(m_configuration.tcpKeepAliveIntervalMs / 1000)); + + if (m_configuration.enableTcpKeepAlive) + { + connectionOptions.SocketOptions.SetKeepAliveIntervalSec( + (uint16_t) (m_configuration.tcpKeepAliveIntervalMs / 1000)); + } connectionOptions.SocketOptions.SetSocketType(Crt::Io::SocketType::Stream); return connectionOptions; @@ -253,21 +263,15 @@ namespace Aws if (clientConfig.proxyScheme == Scheme::HTTPS) { + Crt::Io::TlsContextOptions contextOptions = Crt::Io::TlsContextOptions::InitDefaultClient(); - Crt::Io::TlsContextOptions contextOptions; - contextOptions.SetVerifyPeer(clientConfig.verifySSL); - - if (clientConfig.proxySSLKeyPath.empty()) - { - contextOptions = Crt::Io::TlsContextOptions::InitDefaultClient(); - } - else if (clientConfig.proxySSLKeyPassword.empty()) + if (clientConfig.proxySSLKeyPassword.empty() && !clientConfig.proxySSLCertPath.empty()) { const char* certPath = clientConfig.proxySSLCertPath.empty() ? nullptr : clientConfig.proxySSLCertPath.c_str(); const char* certFile = clientConfig.proxySSLKeyPath.empty() ? nullptr : clientConfig.proxySSLKeyPath.c_str(); contextOptions = Crt::Io::TlsContextOptions::InitClientWithMtls(certPath, certFile); } - else + else if (!clientConfig.proxySSLKeyPassword.empty()) { const char* pkcs12CertFile = clientConfig.proxySSLKeyPath.empty() ? nullptr : clientConfig.proxySSLKeyPath.c_str(); const char* pkcs12Pwd = clientConfig.proxySSLKeyPassword.c_str(); @@ -281,6 +285,7 @@ namespace Aws contextOptions.OverrideDefaultTrustStore(caPath, caFile); } + contextOptions.SetVerifyPeer(clientConfig.verifySSL); Crt::Io::TlsContext context = Crt::Io::TlsContext(contextOptions, Crt::Io::TlsMode::CLIENT); proxyOptions.TlsOptions = context.NewConnectionOptions(); } diff --git a/aws-cpp-sdk-core/source/http/standard/StandardHttpResponse.cpp b/aws-cpp-sdk-core/source/http/standard/StandardHttpResponse.cpp index 8b62ae5e634d..5a72c662bdd3 100644 --- a/aws-cpp-sdk-core/source/http/standard/StandardHttpResponse.cpp +++ b/aws-cpp-sdk-core/source/http/standard/StandardHttpResponse.cpp @@ -51,4 +51,9 @@ void StandardHttpResponse::AddHeader(const Aws::String& headerName, const Aws::S headerMap[StringUtils::ToLower(headerName.c_str())] = headerValue; } +void StandardHttpResponse::AddHeader(const Aws::String& headerName, Aws::String&& headerValue) +{ + headerMap.emplace(StringUtils::ToLower(headerName.c_str()), std::move(headerValue)); +} + From 2b5b9785deba1282ec5f65fc53ea2068adef9c83 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Thu, 12 Jan 2023 15:34:45 -0800 Subject: [PATCH 04/19] Working http implementation with passing integration testsgit statusgit status --- aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp | 9 +++++---- crt/aws-crt-cpp | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 81dbc04279a6..f5e6eebd88e1 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -71,7 +71,7 @@ namespace Aws for (const auto& header : request->GetHeaders()) { Crt::Http::HttpHeader crtHeader; - + crtHeader.name = Crt::ByteCursorFromArray((const uint8_t *)header.first.data(), header.first.length());; crtHeader.value = Crt::ByteCursorFromArray((const uint8_t *)header.second.data(), header.second.length()); crtRequest->AddHeader(crtHeader); @@ -92,10 +92,11 @@ namespace Aws { crtRequest->SetBody(request->GetContentBody()); } - + auto response = Aws::MakeShared("CRTHttpClient", request); - + Crt::Http::HttpRequestOptions requestOptions; + requestOptions.request = crtRequest.get(); requestOptions.onIncomingBody = [response](Crt::Http::HttpStream&, const Crt::ByteCursor& body) { @@ -127,7 +128,7 @@ namespace Aws std::mutex waiterLock; std::condition_variable waiterCVar; bool waitCompletedIntentionally = false; - + requestOptions.onStreamComplete = [&waiterCVar, &waiterLock, &waitCompletedIntentionally, &response](Crt::Http::HttpStream&, int errorCode) { diff --git a/crt/aws-crt-cpp b/crt/aws-crt-cpp index 0a9e0ad7ab07..203ad6d373a4 160000 --- a/crt/aws-crt-cpp +++ b/crt/aws-crt-cpp @@ -1 +1 @@ -Subproject commit 0a9e0ad7ab07113c65b4846ece3a386407c9c0d3 +Subproject commit 203ad6d373a49600dad2dda2a6723773f84a28d4 From 02506ec039bd313cf4319028f8b330f8ef37276b Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Fri, 13 Jan 2023 21:41:18 -0800 Subject: [PATCH 05/19] Well it was working, and then i wrote a memory bug, but to find it i need valgrind. --- .../include/aws/core/http/crt/CRTHttpClient.h | 23 ++- .../source/http/crt/CRTHttpClient.cpp | 186 +++++++++++++++--- 2 files changed, 173 insertions(+), 36 deletions(-) diff --git a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h index a6ffa48bb24f..33d5b4d8aa5e 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h +++ b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h @@ -1,11 +1,15 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ #include #include -#include #include #include #include +#include namespace Aws { @@ -30,6 +34,9 @@ namespace Aws namespace Http { + /** + * Common Runtime implementation of AWS SDK for C++ HttpClient interface. + */ class AWS_CORE_API CRTHttpClient : public HttpClient { public: using Base = HttpClient; @@ -38,13 +45,16 @@ namespace Aws * Initializes the client with relevant parameters from clientConfig. */ CRTHttpClient(const Aws::Client::ClientConfiguration& clientConfig, Crt::Io::ClientBootstrap& bootstrap); - ~CRTHttpClient(); + ~CRTHttpClient() override; std::shared_ptr MakeRequest(const std::shared_ptr& request, - Aws::Utils::RateLimits::RateLimiterInterface* readLimiter = nullptr, - Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter = nullptr) const override; + Aws::Utils::RateLimits::RateLimiterInterface* readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) const override; private: + // Yeah, I know, but someone made MakeRequest() const and didn't think about the fact that + // making an HTTP request most certainly mutates state. It was me. I'm the person that did that, and + // now we're stuck with it. Thanks me. mutable std::unordered_map> m_connectionPools; mutable std::mutex m_connectionPoolLock; @@ -52,14 +62,13 @@ namespace Aws Crt::Optional m_proxyOptions; Crt::Io::ClientBootstrap& m_bootstrap; - Client::ClientConfiguration m_configuration; std::shared_ptr GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& connectionOptions) const; - static Aws::String ResolveConnectionPoolHash(const URI& uri); - Crt::Http::HttpClientConnectionOptions CreateConnectionOptionsForRequest(const std::shared_ptr& request) const; void CheckAndInitializeProxySettings(const Aws::Client::ClientConfiguration& clientConfig); + + static Aws::String ResolveConnectionPoolHash(const URI& uri); }; } } \ No newline at end of file diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index f5e6eebd88e1..59baf2b1c68d 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -1,16 +1,51 @@ - +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ #include -#include -#include #include #include +#include + +#include +#include + +// Adapts AWS SDK input streams and rate limiters to the CRT input stream reading model. +class ThrottleAwareInputStream : public Crt::Io::StdIOStreamInputStream { +public: + ThrottleAwareInputStream(Utils::RateLimits::RateLimiterInterface& rateLimiter, std::shared_ptr stream, + Aws::Crt::Allocator *allocator = Crt::ApiAllocator()) noexcept : Crt::Io::StdIOStreamInputStream(std::move(stream), allocator), m_rateLimiter(rateLimiter){ + + } +protected: + bool ReadImpl(Crt::ByteBuf &buffer) noexcept override + { + // initial check to see if we should avoid reading for the moment. + if (m_rateLimiter.ApplyCost(0) == std::chrono::milliseconds(0)) { + size_t currentPos = buffer.len; + + // now do the read. We may over read by an IO buffer size, but it's fine. The throttle will still + // kick-in in plenty of time. + bool retValue = Crt::Io::StdIOStreamInputStream::ReadImpl(buffer); + + size_t newPos = buffer.len; + AWS_ASSERT(newPos >= currentPos && !"the buffer length should not have decreased in value."); + // now actually reduce the window. + m_rateLimiter.ApplyCost(static_cast(newPos - currentPos)); + return retValue; + } + + return true; + } +private: + Utils::RateLimits::RateLimiterInterface& m_rateLimiter; +}; namespace Aws { namespace Http { - CRTHttpClient::CRTHttpClient(const Aws::Client::ClientConfiguration& clientConfig, Crt::Io::ClientBootstrap& bootstrap) : HttpClient(), m_context(), m_proxyOptions(), m_bootstrap(bootstrap), m_configuration(clientConfig) { @@ -42,6 +77,8 @@ namespace Aws m_context = std::move(newContext); } + // this isn't entirely necessary, but if you want to be nice to debuggers and memory checkers, let's go ahead + // and shut everything down cleanly. CRTHttpClient::~CRTHttpClient() { Aws::Vector> shutdownFutures; @@ -68,6 +105,8 @@ namespace Aws auto connectionManager = GetWithCreateConnectionManagerForRequest(request, requestConnOptions); auto crtRequest = Crt::MakeShared(Crt::g_allocator); + + //Add http headers to the request. for (const auto& header : request->GetHeaders()) { Crt::Http::HttpHeader crtHeader; @@ -77,32 +116,48 @@ namespace Aws crtRequest->AddHeader(crtHeader); } + // HTTP method, GET, PUT, DELETE, etc... auto methodCursor = Crt::ByteCursorFromCString(Aws::Http::HttpMethodMapper::GetNameForHttpMethod(request->GetMethod())); crtRequest->SetMethod(methodCursor); + // Path portion of the request auto pathStrCpy = request->GetUri().GetURLEncodedPathRFC3986(); auto queryStrCpy = request->GetUri().GetQueryString(); Aws::StringStream ss; + + //CRT client has you pass the query string as part of the path. concatenate that here. ss << pathStrCpy << queryStrCpy; auto fullPathAndQueryCpy = ss.str(); - auto pathCursor = Crt::ByteCursorFromCString(fullPathAndQueryCpy.c_str()); + auto pathCursor = Crt::ByteCursorFromArray((uint8_t *)fullPathAndQueryCpy.c_str(), fullPathAndQueryCpy.length()); crtRequest->SetPath(pathCursor); + // Set the request body stream on the crt request. Setup the write rate limiter if present if (request->GetContentBody()) { - crtRequest->SetBody(request->GetContentBody()); + // need to hook up back pressure to plug the read limiter in. But the write direction is fairly simple. + if (m_configuration.writeRateLimiter) + { + crtRequest->SetBody(Aws::MakeShared("CRTHttpClient", *m_configuration.writeRateLimiter, request->GetContentBody())); + } + else + { + crtRequest->SetBody(request->GetContentBody()); + } } auto response = Aws::MakeShared("CRTHttpClient", request); Crt::Http::HttpRequestOptions requestOptions; requestOptions.request = crtRequest.get(); + + // When data is received from the content body of the incoming response, just copy it to the output stream. requestOptions.onIncomingBody = [response](Crt::Http::HttpStream&, const Crt::ByteCursor& body) { - response->GetResponseBody().write((const char*)body.ptr, body.len); + response->GetResponseBody().write((const char*)body.ptr, static_cast(body.len)); }; + // on response headers arriving, write them to the response. requestOptions.onIncomingHeaders = [response](Crt::Http::HttpStream&, enum aws_http_header_block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount) { @@ -115,20 +170,20 @@ namespace Aws } }; + // This will arrive at or around the same time as the headers. Use it to set the response code on the response requestOptions.onIncomingHeadersBlockDone = - [response](Crt::Http::HttpStream& stream, enum aws_http_header_block block) + [response](Crt::Http::HttpStream& stream, enum aws_http_header_block) { - if (block == AWS_HTTP_HEADER_BLOCK_MAIN) - { - // gotta set it somewhere, and this seems to get called at a good enough time to do it, and it only gets called once. response->SetResponseCode((HttpResponseCode)stream.GetResponseStatusCode()); - } }; + // CRT client is async only so we'll need to do the synchronous part ourselves. + // We'll use a condition variable and wait on it until the request completes or errors out. std::mutex waiterLock; std::condition_variable waiterCVar; bool waitCompletedIntentionally = false; + // Request is done. If there was an error set it, otherwise just wake up the cvar. requestOptions.onStreamComplete = [&waiterCVar, &waiterLock, &waitCompletedIntentionally, &response](Crt::Http::HttpStream&, int errorCode) { @@ -145,28 +200,87 @@ namespace Aws waiterCVar.notify_all(); }; + std::shared_ptr connectionRef = nullptr; + + // now we finally have the request, get a connection and make the request. + // if the connection acquisition failed, go ahead and fail the request and wakeup the cvar. connectionManager->AcquireConnection( - [requestOptions, response, &waitCompletedIntentionally, &waiterCVar, &waiterLock] + [&connectionRef, requestOptions, response, &waitCompletedIntentionally, &waiterCVar, &waiterLock] (std::shared_ptr connection, int errorCode) { - if (connection) - { - auto clientStream = connection->NewClientStream(requestOptions); - clientStream->Activate(); - } - else - { - response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); - response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + if (connection) + { + { + std::lock_guard locker(waiterLock); + connectionRef = connection; + } + + auto clientStream = connection->NewClientStream(requestOptions); + + // if client stream is nullptr, something went wrong. This SHOULDNT happen + // because it's usually something not using the API correctly, but + // there's probably a complex set of interactions that can result in in-proper use of the API over-time, + // and we need to just surface the error rather than bringing down the whole process with an assertion. + // If we have a valid stream go ahead and return allowing the request to continue. + if (clientStream) { + clientStream->Activate(); + return; + } + + response->SetClientErrorType(Aws::Client::CoreErrors::INVALID_PARAMETER_COMBINATION); + response->SetClientErrorMessage(aws_error_debug_str(aws_last_error())); + } + else + { + + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + } + { std::lock_guard locker(waiterLock); waitCompletedIntentionally = true; } waiterCVar.notify_all(); - } - }); + }); std::unique_lock cvarUniqueLock(waiterLock); - waiterCVar.wait(cvarUniqueLock, [&waitCompletedIntentionally]() { return waitCompletedIntentionally; }); + + // Naive http request timeout implementation. This doesn't factor in how long it took to get the connection from the pool, and + // I'm undecided on the queueing theory implications of this decision so if this turns out to be the wrong granularity + // this is the section of code you should be changing. You can probably get "close" by having an additional + // atomic (not necessarily full on atomics implementation, but it needs to be the size of a WORD if it's not) + // counter that gets incremented in the acquireConnection callback as long as your connection timeout + // is shorter than your request timeout. Even if it's not, that would handle like.... 4-5 nines of getting this right. + // since in the worst case scenario, your connect timeout got preempted by the request timeout, and is it really worth + // all that effort if that's the worst thing that can happen? + if (m_configuration.requestTimeoutMs > 0) + { + auto requestExpiryTime = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(m_configuration.requestTimeoutMs); + waiterCVar.wait_until(cvarUniqueLock, requestExpiryTime, [&connectionRef, &waitCompletedIntentionally, requestExpiryTime, response] () + { + // If the request is done, we don't care about the timeout. + if (waitCompletedIntentionally) return true; + + // if this predicate was triggered because the cvar timed out, this branch will be taken. + // if it was triggered spuriously, this branch will be missed. + if (std::chrono::high_resolution_clock::now() >= requestExpiryTime) + { + response->SetClientErrorType(Aws::Client::CoreErrors::REQUEST_TIMEOUT); + response->SetClientErrorMessage("Request Timeout Has Expired"); + + if (connectionRef) + { + connectionRef->Close(); + } + } + + // go back to sleep to try again later. + return false; + }); + } else + { + waiterCVar.wait(cvarUniqueLock, [&waitCompletedIntentionally] () { return &waitCompletedIntentionally; }); + } return response; } @@ -179,6 +293,10 @@ namespace Aws return ss.str(); } + // The main purpose of this is to ensure there's exactly one connection manager per unique endpoint. + // To do so, we simply keep a hash table of the hashed endpoint (see ResolveConnectionPoolHash()), and + // put a connection manager for that endpoint as the value. + // This runs in multiple threads potentially so there's a lock around it. std::shared_ptr CRTHttpClient::GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& options) const { const auto connManagerRequestHash = ResolveConnectionPoolHash(request->GetUri()); @@ -187,16 +305,21 @@ namespace Aws const auto& foundManager = m_connectionPools.find(connManagerRequestHash); + // We've already got one, return it. if (foundManager != m_connectionPools.cend()) { return foundManager->second; } + // don't have a manager for this endpoint, so create one for it. Crt::Http::HttpClientConnectionManagerOptions connectionManagerOptions; connectionManagerOptions.ConnectionOptions = options; connectionManagerOptions.MaxConnections = m_configuration.maxConnections; connectionManagerOptions.EnableBlockingShutdown = true; + // need to bind out Monitoring options to handle the read timeout config value. + // once done, come back and use it to setup read timeouts. auto connectionManager = Crt::Http::HttpClientConnectionManager::NewClientConnectionManager(connectionManagerOptions); + // put it in the hash table and return it. m_connectionPools.emplace(connManagerRequestHash, connectionManager); return connectionManager; @@ -204,8 +327,10 @@ namespace Aws Crt::Http::HttpClientConnectionOptions CRTHttpClient::CreateConnectionOptionsForRequest(const std::shared_ptr& request) const { + // connection options are unique per request, this is mostly just connection-level configuration mapping. + Crt::Http::HttpClientConnectionOptions connectionOptions; - connectionOptions.HostName = request->GetUri().GetAuthority().c_str(); + connectionOptions.HostName = request->GetUri().GetAuthority(); // probably want to come back and update this when we hook up the rate limiters. connectionOptions.ManualWindowManagement = false; connectionOptions.Port = request->GetUri().GetPort(); @@ -238,6 +363,9 @@ namespace Aws return connectionOptions; } + // The proxy config is pretty hefty, so we don't want to create one for each request when we don't have to. + // This converts whatever proxy settings are in clientConfig to CRT specific proxy settings. + // It then sets it on the member variable for re-use elsewhere. void CRTHttpClient::CheckAndInitializeProxySettings(const Aws::Client::ClientConfiguration& clientConfig) { if (!clientConfig.proxyHost.empty()) @@ -247,11 +375,11 @@ namespace Aws if (!clientConfig.proxyUserName.empty()) { proxyOptions.AuthType = Crt::Http::AwsHttpProxyAuthenticationType::Basic; - proxyOptions.BasicAuthUsername = clientConfig.proxyUserName.c_str(); - proxyOptions.BasicAuthPassword = clientConfig.proxyPassword.c_str(); + proxyOptions.BasicAuthUsername = clientConfig.proxyUserName; + proxyOptions.BasicAuthPassword = clientConfig.proxyPassword; } - proxyOptions.HostName = m_configuration.proxyHost.c_str(); + proxyOptions.HostName = m_configuration.proxyHost; if (clientConfig.proxyPort != 0) { From 9d8ff8b83b78bb4283d6ed6a6a41e5139dd93628 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Tue, 17 Jan 2023 11:32:23 -0800 Subject: [PATCH 06/19] Tweaks so i can run valgrind. --- .../source/http/crt/CRTHttpClient.cpp | 123 ++++++++++-------- 1 file changed, 66 insertions(+), 57 deletions(-) diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 59baf2b1c68d..a4a6dfc63a17 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -150,57 +150,58 @@ namespace Aws Crt::Http::HttpRequestOptions requestOptions; requestOptions.request = crtRequest.get(); + // CRT client is async only so we'll need to do the synchronous part ourselves. + // We'll use a condition variable and wait on it until the request completes or errors out. + std::mutex waiterLock; + std::condition_variable waiterCVar; + bool waitCompletedIntentionally = false; + // When data is received from the content body of the incoming response, just copy it to the output stream. requestOptions.onIncomingBody = - [response](Crt::Http::HttpStream&, const Crt::ByteCursor& body) + [response, &waiterLock](Crt::Http::HttpStream&, const Crt::ByteCursor& body) { + std::lock_guard locker(waiterLock); response->GetResponseBody().write((const char*)body.ptr, static_cast(body.len)); }; // on response headers arriving, write them to the response. requestOptions.onIncomingHeaders = - [response](Crt::Http::HttpStream&, enum aws_http_header_block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount) + [response, &waiterLock](Crt::Http::HttpStream&, enum aws_http_header_block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount) { for (size_t i = 0; i < headersCount; ++i) { const Crt::Http::HttpHeader* header = &headersArray[i]; Aws::String headerNameStr((const char* const)header->name.ptr, header->name.len); Aws::String headerValueStr((const char* const)header->value.ptr, header->value.len); + std::lock_guard locker(waiterLock); response->AddHeader(headerNameStr, std::move(headerValueStr)); } }; // This will arrive at or around the same time as the headers. Use it to set the response code on the response requestOptions.onIncomingHeadersBlockDone = - [response](Crt::Http::HttpStream& stream, enum aws_http_header_block) + [response, &waiterLock](Crt::Http::HttpStream& stream, enum aws_http_header_block) { - response->SetResponseCode((HttpResponseCode)stream.GetResponseStatusCode()); + std::lock_guard locker(waiterLock); + response->SetResponseCode((HttpResponseCode)stream.GetResponseStatusCode()); }; - // CRT client is async only so we'll need to do the synchronous part ourselves. - // We'll use a condition variable and wait on it until the request completes or errors out. - std::mutex waiterLock; - std::condition_variable waiterCVar; - bool waitCompletedIntentionally = false; - // Request is done. If there was an error set it, otherwise just wake up the cvar. requestOptions.onStreamComplete = [&waiterCVar, &waiterLock, &waitCompletedIntentionally, &response](Crt::Http::HttpStream&, int errorCode) { if (errorCode) { + std::lock_guard locker(waiterLock); /* come back to this one and get the right error parsed out. */ response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); response->SetClientErrorMessage(aws_error_debug_str(errorCode)); - } - { - std::lock_guard locker(waiterLock); waitCompletedIntentionally = true; } waiterCVar.notify_all(); }; - std::shared_ptr connectionRef = nullptr; + std::shared_ptr connectionRef(nullptr); // now we finally have the request, get a connection and make the request. // if the connection acquisition failed, go ahead and fail the request and wakeup the cvar. @@ -209,13 +210,13 @@ namespace Aws (std::shared_ptr connection, int errorCode) { if (connection) { + auto clientStream = connection->NewClientStream(requestOptions); + { std::lock_guard locker(waiterLock); connectionRef = connection; } - auto clientStream = connection->NewClientStream(requestOptions); - // if client stream is nullptr, something went wrong. This SHOULDNT happen // because it's usually something not using the API correctly, but // there's probably a complex set of interactions that can result in in-proper use of the API over-time, @@ -226,60 +227,68 @@ namespace Aws return; } - response->SetClientErrorType(Aws::Client::CoreErrors::INVALID_PARAMETER_COMBINATION); - response->SetClientErrorMessage(aws_error_debug_str(aws_last_error())); + { + std::lock_guard locker(waiterLock); + response->SetClientErrorType(Aws::Client::CoreErrors::INVALID_PARAMETER_COMBINATION); + response->SetClientErrorMessage(aws_error_debug_str(aws_last_error())); + waitCompletedIntentionally = true; + } } else { + std::lock_guard locker(waiterLock); response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); response->SetClientErrorMessage(aws_error_debug_str(errorCode)); - } - - { - std::lock_guard locker(waiterLock); waitCompletedIntentionally = true; } + waiterCVar.notify_all(); }); - std::unique_lock cvarUniqueLock(waiterLock); - - // Naive http request timeout implementation. This doesn't factor in how long it took to get the connection from the pool, and - // I'm undecided on the queueing theory implications of this decision so if this turns out to be the wrong granularity - // this is the section of code you should be changing. You can probably get "close" by having an additional - // atomic (not necessarily full on atomics implementation, but it needs to be the size of a WORD if it's not) - // counter that gets incremented in the acquireConnection callback as long as your connection timeout - // is shorter than your request timeout. Even if it's not, that would handle like.... 4-5 nines of getting this right. - // since in the worst case scenario, your connect timeout got preempted by the request timeout, and is it really worth - // all that effort if that's the worst thing that can happen? - if (m_configuration.requestTimeoutMs > 0) { - auto requestExpiryTime = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(m_configuration.requestTimeoutMs); - waiterCVar.wait_until(cvarUniqueLock, requestExpiryTime, [&connectionRef, &waitCompletedIntentionally, requestExpiryTime, response] () + std::unique_lock cvarUniqueLock(waiterLock); + + // Naive http request timeout implementation. This doesn't factor in how long it took to get the connection from the pool, and + // I'm undecided on the queueing theory implications of this decision so if this turns out to be the wrong granularity + // this is the section of code you should be changing. You can probably get "close" by having an additional + // atomic (not necessarily full on atomics implementation, but it needs to be the size of a WORD if it's not) + // counter that gets incremented in the acquireConnection callback as long as your connection timeout + // is shorter than your request timeout. Even if it's not, that would handle like.... 4-5 nines of getting this right. + // since in the worst case scenario, your connect timeout got preempted by the request timeout, and is it really worth + // all that effort if that's the worst thing that can happen? + if (m_configuration.requestTimeoutMs > 0) { - // If the request is done, we don't care about the timeout. - if (waitCompletedIntentionally) return true; - - // if this predicate was triggered because the cvar timed out, this branch will be taken. - // if it was triggered spuriously, this branch will be missed. - if (std::chrono::high_resolution_clock::now() >= requestExpiryTime) - { - response->SetClientErrorType(Aws::Client::CoreErrors::REQUEST_TIMEOUT); - response->SetClientErrorMessage("Request Timeout Has Expired"); - - if (connectionRef) - { - connectionRef->Close(); - } - } - - // go back to sleep to try again later. - return false; - }); - } else - { - waiterCVar.wait(cvarUniqueLock, [&waitCompletedIntentionally] () { return &waitCompletedIntentionally; }); + auto requestExpiryTime = std::chrono::high_resolution_clock::now() + + std::chrono::milliseconds(m_configuration.requestTimeoutMs); + waiterCVar.wait_until(cvarUniqueLock, requestExpiryTime, + [&connectionRef, &waitCompletedIntentionally, requestExpiryTime, response]() + { + std::cerr << "boo" << std::endl; + // If the request is done, we don't care about the timeout. + if (waitCompletedIntentionally) return true; + + // if this predicate was triggered because the cvar timed out, this branch will be taken. + // if it was triggered spuriously, this branch will be missed. + if (std::chrono::high_resolution_clock::now() >= requestExpiryTime) + { + response->SetClientErrorType( + Aws::Client::CoreErrors::REQUEST_TIMEOUT); + response->SetClientErrorMessage("Request Timeout Has Expired"); + + if (connectionRef) + { + connectionRef->Close(); + } + } + + // go back to sleep to try again later. + return false; + }); + } else { + waiterCVar.wait(cvarUniqueLock, + [&waitCompletedIntentionally]() { return waitCompletedIntentionally; }); + } } return response; From afe3eb2e113f8b9e9b4121dc1af277b499042663 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Tue, 17 Jan 2023 12:00:42 -0800 Subject: [PATCH 07/19] Fix gcc build issue. --- aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index a4a6dfc63a17..096ba82123de 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -339,7 +339,7 @@ namespace Aws // connection options are unique per request, this is mostly just connection-level configuration mapping. Crt::Http::HttpClientConnectionOptions connectionOptions; - connectionOptions.HostName = request->GetUri().GetAuthority(); + connectionOptions.HostName = request->GetUri().GetAuthority().c_str(); // probably want to come back and update this when we hook up the rate limiters. connectionOptions.ManualWindowManagement = false; connectionOptions.Port = request->GetUri().GetPort(); @@ -384,11 +384,11 @@ namespace Aws if (!clientConfig.proxyUserName.empty()) { proxyOptions.AuthType = Crt::Http::AwsHttpProxyAuthenticationType::Basic; - proxyOptions.BasicAuthUsername = clientConfig.proxyUserName; - proxyOptions.BasicAuthPassword = clientConfig.proxyPassword; + proxyOptions.BasicAuthUsername = clientConfig.proxyUserName.c_str(); + proxyOptions.BasicAuthPassword = clientConfig.proxyPassword.c_str(); } - proxyOptions.HostName = m_configuration.proxyHost; + proxyOptions.HostName = m_configuration.proxyHost.c_str(); if (clientConfig.proxyPort != 0) { From 2feebf9ea43691f48552bd08eebc7cd20e6379cd Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Tue, 17 Jan 2023 12:02:39 -0800 Subject: [PATCH 08/19] Remove extra ; --- aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h index 33d5b4d8aa5e..a70fba986fb7 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h +++ b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h @@ -24,7 +24,7 @@ namespace Aws namespace Io { class ClientBootstrap; - }; + } } namespace Client From c2abd6d4f78dc12fc314e157f5770fe8c4bc0beb Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Tue, 17 Jan 2023 12:09:26 -0800 Subject: [PATCH 09/19] Extend timeout for SQS tests. --- aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp | 1 - aws-cpp-sdk-sqs-integration-tests/QueueOperationTest.cpp | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 096ba82123de..e5813946c71f 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -264,7 +264,6 @@ namespace Aws waiterCVar.wait_until(cvarUniqueLock, requestExpiryTime, [&connectionRef, &waitCompletedIntentionally, requestExpiryTime, response]() { - std::cerr << "boo" << std::endl; // If the request is done, we don't care about the timeout. if (waitCompletedIntentionally) return true; diff --git a/aws-cpp-sdk-sqs-integration-tests/QueueOperationTest.cpp b/aws-cpp-sdk-sqs-integration-tests/QueueOperationTest.cpp index 76ea42bf947d..287f460a9037 100644 --- a/aws-cpp-sdk-sqs-integration-tests/QueueOperationTest.cpp +++ b/aws-cpp-sdk-sqs-integration-tests/QueueOperationTest.cpp @@ -100,6 +100,7 @@ class QueueOperationTest : public ::testing::Test config.proxyHost = PROXY_HOST; config.proxyPort = PROXY_PORT; #endif + config.requestTimeoutMs = 20000; return config; } From 299d561ef7488bac4233f405937277b32d30164a Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Tue, 17 Jan 2023 14:45:00 -0800 Subject: [PATCH 10/19] review ready of using the CRT for HTTP. --- CMakeLists.txt | 1 + aws-cpp-sdk-core/CMakeLists.txt | 14 +-- .../include/aws/core/http/crt/CRTHttpClient.h | 2 +- .../source/http/crt/CRTHttpClient.cpp | 94 ++++++++++--------- cmake/external_dependencies.cmake | 4 +- 5 files changed, 63 insertions(+), 52 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d38777052d6d..162a3bcb4fac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,6 +36,7 @@ option(BUILD_SHARED_LIBS "If enabled, all aws sdk libraries will be build as sha option(FORCE_SHARED_CRT "If enabled, will unconditionally link the standard libraries in dynamically, otherwise the standard library will be linked in based on the BUILD_SHARED_LIBS setting" ON) option(SIMPLE_INSTALL "If enabled, removes all the additional indirection (platform/cpu/config) in the bin and lib directories on the install step" ON) option(NO_HTTP_CLIENT "If enabled, no platform-default http client will be included in the library. For the library to be used you will need to provide your own platform-specific implementation" OFF) +option(USE_CRT_HTTP_CLIENT "If enabled, The common runtime HTTP client will be used, and the legacy systems such as WinHttp and libcurl will not be built or included" OFF) option(NO_ENCRYPTION "If enabled, no platform-default encryption will be included in the library. For the library to be used you will need to provide your own platform-specific implementations" OFF) option(USE_IXML_HTTP_REQUEST_2 "If enabled on windows, the com object IXmlHttpRequest2 will be used for the http stack" OFF) option(ENABLE_RTTI "Flag to enable/disable rtti within the library" ON) diff --git a/aws-cpp-sdk-core/CMakeLists.txt b/aws-cpp-sdk-core/CMakeLists.txt index e5d31fc971c8..c2a4466c830d 100644 --- a/aws-cpp-sdk-core/CMakeLists.txt +++ b/aws-cpp-sdk-core/CMakeLists.txt @@ -197,9 +197,11 @@ elseif(ENABLE_WINDOWS_CLIENT) unset(CMAKE_REQUIRED_LIBRARIES) endif() +elseif(USE_CRT_HTTP_CLIENT) + file(GLOB CRT_HTTP_HEADERS "include/aws/core/http/crt/*.h") + file(GLOB CRT_HTTP_SOURCE "${CMAKE_CURRENT_SOURCE_DIR}/source/http/crt/*.cpp") endif() -file(GLOB CRT_HTTP_HEADERS "include/aws/core/http/crt/*.h") -file(GLOB CRT_HTTP_SOURCE "${CMAKE_CURRENT_SOURCE_DIR}/source/http/crt/*.cpp") + if (PLATFORM_WINDOWS) file(GLOB NET_SOURCE "${CMAKE_CURRENT_SOURCE_DIR}/source/net/windows/*.cpp") @@ -453,8 +455,9 @@ if(MSVC) source_group("Source Files\\http\\curl" FILES ${HTTP_CURL_CLIENT_SOURCE}) elseif(ENABLE_WINDOWS_CLIENT) source_group("Source Files\\http\\windows" FILES ${HTTP_WINDOWS_CLIENT_SOURCE}) + elseif(USE_CRT_HTTP_CLIENT) + source_group("Source Files\\http\\crt" FILES ${CRT_HTTP_SOURCE}) endif() - source_group("Source Files\\http\\crt" FILES ${CRT_HTTP_SOURCE}) # encryption conditional source @@ -476,8 +479,6 @@ target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_SDK_VERSION_MAJOR=${AWSSD target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_SDK_VERSION_MINOR=${AWSSDK_VERSION_MINOR}") target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_SDK_VERSION_PATCH=${AWSSDK_VERSION_PATCH}") -target_compile_definitions(${PROJECT_NAME} PRIVATE "AWS_SDK_USE_CRT_HTTP") - if (WININET_HAS_H2) target_compile_definitions(${PROJECT_NAME} PRIVATE "WININET_HAS_H2") endif() @@ -636,8 +637,9 @@ if(ENABLE_CURL_CLIENT) install (FILES ${HTTP_CURL_CLIENT_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/core/http/curl) elseif(ENABLE_WINDOWS_CLIENT) install (FILES ${HTTP_WINDOWS_CLIENT_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/core/http/windows) -endif() +elseif(USE_CRT_HTTP_CLIENT) install (FILES ${CRT_HTTP_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/core/http/crt) +endif() # encryption headers diff --git a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h index a70fba986fb7..24f202e33e3c 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h +++ b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h @@ -68,7 +68,7 @@ namespace Aws Crt::Http::HttpClientConnectionOptions CreateConnectionOptionsForRequest(const std::shared_ptr& request) const; void CheckAndInitializeProxySettings(const Aws::Client::ClientConfiguration& clientConfig); - static Aws::String ResolveConnectionPoolHash(const URI& uri); + static Aws::String ResolveConnectionPoolKey(const URI& uri); }; } } \ No newline at end of file diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index e5813946c71f..0f8839a92d0d 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -10,6 +10,8 @@ #include #include +static const char *const CRT_HTTP_CLIENT_TAG = "CRTHttpClient"; + // Adapts AWS SDK input streams and rate limiters to the CRT input stream reading model. class ThrottleAwareInputStream : public Crt::Io::StdIOStreamInputStream { public: @@ -98,8 +100,8 @@ namespace Aws } std::shared_ptr CRTHttpClient::MakeRequest(const std::shared_ptr& request, - Aws::Utils::RateLimits::RateLimiterInterface*, - Aws::Utils::RateLimits::RateLimiterInterface*) const + Aws::Utils::RateLimits::RateLimiterInterface*, + Aws::Utils::RateLimits::RateLimiterInterface*) const { auto requestConnOptions = CreateConnectionOptionsForRequest(request); auto connectionManager = GetWithCreateConnectionManagerForRequest(request, requestConnOptions); @@ -137,7 +139,7 @@ namespace Aws // need to hook up back pressure to plug the read limiter in. But the write direction is fairly simple. if (m_configuration.writeRateLimiter) { - crtRequest->SetBody(Aws::MakeShared("CRTHttpClient", *m_configuration.writeRateLimiter, request->GetContentBody())); + crtRequest->SetBody(Aws::MakeShared(CRT_HTTP_CLIENT_TAG, *m_configuration.writeRateLimiter, request->GetContentBody())); } else { @@ -145,7 +147,7 @@ namespace Aws } } - auto response = Aws::MakeShared("CRTHttpClient", request); + auto response = Aws::MakeShared(CRT_HTTP_CLIENT_TAG, request); Crt::Http::HttpRequestOptions requestOptions; requestOptions.request = crtRequest.get(); @@ -158,31 +160,28 @@ namespace Aws // When data is received from the content body of the incoming response, just copy it to the output stream. requestOptions.onIncomingBody = - [response, &waiterLock](Crt::Http::HttpStream&, const Crt::ByteCursor& body) + [response](Crt::Http::HttpStream&, const Crt::ByteCursor& body) { - std::lock_guard locker(waiterLock); response->GetResponseBody().write((const char*)body.ptr, static_cast(body.len)); }; // on response headers arriving, write them to the response. requestOptions.onIncomingHeaders = - [response, &waiterLock](Crt::Http::HttpStream&, enum aws_http_header_block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount) + [response](Crt::Http::HttpStream&, enum aws_http_header_block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount) { for (size_t i = 0; i < headersCount; ++i) { const Crt::Http::HttpHeader* header = &headersArray[i]; Aws::String headerNameStr((const char* const)header->name.ptr, header->name.len); Aws::String headerValueStr((const char* const)header->value.ptr, header->value.len); - std::lock_guard locker(waiterLock); response->AddHeader(headerNameStr, std::move(headerValueStr)); } }; // This will arrive at or around the same time as the headers. Use it to set the response code on the response requestOptions.onIncomingHeadersBlockDone = - [response, &waiterLock](Crt::Http::HttpStream& stream, enum aws_http_header_block) + [response](Crt::Http::HttpStream& stream, enum aws_http_header_block) { - std::lock_guard locker(waiterLock); response->SetResponseCode((HttpResponseCode)stream.GetResponseStatusCode()); }; @@ -192,13 +191,17 @@ namespace Aws { if (errorCode) { - std::lock_guard locker(waiterLock); /* come back to this one and get the right error parsed out. */ response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + } + + { + std::lock_guard locker(waiterLock); waitCompletedIntentionally = true; } - waiterCVar.notify_all(); + + waiterCVar.notify_one(); }; std::shared_ptr connectionRef(nullptr); @@ -228,22 +231,23 @@ namespace Aws } { - std::lock_guard locker(waiterLock); response->SetClientErrorType(Aws::Client::CoreErrors::INVALID_PARAMETER_COMBINATION); response->SetClientErrorMessage(aws_error_debug_str(aws_last_error())); + std::lock_guard locker(waiterLock); waitCompletedIntentionally = true; } } else { - std::lock_guard locker(waiterLock); + const char *error_msg = aws_error_debug_str(errorCode); response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); - response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + response->SetClientErrorMessage(error_msg); + std::lock_guard locker(waiterLock); waitCompletedIntentionally = true; } - waiterCVar.notify_all(); + waiterCVar.notify_one(); }); { @@ -257,57 +261,59 @@ namespace Aws // is shorter than your request timeout. Even if it's not, that would handle like.... 4-5 nines of getting this right. // since in the worst case scenario, your connect timeout got preempted by the request timeout, and is it really worth // all that effort if that's the worst thing that can happen? - if (m_configuration.requestTimeoutMs > 0) + if (m_configuration.requestTimeoutMs > 0 ) { auto requestExpiryTime = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(m_configuration.requestTimeoutMs); - waiterCVar.wait_until(cvarUniqueLock, requestExpiryTime, - [&connectionRef, &waitCompletedIntentionally, requestExpiryTime, response]() + auto waiterResult = waiterCVar.wait_until(cvarUniqueLock, requestExpiryTime, + [&waitCompletedIntentionally, response]() { // If the request is done, we don't care about the timeout. - if (waitCompletedIntentionally) return true; - - // if this predicate was triggered because the cvar timed out, this branch will be taken. - // if it was triggered spuriously, this branch will be missed. - if (std::chrono::high_resolution_clock::now() >= requestExpiryTime) - { - response->SetClientErrorType( - Aws::Client::CoreErrors::REQUEST_TIMEOUT); - response->SetClientErrorMessage("Request Timeout Has Expired"); - - if (connectionRef) - { - connectionRef->Close(); - } - } - - // go back to sleep to try again later. - return false; + return waitCompletedIntentionally; + }); - } else { - waiterCVar.wait(cvarUniqueLock, - [&waitCompletedIntentionally]() { return waitCompletedIntentionally; }); + // if this is false, the cvar timed out without a terminal condition being reached. + if (!waiterResult) + { + response->SetClientErrorType( + Aws::Client::CoreErrors::REQUEST_TIMEOUT); + response->SetClientErrorMessage("Request Timeout Has Expired"); + + // close the connection if it's still there so we can expedite anything we're waiting on. + if (connectionRef) + { + connectionRef->Close(); + } + } } + + // always wait, even if the above section timed out, because waitCompletedIntentionally isn't set yet + // and this means we're still waiting on some queued up callbacks to fire. + // going past this point before that occurs will cause a segfault when the callback DOES finally fire + // since the mutex and the condition-variable are both on the stack. + waiterCVar.wait(cvarUniqueLock, + [&waitCompletedIntentionally]() { return waitCompletedIntentionally; }); } return response; } - Aws::String CRTHttpClient::ResolveConnectionPoolHash(const URI& uri) + Aws::String CRTHttpClient::ResolveConnectionPoolKey(const URI& uri) { + // create a unique key for this endpoint. Aws::StringStream ss; - ss << SchemeMapper::ToString(uri.GetScheme()) << "://" << uri.GetAuthority() << uri.GetPort(); + ss << SchemeMapper::ToString(uri.GetScheme()) << "://" << uri.GetAuthority() << ":" << uri.GetPort(); return ss.str(); } // The main purpose of this is to ensure there's exactly one connection manager per unique endpoint. - // To do so, we simply keep a hash table of the hashed endpoint (see ResolveConnectionPoolHash()), and + // To do so, we simply keep a hash table of the hashed endpoint (see ResolveConnectionPoolKey()), and // put a connection manager for that endpoint as the value. // This runs in multiple threads potentially so there's a lock around it. std::shared_ptr CRTHttpClient::GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& options) const { - const auto connManagerRequestHash = ResolveConnectionPoolHash(request->GetUri()); + const auto connManagerRequestHash = ResolveConnectionPoolKey(request->GetUri()); std::lock_guard locker(m_connectionPoolLock); diff --git a/cmake/external_dependencies.cmake b/cmake/external_dependencies.cmake index 20feb6de51bf..99b6befff487 100644 --- a/cmake/external_dependencies.cmake +++ b/cmake/external_dependencies.cmake @@ -55,7 +55,7 @@ elseif(ENABLE_INJECTED_ENCRYPTION) endif() # Http client control -if(NOT NO_HTTP_CLIENT) +if(NOT NO_HTTP_CLIENT AND NOT USE_CRT_HTTP_CLIENT) if(PLATFORM_WINDOWS) if(FORCE_CURL) set(ENABLE_CURL_CLIENT 1) @@ -114,6 +114,8 @@ if(NOT NO_HTTP_CLIENT) else() message(FATAL_ERROR "No http client available for target platform and client injection not enabled (-DNO_HTTP_CLIENT=ON)") endif() +elseif(USE_CRT_HTTP_CLIENT) + add_definitions("-DAWS_SDK_USE_CRT_HTTP") else() message(STATUS "You will need to inject an http client implementation before making any http requests!") endif() From 884044c655d121cfb293f6f806ddb1cfea8128c8 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Tue, 17 Jan 2023 15:19:30 -0800 Subject: [PATCH 11/19] Stuff to make linters pass. --- CMakeLists.txt | 2 +- aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h | 1 + aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 162a3bcb4fac..d8889fb518b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,7 +41,7 @@ option(NO_ENCRYPTION "If enabled, no platform-default encryption will be include option(USE_IXML_HTTP_REQUEST_2 "If enabled on windows, the com object IXmlHttpRequest2 will be used for the http stack" OFF) option(ENABLE_RTTI "Flag to enable/disable rtti within the library" ON) option(ENABLE_TESTING "Flag to enable/disable building unit and integration tests" ON) -option(AUTORUN_UNIT_TESTS "Flag to enable/disable automatically run unit tests after building" OFF) +option(AUTORUN_UNIT_TESTS "Flag to enable/disable automatically run unit tests after building" ON) option(ANDROID_BUILD_CURL "When building for Android, should curl be built as well" ON) option(ANDROID_BUILD_OPENSSL "When building for Android, should Openssl be built as well" ON) option(ANDROID_BUILD_ZLIB "When building for Android, should Zlib be built as well" ON) diff --git a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h index 24f202e33e3c..65de975bbd42 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h +++ b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h @@ -2,6 +2,7 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ + #include #include diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 0f8839a92d0d..06c8e391ad2b 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -2,6 +2,7 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ + #include #include #include From 21845c8524ad894c1e48c75e94888feb70403e4a Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Fri, 27 Jan 2023 16:06:19 -0800 Subject: [PATCH 12/19] Addressed most of the CR feedback, but still need to go back and add in all the extra edge cases I found in the curl client implementation. --- .../include/aws/core/http/HttpResponse.h | 5 +- .../include/aws/core/http/crt/CRTHttpClient.h | 2 +- .../source/http/crt/CRTHttpClient.cpp | 306 ++++++++++-------- .../http/standard/StandardHttpResponse.cpp | 8 +- 4 files changed, 183 insertions(+), 138 deletions(-) diff --git a/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h b/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h index db9351a4a186..5979660f742b 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h +++ b/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h @@ -198,8 +198,11 @@ namespace Aws /** * Add a header to the http response object, and move the value. * The name can't be moved as it is converted to lower-case. + * + * It isn't pure virtual for backwards compatiblity reasons, but the StandardHttpResponse used by default in the SDK + * implements the move. */ - virtual void AddHeader(const Aws::String& headerName, Aws::String&& headerValue) = 0; + virtual void AddHeader(const Aws::String& headerName, Aws::String&& headerValue) { AddHeader(headerName, headerValue); }; /** * Sets the content type header on the http response object. */ diff --git a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h index 65de975bbd42..e5cb2533387d 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h +++ b/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h @@ -72,4 +72,4 @@ namespace Aws static Aws::String ResolveConnectionPoolKey(const URI& uri); }; } -} \ No newline at end of file +} diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 06c8e391ad2b..88e03e9922d0 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -21,6 +21,7 @@ class ThrottleAwareInputStream : public Crt::Io::StdIOStreamInputStream { } protected: + // this whole thing needs to be redone to handle various stream use-cases. bool ReadImpl(Crt::ByteBuf &buffer) noexcept override { // initial check to see if we should avoid reading for the moment. @@ -45,6 +46,42 @@ class ThrottleAwareInputStream : public Crt::Io::StdIOStreamInputStream { Utils::RateLimits::RateLimiterInterface& m_rateLimiter; }; +// Just a wrapper around a Condition Variable and a mutex, which handles wait and timed waits while protecting +// from spurious wakeups. +class AsyncWaiter +{ +public: + AsyncWaiter() = default; + AsyncWaiter(const AsyncWaiter&) = delete; + AsyncWaiter& operator=(const AsyncWaiter&) = delete; + + void Wakeup() + { + { + std::lock_guard locker(m_lock); + m_wakeupIntentional = true; + } + m_cvar.notify_one(); + } + + void WaitOnCompletion() + { + std::unique_lock uniqueLocker(m_lock); + m_cvar.wait(uniqueLocker, [this](){return m_wakeupIntentional;}); + } + + bool WaitOnCompletionUntil(std::chrono::time_point until) + { + std::unique_lock uniqueLocker(m_lock); + return m_cvar.wait_until(uniqueLocker, until, [this](){return m_wakeupIntentional;}); + } + +private: + std::mutex m_lock; + std::condition_variable m_cvar; + bool m_wakeupIntentional{false}; +}; + namespace Aws { namespace Http @@ -57,7 +94,8 @@ namespace Aws CheckAndInitializeProxySettings(clientConfig); - // if ca is overridden and a proxy is configured, it's intended for the proxy, not this context. + // Given current SDK configuration assumptions, if the ca is overridden and a proxy is configured, + // it's intended for the proxy, not this context. if (!m_proxyOptions.has_value()) { if (!m_configuration.caPath.empty() || !m_configuration.caFile.empty()) @@ -100,21 +138,14 @@ namespace Aws m_connectionPools.clear(); } - std::shared_ptr CRTHttpClient::MakeRequest(const std::shared_ptr& request, - Aws::Utils::RateLimits::RateLimiterInterface*, - Aws::Utils::RateLimits::RateLimiterInterface*) const + static void AddRequestMetadataToCrtRequest(const std::shared_ptr& request, const std::shared_ptr& crtRequest) { - auto requestConnOptions = CreateConnectionOptionsForRequest(request); - auto connectionManager = GetWithCreateConnectionManagerForRequest(request, requestConnOptions); - - auto crtRequest = Crt::MakeShared(Crt::g_allocator); - //Add http headers to the request. for (const auto& header : request->GetHeaders()) { Crt::Http::HttpHeader crtHeader; - crtHeader.name = Crt::ByteCursorFromArray((const uint8_t *)header.first.data(), header.first.length());; + crtHeader.name = Crt::ByteCursorFromArray((const uint8_t *)header.first.data(), header.first.length()); crtHeader.value = Crt::ByteCursorFromArray((const uint8_t *)header.second.data(), header.second.length()); crtRequest->AddHeader(crtHeader); } @@ -134,6 +165,84 @@ namespace Aws auto pathCursor = Crt::ByteCursorFromArray((uint8_t *)fullPathAndQueryCpy.c_str(), fullPathAndQueryCpy.length()); crtRequest->SetPath(pathCursor); + + } + + static void OnResponseBodyReceived(Crt::Http::HttpStream&, const Crt::ByteCursor& body, const std::shared_ptr& response) + { + // When data is received from the content body of the incoming response, just copy it to the output stream. + response->GetResponseBody().write((const char*)body.ptr, static_cast(body.len)); + } + + // on response headers arriving, write them to the response. + static void OnIncomingHeaders(Crt::Http::HttpStream&, enum aws_http_header_block block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount, const std::shared_ptr& response) + { + if (block == AWS_HTTP_HEADER_BLOCK_INFORMATIONAL) return; + + for (size_t i = 0; i < headersCount; ++i) + { + const Crt::Http::HttpHeader* header = &headersArray[i]; + Aws::String headerNameStr((const char* const)header->name.ptr, header->name.len); + Aws::String headerValueStr((const char* const)header->value.ptr, header->value.len); + response->AddHeader(headerNameStr, std::move(headerValueStr)); + } + } + + static void OnIncomingHeadersBlockDone(Crt::Http::HttpStream& stream, enum aws_http_header_block, const std::shared_ptr& response) + { + response->SetResponseCode((HttpResponseCode)stream.GetResponseStatusCode()); + } + + // Request is done. If there was an error set it, otherwise just wake up the cvar. + static void OnStreamComplete(Crt::Http::HttpStream&, int errorCode, AsyncWaiter& waiter, const std::shared_ptr& response) + { + if (errorCode) + { + //TODO: get the right error parsed out. + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + } + + waiter.Wakeup(); + } + + // if the connection acquisition failed, go ahead and fail the request and wakeup the cvar. + // If it succeeded go ahead and make the request. + static void OnClientConnectionAvailable(std::shared_ptr connection, int errorCode, std::shared_ptr& connectionReference, + Crt::Http::HttpRequestOptions& requestOptions, AsyncWaiter& waiter, const std::shared_ptr& response) + { + int finalErrorCode = errorCode; + if (connection) + { + auto clientStream = connection->NewClientStream(requestOptions); + connectionReference = connection; + + if (clientStream && clientStream->Activate()) { + return; + } + + finalErrorCode = aws_last_error(); + } + + const char *error_msg = aws_error_debug_str(finalErrorCode); + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(error_msg); + + waiter.Wakeup(); + } + + std::shared_ptr CRTHttpClient::MakeRequest(const std::shared_ptr& request, + Aws::Utils::RateLimits::RateLimiterInterface*, + Aws::Utils::RateLimits::RateLimiterInterface*) const + { + auto requestConnOptions = CreateConnectionOptionsForRequest(request); + auto connectionManager = GetWithCreateConnectionManagerForRequest(request, requestConnOptions); + + auto crtRequest = Crt::MakeShared(Crt::g_allocator); + auto response = Aws::MakeShared(CRT_HTTP_CLIENT_TAG, request); + + AddRequestMetadataToCrtRequest(request, crtRequest); + // Set the request body stream on the crt request. Setup the write rate limiter if present if (request->GetContentBody()) { @@ -148,152 +257,86 @@ namespace Aws } } - auto response = Aws::MakeShared(CRT_HTTP_CLIENT_TAG, request); - Crt::Http::HttpRequestOptions requestOptions; requestOptions.request = crtRequest.get(); - // CRT client is async only so we'll need to do the synchronous part ourselves. - // We'll use a condition variable and wait on it until the request completes or errors out. - std::mutex waiterLock; - std::condition_variable waiterCVar; - bool waitCompletedIntentionally = false; - - // When data is received from the content body of the incoming response, just copy it to the output stream. requestOptions.onIncomingBody = - [response](Crt::Http::HttpStream&, const Crt::ByteCursor& body) + [response](Crt::Http::HttpStream& stream, const Crt::ByteCursor& body) { - response->GetResponseBody().write((const char*)body.ptr, static_cast(body.len)); + OnResponseBodyReceived(stream, body, response); }; - // on response headers arriving, write them to the response. requestOptions.onIncomingHeaders = - [response](Crt::Http::HttpStream&, enum aws_http_header_block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount) + [response](Crt::Http::HttpStream& stream, enum aws_http_header_block block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount) { - for (size_t i = 0; i < headersCount; ++i) - { - const Crt::Http::HttpHeader* header = &headersArray[i]; - Aws::String headerNameStr((const char* const)header->name.ptr, header->name.len); - Aws::String headerValueStr((const char* const)header->value.ptr, header->value.len); - response->AddHeader(headerNameStr, std::move(headerValueStr)); - } + OnIncomingHeaders(stream, block, headersArray, headersCount, response); }; // This will arrive at or around the same time as the headers. Use it to set the response code on the response requestOptions.onIncomingHeadersBlockDone = - [response](Crt::Http::HttpStream& stream, enum aws_http_header_block) + [response](Crt::Http::HttpStream& stream, enum aws_http_header_block block) { - response->SetResponseCode((HttpResponseCode)stream.GetResponseStatusCode()); + OnIncomingHeadersBlockDone(stream, block, response); }; - // Request is done. If there was an error set it, otherwise just wake up the cvar. + // CRT client is async only so we'll need to do the synchronous part ourselves. + // We'll use a condition variable and wait on it until the request completes or errors out. + AsyncWaiter waiter; + requestOptions.onStreamComplete = - [&waiterCVar, &waiterLock, &waitCompletedIntentionally, &response](Crt::Http::HttpStream&, int errorCode) + [&waiter, &response](Crt::Http::HttpStream& stream, int errorCode) { - if (errorCode) - { - /* come back to this one and get the right error parsed out. */ - response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); - response->SetClientErrorMessage(aws_error_debug_str(errorCode)); - } - - { - std::lock_guard locker(waiterLock); - waitCompletedIntentionally = true; - } - - waiterCVar.notify_one(); + OnStreamComplete(stream, errorCode, waiter, response); }; std::shared_ptr connectionRef(nullptr); // now we finally have the request, get a connection and make the request. - // if the connection acquisition failed, go ahead and fail the request and wakeup the cvar. connectionManager->AcquireConnection( - [&connectionRef, requestOptions, response, &waitCompletedIntentionally, &waiterCVar, &waiterLock] - (std::shared_ptr connection, int errorCode) { - if (connection) - { - auto clientStream = connection->NewClientStream(requestOptions); - - { - std::lock_guard locker(waiterLock); - connectionRef = connection; - } - - // if client stream is nullptr, something went wrong. This SHOULDNT happen - // because it's usually something not using the API correctly, but - // there's probably a complex set of interactions that can result in in-proper use of the API over-time, - // and we need to just surface the error rather than bringing down the whole process with an assertion. - // If we have a valid stream go ahead and return allowing the request to continue. - if (clientStream) { - clientStream->Activate(); - return; - } - - { - response->SetClientErrorType(Aws::Client::CoreErrors::INVALID_PARAMETER_COMBINATION); - response->SetClientErrorMessage(aws_error_debug_str(aws_last_error())); - std::lock_guard locker(waiterLock); - waitCompletedIntentionally = true; - } - } - else + [&connectionRef, &requestOptions, response, &waiter] + (std::shared_ptr connection, int errorCode) { - - const char *error_msg = aws_error_debug_str(errorCode); - response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); - response->SetClientErrorMessage(error_msg); - std::lock_guard locker(waiterLock); - waitCompletedIntentionally = true; - } - - waiterCVar.notify_one(); - }); - + OnClientConnectionAvailable(connection, errorCode, connectionRef, requestOptions, waiter, response); + }); + + bool waiterTimedOut = false; + // Naive http request timeout implementation. This doesn't factor in how long it took to get the connection from the pool, and + // I'm undecided on the queueing theory implications of this decision so if this turns out to be the wrong granularity + // this is the section of code you should be changing. You can probably get "close" by having an additional + // atomic (not necessarily full on atomics implementation, but it needs to be the size of a WORD if it's not) + // counter that gets incremented in the acquireConnection callback as long as your connection timeout + // is shorter than your request timeout. Even if it's not, that would handle like.... 4-5 nines of getting this right. + // since in the worst case scenario, your connect timeout got preempted by the request timeout, and is it really worth + // all that effort if that's the worst thing that can happen? + if (m_configuration.requestTimeoutMs > 0 ) { - std::unique_lock cvarUniqueLock(waiterLock); - - // Naive http request timeout implementation. This doesn't factor in how long it took to get the connection from the pool, and - // I'm undecided on the queueing theory implications of this decision so if this turns out to be the wrong granularity - // this is the section of code you should be changing. You can probably get "close" by having an additional - // atomic (not necessarily full on atomics implementation, but it needs to be the size of a WORD if it's not) - // counter that gets incremented in the acquireConnection callback as long as your connection timeout - // is shorter than your request timeout. Even if it's not, that would handle like.... 4-5 nines of getting this right. - // since in the worst case scenario, your connect timeout got preempted by the request timeout, and is it really worth - // all that effort if that's the worst thing that can happen? - if (m_configuration.requestTimeoutMs > 0 ) + auto requestExpiryTime = std::chrono::high_resolution_clock::now() + + std::chrono::milliseconds(m_configuration.requestTimeoutMs); + waiterTimedOut = !waiter.WaitOnCompletionUntil(requestExpiryTime); + + // if this is true, the waiter timed out without a terminal condition being woken up. + if (waiterTimedOut) { - auto requestExpiryTime = std::chrono::high_resolution_clock::now() + - std::chrono::milliseconds(m_configuration.requestTimeoutMs); - auto waiterResult = waiterCVar.wait_until(cvarUniqueLock, requestExpiryTime, - [&waitCompletedIntentionally, response]() - { - // If the request is done, we don't care about the timeout. - return waitCompletedIntentionally; - - }); - // if this is false, the cvar timed out without a terminal condition being reached. - if (!waiterResult) + // close the connection if it's still there so we can expedite anything we're waiting on. + if (connectionRef) { - response->SetClientErrorType( - Aws::Client::CoreErrors::REQUEST_TIMEOUT); - response->SetClientErrorMessage("Request Timeout Has Expired"); - - // close the connection if it's still there so we can expedite anything we're waiting on. - if (connectionRef) - { - connectionRef->Close(); - } + connectionRef->Close(); } } + } - // always wait, even if the above section timed out, because waitCompletedIntentionally isn't set yet - // and this means we're still waiting on some queued up callbacks to fire. - // going past this point before that occurs will cause a segfault when the callback DOES finally fire - // since the mutex and the condition-variable are both on the stack. - waiterCVar.wait(cvarUniqueLock, - [&waitCompletedIntentionally]() { return waitCompletedIntentionally; }); + // always wait, even if the above section timed out, because Wakeup() hasn't yet been called, + // and this means we're still waiting on some queued up callbacks to fire. + // going past this point before that occurs will cause a segfault when the callback DOES finally fire + // since the waiter is on the stack. + waiter.WaitOnCompletion(); + + // now handle if we timed out or not. + if (waiterTimedOut) + { + response->SetClientErrorType( + Aws::Client::CoreErrors::REQUEST_TIMEOUT); + response->SetClientErrorMessage("Request Timeout Has Expired"); } return response; @@ -309,16 +352,16 @@ namespace Aws } // The main purpose of this is to ensure there's exactly one connection manager per unique endpoint. - // To do so, we simply keep a hash table of the hashed endpoint (see ResolveConnectionPoolKey()), and + // To do so, we simply keep a hash table of the endpoint key (see ResolveConnectionPoolKey()), and // put a connection manager for that endpoint as the value. // This runs in multiple threads potentially so there's a lock around it. std::shared_ptr CRTHttpClient::GetWithCreateConnectionManagerForRequest(const std::shared_ptr& request, const Crt::Http::HttpClientConnectionOptions& options) const { - const auto connManagerRequestHash = ResolveConnectionPoolKey(request->GetUri()); + const auto connManagerRequestKey = ResolveConnectionPoolKey(request->GetUri()); std::lock_guard locker(m_connectionPoolLock); - const auto& foundManager = m_connectionPools.find(connManagerRequestHash); + const auto& foundManager = m_connectionPools.find(connManagerRequestKey); // We've already got one, return it. if (foundManager != m_connectionPools.cend()) { @@ -330,12 +373,12 @@ namespace Aws connectionManagerOptions.ConnectionOptions = options; connectionManagerOptions.MaxConnections = m_configuration.maxConnections; connectionManagerOptions.EnableBlockingShutdown = true; - // need to bind out Monitoring options to handle the read timeout config value. + //TODO: need to bind out Monitoring options to handle the read timeout config value. // once done, come back and use it to setup read timeouts. auto connectionManager = Crt::Http::HttpClientConnectionManager::NewClientConnectionManager(connectionManagerOptions); // put it in the hash table and return it. - m_connectionPools.emplace(connManagerRequestHash, connectionManager); + m_connectionPools.emplace(connManagerRequestKey, connectionManager); return connectionManager; } @@ -343,10 +386,9 @@ namespace Aws Crt::Http::HttpClientConnectionOptions CRTHttpClient::CreateConnectionOptionsForRequest(const std::shared_ptr& request) const { // connection options are unique per request, this is mostly just connection-level configuration mapping. - Crt::Http::HttpClientConnectionOptions connectionOptions; connectionOptions.HostName = request->GetUri().GetAuthority().c_str(); - // probably want to come back and update this when we hook up the rate limiters. + // TODO: come back and update this when we hook up the rate limiters. connectionOptions.ManualWindowManagement = false; connectionOptions.Port = request->GetUri().GetPort(); diff --git a/aws-cpp-sdk-core/source/http/standard/StandardHttpResponse.cpp b/aws-cpp-sdk-core/source/http/standard/StandardHttpResponse.cpp index 5a72c662bdd3..bf3a9eb5aeea 100644 --- a/aws-cpp-sdk-core/source/http/standard/StandardHttpResponse.cpp +++ b/aws-cpp-sdk-core/source/http/standard/StandardHttpResponse.cpp @@ -21,9 +21,9 @@ HeaderValueCollection StandardHttpResponse::GetHeaders() const { HeaderValueCollection headerValueCollection; - for (Aws::Map::const_iterator iter = headerMap.begin(); iter != headerMap.end(); ++iter) + for (const auto & iter : headerMap) { - headerValueCollection.emplace(HeaderValuePair(iter->first, iter->second)); + headerValueCollection.emplace(HeaderValuePair(iter.first, iter.second)); } return headerValueCollection; @@ -36,11 +36,11 @@ bool StandardHttpResponse::HasHeader(const char* headerName) const const Aws::String& StandardHttpResponse::GetHeader(const Aws::String& headerName) const { - Aws::Map::const_iterator foundValue = headerMap.find(StringUtils::ToLower(headerName.c_str())); + auto foundValue = headerMap.find(StringUtils::ToLower(headerName.c_str())); assert(foundValue != headerMap.end()); if (foundValue == headerMap.end()) { AWS_LOGSTREAM_ERROR(STANDARD_HTTP_RESPONSE_LOG_TAG, "Requested a header value for a missing header key: " << headerName); - static const Aws::String EMPTY_STRING = ""; + static const Aws::String EMPTY_STRING; return EMPTY_STRING; } return foundValue->second; From 766ee4941ae39e7f0408468fb90c948b79e65ed7 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Fri, 27 Jan 2023 17:21:13 -0800 Subject: [PATCH 13/19] integtation tests are back to passing now. handled initialization errors. --- .../include/aws/core/http/HttpClient.h | 9 ++++- .../source/http/HttpClientFactory.cpp | 11 +++++- .../source/http/crt/CRTHttpClient.cpp | 38 ++++++++++++++++--- 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/aws-cpp-sdk-core/include/aws/core/http/HttpClient.h b/aws-cpp-sdk-core/include/aws/core/http/HttpClient.h index 4c292064d73d..cb6e928e768e 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/HttpClient.h +++ b/aws-cpp-sdk-core/include/aws/core/http/HttpClient.h @@ -67,10 +67,17 @@ namespace Aws bool ContinueRequest(const Aws::Http::HttpRequest&) const; + explicit operator bool() const + { + return !m_bad; + } + + protected: + bool m_bad; + private: std::atomic< bool > m_disableRequestProcessing; - std::mutex m_requestProcessingSignalLock; std::condition_variable m_requestProcessingSignal; }; diff --git a/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp b/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp index ab1d5fe53812..51453adf1037 100644 --- a/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp +++ b/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp @@ -195,7 +195,16 @@ namespace Aws std::shared_ptr CreateHttpClient(const Aws::Client::ClientConfiguration& clientConfiguration) { assert(GetHttpClientFactory()); - return GetHttpClientFactory()->CreateHttpClient(clientConfiguration); + auto client = GetHttpClientFactory()->CreateHttpClient(clientConfiguration); + + if (!client) + { + AWS_LOGSTREAM_FATAL(HTTP_CLIENT_FACTORY_ALLOCATION_TAG, "Initializing Http Client failed!"); + // assert just in case this is a misconfiguration at development time to make the dev's job easier. + assert(false && "Http client initialization failed. Some client configuration parameters are probably invalid"); + } + + return client; } std::shared_ptr CreateHttpRequest(const Aws::String& uri, HttpMethod method, const Aws::IOStreamFactory& streamFactory) diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 88e03e9922d0..ad0dff22fb06 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -91,7 +91,6 @@ namespace Aws { //first need to figure TLS out... Crt::Io::TlsContextOptions tlsContextOptions = Crt::Io::TlsContextOptions::InitDefaultClient(); - CheckAndInitializeProxySettings(clientConfig); // Given current SDK configuration assumptions, if the ca is overridden and a proxy is configured, @@ -102,7 +101,11 @@ namespace Aws { const char* caPath = m_configuration.caPath.empty() ? nullptr : m_configuration.caPath.c_str(); const char* caFile = m_configuration.caFile.empty() ? nullptr : m_configuration.caFile.c_str(); - tlsContextOptions.OverrideDefaultTrustStore(caPath, caFile); + if (!tlsContextOptions.OverrideDefaultTrustStore(caPath, caFile)) + { + m_bad = true; + return; + } } } @@ -111,10 +114,21 @@ namespace Aws if (Crt::Io::TlsContextOptions::IsAlpnSupported()) { // this may need to be pulled from the client configuration.... - tlsContextOptions.SetAlpnList("h2;http/1.1"); + if (!tlsContextOptions.SetAlpnList("h2;http/1.1")) + { + m_bad = true; + return; + } } Crt::Io::TlsContext newContext(tlsContextOptions, Crt::Io::TlsMode::CLIENT); + + if (!newContext) + { + m_bad = true; + return; + } + m_context = std::move(newContext); } @@ -235,12 +249,18 @@ namespace Aws Aws::Utils::RateLimits::RateLimiterInterface*, Aws::Utils::RateLimits::RateLimiterInterface*) const { - auto requestConnOptions = CreateConnectionOptionsForRequest(request); - auto connectionManager = GetWithCreateConnectionManagerForRequest(request, requestConnOptions); - auto crtRequest = Crt::MakeShared(Crt::g_allocator); auto response = Aws::MakeShared(CRT_HTTP_CLIENT_TAG, request); + auto requestConnOptions = CreateConnectionOptionsForRequest(request); + auto connectionManager = GetWithCreateConnectionManagerForRequest(request, requestConnOptions); + + if (!connectionManager) + { + response->SetClientErrorMessage(aws_error_debug_str(aws_last_error())); + response->SetClientErrorType(CoreErrors::INVALID_PARAMETER_COMBINATION); + return response; + } AddRequestMetadataToCrtRequest(request, crtRequest); // Set the request body stream on the crt request. Setup the write rate limiter if present @@ -377,6 +397,12 @@ namespace Aws // once done, come back and use it to setup read timeouts. auto connectionManager = Crt::Http::HttpClientConnectionManager::NewClientConnectionManager(connectionManagerOptions); + + if (!connectionManager) + { + return nullptr; + } + // put it in the hash table and return it. m_connectionPools.emplace(connManagerRequestKey, connectionManager); From 63e6795d4e48a3e7dd3c186bf24e1995221878cc Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Wed, 1 Feb 2023 15:02:09 -0800 Subject: [PATCH 14/19] Added logging and most of the edge cases included in the existing clients. --- .../include/aws/core/http/HttpRequest.h | 2 +- aws-cpp-sdk-core/source/http/HttpClient.cpp | 3 +- .../source/http/crt/CRTHttpClient.cpp | 219 +++++++++++++++--- crt/aws-crt-cpp | 2 +- 4 files changed, 194 insertions(+), 32 deletions(-) diff --git a/aws-cpp-sdk-core/include/aws/core/http/HttpRequest.h b/aws-cpp-sdk-core/include/aws/core/http/HttpRequest.h index 129bd3bd3643..33e5d072bdce 100644 --- a/aws-cpp-sdk-core/include/aws/core/http/HttpRequest.h +++ b/aws-cpp-sdk-core/include/aws/core/http/HttpRequest.h @@ -552,7 +552,7 @@ namespace Aws { m_requestHash = std::make_pair(algorithmName, hash); } - const std::pair>& GetRequestHash() { return m_requestHash; } + const std::pair>& GetRequestHash() const { return m_requestHash; } void AddResponseValidationHash(const Aws::String& algorithmName, const std::shared_ptr& hash) { diff --git a/aws-cpp-sdk-core/source/http/HttpClient.cpp b/aws-cpp-sdk-core/source/http/HttpClient.cpp index 854202339362..66d9b3c87eeb 100644 --- a/aws-cpp-sdk-core/source/http/HttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/HttpClient.cpp @@ -12,7 +12,8 @@ using namespace Aws::Http; HttpClient::HttpClient() : m_disableRequestProcessing( false ), m_requestProcessingSignalLock(), - m_requestProcessingSignal() + m_requestProcessingSignal(), + m_bad(false) { } diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index ad0dff22fb06..d3978e8ad55f 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -14,16 +14,38 @@ static const char *const CRT_HTTP_CLIENT_TAG = "CRTHttpClient"; // Adapts AWS SDK input streams and rate limiters to the CRT input stream reading model. -class ThrottleAwareInputStream : public Crt::Io::StdIOStreamInputStream { +class SDKAdaptingInputStream : public Crt::Io::StdIOStreamInputStream { public: - ThrottleAwareInputStream(Utils::RateLimits::RateLimiterInterface& rateLimiter, std::shared_ptr stream, - Aws::Crt::Allocator *allocator = Crt::ApiAllocator()) noexcept : Crt::Io::StdIOStreamInputStream(std::move(stream), allocator), m_rateLimiter(rateLimiter){ - + SDKAdaptingInputStream(Utils::RateLimits::RateLimiterInterface& rateLimiter, std::shared_ptr stream, + const Http::HttpClient& client, const Http::HttpRequest& request, bool isStreaming, + Aws::Crt::Allocator *allocator = Crt::ApiAllocator()) noexcept : + Crt::Io::StdIOStreamInputStream(std::move(stream), allocator), m_rateLimiter(rateLimiter), + m_client(client), m_currentRequest(request), m_isStreaming(isStreaming), m_chunkEnd(false) + { } protected: - // this whole thing needs to be redone to handle various stream use-cases. + bool ReadImpl(Crt::ByteBuf &buffer) noexcept override { + if (!m_client.ContinueRequest(m_currentRequest) || !m_client.IsRequestProcessingEnabled()) + { + return false; + } + + bool isAwsChunked = m_currentRequest.HasHeader(Aws::Http::CONTENT_ENCODING_HEADER) && + m_currentRequest.GetHeaderValue(Aws::Http::CONTENT_ENCODING_HEADER) == Aws::Http::AWS_CHUNKED_VALUE; + + size_t amountToRead = buffer.capacity - buffer.len; + uint8_t* originalBufferPos = buffer.buffer; + + // aws-chunk = hex(chunk-size) + CRLF + chunk-data + CRLF + // Needs to reserve bytes of sizeof(hex(chunk-size)) + sizeof(CRLF) + sizeof(CRLF) + if (isAwsChunked) + { + Aws::String amountToReadHexString = Aws::Utils::StringUtils::ToHexString(amountToRead); + amountToRead -= (amountToReadHexString.size() + 4); + } + // initial check to see if we should avoid reading for the moment. if (m_rateLimiter.ApplyCost(0) == std::chrono::milliseconds(0)) { size_t currentPos = buffer.len; @@ -31,9 +53,67 @@ class ThrottleAwareInputStream : public Crt::Io::StdIOStreamInputStream { // now do the read. We may over read by an IO buffer size, but it's fine. The throttle will still // kick-in in plenty of time. bool retValue = Crt::Io::StdIOStreamInputStream::ReadImpl(buffer); - size_t newPos = buffer.len; AWS_ASSERT(newPos >= currentPos && !"the buffer length should not have decreased in value."); + + if (retValue && m_isStreaming) + { + Crt::Io::StreamStatus streamStatus; + GetStatus(streamStatus); + + if (newPos == currentPos && !streamStatus.is_end_of_stream && streamStatus.is_valid) + { + return true; + } + } + + size_t amountRead = newPos - currentPos; + + if (isAwsChunked) + { + // if we have a chunk to wrap, wrap it, be sure to update the running checksum. + if (amountRead > 0) + { + if (m_currentRequest.GetRequestHash().second != nullptr) + { + m_currentRequest.GetRequestHash().second->Update(reinterpret_cast(originalBufferPos), amountRead); + } + + Aws::String hex = Aws::Utils::StringUtils::ToHexString(amountRead); + // this is safe because of the isAwsChunked branch above. + // I don't see a aws_byte_buf equivalent of memmove. This is lifted from the curl implementation. + memmove(originalBufferPos + hex.size() + 2, originalBufferPos, amountRead); + memmove(originalBufferPos + hex.size() + 2 + amountRead, "\r\n", 2); + memmove(originalBufferPos, hex.c_str(), hex.size()); + memmove(originalBufferPos + hex.size(), "\r\n", 2); + amountRead += hex.size() + 4; + } + else if (!m_chunkEnd) + { + // if we didn't read anything, then lets finish up the chunk and send it. + // the reference implementation seems to assume only one chunk is allowed, because the chunkEnd bit is never updated. + // keep that same behavior here. + Aws::StringStream chunkedTrailer; + chunkedTrailer << "0\r\n"; + if (m_currentRequest.GetRequestHash().second != nullptr) + { + chunkedTrailer << "x-amz-checksum-" << m_currentRequest.GetRequestHash().first << ":" + << HashingUtils::Base64Encode(m_currentRequest.GetRequestHash().second->GetHash().GetResult()) << "\r\n"; + } + chunkedTrailer << "\r\n"; + amountRead = chunkedTrailer.str().size(); + memcpy(originalBufferPos, chunkedTrailer.str().c_str(), amountRead); + m_chunkEnd = true; + } + buffer.len += amountRead; + } + + auto& sentHandler = m_currentRequest.GetDataSentEventHandler(); + if (sentHandler) + { + sentHandler(&m_currentRequest, static_cast(amountRead)); + } + // now actually reduce the window. m_rateLimiter.ApplyCost(static_cast(newPos - currentPos)); return retValue; @@ -44,6 +124,10 @@ class ThrottleAwareInputStream : public Crt::Io::StdIOStreamInputStream { private: Utils::RateLimits::RateLimiterInterface& m_rateLimiter; + const Http::HttpClient& m_client; + const Http::HttpRequest& m_currentRequest; + bool m_isStreaming; + bool m_chunkEnd; }; // Just a wrapper around a Condition Variable and a mutex, which handles wait and timed waits while protecting @@ -152,20 +236,57 @@ namespace Aws m_connectionPools.clear(); } - static void AddRequestMetadataToCrtRequest(const std::shared_ptr& request, const std::shared_ptr& crtRequest) + static void AddRequestMetadataToCrtRequest(const std::shared_ptr& request, const std::shared_ptr& crtRequest, const ClientConfiguration& clientConfig) { + const char* methodStr = Aws::Http::HttpMethodMapper::GetNameForHttpMethod(request->GetMethod()); + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Making " << methodStr << " request to " << request->GetURIString()); + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Including headers:"); //Add http headers to the request. for (const auto& header : request->GetHeaders()) { Crt::Http::HttpHeader crtHeader; - + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, header.first << ": " << header.second); crtHeader.name = Crt::ByteCursorFromArray((const uint8_t *)header.first.data(), header.first.length()); crtHeader.value = Crt::ByteCursorFromArray((const uint8_t *)header.second.data(), header.second.length()); crtRequest->AddHeader(crtHeader); } + // Explicitly send empty headers for some of the more quirky HTTP interop behaviors if they aren't set. + if (!request->HasHeader(Http::TRANSFER_ENCODING_HEADER)) + { + Crt::Http::HttpHeader transferEncodingHeader; + transferEncodingHeader.name = Crt::ByteCursorFromCString("transfer-encoding"); + transferEncodingHeader.value = Crt::ByteCursorFromCString(""); + crtRequest->AddHeader(transferEncodingHeader); + } + + if (!request->HasHeader(Http::CONTENT_LENGTH_HEADER)) + { + Crt::Http::HttpHeader contentLengthHeader; + contentLengthHeader.name = Crt::ByteCursorFromCString("content-length"); + contentLengthHeader.value = Crt::ByteCursorFromCString(""); + crtRequest->AddHeader(contentLengthHeader); + } + + if (!request->HasHeader(Http::CONTENT_TYPE_HEADER)) + { + Crt::Http::HttpHeader contentTypeHeader; + contentTypeHeader.name = Crt::ByteCursorFromCString("content-type"); + contentTypeHeader.value = Crt::ByteCursorFromCString(""); + crtRequest->AddHeader(contentTypeHeader); + } + + // Discard Expect header so as to avoid using multiple payloads to send a http request (header + body) + if (clientConfig.disableExpectHeader) + { + Crt::Http::HttpHeader expectHeader; + expectHeader.name = Crt::ByteCursorFromCString("expect"); + expectHeader.value = Crt::ByteCursorFromCString(""); + crtRequest->AddHeader(expectHeader); + } + // HTTP method, GET, PUT, DELETE, etc... - auto methodCursor = Crt::ByteCursorFromCString(Aws::Http::HttpMethodMapper::GetNameForHttpMethod(request->GetMethod())); + auto methodCursor = Crt::ByteCursorFromCString(methodStr); crtRequest->SetMethod(methodCursor); // Path portion of the request @@ -178,14 +299,40 @@ namespace Aws auto fullPathAndQueryCpy = ss.str(); auto pathCursor = Crt::ByteCursorFromArray((uint8_t *)fullPathAndQueryCpy.c_str(), fullPathAndQueryCpy.length()); crtRequest->SetPath(pathCursor); - - } - static void OnResponseBodyReceived(Crt::Http::HttpStream&, const Crt::ByteCursor& body, const std::shared_ptr& response) + static void OnResponseBodyReceived(Crt::Http::HttpStream& stream, const Crt::ByteCursor& body, const std::shared_ptr& response, const std::shared_ptr& request, const Http::HttpClient& client) { + if (!client.ContinueRequest(*request) || !client.IsRequestProcessingEnabled()) + { + AWS_LOGSTREAM_INFO(CRT_HTTP_CLIENT_TAG, "Request canceled. Canceling request by closing the connection."); + stream.GetConnection().Close(); + return; + } + + //TODO: handle the read rate limiter here, once backpressure is setup. + + for (const auto& hashIterator : request->GetResponseValidationHashes()) + { + hashIterator.second->Update(reinterpret_cast(body.ptr), body.len); + } + // When data is received from the content body of the incoming response, just copy it to the output stream. response->GetResponseBody().write((const char*)body.ptr, static_cast(body.len)); + + if (request->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) + { + response->GetResponseBody().flush(); + } + + auto& receivedHandler = request->GetDataReceivedEventHandler(); + if (receivedHandler) + { + receivedHandler(request.get(), response.get(), static_cast(body.len)); + } + + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, body.len << " bytes written to response."); + } // on response headers arriving, write them to the response. @@ -193,17 +340,21 @@ namespace Aws { if (block == AWS_HTTP_HEADER_BLOCK_INFORMATIONAL) return; + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Received Headers: "); + for (size_t i = 0; i < headersCount; ++i) { const Crt::Http::HttpHeader* header = &headersArray[i]; Aws::String headerNameStr((const char* const)header->name.ptr, header->name.len); Aws::String headerValueStr((const char* const)header->value.ptr, header->value.len); + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, headerNameStr << ": " << headerValueStr); response->AddHeader(headerNameStr, std::move(headerValueStr)); } } static void OnIncomingHeadersBlockDone(Crt::Http::HttpStream& stream, enum aws_http_header_block, const std::shared_ptr& response) { + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Received response code: " << stream.GetResponseStatusCode()); response->SetResponseCode((HttpResponseCode)stream.GetResponseStatusCode()); } @@ -223,24 +374,40 @@ namespace Aws // if the connection acquisition failed, go ahead and fail the request and wakeup the cvar. // If it succeeded go ahead and make the request. static void OnClientConnectionAvailable(std::shared_ptr connection, int errorCode, std::shared_ptr& connectionReference, - Crt::Http::HttpRequestOptions& requestOptions, AsyncWaiter& waiter, const std::shared_ptr& response) + Crt::Http::HttpRequestOptions& requestOptions, AsyncWaiter& waiter, const std::shared_ptr& request, + const std::shared_ptr& response, const HttpClient& client) { + bool shouldContinueRequest = client.ContinueRequest(*request); + + if (!shouldContinueRequest) + { + response->SetClientErrorType(CoreErrors::USER_CANCELLED); + response->SetClientErrorMessage("Request cancelled by user's continuation handler"); + waiter.Wakeup(); + return; + } + int finalErrorCode = errorCode; if (connection) { + AWS_LOGSTREAM_DEBUG(CRT_HTTP_CLIENT_TAG, "Obtained connection handle " << (void*)connection.get()); + auto clientStream = connection->NewClientStream(requestOptions); connectionReference = connection; if (clientStream && clientStream->Activate()) { + AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Initiation of request failed because " << aws_error_debug_str(aws_last_error())); + waiter.Wakeup(); return; } finalErrorCode = aws_last_error(); } - const char *error_msg = aws_error_debug_str(finalErrorCode); + const char *errorMsg = aws_error_debug_str(finalErrorCode); + AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Obtaining connection failed because " << errorMsg); response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); - response->SetClientErrorMessage(error_msg); + response->SetClientErrorMessage(errorMsg); waiter.Wakeup(); } @@ -261,29 +428,22 @@ namespace Aws response->SetClientErrorType(CoreErrors::INVALID_PARAMETER_COMBINATION); return response; } - AddRequestMetadataToCrtRequest(request, crtRequest); + AddRequestMetadataToCrtRequest(request, crtRequest, m_configuration); // Set the request body stream on the crt request. Setup the write rate limiter if present if (request->GetContentBody()) { - // need to hook up back pressure to plug the read limiter in. But the write direction is fairly simple. - if (m_configuration.writeRateLimiter) - { - crtRequest->SetBody(Aws::MakeShared(CRT_HTTP_CLIENT_TAG, *m_configuration.writeRateLimiter, request->GetContentBody())); - } - else - { - crtRequest->SetBody(request->GetContentBody()); - } + bool isStreaming = request->IsEventStreamRequest(); + crtRequest->SetBody(Aws::MakeShared(CRT_HTTP_CLIENT_TAG, *m_configuration.writeRateLimiter, request->GetContentBody(), *this, *request, isStreaming)); } Crt::Http::HttpRequestOptions requestOptions; requestOptions.request = crtRequest.get(); requestOptions.onIncomingBody = - [response](Crt::Http::HttpStream& stream, const Crt::ByteCursor& body) + [this, request, response](Crt::Http::HttpStream& stream, const Crt::ByteCursor& body) { - OnResponseBodyReceived(stream, body, response); + OnResponseBodyReceived(stream, body, response, request, *this); }; requestOptions.onIncomingHeaders = @@ -313,10 +473,10 @@ namespace Aws // now we finally have the request, get a connection and make the request. connectionManager->AcquireConnection( - [&connectionRef, &requestOptions, response, &waiter] + [&connectionRef, &requestOptions, response, &waiter, request, this] (std::shared_ptr connection, int errorCode) { - OnClientConnectionAvailable(connection, errorCode, connectionRef, requestOptions, waiter, response); + OnClientConnectionAvailable(connection, errorCode, connectionRef, requestOptions, waiter, request, response, *this); }); bool waiterTimedOut = false; @@ -359,6 +519,7 @@ namespace Aws response->SetClientErrorMessage("Request Timeout Has Expired"); } + // TODO: is VOX support still a thing? If so we need to add the metrics for it. return response; } diff --git a/crt/aws-crt-cpp b/crt/aws-crt-cpp index 203ad6d373a4..0a9e0ad7ab07 160000 --- a/crt/aws-crt-cpp +++ b/crt/aws-crt-cpp @@ -1 +1 @@ -Subproject commit 203ad6d373a49600dad2dda2a6723773f84a28d4 +Subproject commit 0a9e0ad7ab07113c65b4846ece3a386407c9c0d3 From 5d5d3df9591b2af6cadc1c33d651b4bdc0c9bce9 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Wed, 1 Feb 2023 15:16:01 -0800 Subject: [PATCH 15/19] It is a real bummer that valgrind only works on linux. --- aws-cpp-sdk-core/source/http/HttpClient.cpp | 4 +- .../source/http/crt/CRTHttpClient.cpp | 56 ++++--------------- crt/aws-crt-cpp | 2 +- 3 files changed, 15 insertions(+), 47 deletions(-) diff --git a/aws-cpp-sdk-core/source/http/HttpClient.cpp b/aws-cpp-sdk-core/source/http/HttpClient.cpp index 66d9b3c87eeb..a90c5e94e4a1 100644 --- a/aws-cpp-sdk-core/source/http/HttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/HttpClient.cpp @@ -10,10 +10,10 @@ using namespace Aws; using namespace Aws::Http; HttpClient::HttpClient() : + m_bad(false), m_disableRequestProcessing( false ), m_requestProcessingSignalLock(), - m_requestProcessingSignal(), - m_bad(false) + m_requestProcessingSignal() { } diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index d3978e8ad55f..87f2462368a6 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -16,7 +16,7 @@ static const char *const CRT_HTTP_CLIENT_TAG = "CRTHttpClient"; // Adapts AWS SDK input streams and rate limiters to the CRT input stream reading model. class SDKAdaptingInputStream : public Crt::Io::StdIOStreamInputStream { public: - SDKAdaptingInputStream(Utils::RateLimits::RateLimiterInterface& rateLimiter, std::shared_ptr stream, + SDKAdaptingInputStream(const std::shared_ptr& rateLimiter, std::shared_ptr stream, const Http::HttpClient& client, const Http::HttpRequest& request, bool isStreaming, Aws::Crt::Allocator *allocator = Crt::ApiAllocator()) noexcept : Crt::Io::StdIOStreamInputStream(std::move(stream), allocator), m_rateLimiter(rateLimiter), @@ -47,7 +47,7 @@ class SDKAdaptingInputStream : public Crt::Io::StdIOStreamInputStream { } // initial check to see if we should avoid reading for the moment. - if (m_rateLimiter.ApplyCost(0) == std::chrono::milliseconds(0)) { + if (!m_rateLimiter || (m_rateLimiter && m_rateLimiter->ApplyCost(0) == std::chrono::milliseconds(0))) { size_t currentPos = buffer.len; // now do the read. We may over read by an IO buffer size, but it's fine. The throttle will still @@ -114,16 +114,19 @@ class SDKAdaptingInputStream : public Crt::Io::StdIOStreamInputStream { sentHandler(&m_currentRequest, static_cast(amountRead)); } - // now actually reduce the window. - m_rateLimiter.ApplyCost(static_cast(newPos - currentPos)); - return retValue; + if (m_rateLimiter) + { + // now actually reduce the window. + m_rateLimiter->ApplyCost(static_cast(newPos - currentPos)); + return retValue; + } } return true; } private: - Utils::RateLimits::RateLimiterInterface& m_rateLimiter; + std::shared_ptr m_rateLimiter; const Http::HttpClient& m_client; const Http::HttpRequest& m_currentRequest; bool m_isStreaming; @@ -236,7 +239,7 @@ namespace Aws m_connectionPools.clear(); } - static void AddRequestMetadataToCrtRequest(const std::shared_ptr& request, const std::shared_ptr& crtRequest, const ClientConfiguration& clientConfig) + static void AddRequestMetadataToCrtRequest(const std::shared_ptr& request, const std::shared_ptr& crtRequest) { const char* methodStr = Aws::Http::HttpMethodMapper::GetNameForHttpMethod(request->GetMethod()); AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Making " << methodStr << " request to " << request->GetURIString()); @@ -251,40 +254,6 @@ namespace Aws crtRequest->AddHeader(crtHeader); } - // Explicitly send empty headers for some of the more quirky HTTP interop behaviors if they aren't set. - if (!request->HasHeader(Http::TRANSFER_ENCODING_HEADER)) - { - Crt::Http::HttpHeader transferEncodingHeader; - transferEncodingHeader.name = Crt::ByteCursorFromCString("transfer-encoding"); - transferEncodingHeader.value = Crt::ByteCursorFromCString(""); - crtRequest->AddHeader(transferEncodingHeader); - } - - if (!request->HasHeader(Http::CONTENT_LENGTH_HEADER)) - { - Crt::Http::HttpHeader contentLengthHeader; - contentLengthHeader.name = Crt::ByteCursorFromCString("content-length"); - contentLengthHeader.value = Crt::ByteCursorFromCString(""); - crtRequest->AddHeader(contentLengthHeader); - } - - if (!request->HasHeader(Http::CONTENT_TYPE_HEADER)) - { - Crt::Http::HttpHeader contentTypeHeader; - contentTypeHeader.name = Crt::ByteCursorFromCString("content-type"); - contentTypeHeader.value = Crt::ByteCursorFromCString(""); - crtRequest->AddHeader(contentTypeHeader); - } - - // Discard Expect header so as to avoid using multiple payloads to send a http request (header + body) - if (clientConfig.disableExpectHeader) - { - Crt::Http::HttpHeader expectHeader; - expectHeader.name = Crt::ByteCursorFromCString("expect"); - expectHeader.value = Crt::ByteCursorFromCString(""); - crtRequest->AddHeader(expectHeader); - } - // HTTP method, GET, PUT, DELETE, etc... auto methodCursor = Crt::ByteCursorFromCString(methodStr); crtRequest->SetMethod(methodCursor); @@ -311,7 +280,6 @@ namespace Aws } //TODO: handle the read rate limiter here, once backpressure is setup. - for (const auto& hashIterator : request->GetResponseValidationHashes()) { hashIterator.second->Update(reinterpret_cast(body.ptr), body.len); @@ -428,13 +396,13 @@ namespace Aws response->SetClientErrorType(CoreErrors::INVALID_PARAMETER_COMBINATION); return response; } - AddRequestMetadataToCrtRequest(request, crtRequest, m_configuration); + AddRequestMetadataToCrtRequest(request, crtRequest); // Set the request body stream on the crt request. Setup the write rate limiter if present if (request->GetContentBody()) { bool isStreaming = request->IsEventStreamRequest(); - crtRequest->SetBody(Aws::MakeShared(CRT_HTTP_CLIENT_TAG, *m_configuration.writeRateLimiter, request->GetContentBody(), *this, *request, isStreaming)); + crtRequest->SetBody(Aws::MakeShared(CRT_HTTP_CLIENT_TAG, m_configuration.writeRateLimiter, request->GetContentBody(), *this, *request, isStreaming)); } Crt::Http::HttpRequestOptions requestOptions; diff --git a/crt/aws-crt-cpp b/crt/aws-crt-cpp index 0a9e0ad7ab07..203ad6d373a4 160000 --- a/crt/aws-crt-cpp +++ b/crt/aws-crt-cpp @@ -1 +1 @@ -Subproject commit 0a9e0ad7ab07113c65b4846ece3a386407c9c0d3 +Subproject commit 203ad6d373a49600dad2dda2a6723773f84a28d4 From 18a8d22c1b5813adf66ac2a6a6414b82049dffc3 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Wed, 1 Feb 2023 15:35:28 -0800 Subject: [PATCH 16/19] I knew that wakeup didn't look right. --- aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 87f2462368a6..d30eb87935f2 100644 --- a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -364,12 +364,12 @@ namespace Aws connectionReference = connection; if (clientStream && clientStream->Activate()) { - AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Initiation of request failed because " << aws_error_debug_str(aws_last_error())); - waiter.Wakeup(); return; } finalErrorCode = aws_last_error(); + AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Initiation of request failed because " << aws_error_debug_str(finalErrorCode)); + } const char *errorMsg = aws_error_debug_str(finalErrorCode); From 48e2a592661b1ff77bc5ad4326e12eaeb1478741 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Wed, 1 Feb 2023 15:37:34 -0800 Subject: [PATCH 17/19] Merged from upstream. --- crt/aws-crt-cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-crt-cpp b/crt/aws-crt-cpp index f2adef31d778..203ad6d373a4 160000 --- a/crt/aws-crt-cpp +++ b/crt/aws-crt-cpp @@ -1 +1 @@ -Subproject commit f2adef31d778cfe90b8a5bb377425f825ebf92f0 +Subproject commit 203ad6d373a49600dad2dda2a6723773f84a28d4 From 860c0fa665af3b96c8052c9c31cb612b20deb503 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Wed, 1 Feb 2023 16:36:27 -0800 Subject: [PATCH 18/19] Handled directory structure move and tested that H2 works correctly. it does now. --- cmake/external_dependencies.cmake | 2 +- .../include/aws/core/http/crt/CRTHttpClient.h | 0 .../aws-cpp-sdk-core}/source/http/crt/CRTHttpClient.cpp | 0 .../TranscribeTests.cpp | 6 +++--- 4 files changed, 4 insertions(+), 4 deletions(-) rename {aws-cpp-sdk-core => src/aws-cpp-sdk-core}/include/aws/core/http/crt/CRTHttpClient.h (100%) rename {aws-cpp-sdk-core => src/aws-cpp-sdk-core}/source/http/crt/CRTHttpClient.cpp (100%) diff --git a/cmake/external_dependencies.cmake b/cmake/external_dependencies.cmake index 99b6befff487..737ecf16c293 100644 --- a/cmake/external_dependencies.cmake +++ b/cmake/external_dependencies.cmake @@ -115,7 +115,7 @@ if(NOT NO_HTTP_CLIENT AND NOT USE_CRT_HTTP_CLIENT) message(FATAL_ERROR "No http client available for target platform and client injection not enabled (-DNO_HTTP_CLIENT=ON)") endif() elseif(USE_CRT_HTTP_CLIENT) - add_definitions("-DAWS_SDK_USE_CRT_HTTP") + add_definitions("-DAWS_SDK_USE_CRT_HTTP -DHAVE_H2_CLIENT") else() message(STATUS "You will need to inject an http client implementation before making any http requests!") endif() diff --git a/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/src/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h similarity index 100% rename from aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h rename to src/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h diff --git a/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp similarity index 100% rename from aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp rename to src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp diff --git a/tests/aws-cpp-sdk-transcribestreaming-integration-tests/TranscribeTests.cpp b/tests/aws-cpp-sdk-transcribestreaming-integration-tests/TranscribeTests.cpp index 786ec9f7af15..20b99fd2a18d 100644 --- a/tests/aws-cpp-sdk-transcribestreaming-integration-tests/TranscribeTests.cpp +++ b/tests/aws-cpp-sdk-transcribestreaming-integration-tests/TranscribeTests.cpp @@ -46,7 +46,7 @@ class TranscribeStreamingTests : public ::testing::Test #endif m_client = Aws::MakeUnique(ALLOC_TAG, config); m_clientWithWrongCreds = Aws::MakeUnique(ALLOC_TAG, Aws::Auth::AWSCredentials("a", "b"), config); - config.endpointOverride = "https://0xxxabcdefg123456789.com"; + //config.endpointOverride = "https://0xxxabcdefg123456789.com"; m_clientWithWrongEndpoint = Aws::MakeUnique(ALLOC_TAG, config); } @@ -61,7 +61,7 @@ class TranscribeStreamingTests : public ::testing::Test }; -#if 0 +//#if 0 // Temporarilly bypassing this test TEST_F(TranscribeStreamingTests, TranscribeAudioFile) { @@ -141,7 +141,7 @@ TEST_F(TranscribeStreamingTests, TranscribeAudioFile) semaphore.WaitOne(); ASSERT_EQ(0u, transcribedResult.find(EXPECTED_MESSAGE)); } -#endif +//#endif TEST_F(TranscribeStreamingTests, TranscribeAudioFileWithErrorServiceResponse) { From 66ab9fb8acb3533d5601a52d1f4c5d6ff60e9415 Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Wed, 1 Feb 2023 16:37:35 -0800 Subject: [PATCH 19/19] forgot to undef test back out. --- .../TranscribeTests.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/aws-cpp-sdk-transcribestreaming-integration-tests/TranscribeTests.cpp b/tests/aws-cpp-sdk-transcribestreaming-integration-tests/TranscribeTests.cpp index 20b99fd2a18d..4abc00aff4a3 100644 --- a/tests/aws-cpp-sdk-transcribestreaming-integration-tests/TranscribeTests.cpp +++ b/tests/aws-cpp-sdk-transcribestreaming-integration-tests/TranscribeTests.cpp @@ -61,7 +61,7 @@ class TranscribeStreamingTests : public ::testing::Test }; -//#if 0 +#if 0 // Temporarilly bypassing this test TEST_F(TranscribeStreamingTests, TranscribeAudioFile) { @@ -141,7 +141,7 @@ TEST_F(TranscribeStreamingTests, TranscribeAudioFile) semaphore.WaitOne(); ASSERT_EQ(0u, transcribedResult.find(EXPECTED_MESSAGE)); } -//#endif +#endif TEST_F(TranscribeStreamingTests, TranscribeAudioFileWithErrorServiceResponse) {