Skip to content

Commit

Permalink
Merge commit 'd2ba90109bc53ea705626ab21e5125363a19a5c4' into chunchun…
Browse files Browse the repository at this point in the history
…/df-upgrade-2024-04-02
  • Loading branch information
appletreeisyellow committed Apr 22, 2024
2 parents 784ff55 + d2ba901 commit 879a092
Show file tree
Hide file tree
Showing 102 changed files with 1,618 additions and 447 deletions.
2 changes: 2 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,8 @@ impl TableOptions {
}
}

/// Options that control how Parquet files are read, including global options
/// that apply to all columns and optional column-specific overrides
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
/// Global Parquet options that propagates to all columns.
Expand Down
8 changes: 4 additions & 4 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl DFSchema {
///
/// To create a schema from an Arrow schema without a qualifier, use
/// `DFSchema::try_from`.
pub fn try_from_qualified_schema<'a>(
qualifier: impl Into<TableReference<'a>>,
pub fn try_from_qualified_schema(
qualifier: impl Into<TableReference>,
schema: &Schema,
) -> Result<Self> {
let qualifier = qualifier.into();
Expand All @@ -181,8 +181,8 @@ impl DFSchema {
}

/// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
pub fn from_field_specific_qualified_schema<'a>(
qualifiers: Vec<Option<impl Into<TableReference<'a>>>>,
pub fn from_field_specific_qualified_schema(
qualifiers: Vec<Option<impl Into<TableReference>>>,
schema: &SchemaRef,
) -> Result<Self> {
let owned_qualifiers = qualifiers
Expand Down
31 changes: 9 additions & 22 deletions datafusion/common/src/schema_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use std::borrow::Cow;
use std::sync::Arc;

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum SchemaReference<'a> {
Bare {
schema: Cow<'a, str>,
},
Full {
schema: Cow<'a, str>,
catalog: Cow<'a, str>,
},
pub enum SchemaReference {
Bare { schema: Arc<str> },
Full { schema: Arc<str>, catalog: Arc<str> },
}

impl SchemaReference<'_> {
impl SchemaReference {
/// Get only the schema name that this references.
pub fn schema_name(&self) -> &str {
match self {
Expand All @@ -38,9 +33,9 @@ impl SchemaReference<'_> {
}
}

pub type OwnedSchemaReference = SchemaReference<'static>;
pub type OwnedSchemaReference = SchemaReference;

impl std::fmt::Display for SchemaReference<'_> {
impl std::fmt::Display for SchemaReference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bare { schema } => write!(f, "{schema}"),
Expand All @@ -49,16 +44,8 @@ impl std::fmt::Display for SchemaReference<'_> {
}
}

impl<'a> From<&'a OwnedSchemaReference> for SchemaReference<'a> {
impl<'a> From<&'a OwnedSchemaReference> for SchemaReference {
fn from(value: &'a OwnedSchemaReference) -> Self {
match value {
SchemaReference::Bare { schema } => SchemaReference::Bare {
schema: Cow::Borrowed(schema),
},
SchemaReference::Full { schema, catalog } => SchemaReference::Full {
schema: Cow::Borrowed(schema),
catalog: Cow::Borrowed(catalog),
},
}
value.clone()
}
}
131 changes: 49 additions & 82 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@
// under the License.

use crate::utils::{parse_identifiers_normalized, quote_identifier};
use std::borrow::Cow;
use std::sync::Arc;

/// A resolved path to a table of the form "catalog.schema.table"
#[derive(Debug, Clone)]
pub struct ResolvedTableReference<'a> {
pub struct ResolvedTableReference {
/// The catalog (aka database) containing the table
pub catalog: Cow<'a, str>,
pub catalog: Arc<str>,
/// The schema containing the table
pub schema: Cow<'a, str>,
pub schema: Arc<str>,
/// The table name
pub table: Cow<'a, str>,
pub table: Arc<str>,
}

impl<'a> std::fmt::Display for ResolvedTableReference<'a> {
impl std::fmt::Display for ResolvedTableReference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.catalog, self.schema, self.table)
}
Expand Down Expand Up @@ -68,27 +68,27 @@ impl<'a> std::fmt::Display for ResolvedTableReference<'a> {
/// assert_eq!(table_reference, TableReference::partial("myschema", "mytable"));
///```
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum TableReference<'a> {
pub enum TableReference {
/// An unqualified table reference, e.g. "table"
Bare {
/// The table name
table: Cow<'a, str>,
table: Arc<str>,
},
/// A partially resolved table reference, e.g. "schema.table"
Partial {
/// The schema containing the table
schema: Cow<'a, str>,
schema: Arc<str>,
/// The table name
table: Cow<'a, str>,
table: Arc<str>,
},
/// A fully resolved table reference, e.g. "catalog.schema.table"
Full {
/// The catalog (aka database) containing the table
catalog: Cow<'a, str>,
catalog: Arc<str>,
/// The schema containing the table
schema: Cow<'a, str>,
schema: Arc<str>,
/// The table name
table: Cow<'a, str>,
table: Arc<str>,
},
}

Expand All @@ -102,9 +102,9 @@ pub enum TableReference<'a> {
/// let table_reference = TableReference::from("mytable");
/// let owned_reference = table_reference.to_owned_reference();
/// ```
pub type OwnedTableReference = TableReference<'static>;
pub type OwnedTableReference = TableReference;

impl std::fmt::Display for TableReference<'_> {
impl std::fmt::Display for TableReference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TableReference::Bare { table } => write!(f, "{table}"),
Expand All @@ -120,9 +120,9 @@ impl std::fmt::Display for TableReference<'_> {
}
}

