From 0503d613b63eacc9590e2906442eeb60e0133e7d Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 3 Jan 2025 15:56:59 +0800 Subject: [PATCH] SpillConfig.storage --- src/common/storage/src/config.rs | 1 - src/common/storage/src/operator.rs | 74 ++++++------------- src/query/config/src/config.rs | 50 ++++++------- src/query/config/src/inner.rs | 5 ++ src/query/service/src/global_services.rs | 2 +- .../storages/testdata/configs_table_basic.txt | 2 +- 6 files changed, 49 insertions(+), 85 deletions(-) diff --git a/src/common/storage/src/config.rs b/src/common/storage/src/config.rs index 22e1a771bdbbb..7a0c832fc3bd7 100644 --- a/src/common/storage/src/config.rs +++ b/src/common/storage/src/config.rs @@ -44,7 +44,6 @@ pub struct StorageConfig { pub num_cpus: u64, pub allow_insecure: bool, pub params: StorageParams, - pub spill_bucket: Option, } // TODO: This config should be moved out of common-storage crate. diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index 3d9f465e8f568..aae9ff83f38ee 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -406,7 +406,7 @@ pub struct DataOperator { operator: Operator, spill_operator: Option, params: StorageParams, - spill_bucket: Option, + spill_params: Option, } impl DataOperator { @@ -422,8 +422,8 @@ impl DataOperator { } } - pub fn spill_bucket(&self) -> Option<&String> { - self.spill_bucket.as_ref() + pub fn spill_params(&self) -> Option<&StorageParams> { + self.spill_params.as_ref() } pub fn params(&self) -> StorageParams { @@ -431,53 +431,53 @@ impl DataOperator { } #[async_backtrace::framed] - pub async fn init(conf: &StorageConfig) -> databend_common_exception::Result<()> { - GlobalInstance::set(Self::try_create(conf).await?); + pub async fn init( + conf: &StorageConfig, + spill_params: Option, + ) -> databend_common_exception::Result<()> { + GlobalInstance::set(Self::try_create(conf, spill_params).await?); Ok(()) } /// Create a new data operator without check. - pub fn try_new(conf: &StorageConfig) -> databend_common_exception::Result { + pub fn try_new( + conf: &StorageConfig, + spill_params: Option, + ) -> databend_common_exception::Result { let operator = init_operator(&conf.params)?; - - let (spill_operator, spill_bucket) = match Self::spill_params(conf) { - Some(params) => { - let op = init_operator(¶ms)?; - (Some(op), conf.spill_bucket.clone()) - } - None => (None, None), - }; + let spill_operator = spill_params.as_ref().map(init_operator).transpose()?; Ok(DataOperator { operator, params: conf.params.clone(), spill_operator, - spill_bucket, + spill_params, }) } #[async_backtrace::framed] pub async fn try_create( conf: &StorageConfig, + spill_params: Option, ) -> databend_common_exception::Result { let operator = init_operator(&conf.params)?; check_operator(&operator, &conf.params).await?; - let (spill_operator, spill_bucket) = match Self::spill_params(conf) { + let spill_operator = match &spill_params { Some(params) => { - let op = init_operator(¶ms)?; - check_operator(&op, ¶ms).await?; - (Some(op), conf.spill_bucket.clone()) + let op = init_operator(params)?; + check_operator(&op, params).await?; + Some(op) } - None => (None, None), + None => None, }; Ok(DataOperator { operator, params: conf.params.clone(), spill_operator, - spill_bucket, + spill_params, }) } @@ -492,38 +492,6 @@ impl DataOperator { pub fn instance() -> DataOperator { GlobalInstance::get() } - - fn spill_params(conf: &StorageConfig) -> Option { - let bucket = conf.spill_bucket.clone()?; - - match &conf.params { - StorageParams::Azblob(c) => Some(StorageParams::Azblob(StorageAzblobConfig { - container: bucket, - ..c.clone() - })), - StorageParams::Gcs(c) => Some(StorageParams::Gcs(StorageGcsConfig { - bucket, - ..c.clone() - })), - StorageParams::Obs(c) => Some(StorageParams::Obs(StorageObsConfig { - bucket, - ..c.clone() - })), - StorageParams::Oss(c) => Some(StorageParams::Oss(StorageOssConfig { - bucket, - ..c.clone() - })), - StorageParams::S3(c) => Some(StorageParams::S3(StorageS3Config { - bucket, - ..c.clone() - })), - StorageParams::Cos(c) => Some(StorageParams::Cos(StorageCosConfig { - bucket, - ..c.clone() - })), - _ => None, - } - } } pub async fn check_operator( diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 2b003a6175aff..f30538a7aafff 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -309,10 +309,6 @@ pub struct StorageConfig { // COS storage backend config #[clap(flatten)] pub cos: CosStorageConfig, - - // Spill config for any storage backend - #[clap(flatten)] - pub spill: StorageSpillConfig, } impl Default for StorageConfig { @@ -340,9 +336,6 @@ impl From for StorageConfig { obs: Default::default(), webhdfs: Default::default(), cos: Default::default(), - spill: StorageSpillConfig { - spill_bucket: inner.spill_bucket.unwrap_or_default(), - }, // Deprecated fields storage_type: None, @@ -415,11 +408,6 @@ impl TryInto for StorageConfig { Ok(InnerStorageConfig { num_cpus: self.storage_num_cpus, allow_insecure: self.allow_insecure, - spill_bucket: if self.spill.spill_bucket.is_empty() { - None - } else { - Some(self.spill.spill_bucket) - }, params: { match self.typ.as_str() { "azblob" => StorageParams::Azblob(self.azblob.try_into()?), @@ -1323,23 +1311,6 @@ impl TryFrom for InnerStorageCosConfig { } } -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Args)] -pub struct StorageSpillConfig { - #[clap(long = "storage-spill-bucket", value_name = "VALUE", default_value_t)] - #[serde(rename = "bucket")] - pub spill_bucket: String, -} - -impl Default for StorageSpillConfig { - fn default() -> Self { - Self { - spill_bucket: InnerStorageConfig::default() - .spill_bucket - .unwrap_or_default(), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq)] pub enum SettingValue { UInt64(u64), @@ -3001,6 +2972,9 @@ pub struct SpillConfig { #[clap(long, value_name = "VALUE", default_value = "18446744073709551615")] /// Allow space in bytes to spill to local disk. pub spill_local_disk_max_bytes: u64, + + #[clap(skip)] + pub storage: Option, } impl Default for SpillConfig { @@ -3158,20 +3132,38 @@ mod cache_config_converters { None }; + let storage_params = spill + .storage + .map(|storage| { + let storage: InnerStorageConfig = storage.try_into()?; + Ok::<_, ErrorCode>(storage.params) + }) + .transpose()?; + Ok(inner::LocalSpillConfig { local_writeable_root, path: spill.spill_local_disk_path, reserved_disk_ratio: spill.spill_local_disk_reserved_space_percentage / 100.0, global_bytes_limit: spill.spill_local_disk_max_bytes, + storage_params, }) } impl From for SpillConfig { fn from(value: inner::LocalSpillConfig) -> Self { + let storage = value.storage_params.map(|params| { + InnerStorageConfig { + params, + ..Default::default() + } + .into() + }); + Self { spill_local_disk_path: value.path, spill_local_disk_reserved_space_percentage: value.reserved_disk_ratio * 100.0, spill_local_disk_max_bytes: value.global_bytes_limit, + storage, } } } diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index b46f3f306afaa..61a4b0d9b0260 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -29,6 +29,7 @@ use databend_common_exception::Result; use databend_common_grpc::RpcClientConf; use databend_common_grpc::RpcClientTlsConfig; use databend_common_meta_app::principal::UserSettingValue; +use databend_common_meta_app::storage::StorageParams; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::tenant::TenantQuota; use databend_common_storage::StorageConfig; @@ -726,6 +727,8 @@ pub struct LocalSpillConfig { /// Allow bytes use of disk space. pub global_bytes_limit: u64, + + pub storage_params: Option, } impl LocalSpillConfig { @@ -752,6 +755,7 @@ impl LocalSpillConfig { path, reserved_disk_ratio: OrderedFloat(reserved_disk_ratio), global_bytes_limit, + storage_params: None, } } } @@ -763,6 +767,7 @@ impl Default for LocalSpillConfig { path: "".to_string(), reserved_disk_ratio: OrderedFloat(0.3), global_bytes_limit: u64::MAX, + storage_params: None, } } } diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 263a7b5b86f3d..7be4edeea5827 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -139,7 +139,7 @@ impl GlobalServices { RoleCacheManager::init()?; - DataOperator::init(&config.storage).await?; + DataOperator::init(&config.storage, config.spill.storage_params.clone()).await?; ShareTableConfig::init( &config.query.share_endpoint_address, &config.query.share_endpoint_auth_token_file, diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index b6ae4787b47aa..28d1ec00f7bea 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -145,6 +145,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'spill' | 'spill_local_disk_max_bytes' | '18446744073709551615' | '' | | 'spill' | 'spill_local_disk_path' | '' | '' | | 'spill' | 'spill_local_disk_reserved_space_percentage' | '30.0' | '' | +| 'spill' | 'storage' | 'null' | '' | | 'storage' | 'allow_insecure' | 'true' | '' | | 'storage' | 'azblob.account_key' | '' | '' | | 'storage' | 'azblob.account_name' | '' | '' | @@ -188,7 +189,6 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'storage' | 's3.root' | '' | '' | | 'storage' | 's3.secret_access_key' | '' | '' | | 'storage' | 's3.security_token' | '' | '' | -| 'storage' | 'spill.bucket' | '' | '' | | 'storage' | 'storage_num_cpus' | 'null' | '' | | 'storage' | 'storage_type' | 'null' | '' | | 'storage' | 'type' | 'fs' | '' |