Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/core/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MilvusConan(ConanFile):
"lzo/2.10#9517fc1bcc4d4cc229a79806003a1baa",
"arrow/17.0.0#8cea917a6e06ca17c28411966d6fcdd7",
"openssl/3.1.2#02594c4c0a6e2b4feb3cd15119993597",
"aws-sdk-cpp/1.9.234#28d6d2c175975900ce292bafe8022c88",
"aws-sdk-cpp/1.11.352@milvus/dev",
"googleapis/cci.20221108#65604e1b3b9a6b363044da625b201a2a",
"benchmark/1.7.0#459f3bb1a64400a886ba43047576df3c",
"gtest/1.13.0#f9548be18a41ccc6367efcb8146e92be",
Expand Down Expand Up @@ -68,6 +68,7 @@ class MilvusConan(ConanFile):
"arrow:with_jemalloc": True,
"arrow:shared": False,
"arrow:with_s3": True,
"aws-sdk-cpp:s3-crt": True,
"aws-sdk-cpp:config": True,
"aws-sdk-cpp:text-to-speech": False,
"aws-sdk-cpp:transfer": False,
Expand Down
5 changes: 0 additions & 5 deletions internal/core/src/storage/AliyunSTSClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ class HttpRequest;
enum class HttpResponseCode;
} // namespace Http

namespace Client {
Aws::String
ComputeUserAgentString();
}

