Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ class IOConfig:
gravitino: GravitinoConfig
cos: CosConfig
opendal_backends: dict[str, dict[str, str]]
protocol_aliases: dict[str, str]

def __init__(
self,
Expand All @@ -1006,6 +1007,7 @@ class IOConfig:
gravitino: GravitinoConfig | None = None,
cos: CosConfig | None = None,
opendal_backends: dict[str, dict[str, str]] | None = None,
protocol_aliases: dict[str, str] | None = None,
): ...
def replace(
self,
Expand All @@ -1020,6 +1022,7 @@ class IOConfig:
gravitino: GravitinoConfig | None = None,
cos: CosConfig | None = None,
opendal_backends: dict[str, dict[str, str]] | None = None,
protocol_aliases: dict[str, str] | None = None,
) -> IOConfig:
"""Replaces values if provided, returning a new IOConfig."""
...
Expand Down
23 changes: 23 additions & 0 deletions src/common/io-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub struct IOConfig {
/// Additional backends configured via OpenDAL.
/// Keys are scheme names (e.g. "oss", "cos"), values are key-value config maps.
pub opendal_backends: BTreeMap<String, BTreeMap<String, String>>,
/// Protocol aliases: maps custom scheme names to existing scheme names.
/// For example, {"my-s3": "s3"} rewrites "my-s3://bucket/path" to "s3://bucket/path".
pub protocol_aliases: BTreeMap<String, String>,
}

impl IOConfig {
Expand Down Expand Up @@ -74,8 +77,28 @@ impl IOConfig {
if !self.opendal_backends.is_empty() {
res.push(format!("OpenDAL backends = {:?}", self.opendal_backends));
}
if !self.protocol_aliases.is_empty() {
res.push(format!("Protocol aliases = {:?}", self.protocol_aliases));
}
res
}

/// Validates that no protocol alias key shadows a built-in scheme.
pub fn validate_protocol_aliases(&self) -> std::result::Result<(), String> {
const BUILTIN_SCHEMES: &[&str] = &[
"file", "http", "https", "s3", "s3a", "s3n", "az", "abfs", "abfss", "gcs", "gs", "hf",
"tos", "cos", "cosn", "vol+dbfs", "dbfs", "gvfs",
];
for key in self.protocol_aliases.keys() {
if BUILTIN_SCHEMES.contains(&key.as_str()) {
return Err(format!(
"Protocol alias key '{key}' conflicts with built-in scheme. \
Aliases can only map new custom scheme names to existing schemes."
));
}
}
Ok(())
}
Comment on lines +87 to +101
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alias target values are not validated against known schemes

validate_protocol_aliases only verifies that alias keys don't shadow built-in schemes. It does not validate that alias values (targets) actually refer to a known scheme or a registered OpenDAL backend. A config like {"my-proto": "typo-schme"} will be accepted at construction time and only fail at runtime when a URL is first resolved. Consider validating that alias targets are either built-in schemes or present in opendal_backends, so users catch configuration mistakes early.

}

impl Display for IOConfig {
Expand Down
156 changes: 91 additions & 65 deletions src/common/io-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ pub struct CosConfig {
#[pymethods]
impl IOConfig {
#[new]
#[must_use]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (
s3=None,
Expand All @@ -226,7 +225,8 @@ impl IOConfig {
tos=None,
gravitino=None,
cos=None,
opendal_backends=None
opendal_backends=None,
protocol_aliases=None
))]
#[allow(clippy::too_many_arguments)]
Comment on lines 216 to 231
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate #[allow(clippy::too_many_arguments)] suppression

The #[allow(clippy::too_many_arguments)] attribute appears twice for the new method — once at line 216 before the #[pyo3(signature = (...))] attribute and again at line 231 immediately before the fn declaration. The same duplication occurs on the replace method (lines 273 and 288). Per project rules, clippy warnings should be fixed rather than suppressed, but at minimum the redundant duplicate suppressions should be removed.

Context Used: Rule from dashboard - Fix clippy warnings instead of suppressing them with allow attributes in Rust code. (source)

pub fn new(
Expand All @@ -241,30 +241,36 @@ impl IOConfig {
gravitino: Option<GravitinoConfig>,
cos: Option<CosConfig>,
opendal_backends: Option<HashMap<String, HashMap<String, String>>>,
) -> Self {
Self {
config: config::IOConfig {
s3: s3.unwrap_or_default().config,
azure: azure.unwrap_or_default().config,
gcs: gcs.unwrap_or_default().config,
http: http.unwrap_or_default().config,
unity: unity.unwrap_or_default().config,
hf: hf.unwrap_or_default().config,
disable_suffix_range: disable_suffix_range.unwrap_or_default(),
tos: tos.unwrap_or_default().config,
gravitino: gravitino.unwrap_or_default().config,
cos: cos.unwrap_or_default().config,
opendal_backends: opendal_backends
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, v.into_iter().collect()))
.collect(),
},
}
protocol_aliases: Option<HashMap<String, String>>,
) -> PyResult<Self> {
let cfg = config::IOConfig {
s3: s3.unwrap_or_default().config,
azure: azure.unwrap_or_default().config,
gcs: gcs.unwrap_or_default().config,
http: http.unwrap_or_default().config,
unity: unity.unwrap_or_default().config,
hf: hf.unwrap_or_default().config,
disable_suffix_range: disable_suffix_range.unwrap_or_default(),
tos: tos.unwrap_or_default().config,
gravitino: gravitino.unwrap_or_default().config,
cos: cos.unwrap_or_default().config,
opendal_backends: opendal_backends
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, v.into_iter().collect()))
.collect(),
protocol_aliases: protocol_aliases
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k.to_lowercase(), v.to_lowercase()))
.collect(),
};
cfg.validate_protocol_aliases()
.map_err(pyo3::exceptions::PyValueError::new_err)?;
Ok(Self { config: cfg })
}

