Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove OwnedTableReference and OwnedSchemaReference #9933

Merged
merged 4 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow_schema::Field;

use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
use crate::{DFSchema, DataFusionError, OwnedTableReference, Result, SchemaError};
use crate::{DFSchema, DataFusionError, Result, SchemaError, TableReference};
use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt;
Expand All @@ -32,7 +32,7 @@ use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
/// relation/table reference.
pub relation: Option<OwnedTableReference>,
pub relation: Option<TableReference>,
/// field/column name.
pub name: String,
}
Expand All @@ -45,7 +45,7 @@ impl Column {
///
/// [`TableReference::parse_str`]: crate::TableReference::parse_str
pub fn new(
relation: Option<impl Into<OwnedTableReference>>,
relation: Option<impl Into<TableReference>>,
name: impl Into<String>,
) -> Self {
Self {
Expand Down Expand Up @@ -74,20 +74,20 @@ impl Column {
let (relation, name) = match idents.len() {
1 => (None, idents.remove(0)),
2 => (
Some(OwnedTableReference::Bare {
Some(TableReference::Bare {
table: idents.remove(0).into(),
}),
idents.remove(0),
),
3 => (
Some(OwnedTableReference::Partial {
Some(TableReference::Partial {
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
}),
idents.remove(0),
),
4 => (
Some(OwnedTableReference::Full {
Some(TableReference::Full {
catalog: idents.remove(0).into(),
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
Expand Down Expand Up @@ -340,8 +340,8 @@ impl From<String> for Column {
}

/// Create a column, use qualifier and field name
impl From<(Option<&OwnedTableReference>, &Field)> for Column {
fn from((relation, field): (Option<&OwnedTableReference>, &Field)) -> Self {
impl From<(Option<&TableReference>, &Field)> for Column {
fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
Self::new(relation.cloned(), field.name())
}
}
Expand Down
47 changes: 19 additions & 28 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::sync::Arc;
use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
use crate::{
field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
OwnedTableReference, SchemaError, TableReference,
SchemaError, TableReference,
};

use arrow::compute::can_cast_types;
Expand Down Expand Up @@ -111,7 +111,7 @@ pub struct DFSchema {
inner: SchemaRef,
/// Optional qualifiers for each column in this schema. In the same order as
/// the `self.inner.fields()`
field_qualifiers: Vec<Option<OwnedTableReference>>,
field_qualifiers: Vec<Option<TableReference>>,
/// Stores functional dependencies in the schema.
functional_dependencies: FunctionalDependencies,
}
Expand All @@ -128,10 +128,10 @@ impl DFSchema {

/// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
pub fn new_with_metadata(
qualified_fields: Vec<(Option<OwnedTableReference>, Arc<Field>)>,
qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
metadata: HashMap<String, String>,
) -> Result<Self> {
let (qualifiers, fields): (Vec<Option<OwnedTableReference>>, Vec<Arc<Field>>) =
let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
qualified_fields.into_iter().unzip();

let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
Expand Down Expand Up @@ -170,10 +170,9 @@ impl DFSchema {
schema: &Schema,
) -> Result<Self> {
let qualifier = qualifier.into();
let owned_qualifier = qualifier.to_owned_reference();
let schema = DFSchema {
inner: schema.clone().into(),
field_qualifiers: vec![Some(owned_qualifier); schema.fields.len()],
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
schema.check_names()?;
Expand All @@ -182,16 +181,12 @@ impl DFSchema {

/// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
pub fn from_field_specific_qualified_schema(
qualifiers: Vec<Option<impl Into<TableReference>>>,
qualifiers: Vec<Option<TableReference>>,
schema: &SchemaRef,
) -> Result<Self> {
let owned_qualifiers = qualifiers
.into_iter()
.map(|qualifier| qualifier.map(|q| q.into().to_owned_reference()))
.collect();
let dfschema = Self {
inner: schema.clone(),
field_qualifiers: owned_qualifiers,
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Expand All @@ -216,7 +211,7 @@ impl DFSchema {
for (qualifier, name) in qualified_names {
if unqualified_names.contains(name) {
return _schema_err!(SchemaError::AmbiguousReference {
field: Column::new(Some(qualifier.to_owned_reference()), name)
field: Column::new(Some(qualifier.clone()), name)
});
}
}
Expand Down Expand Up @@ -270,7 +265,7 @@ impl DFSchema {
return;
}

let self_fields: HashSet<(Option<&OwnedTableReference>, &FieldRef)> =
let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
self.iter().collect();
let self_unqualified_names: HashSet<&str> = self
.inner
Expand Down Expand Up @@ -316,7 +311,7 @@ impl DFSchema {

/// Returns an immutable reference of a specific `Field` instance selected using an
/// offset within the internal `fields` vector and its qualifier
pub fn qualified_field(&self, i: usize) -> (Option<&OwnedTableReference>, &Field) {
pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
(self.field_qualifiers[i].as_ref(), self.field(i))
}

Expand Down Expand Up @@ -383,13 +378,11 @@ impl DFSchema {
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Result<(Option<&OwnedTableReference>, &Field)> {
) -> Result<(Option<&TableReference>, &Field)> {
if let Some(qualifier) = qualifier {
let idx = self
.index_of_column_by_name(Some(qualifier), name)?
.ok_or_else(|| {
field_not_found(Some(qualifier.to_string()), name, self)
})?;
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
} else {
self.qualified_field_with_unqualified_name(name)
Expand Down Expand Up @@ -428,7 +421,7 @@ impl DFSchema {
pub fn qualified_fields_with_unqualified_name(
&self,
name: &str,
) -> Vec<(Option<&OwnedTableReference>, &Field)> {
) -> Vec<(Option<&TableReference>, &Field)> {
self.iter()
.filter(|(_, field)| field.name() == name)
.map(|(qualifier, field)| (qualifier, field.as_ref()))
Expand Down Expand Up @@ -456,7 +449,7 @@ impl DFSchema {
pub fn qualified_field_with_unqualified_name(
&self,
name: &str,
) -> Result<(Option<&OwnedTableReference>, &Field)> {
) -> Result<(Option<&TableReference>, &Field)> {
let matches = self.qualified_fields_with_unqualified_name(name);
match matches.len() {
0 => Err(unqualified_field_not_found(name, self)),
Expand Down Expand Up @@ -527,7 +520,7 @@ impl DFSchema {
) -> Result<&Field> {
let idx = self
.index_of_column_by_name(Some(qualifier), name)?
.ok_or_else(|| field_not_found(Some(qualifier.to_string()), name, self))?;
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;

Ok(self.field(idx))
}
Expand All @@ -544,7 +537,7 @@ impl DFSchema {
pub fn qualified_field_from_column(
&self,
column: &Column,
) -> Result<(Option<&OwnedTableReference>, &Field)> {
) -> Result<(Option<&TableReference>, &Field)> {
self.qualified_field_with_name(column.relation.as_ref(), &column.name)
}

Expand Down Expand Up @@ -750,7 +743,7 @@ impl DFSchema {
}

/// Replace all field qualifier with new value in schema
pub fn replace_qualifier(self, qualifier: impl Into<OwnedTableReference>) -> Self {
pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
let qualifier = qualifier.into();
DFSchema {
field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
Expand All @@ -777,9 +770,7 @@ impl DFSchema {
}

/// Iterate over the qualifiers and fields in the DFSchema
pub fn iter(
&self,
) -> impl Iterator<Item = (Option<&OwnedTableReference>, &FieldRef)> {
pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
self.field_qualifiers
.iter()
.zip(self.inner.fields().iter())
Expand Down Expand Up @@ -1065,7 +1056,7 @@ mod tests {
#[test]
fn test_from_field_specific_qualified_schema() -> Result<()> {
let schema = DFSchema::from_field_specific_qualified_schema(
vec![Some("t1"), None],
vec![Some("t1".into()), None],
&Arc::new(Schema::new(vec![
Field::new("c0", DataType::Boolean, true),
Field::new("c1", DataType::Boolean, true),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::result;
use std::sync::Arc;

use crate::utils::quote_identifier;
use crate::{Column, DFSchema, OwnedTableReference};
use crate::{Column, DFSchema, TableReference};
#[cfg(feature = "avro")]
use apache_avro::Error as AvroError;
use arrow::error::ArrowError;
Expand Down Expand Up @@ -141,7 +141,7 @@ pub enum SchemaError {
AmbiguousReference { field: Column },
/// Schema contains duplicate qualified field name
DuplicateQualifiedField {
qualifier: Box<OwnedTableReference>,
qualifier: Box<TableReference>,
name: String,
},
/// Schema contains duplicate unqualified field name
Expand Down Expand Up @@ -606,7 +606,7 @@ pub use plan_err as _plan_err;
pub use schema_err as _schema_err;

/// Create a "field not found" DataFusion::SchemaError
pub fn field_not_found<R: Into<OwnedTableReference>>(
pub fn field_not_found<R: Into<TableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ pub use functional_dependencies::{
pub use join_type::{JoinConstraint, JoinSide, JoinType};
pub use param_value::ParamValues;
pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use schema_reference::SchemaReference;
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};
pub use table_reference::{ResolvedTableReference, TableReference};
pub use unnest::UnnestOptions;
pub use utils::project_schema;

Expand Down
8 changes: 0 additions & 8 deletions datafusion/common/src/schema_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ impl SchemaReference {
}
}

pub type OwnedSchemaReference = SchemaReference;

impl std::fmt::Display for SchemaReference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand All @@ -43,9 +41,3 @@ impl std::fmt::Display for SchemaReference {
}
}
}

impl<'a> From<&'a OwnedSchemaReference> for SchemaReference {
fn from(value: &'a OwnedSchemaReference) -> Self {
value.clone()
}
}
37 changes: 6 additions & 31 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,6 @@ pub enum TableReference {
},
}

/// This is a [`TableReference`] that has 'static lifetime (aka it
/// owns the underlying string)
///
/// To convert a [`TableReference`] to an [`OwnedTableReference`], use
///
/// ```
/// # use datafusion_common::{OwnedTableReference, TableReference};
/// let table_reference = TableReference::from("mytable");
/// let owned_reference = table_reference.to_owned_reference();
/// ```
pub type OwnedTableReference = TableReference;

impl std::fmt::Display for TableReference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -244,12 +232,6 @@ impl TableReference {
}
}

/// Converts directly into an [`OwnedTableReference`] by cloning
/// the underlying data.
pub fn to_owned_reference(&self) -> OwnedTableReference {
self.clone()
}

/// Forms a string where the identifiers are quoted
///
/// # Example
Expand Down Expand Up @@ -322,19 +304,6 @@ impl TableReference {
}
}

/// Parse a `String` into a OwnedTableReference as a multipart SQL identifier.
impl From<String> for OwnedTableReference {
fn from(s: String) -> Self {
TableReference::parse_str(&s).to_owned_reference()
}
}

impl<'a> From<&'a OwnedTableReference> for TableReference {
fn from(value: &'a OwnedTableReference) -> Self {
value.clone()
}
}

/// Parse a string into a TableReference, normalizing where appropriate
///
/// See full details on [`TableReference::parse_str`]
Expand All @@ -350,6 +319,12 @@ impl<'a> From<&'a String> for TableReference {
}
}

impl From<String> for TableReference {
fn from(s: String) -> Self {
Self::parse_str(&s)
}
}

impl From<ResolvedTableReference> for TableReference {
fn from(resolved: ResolvedTableReference) -> Self {
Self::Full {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::datasource::TableProvider;
use crate::execution::context::SessionState;

use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{Constraints, DFSchema, DataFusionError, OwnedTableReference};
use datafusion_common::{Constraints, DFSchema, DataFusionError, TableReference};
use datafusion_expr::CreateExternalTable;

use async_trait::async_trait;
Expand Down Expand Up @@ -129,7 +129,7 @@ impl ListingSchemaProvider {
if !self.table_exist(table_name) {
let table_url = format!("{}/{}", self.authority, table_path);

let name = OwnedTableReference::bare(table_name);
let name = TableReference::bare(table_name);
let provider = self
.factory
.create(
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ mod tests {
use crate::execution::context::SessionContext;

use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{Constraints, DFSchema, OwnedTableReference};
use datafusion_common::{Constraints, DFSchema, TableReference};

#[tokio::test]
async fn test_create_using_non_std_file_ext() {
Expand All @@ -184,7 +184,7 @@ mod tests {
let factory = ListingTableFactory::new();
let context = SessionContext::new();
let state = context.state();
let name = OwnedTableReference::bare("foo");
let name = TableReference::bare("foo");
let cmd = CreateExternalTable {
name,
location: csv_file.path().to_str().unwrap().to_string(),
Expand Down Expand Up @@ -222,7 +222,7 @@ mod tests {
let factory = ListingTableFactory::new();
let context = SessionContext::new();
let state = context.state();
let name = OwnedTableReference::bare("foo");
let name = TableReference::bare("foo");

let mut options = HashMap::new();
options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned());
Expand Down
Loading
Loading