namespace Internal {

static const char STS_RESOURCE_CLIENT_LOG_TAG[] =
Expand Down
206 changes: 1 addition & 205 deletions internal/core/src/storage/ChunkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include <fstream>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/auth/STSCredentialsProvider.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/DeleteBucketRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
Expand All @@ -30,215 +28,13 @@

#include "storage/MinioChunkManager.h"
#include "storage/AliyunSTSClient.h"
#include "storage/TencentCloudSTSClient.h"
#include "storage/AliyunCredentialsProvider.h"
#include "storage/TencentCloudCredentialsProvider.h"
#include "common/Consts.h"
#include "common/EasyAssert.h"
#include "log/Log.h"
#include "signal.h"

namespace milvus::storage {

Aws::String
ConvertToAwsString(const std::string& str) {
return Aws::String(str.c_str(), str.size());
}

Aws::Client::ClientConfiguration
generateConfig(const StorageConfig& storage_config) {
// The ClientConfiguration default constructor will take a long time.
// For more details, please refer to https://github.com/aws/aws-sdk-cpp/issues/1440
static Aws::Client::ClientConfiguration g_config;
Aws::Client::ClientConfiguration config = g_config;
config.endpointOverride = ConvertToAwsString(storage_config.address);

// Three cases:
// 1. no ssl, verifySSL=false
// 2. self-signed certificate, verifySSL=false
// 3. CA-signed certificate, verifySSL=true
if (storage_config.useSSL) {
config.scheme = Aws::Http::Scheme::HTTPS;
config.verifySSL = true;
if (!storage_config.sslCACert.empty()) {
config.caPath = ConvertToAwsString(storage_config.sslCACert);
config.verifySSL = false;
}
} else {
config.scheme = Aws::Http::Scheme::HTTP;
config.verifySSL = false;
}

if (!storage_config.region.empty()) {
config.region = ConvertToAwsString(storage_config.region);
}

config.requestTimeoutMs = storage_config.requestTimeoutMs == 0
? DEFAULT_CHUNK_MANAGER_REQUEST_TIMEOUT_MS
: storage_config.requestTimeoutMs;

return config;
}

AwsChunkManager::AwsChunkManager(const StorageConfig& storage_config) {
default_bucket_name_ = storage_config.bucket_name;
remote_root_path_ = storage_config.root_path;

InitSDKAPIDefault(storage_config.log_level);

Aws::Client::ClientConfiguration config = generateConfig(storage_config);
if (storage_config.useIAM) {
auto provider =
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
auto aws_credentials = provider->GetAWSCredentials();
AssertInfo(!aws_credentials.GetAWSAccessKeyId().empty(),
"if use iam, access key id should not be empty");
AssertInfo(!aws_credentials.GetAWSSecretKey().empty(),
"if use iam, secret key should not be empty");
AssertInfo(!aws_credentials.GetSessionToken().empty(),
"if use iam, token should not be empty");

client_ = std::make_shared<Aws::S3::S3Client>(
provider,
config,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
storage_config.useVirtualHost);
} else {
BuildAccessKeyClient(storage_config, config);
}

PreCheck(storage_config);

LOG_INFO(
"init AwsChunkManager with "
"parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]",
storage_config.address,
storage_config.bucket_name,
storage_config.root_path,
storage_config.useSSL);
}

GcpChunkManager::GcpChunkManager(const StorageConfig& storage_config) {
default_bucket_name_ = storage_config.bucket_name;
remote_root_path_ = storage_config.root_path;

if (storage_config.useIAM) {
sdk_options_.httpOptions.httpClientFactory_create_fn = []() {
auto credentials = std::make_shared<
google::cloud::oauth2_internal::GOOGLE_CLOUD_CPP_NS::
ComputeEngineCredentials>();
return Aws::MakeShared<GoogleHttpClientFactory>(
GOOGLE_CLIENT_FACTORY_ALLOCATION_TAG, credentials);
};
}

InitSDKAPIDefault(storage_config.log_level);

Aws::Client::ClientConfiguration config = generateConfig(storage_config);
if (storage_config.useIAM) {
// Using S3 client instead of google client because of compatible protocol
client_ = std::make_shared<Aws::S3::S3Client>(
config,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
storage_config.useVirtualHost);
} else {
BuildAccessKeyClient(storage_config, config);
}

PreCheck(storage_config);

LOG_INFO(
"init GcpChunkManager with "
"parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]",
storage_config.address,
storage_config.bucket_name,
storage_config.root_path,
storage_config.useSSL);
}

AliyunChunkManager::AliyunChunkManager(const StorageConfig& storage_config) {
default_bucket_name_ = storage_config.bucket_name;
remote_root_path_ = storage_config.root_path;

InitSDKAPIDefault(storage_config.log_level);

Aws::Client::ClientConfiguration config = generateConfig(storage_config);

// For aliyun oss, support use virtual host mode
StorageConfig mutable_config = storage_config;
mutable_config.useVirtualHost = true;
if (storage_config.useIAM) {
auto aliyun_provider = Aws::MakeShared<
Aws::Auth::AliyunSTSAssumeRoleWebIdentityCredentialsProvider>(
"AliyunSTSAssumeRoleWebIdentityCredentialsProvider");
auto aliyun_credentials = aliyun_provider->GetAWSCredentials();
AssertInfo(!aliyun_credentials.GetAWSAccessKeyId().empty(),
"if use iam, access key id should not be empty");
AssertInfo(!aliyun_credentials.GetAWSSecretKey().empty(),
"if use iam, secret key should not be empty");
AssertInfo(!aliyun_credentials.GetSessionToken().empty(),
"if use iam, token should not be empty");
client_ = std::make_shared<Aws::S3::S3Client>(
aliyun_provider,
config,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
mutable_config.useVirtualHost);
} else {
BuildAccessKeyClient(mutable_config, config);
}

PreCheck(storage_config);

LOG_INFO(
"init AliyunChunkManager with "
"parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]",
storage_config.address,
storage_config.bucket_name,
storage_config.root_path,
storage_config.useSSL);
}

TencentCloudChunkManager::TencentCloudChunkManager(
const StorageConfig& storage_config) {
default_bucket_name_ = storage_config.bucket_name;
remote_root_path_ = storage_config.root_path;

InitSDKAPIDefault(storage_config.log_level);

Aws::Client::ClientConfiguration config = generateConfig(storage_config);

StorageConfig mutable_config = storage_config;
mutable_config.useVirtualHost = true;
if (storage_config.useIAM) {
auto tencent_cloud_provider = Aws::MakeShared<
Aws::Auth::TencentCloudSTSAssumeRoleWebIdentityCredentialsProvider>(
"TencentCloudSTSAssumeRoleWebIdentityCredentialsProvider");
auto tencent_cloud_credentials =
tencent_cloud_provider->GetAWSCredentials();
AssertInfo(!tencent_cloud_credentials.GetAWSAccessKeyId().empty(),
"if use iam, access key id should not be empty");
AssertInfo(!tencent_cloud_credentials.GetAWSSecretKey().empty(),
"if use iam, secret key should not be empty");
AssertInfo(!tencent_cloud_credentials.GetSessionToken().empty(),
"if use iam, token should not be empty");
client_ = std::make_shared<Aws::S3::S3Client>(
tencent_cloud_provider,
config,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
mutable_config.useVirtualHost);
} else {
BuildAccessKeyClient(mutable_config, config);
}

PreCheck(storage_config);

LOG_INFO(
"init TencentCloudChunkManager with "
"parameter[endpoint={}][bucket_name={}][root_path={}][use_secure={}]",
storage_config.address,
storage_config.bucket_name,
storage_config.root_path,
storage_config.useSSL);
}
namespace milvus::storage {

} // namespace milvus::storage
Loading
Loading