diff --git a/internal/core/conanfile.py b/internal/core/conanfile.py index de9cb8a345c81..b07f2aa34856a 100644 --- a/internal/core/conanfile.py +++ b/internal/core/conanfile.py @@ -15,7 +15,7 @@ class MilvusConan(ConanFile): "lzo/2.10#9517fc1bcc4d4cc229a79806003a1baa", "arrow/17.0.0#8cea917a6e06ca17c28411966d6fcdd7", "openssl/3.1.2#02594c4c0a6e2b4feb3cd15119993597", - "aws-sdk-cpp/1.9.234#28d6d2c175975900ce292bafe8022c88", + "aws-sdk-cpp/1.11.352@milvus/dev", "googleapis/cci.20221108#65604e1b3b9a6b363044da625b201a2a", "benchmark/1.7.0#459f3bb1a64400a886ba43047576df3c", "gtest/1.13.0#f9548be18a41ccc6367efcb8146e92be", @@ -68,6 +68,7 @@ class MilvusConan(ConanFile): "arrow:with_jemalloc": True, "arrow:shared": False, "arrow:with_s3": True, + "aws-sdk-cpp:s3-crt": True, "aws-sdk-cpp:config": True, "aws-sdk-cpp:text-to-speech": False, "aws-sdk-cpp:transfer": False, diff --git a/internal/core/src/storage/AliyunSTSClient.cpp b/internal/core/src/storage/AliyunSTSClient.cpp index cf54f9d4c92f4..169938cace07d 100644 --- a/internal/core/src/storage/AliyunSTSClient.cpp +++ b/internal/core/src/storage/AliyunSTSClient.cpp @@ -39,11 +39,6 @@ class HttpRequest; enum class HttpResponseCode; } // namespace Http -namespace Client { -Aws::String -ComputeUserAgentString(); -} - namespace Internal { static const char STS_RESOURCE_CLIENT_LOG_TAG[] = diff --git a/internal/core/src/storage/ChunkManager.cpp b/internal/core/src/storage/ChunkManager.cpp index c6d8908625e0e..44e156d2faa08 100644 --- a/internal/core/src/storage/ChunkManager.cpp +++ b/internal/core/src/storage/ChunkManager.cpp @@ -17,8 +17,6 @@ #include #include #include -#include -#include #include #include #include @@ -30,215 +28,13 @@ #include "storage/MinioChunkManager.h" #include "storage/AliyunSTSClient.h" -#include "storage/TencentCloudSTSClient.h" #include "storage/AliyunCredentialsProvider.h" #include "storage/TencentCloudCredentialsProvider.h" #include "common/Consts.h" #include "common/EasyAssert.h" #include "log/Log.h" -#include "signal.h" -namespace milvus::storage { - -Aws::String -ConvertToAwsString(const std::string& str) { - return Aws::String(str.c_str(), str.size()); -} - -Aws::Client::ClientConfiguration -generateConfig(const StorageConfig& storage_config) { - // The ClientConfiguration default constructor will take a long time. - // For more details, please refer to https://github.com/aws/aws-sdk-cpp/issues/1440 - static Aws::Client::ClientConfiguration g_config; - Aws::Client::ClientConfiguration config = g_config; - config.endpointOverride = ConvertToAwsString(storage_config.address); - - // Three cases: - // 1. no ssl, verifySSL=false - // 2. self-signed certificate, verifySSL=false - // 3. CA-signed certificate, verifySSL=true - if (storage_config.useSSL) { - config.scheme = Aws::Http::Scheme::HTTPS; - config.verifySSL = true; - if (!storage_config.sslCACert.empty()) { - config.caPath = ConvertToAwsString(storage_config.sslCACert); - config.verifySSL = false; - } - } else { - config.scheme = Aws::Http::Scheme::HTTP; - config.verifySSL = false; - } - - if (!storage_config.region.empty()) { - config.region = ConvertToAwsString(storage_config.region); - } - - config.requestTimeoutMs = storage_config.requestTimeoutMs == 0 - ? DEFAULT_CHUNK_MANAGER_REQUEST_TIMEOUT_MS - : storage_config.requestTimeoutMs; - - return config; -} - -AwsChunkManager::AwsChunkManager(const StorageConfig& storage_config) { - default_bucket_name_ = storage_config.bucket_name; - remote_root_path_ = storage_config.root_path; - - InitSDKAPIDefault(storage_config.log_level); - - Aws::Client::ClientConfiguration config = generateConfig(storage_config); - if (storage_config.useIAM) { - auto provider = - std::make_shared(); - auto aws_credentials = provider->GetAWSCredentials(); - AssertInfo(!aws_credentials.GetAWSAccessKeyId().empty(), - "if use iam, access key id should not be empty"); - AssertInfo(!aws_credentials.GetAWSSecretKey().empty(), - "if use iam, secret key should not be empty"); - AssertInfo(!aws_credentials.GetSessionToken().empty(), - "if use iam, token should not be empty"); - - client_ = std::make_shared( - provider, - config, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - storage_config.useVirtualHost); - } else { - BuildAccessKeyClient(storage_config, config); - } - - PreCheck(storage_config); - - LOG_INFO( - "init AwsChunkManager with " - "parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]", - storage_config.address, - storage_config.bucket_name, - storage_config.root_path, - storage_config.useSSL); -} - -GcpChunkManager::GcpChunkManager(const StorageConfig& storage_config) { - default_bucket_name_ = storage_config.bucket_name; - remote_root_path_ = storage_config.root_path; - - if (storage_config.useIAM) { - sdk_options_.httpOptions.httpClientFactory_create_fn = []() { - auto credentials = std::make_shared< - google::cloud::oauth2_internal::GOOGLE_CLOUD_CPP_NS:: - ComputeEngineCredentials>(); - return Aws::MakeShared( - GOOGLE_CLIENT_FACTORY_ALLOCATION_TAG, credentials); - }; - } - - InitSDKAPIDefault(storage_config.log_level); - Aws::Client::ClientConfiguration config = generateConfig(storage_config); - if (storage_config.useIAM) { - // Using S3 client instead of google client because of compatible protocol - client_ = std::make_shared( - config, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - storage_config.useVirtualHost); - } else { - BuildAccessKeyClient(storage_config, config); - } - - PreCheck(storage_config); - - LOG_INFO( - "init GcpChunkManager with " - "parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]", - storage_config.address, - storage_config.bucket_name, - storage_config.root_path, - storage_config.useSSL); -} - -AliyunChunkManager::AliyunChunkManager(const StorageConfig& storage_config) { - default_bucket_name_ = storage_config.bucket_name; - remote_root_path_ = storage_config.root_path; - - InitSDKAPIDefault(storage_config.log_level); - - Aws::Client::ClientConfiguration config = generateConfig(storage_config); - - // For aliyun oss, support use virtual host mode - StorageConfig mutable_config = storage_config; - mutable_config.useVirtualHost = true; - if (storage_config.useIAM) { - auto aliyun_provider = Aws::MakeShared< - Aws::Auth::AliyunSTSAssumeRoleWebIdentityCredentialsProvider>( - "AliyunSTSAssumeRoleWebIdentityCredentialsProvider"); - auto aliyun_credentials = aliyun_provider->GetAWSCredentials(); - AssertInfo(!aliyun_credentials.GetAWSAccessKeyId().empty(), - "if use iam, access key id should not be empty"); - AssertInfo(!aliyun_credentials.GetAWSSecretKey().empty(), - "if use iam, secret key should not be empty"); - AssertInfo(!aliyun_credentials.GetSessionToken().empty(), - "if use iam, token should not be empty"); - client_ = std::make_shared( - aliyun_provider, - config, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - mutable_config.useVirtualHost); - } else { - BuildAccessKeyClient(mutable_config, config); - } - - PreCheck(storage_config); - - LOG_INFO( - "init AliyunChunkManager with " - "parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]", - storage_config.address, - storage_config.bucket_name, - storage_config.root_path, - storage_config.useSSL); -} - -TencentCloudChunkManager::TencentCloudChunkManager( - const StorageConfig& storage_config) { - default_bucket_name_ = storage_config.bucket_name; - remote_root_path_ = storage_config.root_path; - - InitSDKAPIDefault(storage_config.log_level); - - Aws::Client::ClientConfiguration config = generateConfig(storage_config); - - StorageConfig mutable_config = storage_config; - mutable_config.useVirtualHost = true; - if (storage_config.useIAM) { - auto tencent_cloud_provider = Aws::MakeShared< - Aws::Auth::TencentCloudSTSAssumeRoleWebIdentityCredentialsProvider>( - "TencentCloudSTSAssumeRoleWebIdentityCredentialsProvider"); - auto tencent_cloud_credentials = - tencent_cloud_provider->GetAWSCredentials(); - AssertInfo(!tencent_cloud_credentials.GetAWSAccessKeyId().empty(), - "if use iam, access key id should not be empty"); - AssertInfo(!tencent_cloud_credentials.GetAWSSecretKey().empty(), - "if use iam, secret key should not be empty"); - AssertInfo(!tencent_cloud_credentials.GetSessionToken().empty(), - "if use iam, token should not be empty"); - client_ = std::make_shared( - tencent_cloud_provider, - config, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - mutable_config.useVirtualHost); - } else { - BuildAccessKeyClient(mutable_config, config); - } - - PreCheck(storage_config); - - LOG_INFO( - "init TencentCloudChunkManager with " - "parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]", - storage_config.address, - storage_config.bucket_name, - storage_config.root_path, - storage_config.useSSL); -} +namespace milvus::storage { } // namespace milvus::storage diff --git a/internal/core/src/storage/MinioChunkManager.cpp b/internal/core/src/storage/MinioChunkManager.cpp index d1a26917bcb3d..16131d682accf 100644 --- a/internal/core/src/storage/MinioChunkManager.cpp +++ b/internal/core/src/storage/MinioChunkManager.cpp @@ -19,26 +19,30 @@ #include #include #include -#include #include #include #include #include #include -#include #include #include #include - +#include +#include +#include +#include +#include +#include +#include +#include #include "storage/AliyunSTSClient.h" #include "storage/AliyunCredentialsProvider.h" -#include "storage/TencentCloudSTSClient.h" #include "storage/TencentCloudCredentialsProvider.h" #include "monitor/prometheus_client.h" #include "common/EasyAssert.h" #include "log/Log.h" #include "signal.h" -#include "common/Consts.h" +#include namespace milvus::storage { @@ -74,6 +78,31 @@ ConvertToAwsString(const std::string& str) { return Aws::String(str.c_str(), str.size()); } +Aws::S3Crt::ClientConfiguration +generateConfig(const StorageConfig& storage_config) { + // The ClientConfiguration default constructor will take a long time. + // For more details, please refer to https://github.com/aws/aws-sdk-cpp/issues/1440 + static Aws::S3Crt::ClientConfiguration g_config; + + Aws::S3Crt::ClientConfiguration s3CrtConfig = g_config; + s3CrtConfig.endpointOverride = ConvertToAwsString(storage_config.address); + + if (storage_config.useSSL) { + s3CrtConfig.scheme = Aws::Http::Scheme::HTTPS; + } else { + s3CrtConfig.scheme = Aws::Http::Scheme::HTTP; + s3CrtConfig.verifySSL = false; + } + + if (!storage_config.region.empty()) { + s3CrtConfig.region = storage_config.region; + } + s3CrtConfig.useVirtualAddressing = storage_config.useVirtualHost; + s3CrtConfig.throughputTargetGbps = 10; + s3CrtConfig.partSize = 20 * 1024 * 1024; + return s3CrtConfig; +} + /** * @brief convert Aws::string to std::string * @param aws_str @@ -198,23 +227,9 @@ MinioChunkManager::ShutdownSDKAPI() { void MinioChunkManager::BuildS3Client( const StorageConfig& storage_config, - const Aws::Client::ClientConfiguration& config) { + const Aws::S3Crt::ClientConfiguration& config) { if (storage_config.useIAM) { - auto provider = - std::make_shared(); - auto aws_credentials = provider->GetAWSCredentials(); - AssertInfo(!aws_credentials.GetAWSAccessKeyId().empty(), - "if use iam, access key id should not be empty"); - AssertInfo(!aws_credentials.GetAWSSecretKey().empty(), - "if use iam, secret key should not be empty"); - AssertInfo(!aws_credentials.GetSessionToken().empty(), - "if use iam, token should not be empty"); - - client_ = std::make_shared( - provider, - config, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - storage_config.useVirtualHost); + BuildIAMClient(storage_config, config, ""); } else { BuildAccessKeyClient(storage_config, config); } @@ -244,25 +259,25 @@ MinioChunkManager::PreCheck(const StorageConfig& config) { void MinioChunkManager::BuildAccessKeyClient( const StorageConfig& storage_config, - const Aws::Client::ClientConfiguration& config) { + const Aws::S3Crt::ClientConfiguration& config) { AssertInfo(!storage_config.access_key_id.empty(), "if not use iam, access key should not be empty"); AssertInfo(!storage_config.access_key_value.empty(), "if not use iam, access value should not be empty"); - client_ = std::make_shared( - Aws::Auth::AWSCredentials( - ConvertToAwsString(storage_config.access_key_id), - ConvertToAwsString(storage_config.access_key_value)), - config, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - storage_config.useVirtualHost); + // hc--init crt client + Aws::Auth::AWSCredentials credentials(ConvertToAwsString(storage_config.access_key_id), + ConvertToAwsString(storage_config.access_key_value)); + LOG_INFO("hc===start to create s3_crt_client_1"); + crt_client_ = Aws::MakeShared("S3-Crt-client", credentials, config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never); + LOG_INFO("hc===finished created s3_crt_client"); } void MinioChunkManager::BuildAliyunCloudClient( const StorageConfig& storage_config, - const Aws::Client::ClientConfiguration& config) { + const Aws::S3Crt::ClientConfiguration& config) { // For aliyun oss, support use virtual host mode StorageConfig mutable_config = storage_config; mutable_config.useVirtualHost = true; @@ -277,7 +292,7 @@ MinioChunkManager::BuildAliyunCloudClient( "if use iam, secret key should not be empty"); AssertInfo(!aliyun_credentials.GetSessionToken().empty(), "if use iam, token should not be empty"); - client_ = std::make_shared( + crt_client_ = std::make_shared( aliyun_provider, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, @@ -290,10 +305,10 @@ MinioChunkManager::BuildAliyunCloudClient( void MinioChunkManager::BuildGoogleCloudClient( const StorageConfig& storage_config, - const Aws::Client::ClientConfiguration& config) { + const Aws::S3Crt::ClientConfiguration& config) { if (storage_config.useIAM) { // Using S3 client instead of google client because of compatible protocol - client_ = std::make_shared( + crt_client_ = std::make_shared( config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, storage_config.useVirtualHost); @@ -318,40 +333,17 @@ MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config) // The ClientConfiguration default constructor will take a long time. // For more details, please refer to https://github.com/aws/aws-sdk-cpp/issues/1440 - static Aws::Client::ClientConfiguration g_config; - Aws::Client::ClientConfiguration config = g_config; - config.endpointOverride = ConvertToAwsString(storage_config.address); - - // Three cases: - // 1. no ssl, verifySSL=false - // 2. self-signed certificate, verifySSL=false - // 3. CA-signed certificate, verifySSL=true - if (storage_config.useSSL) { - config.scheme = Aws::Http::Scheme::HTTPS; - config.verifySSL = true; - if (!storage_config.sslCACert.empty()) { - config.caPath = ConvertToAwsString(storage_config.sslCACert); - config.verifySSL = false; - } - } else { - config.scheme = Aws::Http::Scheme::HTTP; - config.verifySSL = false; - } - - config.requestTimeoutMs = storage_config.requestTimeoutMs == 0 - ? DEFAULT_CHUNK_MANAGER_REQUEST_TIMEOUT_MS - : storage_config.requestTimeoutMs; - - if (!storage_config.region.empty()) { - config.region = ConvertToAwsString(storage_config.region); + Aws::S3Crt::ClientConfiguration s3CrtConfig = generateConfig(storage_config); + if(!storage_config.useSSL){ + Aws::SetDefaultTlsConnectionOptions(nullptr); } if (storageType == RemoteStorageType::S3) { - BuildS3Client(storage_config, config); + BuildS3Client(storage_config, s3CrtConfig); } else if (storageType == RemoteStorageType::ALIYUN_CLOUD) { - BuildAliyunCloudClient(storage_config, config); + BuildAliyunCloudClient(storage_config, s3CrtConfig); } else if (storageType == RemoteStorageType::GOOGLE_CLOUD) { - BuildGoogleCloudClient(storage_config, config); + BuildGoogleCloudClient(storage_config, s3CrtConfig); } PreCheck(storage_config); @@ -366,7 +358,7 @@ MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config) } MinioChunkManager::~MinioChunkManager() { - client_.reset(); + crt_client_.reset(); ShutdownSDKAPI(); } @@ -404,19 +396,19 @@ MinioChunkManager::Write(const std::string& filepath, bool MinioChunkManager::BucketExists(const std::string& bucket_name) { - Aws::S3::Model::HeadBucketRequest request; + Aws::S3Crt::Model::HeadBucketRequest request; request.SetBucket(bucket_name.c_str()); - auto outcome = client_->HeadBucket(request); + auto outcome = crt_client_->HeadBucket(request); if (!outcome.IsSuccess()) { const auto& err = outcome.GetError(); auto error_type = err.GetErrorType(); // only throw if the error is not nosuchbucket // if bucket not exist, HeadBucket return errorType RESOURCE_NOT_FOUND - if (error_type != Aws::S3::S3Errors::NO_SUCH_BUCKET && - error_type != Aws::S3::S3Errors::RESOURCE_NOT_FOUND) { - ThrowS3Error("BucketExists", err, "params, bucket={}", bucket_name); + if (error_type != Aws::S3Crt::S3CrtErrors::NO_SUCH_BUCKET && + error_type != Aws::S3Crt::S3CrtErrors::RESOURCE_NOT_FOUND) { + ThrowS3CrtError("BucketExists", err, "params, bucket={}", bucket_name); } return false; } @@ -426,11 +418,11 @@ MinioChunkManager::BucketExists(const std::string& bucket_name) { std::vector MinioChunkManager::ListBuckets() { std::vector buckets; - auto outcome = client_->ListBuckets(); + auto outcome = crt_client_->ListBuckets(); if (!outcome.IsSuccess()) { const auto& err = outcome.GetError(); - ThrowS3Error("ListBuckets", err, "params"); + ThrowS3CrtError("ListBuckets", err, "params"); } for (auto&& b : outcome.GetResult().GetBuckets()) { buckets.emplace_back(b.GetName()); @@ -440,16 +432,16 @@ MinioChunkManager::ListBuckets() { bool MinioChunkManager::CreateBucket(const std::string& bucket_name) { - Aws::S3::Model::CreateBucketRequest request; + Aws::S3Crt::Model::CreateBucketRequest request; request.SetBucket(bucket_name.c_str()); - auto outcome = client_->CreateBucket(request); + auto outcome = crt_client_->CreateBucket(request); if (!outcome.IsSuccess()) { const auto& err = outcome.GetError(); if (err.GetErrorType() != - Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU) { - ThrowS3Error("CreateBucket", err, "params, bucket={}", bucket_name); + Aws::S3Crt::S3CrtErrors::BUCKET_ALREADY_OWNED_BY_YOU) { + ThrowS3CrtError("CreateBucket", err, "params, bucket={}", bucket_name); } return false; } @@ -458,17 +450,17 @@ MinioChunkManager::CreateBucket(const std::string& bucket_name) { bool MinioChunkManager::DeleteBucket(const std::string& bucket_name) { - Aws::S3::Model::DeleteBucketRequest request; + Aws::S3Crt::Model::DeleteBucketRequest request; request.SetBucket(bucket_name.c_str()); - auto outcome = client_->DeleteBucket(request); + auto outcome = crt_client_->DeleteBucket(request); if (!outcome.IsSuccess()) { const auto& err = outcome.GetError(); auto error_type = err.GetErrorType(); - if (error_type != Aws::S3::S3Errors::NO_SUCH_BUCKET && - error_type != Aws::S3::S3Errors::RESOURCE_NOT_FOUND) { - ThrowS3Error("DeleteBucket", err, "params, bucket={}", bucket_name); + if (error_type != Aws::S3Crt::S3CrtErrors::NO_SUCH_BUCKET && + error_type != Aws::S3Crt::S3CrtErrors::RESOURCE_NOT_FOUND) { + ThrowS3CrtError("DeleteBucket", err, "params, bucket={}", bucket_name); } return false; } @@ -478,12 +470,12 @@ MinioChunkManager::DeleteBucket(const std::string& bucket_name) { bool MinioChunkManager::ObjectExists(const std::string& bucket_name, const std::string& object_name) { - Aws::S3::Model::HeadObjectRequest request; + Aws::S3Crt::Model::HeadObjectRequest request; request.SetBucket(bucket_name.c_str()); request.SetKey(object_name.c_str()); auto start = std::chrono::system_clock::now(); - auto outcome = client_->HeadObject(request); + auto outcome = crt_client_->HeadObject(request); monitor::internal_storage_request_latency_stat.Observe( std::chrono::duration_cast( std::chrono::system_clock::now() - start) @@ -493,7 +485,7 @@ MinioChunkManager::ObjectExists(const std::string& bucket_name, const auto& err = outcome.GetError(); if (!IsNotFound(err.GetErrorType())) { monitor::internal_storage_op_count_stat_fail.Increment(); - ThrowS3Error("ObjectExists", + ThrowS3CrtError("ObjectExists", err, "params, bucket={}, object={}", bucket_name, @@ -509,12 +501,12 @@ MinioChunkManager::ObjectExists(const std::string& bucket_name, uint64_t MinioChunkManager::GetObjectSize(const std::string& bucket_name, const std::string& object_name) { - Aws::S3::Model::HeadObjectRequest request; + Aws::S3Crt::Model::HeadObjectRequest request; request.SetBucket(bucket_name.c_str()); request.SetKey(object_name.c_str()); auto start = std::chrono::system_clock::now(); - auto outcome = client_->HeadObject(request); + auto outcome = crt_client_->HeadObject(request); monitor::internal_storage_request_latency_stat.Observe( std::chrono::duration_cast( std::chrono::system_clock::now() - start) @@ -522,7 +514,7 @@ MinioChunkManager::GetObjectSize(const std::string& bucket_name, if (!outcome.IsSuccess()) { monitor::internal_storage_op_count_stat_fail.Increment(); const auto& err = outcome.GetError(); - ThrowS3Error("GetObjectSize", + ThrowS3CrtError("GetObjectSize", err, "params, bucket={}, object={}", bucket_name, @@ -535,12 +527,12 @@ MinioChunkManager::GetObjectSize(const std::string& bucket_name, bool MinioChunkManager::DeleteObject(const std::string& bucket_name, const std::string& object_name) { - Aws::S3::Model::DeleteObjectRequest request; + Aws::S3Crt::Model::DeleteObjectRequest request; request.SetBucket(bucket_name.c_str()); request.SetKey(object_name.c_str()); auto start = std::chrono::system_clock::now(); - auto outcome = client_->DeleteObject(request); + auto outcome = crt_client_->DeleteObject(request); monitor::internal_storage_request_latency_remove.Observe( std::chrono::duration_cast( std::chrono::system_clock::now() - start) @@ -550,7 +542,7 @@ MinioChunkManager::DeleteObject(const std::string& bucket_name, const auto& err = outcome.GetError(); if (!IsNotFound(err.GetErrorType())) { monitor::internal_storage_op_count_remove_fail.Increment(); - ThrowS3Error("DeleteObject", + ThrowS3CrtError("DeleteObject", err, "params, bucket={}, object={}", bucket_name, @@ -568,7 +560,7 @@ MinioChunkManager::PutObjectBuffer(const std::string& bucket_name, const std::string& object_name, void* buf, uint64_t size) { - Aws::S3::Model::PutObjectRequest request; + Aws::S3Crt::Model::PutObjectRequest request; request.SetBucket(bucket_name.c_str()); request.SetKey(object_name.c_str()); @@ -579,7 +571,7 @@ MinioChunkManager::PutObjectBuffer(const std::string& bucket_name, request.SetBody(input_data); auto start = std::chrono::system_clock::now(); - auto outcome = client_->PutObject(request); + auto outcome = crt_client_->PutObject(request); monitor::internal_storage_request_latency_put.Observe( std::chrono::duration_cast( std::chrono::system_clock::now() - start) @@ -589,7 +581,7 @@ MinioChunkManager::PutObjectBuffer(const std::string& bucket_name, if (!outcome.IsSuccess()) { monitor::internal_storage_op_count_put_fail.Increment(); const auto& err = outcome.GetError(); - ThrowS3Error("PutObjectBuffer", + ThrowS3CrtError("PutObjectBuffer", err, "params, bucket={}, object={}", bucket_name, @@ -643,10 +635,8 @@ MinioChunkManager::GetObjectBuffer(const std::string& bucket_name, const std::string& object_name, void* buf, uint64_t size) { - Aws::S3::Model::GetObjectRequest request; - request.SetBucket(bucket_name.c_str()); - request.SetKey(object_name.c_str()); - + Aws::S3Crt::Model::GetObjectRequest request; + request.WithBucket(bucket_name).WithKey(object_name); request.SetResponseStreamFactory([buf, size]() { // For macOs, pubsetbuf interface not implemented #ifdef __linux__ @@ -660,7 +650,7 @@ MinioChunkManager::GetObjectBuffer(const std::string& bucket_name, return stream.release(); }); auto start = std::chrono::system_clock::now(); - auto outcome = client_->GetObject(request); + auto outcome = crt_client_->GetObject(request); monitor::internal_storage_request_latency_get.Observe( std::chrono::duration_cast( std::chrono::system_clock::now() - start) @@ -670,12 +660,13 @@ MinioChunkManager::GetObjectBuffer(const std::string& bucket_name, if (!outcome.IsSuccess()) { monitor::internal_storage_op_count_get_fail.Increment(); const auto& err = outcome.GetError(); - ThrowS3Error("GetObjectBuffer", + ThrowS3CrtError("GetObjectBuffer", err, "params, bucket={}, object={}", bucket_name, object_name); } + LOG_INFO("hc===GetObjectBuffer ok"); monitor::internal_storage_op_count_get_suc.Increment(); return size; } @@ -684,14 +675,14 @@ std::vector MinioChunkManager::ListObjects(const std::string& bucket_name, const std::string& prefix) { std::vector objects_vec; - Aws::S3::Model::ListObjectsRequest request; + Aws::S3Crt::Model::ListObjectsRequest request; request.WithBucket(bucket_name); if (prefix != "") { request.SetPrefix(prefix); } auto start = std::chrono::system_clock::now(); - auto outcome = client_->ListObjects(request); + auto outcome = crt_client_->ListObjects(request); monitor::internal_storage_request_latency_list.Observe( std::chrono::duration_cast( std::chrono::system_clock::now() - start) @@ -700,7 +691,7 @@ MinioChunkManager::ListObjects(const std::string& bucket_name, if (!outcome.IsSuccess()) { monitor::internal_storage_op_count_list_fail.Increment(); const auto& err = outcome.GetError(); - ThrowS3Error("ListObjects", + ThrowS3CrtError("ListObjects", err, "params, bucket={}, prefix={}", bucket_name, @@ -714,4 +705,127 @@ MinioChunkManager::ListObjects(const std::string& bucket_name, return objects_vec; } +AwsChunkManager::AwsChunkManager(const StorageConfig& storage_config) { + default_bucket_name_ = storage_config.bucket_name; + remote_root_path_ = storage_config.root_path; + + InitSDKAPIDefault(storage_config.log_level); + if(!storage_config.useSSL){ + Aws::SetDefaultTlsConnectionOptions(nullptr); + } + + Aws::S3Crt::ClientConfiguration config = generateConfig(storage_config); + if (storage_config.useIAM) { + BuildIAMClient(storage_config, config, ""); + } else { + BuildAccessKeyClient(storage_config, config); + } + + PreCheck(storage_config); + + LOG_INFO( + "init AwsChunkManager with " + "parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]", + storage_config.address, + storage_config.bucket_name, + storage_config.root_path, + storage_config.useSSL); +} + +GcpChunkManager::GcpChunkManager(const StorageConfig& storage_config) { + default_bucket_name_ = storage_config.bucket_name; + remote_root_path_ = storage_config.root_path; + + if (storage_config.useIAM) { + sdk_options_.httpOptions.httpClientFactory_create_fn = []() { + auto credentials = std::make_shared< + google::cloud::oauth2_internal::GOOGLE_CLOUD_CPP_NS:: + ComputeEngineCredentials>(); + return Aws::MakeShared( + GOOGLE_CLIENT_FACTORY_ALLOCATION_TAG, credentials); + }; + } + + InitSDKAPIDefault(storage_config.log_level); + + Aws::S3Crt::ClientConfiguration config = generateConfig(storage_config); + if (storage_config.useIAM) { + // Using S3 client instead of google client because of compatible protocol + crt_client_ = std::make_shared( + config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + storage_config.useVirtualHost); + } else { + BuildAccessKeyClient(storage_config, config); + } + + PreCheck(storage_config); + + LOG_INFO( + "init GcpChunkManager with " + "parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]", + storage_config.address, + storage_config.bucket_name, + storage_config.root_path, + storage_config.useSSL); +} + +AliyunChunkManager::AliyunChunkManager(const StorageConfig& storage_config) { + default_bucket_name_ = storage_config.bucket_name; + remote_root_path_ = storage_config.root_path; + + InitSDKAPIDefault(storage_config.log_level); + + Aws::S3Crt::ClientConfiguration config = generateConfig(storage_config); + + // For aliyun oss, support use virtual host mode + StorageConfig mutable_config = storage_config; + mutable_config.useVirtualHost = true; + if (storage_config.useIAM) { + BuildIAMClient(storage_config, config, "AliyunSTSAssumeRoleWebIdentityCredentialsProvider"); + } else { + BuildAccessKeyClient(mutable_config, config); + } + + PreCheck(storage_config); + + LOG_INFO( + "init AliyunChunkManager with " + "parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]", + storage_config.address, + storage_config.bucket_name, + storage_config.root_path, + storage_config.useSSL); +} + +TencentCloudChunkManager::TencentCloudChunkManager( + const StorageConfig& storage_config) { + default_bucket_name_ = storage_config.bucket_name; + remote_root_path_ = storage_config.root_path; + + InitSDKAPIDefault(storage_config.log_level); + + Aws::S3Crt::ClientConfiguration config = generateConfig(storage_config); + + StorageConfig mutable_config = storage_config; + mutable_config.useVirtualHost = true; + if (storage_config.useIAM) { + BuildIAMClient + (storage_config, config, "TencentCloudSTSAssumeRoleWebIdentityCredentialsProvider"); + + } else { + BuildAccessKeyClient(mutable_config, config); + } + + PreCheck(storage_config); + + LOG_INFO( + "init TencentCloudChunkManager with " + "parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]", + storage_config.address, + storage_config.bucket_name, + storage_config.root_path, + storage_config.useSSL); +} + } // namespace milvus::storage diff --git a/internal/core/src/storage/MinioChunkManager.h b/internal/core/src/storage/MinioChunkManager.h index 5c8e4a2722a51..05a1b5a35cd87 100644 --- a/internal/core/src/storage/MinioChunkManager.h +++ b/internal/core/src/storage/MinioChunkManager.h @@ -30,6 +30,8 @@ #include #include #include +#include +#include #include #include #include @@ -77,12 +79,34 @@ ThrowS3Error(const std::string& func, throw SegcoreError(S3Error, error_message); } + +template +static SegcoreError +ThrowS3CrtError(const std::string& func, + const Aws::S3Crt::S3CrtError& err, + const std::string& fmtString, + Args&&... args){ + std::ostringstream oss; + const auto& message = fmt::format(fmtString, std::forward(args)...); + oss << "Error in " << func << "[errcode:" << int(err.GetResponseCode()) + << ", exception:" << err.GetExceptionName() + << ", errmessage:" << err.GetMessage() << ", params:" << message << "]"; + throw SegcoreError(S3Error, oss.str()); +} + + static bool IsNotFound(const Aws::S3::S3Errors& s3err) { return (s3err == Aws::S3::S3Errors::NO_SUCH_KEY || s3err == Aws::S3::S3Errors::RESOURCE_NOT_FOUND); } +static bool +IsNotFound(const Aws::S3Crt::S3CrtErrors& s3CrtErr){ + return (s3CrtErr == Aws::S3Crt::S3CrtErrors::NO_SUCH_KEY || + s3CrtErr == Aws::S3Crt::S3CrtErrors::RESOURCE_NOT_FOUND); +} + /** * @brief user defined aws logger, redirect aws log to segcore log */ @@ -224,23 +248,44 @@ class MinioChunkManager : public ChunkManager { ShutdownSDKAPI(); void BuildS3Client(const StorageConfig& storage_config, - const Aws::Client::ClientConfiguration& config); + const Aws::S3Crt::ClientConfiguration& config); void BuildAliyunCloudClient(const StorageConfig& storage_config, - const Aws::Client::ClientConfiguration& config); + const Aws::S3Crt::ClientConfiguration& config); void BuildGoogleCloudClient(const StorageConfig& storage_config, - const Aws::Client::ClientConfiguration& config); + const Aws::S3Crt::ClientConfiguration& config); protected: void BuildAccessKeyClient(const StorageConfig& storage_config, - const Aws::Client::ClientConfiguration& config); + const Aws::S3Crt::ClientConfiguration& config); + + template + void + BuildIAMClient(const StorageConfig& storage_config, const Aws::S3Crt::ClientConfiguration& config, const char * description) { + auto provider = + Aws::MakeShared(description); + auto aws_credentials = provider->GetAWSCredentials(); + AssertInfo(!aws_credentials.GetAWSAccessKeyId().empty(), + "if use iam, access key id should not be empty"); + AssertInfo(!aws_credentials.GetAWSSecretKey().empty(), + "if use iam, secret key should not be empty"); + AssertInfo(!aws_credentials.GetSessionToken().empty(), + "if use iam, token should not be empty"); + + crt_client_ = std::make_shared( + provider, + config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + storage_config.useVirtualHost); + } Aws::SDKOptions sdk_options_; static std::atomic init_count_; static std::mutex client_mutex_; std::shared_ptr client_; + std::shared_ptr crt_client_; std::string default_bucket_name_; std::string remote_root_path_; }; diff --git a/internal/core/src/storage/TencentCloudSTSClient.cpp b/internal/core/src/storage/TencentCloudSTSClient.cpp index 18915c2bb42f9..299dda66fd0b0 100644 --- a/internal/core/src/storage/TencentCloudSTSClient.cpp +++ b/internal/core/src/storage/TencentCloudSTSClient.cpp @@ -26,6 +26,7 @@ #include #include #include "TencentCloudSTSClient.h" +#include namespace Aws { namespace Http { @@ -34,11 +35,6 @@ class HttpRequest; enum class HttpResponseCode; } // namespace Http -namespace Client { -Aws::String -ComputeUserAgentString(); -} - namespace Internal { static const char STS_RESOURCE_CLIENT_LOG_TAG[] = diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index 1d63892807ecc..3290bda893b38 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,8 +14,8 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION d143229 ) -set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") +set( milvus-storage_VERSION a8b4f84) +set( GIT_REPOSITORY "https://github.com/MrPresent-Han/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index da95a1226bbdb..b559ca1e734ad 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -100,6 +100,8 @@ set(MILVUS_TEST_FILES test_types.cpp test_growing_storage_v2.cpp test_memory_planner.cpp + test_data_codec.cpp + test_minio_chunk_manager.cpp ) if ( INDEX_ENGINE STREQUAL "cardinal" ) @@ -112,7 +114,7 @@ if ( BUILD_DISK_ANN STREQUAL "ON" ) set(MILVUS_TEST_FILES ${MILVUS_TEST_FILES} #need update aws-sdk-cpp, see more from https://github.com/aws/aws-sdk-cpp/issues/1757 - #test_minio_chunk_manager.cpp + test_minio_chunk_manager.cpp ) endif() diff --git a/internal/core/unittest/test_minio_chunk_manager.cpp b/internal/core/unittest/test_minio_chunk_manager.cpp index 9361ff4f021d2..6f2f5a4e1bbca 100644 --- a/internal/core/unittest/test_minio_chunk_manager.cpp +++ b/internal/core/unittest/test_minio_chunk_manager.cpp @@ -235,7 +235,7 @@ TEST_F(MinioChunkManagerTest, ReadNotExist) { chunk_manager_->DeleteBucket(testBucketName); } -TEST_F(MinioChunkManagerTest, RemovePositive) { +/*TEST_F(MinioChunkManagerTest, RemovePositive) { string testBucketName = "test-remove"; chunk_manager_->SetBucketName(testBucketName); EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName); @@ -261,7 +261,7 @@ TEST_F(MinioChunkManagerTest, RemovePositive) { EXPECT_EQ(exist, false); chunk_manager_->DeleteBucket(testBucketName); -} +}*/ TEST_F(MinioChunkManagerTest, ListWithPrefixPositive) { string testBucketName = "test-listprefix"; @@ -301,6 +301,117 @@ TEST_F(MinioChunkManagerTest, ListWithPrefixPositive) { chunk_manager_->DeleteBucket(testBucketName); } +#include +#include +#include +#include +#include +#include +#include "segcore/segcore_init_c.h" + +Aws::String +ConvertToAwsString(const std::string& str) { + return Aws::String(str.c_str(), str.size()); +} + +TEST(AWSClient, Init){ + SegcoreInit("/home/hanchun/Documents/project/milvus-master-temp/milvus/configs/glog.conf"); + Aws::SDKOptions options; + auto log_level = Aws::Utils::Logging::LogLevel::Trace; + options.loggingOptions.logLevel = log_level; + options.loggingOptions.logger_create_fn = [log_level]() { + return std::make_shared(log_level); + }; + Aws::InitAPI(options); + Aws::SetDefaultTlsConnectionOptions(nullptr); + + static const char* ALLOCATION_TAG = "BucketAndObjectOperationTest"; + Aws::S3Crt::ClientConfiguration s3ClientConfig; + s3ClientConfig.region = "us-east-1"; + s3ClientConfig.scheme = Aws::Http::Scheme::HTTP; + s3ClientConfig.executor = Aws::MakeShared(ALLOCATION_TAG, 4); + s3ClientConfig.throughputTargetGbps = 2.0; + s3ClientConfig.partSize = 5 * 1024 * 1024; + s3ClientConfig.endpointOverride = ConvertToAwsString("http://minio.local:9000"); + + Aws::Auth::AWSCredentials credentials(ConvertToAwsString("minioadmin"), + ConvertToAwsString("minioadmin")); + + std::cout << "hc==start to construct client" << std::endl; + auto client = Aws::MakeShared(ALLOCATION_TAG, + credentials, + s3ClientConfig, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always /*signPayloads*/, + false); + + std::cout << "hc==construct client ok" << std::endl; + + { + auto res = client->ListBuckets(); + std::cout << "hc==list buckets isSuccess:" << res.IsSuccess() << ",errCode:" + << int(res.GetError().GetResponseCode()) << ", exception:" + << res.GetError().GetExceptionName() << ", errMsg:" + << res.GetError().GetMessage() + << std::endl; + + for(auto bucket: res.GetResult().GetBuckets()){ + std::cout << "hc==bucket:" << string(bucket.GetName().c_str(), bucket.GetName().length()) << std::endl; + } + + Aws::S3Crt::Model::ListObjectsRequest request; + request.SetBucket("a-bucket"); + request.SetPrefix("files/insert_log/456616016241632061/456616016241632062/456616016241632078/1/456616016241430263"); + auto listRes = client->ListObjects(request); + for(auto obj: listRes.GetResult().GetContents()){ + std::cout << "hc==List obj:" << obj.GetKey().c_str() << std::endl; + } + } + { + Aws::String objectKey = "files/insert_log/456616016241632061/456616016241632062/456616016241632078/1/456616016241430263"; + Aws::String encodedKey = Aws::Utils::StringUtils::URLEncode(objectKey.c_str()); + Aws::S3Crt::Model::GetObjectRequest request; + request.SetBucket("a-bucket"); + request.SetKey(encodedKey); + auto res = client->GetObject(request); + std::cout << "hc==get isSuccess:" << res.IsSuccess() << ",errCode:" + << int(res.GetError().GetResponseCode()) << ", exception:" + << res.GetError().GetExceptionName() << ", errMsg:" + << res.GetError().GetMessage() + << ", contentLength:" << res.GetResult().GetContentLength() + << std::endl; + } + + client.reset(); + Aws::ShutdownAPI(options); +} + + /* + * { + Aws::S3Crt::Model::GetObjectRequest request; + request.SetBucket("a-bucket"); + request.SetKey("/files/insert_log/456616016241632061/456616016241632062/456616016241632078/1/456616016241430263"); + auto size = 128; + std::vector buf(size); + char* buf_data = reinterpret_cast(buf.data()); + request.SetResponseStreamFactory([buf_data, size]() { + std::cout << "hc===write stream is called" << std::endl; + std::unique_ptr stream( + Aws::New("")); + auto str_buf = stream->rdbuf(); + auto str = str_buf->str(); + std::cout << "hc===data in content.size: " << str.size() << ", str:" << str << std::endl; + return stream.release(); + }); + auto res = client->GetObject(request); + std::cout << "hc===get isSuccess:" << res.IsSuccess() << ",errCode:" + << int(res.GetError().GetResponseCode()) << ", exception:" + << res.GetError().GetExceptionName() << ", errMsg:" + << res.GetError().GetMessage() + << ", contentLength:" << res.GetResult().GetContentLength() + << std::endl; + } + * */ + //TEST_F(AliyunChunkManagerTest, ReadPositive) { // string testBucketName = "vdc-infra-poc"; // chunk_manager_->SetBucketName(testBucketName);