Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/catalog-listing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ name = "datafusion-catalog-listing"
description = "datafusion-catalog-listing"
readme = "README.md"
authors.workspace = true
edition.workspace = true
edition = "2024"
homepage.workspace = true
license.workspace = true
repository.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog-listing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::options::ListingOptions;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_catalog::Session;
use datafusion_common::{config_err, internal_err};
use datafusion_datasource::ListingTableUrl;
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::ListingTableUrl;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use std::str::FromStr;
use std::sync::Arc;
Expand Down
21 changes: 8 additions & 13 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use std::mem;
use std::sync::Arc;

use datafusion_catalog::Session;
use datafusion_common::{assert_or_internal_err, HashMap, Result, ScalarValue};
use datafusion_common::{HashMap, Result, ScalarValue, assert_or_internal_err};
use datafusion_datasource::ListingTableUrl;
use datafusion_datasource::PartitionedFile;
use datafusion_expr::{lit, utils, BinaryExpr, Operator};
use datafusion_expr::{BinaryExpr, Operator, lit, utils};

use arrow::{
array::AsArray,
Expand All @@ -33,7 +33,7 @@ use arrow::{
};
use datafusion_expr::execution_props::ExecutionProps;
use futures::stream::FuturesUnordered;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt, stream::BoxStream};
use log::{debug, trace};

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
Expand All @@ -51,7 +51,7 @@ use object_store::{ObjectMeta, ObjectStore};
pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.apply(|expr| match expr {
Expr::Column(Column { ref name, .. }) => {
Expr::Column(Column { name, .. }) => {
is_applicable &= col_names.contains(&name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
Expand Down Expand Up @@ -247,16 +247,11 @@ fn populate_partition_values<'a>(
partition_values: &mut HashMap<&'a str, PartitionValue>,
filter: &'a Expr,
) {
if let Expr::BinaryExpr(BinaryExpr {
ref left,
op,
ref right,
}) = filter
{
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter {
match op {
Operator::Eq => match (left.as_ref(), right.as_ref()) {
(Expr::Column(Column { ref name, .. }), Expr::Literal(val, _))
| (Expr::Literal(val, _), Expr::Column(Column { ref name, .. })) => {
(Expr::Column(Column { name, .. }), Expr::Literal(val, _))
| (Expr::Literal(val, _), Expr::Column(Column { name, .. })) => {
if partition_values
.insert(name, PartitionValue::Single(val.to_string()))
.is_some()
Expand Down Expand Up @@ -466,7 +461,7 @@ mod tests {
use std::ops::Not;

use super::*;
use datafusion_expr::{case, col, lit, Expr};
use datafusion_expr::{Expr, case, col, lit};

#[test]
fn test_split_files() {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/catalog-listing/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_catalog::Session;
use datafusion_common::plan_err;
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::ListingTableUrl;
use datafusion_datasource::file_format::FileFormat;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::SortExpr;
use futures::StreamExt;
use futures::{future, TryStreamExt};
use futures::{TryStreamExt, future};
use itertools::Itertools;
use std::sync::Arc;

Expand Down
43 changes: 21 additions & 22 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use async_trait::async_trait;
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_datafusion_err, plan_err, project_schema, Constraints, DataFusionError,
SchemaExt, Statistics,
Constraints, DataFusionError, SchemaExt, Statistics, internal_datafusion_err,
plan_err, project_schema,
};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_groups::FileGroup;
Expand All @@ -34,7 +34,7 @@ use datafusion_datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
};
use datafusion_datasource::{
compute_all_files_statistics, ListingTableUrl, PartitionedFile, TableSchema,
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
Expand All @@ -44,9 +44,9 @@ use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::create_lex_ordering;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::ExecutionPlan;
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
use datafusion_physical_plan::empty::EmptyExec;
use futures::{Stream, StreamExt, TryStreamExt, future, stream};
use object_store::ObjectStore;
use std::any::Any;
use std::collections::HashMap;
Expand Down Expand Up @@ -478,7 +478,9 @@ impl TableProvider for ListingTable {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
log::debug!(
"attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered"
)
}
}
None => {} // no ordering required
Expand Down Expand Up @@ -765,28 +767,25 @@ async fn get_files_with_limit(
let file = file_result?;

// Update file statistics regardless of state
if collect_stats {
if let Some(file_stats) = &file.statistics {
num_rows = if file_group.is_empty() {
// For the first file, just take its row count
file_stats.num_rows
} else {
// For subsequent files, accumulate the counts
num_rows.add(&file_stats.num_rows)
};
}
if collect_stats && let Some(file_stats) = &file.statistics {
num_rows = if file_group.is_empty() {
// For the first file, just take its row count
file_stats.num_rows
} else {
// For subsequent files, accumulate the counts
num_rows.add(&file_stats.num_rows)
};
}

// Always add the file to our group
file_group.push(file);

// Check if we've hit the limit (if one was specified)
if let Some(limit) = limit {
if let Precision::Exact(row_count) = num_rows {
if row_count > limit {
state = ProcessingState::ReachedLimit;
}
}
if let Some(limit) = limit
&& let Precision::Exact(row_count) = num_rows
&& row_count > limit
{
state = ProcessingState::ReachedLimit;
}
}
// If we still have files in the stream, it means that the limit kicked
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ name = "datafusion-catalog"
description = "datafusion-catalog"
readme = "README.md"
authors.workspace = true
edition.workspace = true
edition = "2024"
homepage.workspace = true
license.workspace = true
repository.workspace = true
Expand Down
10 changes: 6 additions & 4 deletions datafusion/catalog/src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use datafusion_common::{error::Result, not_impl_err, HashMap, TableReference};
use datafusion_common::{HashMap, TableReference, error::Result, not_impl_err};
use datafusion_execution::config::SessionConfig;

use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
Expand Down Expand Up @@ -60,7 +60,9 @@ impl SchemaProvider for ResolvedSchemaProvider {
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
not_impl_err!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported")
not_impl_err!(
"Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported"
)
}

fn table_exist(&self, name: &str) -> bool {
Expand Down Expand Up @@ -425,14 +427,14 @@ mod tests {
use std::{
any::Any,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
atomic::{AtomicU32, Ordering},
},
};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::{error::Result, Statistics, TableReference};
use datafusion_common::{Statistics, TableReference, error::Result};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::{Expr, TableType};
use datafusion_physical_plan::ExecutionPlan;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use std::fmt::Debug;
use std::sync::Arc;

pub use crate::schema::SchemaProvider;
use datafusion_common::not_impl_err;
use datafusion_common::Result;
use datafusion_common::not_impl_err;

/// Represents a catalog, comprising a number of named schemas.
///
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{any::Any, borrow::Cow};
use crate::TableProvider;

use arrow::datatypes::SchemaRef;
use datafusion_common::{internal_err, Constraints};
use datafusion_common::{Constraints, internal_err};
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType};

/// Implements [`TableSource`] for a [`TableProvider`]
Expand Down
20 changes: 11 additions & 9 deletions datafusion/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ use arrow::{
record_batch::RecordBatch,
};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_common::config::{ConfigEntry, ConfigOptions};
use datafusion_common::error::Result;
use datafusion_common::types::NativeType;
use datafusion_common::DataFusionError;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
use datafusion_expr::{TableType, Volatility};
use datafusion_physical_plan::SendableRecordBatchStream;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::streaming::PartitionStream;
use datafusion_physical_plan::SendableRecordBatchStream;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::Debug;
use std::{any::Any, sync::Arc};
Expand Down Expand Up @@ -138,11 +138,11 @@ impl InformationSchemaConfig {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
if let Some(schema) = catalog.schema(&schema_name) {
let schema_owner = schema.owner_name();
builder.add_schemata(&catalog_name, &schema_name, schema_owner);
}
if schema_name != INFORMATION_SCHEMA
&& let Some(schema) = catalog.schema(&schema_name)
{
let schema_owner = schema.owner_name();
builder.add_schemata(&catalog_name, &schema_name, schema_owner);
}
}
}
Expand Down Expand Up @@ -1408,7 +1408,9 @@ mod tests {
// InformationSchemaConfig::make_tables used this before `table_type`
// existed but should not, as it may be expensive.
async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
panic!("InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type")
panic!(
"InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type"
)
}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ mod dynamic_file;
mod schema;
mod table;

pub use r#async::*;
pub use catalog::*;
pub use datafusion_session::Session;
pub use dynamic_file::catalog::*;
pub use memory::{
MemTable, MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
};
pub use r#async::*;
pub use schema::*;
pub use table::*;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{SchemaProvider, TableProvider, TableProviderFactory};

use crate::Session;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, HashMap, TableReference,
DFSchema, DataFusionError, HashMap, TableReference, internal_datafusion_err,
};
use datafusion_expr::CreateExternalTable;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/memory/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::{SchemaProvider, TableProvider};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::{exec_err, DataFusionError};
use datafusion_common::{DataFusionError, exec_err};
use std::any::Any;
use std::sync::Arc;

Expand Down
6 changes: 3 additions & 3 deletions datafusion/catalog/src/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ use crate::TableProvider;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::error::Result;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
use datafusion_common::{Constraints, DFSchema, SchemaExt, not_impl_err, plan_err};
use datafusion_common_runtime::JoinSet;
use datafusion_datasource::memory::{MemSink, MemorySourceConfig};
use datafusion_datasource::sink::DataSinkExec;
use datafusion_datasource::source::DataSourceExec;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{Expr, SortExpr, TableType};
use datafusion_physical_expr::{create_physical_sort_exprs, LexOrdering};
use datafusion_physical_expr::{LexOrdering, create_physical_sort_exprs};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::{
common, ExecutionPlan, ExecutionPlanProperties, Partitioning,
ExecutionPlan, ExecutionPlanProperties, Partitioning, common,
};
use datafusion_session::Session;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! representing collections of named tables.

use async_trait::async_trait;
use datafusion_common::{exec_err, DataFusionError};
use datafusion_common::{DataFusionError, exec_err};
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;
use crate::{Session, TableProvider, TableProviderFactory};
use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
use arrow::datatypes::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
use datafusion_common::{Constraints, DataFusionError, Result, config_err, plan_err};
use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
Expand Down
6 changes: 3 additions & 3 deletions datafusion/catalog/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use crate::Session;
use crate::TableProvider;

use arrow::datatypes::SchemaRef;
use datafusion_common::{plan_err, DFSchema, Result};
use datafusion_common::{DFSchema, Result, plan_err};
use datafusion_expr::{Expr, SortExpr, TableType};
use datafusion_physical_expr::{create_physical_sort_exprs, LexOrdering};
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_expr::{LexOrdering, create_physical_sort_exprs};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};

use async_trait::async_trait;
use log::debug;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::session::Session;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
use datafusion_common::{Constraints, Statistics, not_impl_err};
use datafusion_expr::Expr;

use datafusion_expr::dml::InsertOp;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::TableProvider;

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::error::Result;
use datafusion_common::Column;
use datafusion_common::error::Result;
use datafusion_expr::TableType;
use datafusion_expr::{Expr, LogicalPlan};
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
Expand Down
Loading