impl<'a> TableReference<'a> {
impl TableReference {
/// Convenience method for creating a typed none `None`
pub fn none() -> Option<TableReference<'a>> {
pub fn none() -> Option<TableReference> {
None
}

Expand All @@ -131,7 +131,7 @@ impl<'a> TableReference<'a> {
/// As described on [`TableReference`] this does *NO* parsing at
/// all, so "Foo.Bar" stays as a reference to the table named
/// "Foo.Bar" (rather than "foo"."bar")
pub fn bare(table: impl Into<Cow<'a, str>>) -> TableReference<'a> {
pub fn bare(table: impl Into<Arc<str>>) -> TableReference {
TableReference::Bare {
table: table.into(),
}
Expand All @@ -141,9 +141,9 @@ impl<'a> TableReference<'a> {
///
/// As described on [`TableReference`] this does *NO* parsing at all.
pub fn partial(
schema: impl Into<Cow<'a, str>>,
table: impl Into<Cow<'a, str>>,
) -> TableReference<'a> {
schema: impl Into<Arc<str>>,
table: impl Into<Arc<str>>,
) -> TableReference {
TableReference::Partial {
schema: schema.into(),
table: table.into(),
Expand All @@ -154,10 +154,10 @@ impl<'a> TableReference<'a> {
///
/// As described on [`TableReference`] this does *NO* parsing at all.
pub fn full(
catalog: impl Into<Cow<'a, str>>,
schema: impl Into<Cow<'a, str>>,
table: impl Into<Cow<'a, str>>,
) -> TableReference<'a> {
catalog: impl Into<Arc<str>>,
schema: impl Into<Arc<str>>,
table: impl Into<Arc<str>>,
) -> TableReference {
TableReference::Full {
catalog: catalog.into(),
schema: schema.into(),
Expand Down Expand Up @@ -198,28 +198,29 @@ impl<'a> TableReference<'a> {
/// fully qualified [`TableReference::Full`] if the table names match.
pub fn resolved_eq(&self, other: &Self) -> bool {
match self {
TableReference::Bare { table } => table == other.table(),
TableReference::Bare { table } => **table == *other.table(),
TableReference::Partial { schema, table } => {
table == other.table() && other.schema().map_or(true, |s| s == schema)
**table == *other.table()
&& other.schema().map_or(true, |s| *s == **schema)
}
TableReference::Full {
catalog,
schema,
table,
} => {
table == other.table()
&& other.schema().map_or(true, |s| s == schema)
&& other.catalog().map_or(true, |c| c == catalog)
**table == *other.table()
&& other.schema().map_or(true, |s| *s == **schema)
&& other.catalog().map_or(true, |c| *c == **catalog)
}
}
}

/// Given a default catalog and schema, ensure this table reference is fully resolved
pub fn resolve(
self,
default_catalog: &'a str,
default_schema: &'a str,
) -> ResolvedTableReference<'a> {
default_catalog: &str,
default_schema: &str,
) -> ResolvedTableReference {
match self {
Self::Full {
catalog,
Expand All @@ -246,24 +247,7 @@ impl<'a> TableReference<'a> {
/// Converts directly into an [`OwnedTableReference`] by cloning
/// the underlying data.
pub fn to_owned_reference(&self) -> OwnedTableReference {
match self {
Self::Full {
catalog,
schema,
table,
} => OwnedTableReference::Full {
catalog: catalog.to_string().into(),
schema: schema.to_string().into(),
table: table.to_string().into(),
},
Self::Partial { schema, table } => OwnedTableReference::Partial {
schema: schema.to_string().into(),
table: table.to_string().into(),
},
Self::Bare { table } => OwnedTableReference::Bare {
table: table.to_string().into(),
},
}
self.clone()
}

/// Forms a string where the identifiers are quoted
Expand Down Expand Up @@ -298,7 +282,7 @@ impl<'a> TableReference<'a> {

/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
/// identifier. See docs on [`TableReference`] for more details.
pub fn parse_str(s: &'a str) -> Self {
pub fn parse_str(s: &str) -> Self {
let mut parts = parse_identifiers_normalized(s, false);

match parts.len() {
Expand Down Expand Up @@ -345,46 +329,29 @@ impl From<String> for OwnedTableReference {
}
}

impl<'a> From<&'a OwnedTableReference> for TableReference<'a> {
impl<'a> From<&'a OwnedTableReference> for TableReference {
fn from(value: &'a OwnedTableReference) -> Self {
match value {
OwnedTableReference::Bare { table } => TableReference::Bare {
table: Cow::Borrowed(table),
},
OwnedTableReference::Partial { schema, table } => TableReference::Partial {
schema: Cow::Borrowed(schema),
table: Cow::Borrowed(table),
},
OwnedTableReference::Full {
catalog,
schema,
table,
} => TableReference::Full {
catalog: Cow::Borrowed(catalog),
schema: Cow::Borrowed(schema),
table: Cow::Borrowed(table),
},
}
value.clone()
}
}

/// Parse a string into a TableReference, normalizing where appropriate
///
/// See full details on [`TableReference::parse_str`]
impl<'a> From<&'a str> for TableReference<'a> {
impl<'a> From<&'a str> for TableReference {
fn from(s: &'a str) -> Self {
Self::parse_str(s)
}
}

impl<'a> From<&'a String> for TableReference<'a> {
impl<'a> From<&'a String> for TableReference {
fn from(s: &'a String) -> Self {
Self::parse_str(s)
}
}

impl<'a> From<ResolvedTableReference<'a>> for TableReference<'a> {
fn from(resolved: ResolvedTableReference<'a>) -> Self {
impl From<ResolvedTableReference> for TableReference {
fn from(resolved: ResolvedTableReference) -> Self {
Self::Full {
catalog: resolved.catalog,
schema: resolved.schema,
Expand All @@ -400,29 +367,29 @@ mod tests {
#[test]
fn test_table_reference_from_str_normalizes() {
let expected = TableReference::Full {
catalog: Cow::Owned("catalog".to_string()),
schema: Cow::Owned("FOO\".bar".to_string()),
table: Cow::Owned("table".to_string()),
catalog: "catalog".into(),
schema: "FOO\".bar".into(),
table: "table".into(),
};
let actual = TableReference::from("catalog.\"FOO\"\".bar\".TABLE");
assert_eq!(expected, actual);

let expected = TableReference::Partial {
schema: Cow::Owned("FOO\".bar".to_string()),
table: Cow::Owned("table".to_string()),
schema: "FOO\".bar".into(),
table: "table".into(),
};
let actual = TableReference::from("\"FOO\"\".bar\".TABLE");
assert_eq!(expected, actual);

let expected = TableReference::Bare {
table: Cow::Owned("table".to_string()),
table: "table".into(),
};
let actual = TableReference::from("TABLE");
assert_eq!(expected, actual);

// if fail to parse, take entire input string as identifier
let expected = TableReference::Bare {
table: Cow::Owned("TABLE()".to_string()),
table: "TABLE()".into(),
};
let actual = TableReference::from("TABLE()");
assert_eq!(expected, actual);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ListingSchemaProvider {
if !self.table_exist(table_name) {
let table_url = format!("{}/{}", self.authority, table_path);

let name = OwnedTableReference::bare(table_name.to_string());
let name = OwnedTableReference::bare(table_name);
let provider = self
.factory
.create(
Expand Down
Loading

0 comments on commit 879a092

Please sign in to comment.