#[allow(clippy::too_many_arguments)]
#[must_use]
#[pyo3(signature = (
s3=None,
azure=None,
Expand All @@ -276,7 +282,8 @@ impl IOConfig {
tos=None,
gravitino=None,
cos=None,
opendal_backends=None
opendal_backends=None,
protocol_aliases=None
))]
#[allow(clippy::too_many_arguments)]
pub fn replace(
Expand All @@ -292,47 +299,55 @@ impl IOConfig {
gravitino: Option<GravitinoConfig>,
cos: Option<CosConfig>,
opendal_backends: Option<HashMap<String, HashMap<String, String>>>,
) -> Self {
Self {
config: config::IOConfig {
s3: s3
.map(|s3| s3.config)
.unwrap_or_else(|| self.config.s3.clone()),
azure: azure
.map(|azure| azure.config)
.unwrap_or_else(|| self.config.azure.clone()),
gcs: gcs
.map(|gcs| gcs.config)
.unwrap_or_else(|| self.config.gcs.clone()),
http: http
.map(|http| http.config)
.unwrap_or_else(|| self.config.http.clone()),
unity: unity
.map(|unity| unity.config)
.unwrap_or_else(|| self.config.unity.clone()),
hf: hf
.map(|hf| hf.config)
.unwrap_or_else(|| self.config.hf.clone()),
disable_suffix_range: disable_suffix_range
.unwrap_or(self.config.disable_suffix_range),
tos: tos
.map(|tos| tos.config)
.unwrap_or_else(|| self.config.tos.clone()),
gravitino: gravitino
.map(|gravitino| gravitino.config)
.unwrap_or_else(|| self.config.gravitino.clone()),
cos: cos
.map(|cos| cos.config)
.unwrap_or_else(|| self.config.cos.clone()),
opendal_backends: opendal_backends
.map(|b| {
b.into_iter()
.map(|(k, v)| (k, v.into_iter().collect()))
.collect()
})
.unwrap_or_else(|| self.config.opendal_backends.clone()),
},
}
protocol_aliases: Option<HashMap<String, String>>,
) -> PyResult<Self> {
let cfg = config::IOConfig {
s3: s3
.map(|s3| s3.config)
.unwrap_or_else(|| self.config.s3.clone()),
azure: azure
.map(|azure| azure.config)
.unwrap_or_else(|| self.config.azure.clone()),
gcs: gcs
.map(|gcs| gcs.config)
.unwrap_or_else(|| self.config.gcs.clone()),
http: http
.map(|http| http.config)
.unwrap_or_else(|| self.config.http.clone()),
unity: unity
.map(|unity| unity.config)
.unwrap_or_else(|| self.config.unity.clone()),
hf: hf
.map(|hf| hf.config)
.unwrap_or_else(|| self.config.hf.clone()),
disable_suffix_range: disable_suffix_range.unwrap_or(self.config.disable_suffix_range),
tos: tos
.map(|tos| tos.config)
.unwrap_or_else(|| self.config.tos.clone()),
gravitino: gravitino
.map(|gravitino| gravitino.config)
.unwrap_or_else(|| self.config.gravitino.clone()),
cos: cos
.map(|cos| cos.config)
.unwrap_or_else(|| self.config.cos.clone()),
opendal_backends: opendal_backends
.map(|b| {
b.into_iter()
.map(|(k, v)| (k, v.into_iter().collect()))
.collect()
})
.unwrap_or_else(|| self.config.opendal_backends.clone()),
protocol_aliases: protocol_aliases
.map(|a| {
a.into_iter()
.map(|(k, v)| (k.to_lowercase(), v.to_lowercase()))
.collect()
})
.unwrap_or_else(|| self.config.protocol_aliases.clone()),
};
cfg.validate_protocol_aliases()
.map_err(pyo3::exceptions::PyValueError::new_err)?;
Ok(Self { config: cfg })
}

pub fn __repr__(&self) -> PyResult<String> {
Expand Down Expand Up @@ -415,6 +430,17 @@ impl IOConfig {
.collect())
}

/// Protocol aliases mapping custom scheme names to existing schemes
#[getter]
pub fn protocol_aliases(&self) -> PyResult<HashMap<String, String>> {
Ok(self
.config
.protocol_aliases
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect())
}

/// Configuration to be used when accessing COS URLs
#[getter]
pub fn cos(&self) -> PyResult<CosConfig> {
Expand Down
Loading
Loading