Skip to content

Commit

Permalink
SpillConfig.storage
Browse files Browse the repository at this point in the history
  • Loading branch information
forsaken628 committed Jan 3, 2025
1 parent 0b3a1f6 commit 0503d61
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 85 deletions.
1 change: 0 additions & 1 deletion src/common/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub struct StorageConfig {
pub num_cpus: u64,
pub allow_insecure: bool,
pub params: StorageParams,
pub spill_bucket: Option<String>,
}

// TODO: This config should be moved out of common-storage crate.
Expand Down
74 changes: 21 additions & 53 deletions src/common/storage/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ pub struct DataOperator {
operator: Operator,
spill_operator: Option<Operator>,
params: StorageParams,
spill_bucket: Option<String>,
spill_params: Option<StorageParams>,
}

impl DataOperator {
Expand All @@ -422,62 +422,62 @@ impl DataOperator {
}
}

pub fn spill_bucket(&self) -> Option<&String> {
self.spill_bucket.as_ref()
pub fn spill_params(&self) -> Option<&StorageParams> {
self.spill_params.as_ref()
}

pub fn params(&self) -> StorageParams {
self.params.clone()
}

#[async_backtrace::framed]
pub async fn init(conf: &StorageConfig) -> databend_common_exception::Result<()> {
GlobalInstance::set(Self::try_create(conf).await?);
pub async fn init(
conf: &StorageConfig,
spill_params: Option<StorageParams>,
) -> databend_common_exception::Result<()> {
GlobalInstance::set(Self::try_create(conf, spill_params).await?);

Ok(())
}

/// Create a new data operator without check.
pub fn try_new(conf: &StorageConfig) -> databend_common_exception::Result<DataOperator> {
pub fn try_new(
conf: &StorageConfig,
spill_params: Option<StorageParams>,
) -> databend_common_exception::Result<DataOperator> {
let operator = init_operator(&conf.params)?;

let (spill_operator, spill_bucket) = match Self::spill_params(conf) {
Some(params) => {
let op = init_operator(&params)?;
(Some(op), conf.spill_bucket.clone())
}
None => (None, None),
};
let spill_operator = spill_params.as_ref().map(init_operator).transpose()?;

Ok(DataOperator {
operator,
params: conf.params.clone(),
spill_operator,
spill_bucket,
spill_params,
})
}

#[async_backtrace::framed]
pub async fn try_create(
conf: &StorageConfig,
spill_params: Option<StorageParams>,
) -> databend_common_exception::Result<DataOperator> {
let operator = init_operator(&conf.params)?;
check_operator(&operator, &conf.params).await?;

let (spill_operator, spill_bucket) = match Self::spill_params(conf) {
let spill_operator = match &spill_params {
Some(params) => {
let op = init_operator(&params)?;
check_operator(&op, &params).await?;
(Some(op), conf.spill_bucket.clone())
let op = init_operator(params)?;
check_operator(&op, params).await?;
Some(op)
}
None => (None, None),
None => None,
};

Ok(DataOperator {
operator,
params: conf.params.clone(),
spill_operator,
spill_bucket,
spill_params,
})
}

Expand All @@ -492,38 +492,6 @@ impl DataOperator {
pub fn instance() -> DataOperator {
GlobalInstance::get()
}

fn spill_params(conf: &StorageConfig) -> Option<StorageParams> {
let bucket = conf.spill_bucket.clone()?;

match &conf.params {
StorageParams::Azblob(c) => Some(StorageParams::Azblob(StorageAzblobConfig {
container: bucket,
..c.clone()
})),
StorageParams::Gcs(c) => Some(StorageParams::Gcs(StorageGcsConfig {
bucket,
..c.clone()
})),
StorageParams::Obs(c) => Some(StorageParams::Obs(StorageObsConfig {
bucket,
..c.clone()
})),
StorageParams::Oss(c) => Some(StorageParams::Oss(StorageOssConfig {
bucket,
..c.clone()
})),
StorageParams::S3(c) => Some(StorageParams::S3(StorageS3Config {
bucket,
..c.clone()
})),
StorageParams::Cos(c) => Some(StorageParams::Cos(StorageCosConfig {
bucket,
..c.clone()
})),
_ => None,
}
}
}

pub async fn check_operator(
Expand Down
50 changes: 21 additions & 29 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,6 @@ pub struct StorageConfig {
// COS storage backend config
#[clap(flatten)]
pub cos: CosStorageConfig,

// Spill config for any storage backend
#[clap(flatten)]
pub spill: StorageSpillConfig,
}

impl Default for StorageConfig {
Expand Down Expand Up @@ -340,9 +336,6 @@ impl From<InnerStorageConfig> for StorageConfig {
obs: Default::default(),
webhdfs: Default::default(),
cos: Default::default(),
spill: StorageSpillConfig {
spill_bucket: inner.spill_bucket.unwrap_or_default(),
},

// Deprecated fields
storage_type: None,
Expand Down Expand Up @@ -415,11 +408,6 @@ impl TryInto<InnerStorageConfig> for StorageConfig {
Ok(InnerStorageConfig {
num_cpus: self.storage_num_cpus,
allow_insecure: self.allow_insecure,
spill_bucket: if self.spill.spill_bucket.is_empty() {
None
} else {
Some(self.spill.spill_bucket)
},
params: {
match self.typ.as_str() {
"azblob" => StorageParams::Azblob(self.azblob.try_into()?),
Expand Down Expand Up @@ -1323,23 +1311,6 @@ impl TryFrom<CosStorageConfig> for InnerStorageCosConfig {
}
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Args)]
pub struct StorageSpillConfig {
#[clap(long = "storage-spill-bucket", value_name = "VALUE", default_value_t)]
#[serde(rename = "bucket")]
pub spill_bucket: String,
}

impl Default for StorageSpillConfig {
fn default() -> Self {
Self {
spill_bucket: InnerStorageConfig::default()
.spill_bucket
.unwrap_or_default(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SettingValue {
UInt64(u64),
Expand Down Expand Up @@ -3001,6 +2972,9 @@ pub struct SpillConfig {
#[clap(long, value_name = "VALUE", default_value = "18446744073709551615")]
/// Allow space in bytes to spill to local disk.
pub spill_local_disk_max_bytes: u64,

#[clap(skip)]
pub storage: Option<StorageConfig>,
}

impl Default for SpillConfig {
Expand Down Expand Up @@ -3158,20 +3132,38 @@ mod cache_config_converters {
None
};

let storage_params = spill
.storage
.map(|storage| {
let storage: InnerStorageConfig = storage.try_into()?;
Ok::<_, ErrorCode>(storage.params)
})
.transpose()?;

Ok(inner::LocalSpillConfig {
local_writeable_root,
path: spill.spill_local_disk_path,
reserved_disk_ratio: spill.spill_local_disk_reserved_space_percentage / 100.0,
global_bytes_limit: spill.spill_local_disk_max_bytes,
storage_params,
})
}

impl From<inner::LocalSpillConfig> for SpillConfig {
fn from(value: inner::LocalSpillConfig) -> Self {
let storage = value.storage_params.map(|params| {
InnerStorageConfig {
params,
..Default::default()
}
.into()
});

Self {
spill_local_disk_path: value.path,
spill_local_disk_reserved_space_percentage: value.reserved_disk_ratio * 100.0,
spill_local_disk_max_bytes: value.global_bytes_limit,
storage,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_exception::Result;
use databend_common_grpc::RpcClientConf;
use databend_common_grpc::RpcClientTlsConfig;
use databend_common_meta_app::principal::UserSettingValue;
use databend_common_meta_app::storage::StorageParams;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::tenant::TenantQuota;
use databend_common_storage::StorageConfig;
Expand Down Expand Up @@ -726,6 +727,8 @@ pub struct LocalSpillConfig {

/// Allow bytes use of disk space.
pub global_bytes_limit: u64,

pub storage_params: Option<StorageParams>,
}

impl LocalSpillConfig {
Expand All @@ -752,6 +755,7 @@ impl LocalSpillConfig {
path,
reserved_disk_ratio: OrderedFloat(reserved_disk_ratio),
global_bytes_limit,
storage_params: None,
}
}
}
Expand All @@ -763,6 +767,7 @@ impl Default for LocalSpillConfig {
path: "".to_string(),
reserved_disk_ratio: OrderedFloat(0.3),
global_bytes_limit: u64::MAX,
storage_params: None,
}
}
}
2 changes: 1 addition & 1 deletion src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl GlobalServices {

RoleCacheManager::init()?;

DataOperator::init(&config.storage).await?;
DataOperator::init(&config.storage, config.spill.storage_params.clone()).await?;
ShareTableConfig::init(
&config.query.share_endpoint_address,
&config.query.share_endpoint_auth_token_file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'spill' | 'spill_local_disk_max_bytes' | '18446744073709551615' | '' |
| 'spill' | 'spill_local_disk_path' | '' | '' |
| 'spill' | 'spill_local_disk_reserved_space_percentage' | '30.0' | '' |
| 'spill' | 'storage' | 'null' | '' |
| 'storage' | 'allow_insecure' | 'true' | '' |
| 'storage' | 'azblob.account_key' | '' | '' |
| 'storage' | 'azblob.account_name' | '' | '' |
Expand Down Expand Up @@ -188,7 +189,6 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'storage' | 's3.root' | '' | '' |
| 'storage' | 's3.secret_access_key' | '' | '' |
| 'storage' | 's3.security_token' | '' | '' |
| 'storage' | 'spill.bucket' | '' | '' |
| 'storage' | 'storage_num_cpus' | 'null' | '' |
| 'storage' | 'storage_type' | 'null' | '' |
| 'storage' | 'type' | 'fs' | '' |
Expand Down

0 comments on commit 0503d61

Please sign in to comment.