diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index eb0c4b8e047b1..e2be2a0e240a7 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -37,7 +37,6 @@ use super::expressions::Column; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use async_trait::async_trait; - use futures::stream::Stream; use futures::stream::StreamExt; @@ -62,18 +61,22 @@ impl ProjectionExec { ) -> Result { let input_schema = input.schema(); - let fields: Result> = expr + let fields: Result> = expr .iter() - .map(|(e, name)| { - Ok(Field::new( - name, - e.data_type(&input_schema)?, - e.nullable(&input_schema)?, - )) + .map(|(e, name)| match input_schema.field_with_name(name) { + Ok(f) => Ok(f.clone()), + Err(_) => { + let dt = e.data_type(&input_schema)?; + let nullable = e.nullable(&input_schema)?; + Ok(Field::new(name, dt, nullable)) + } }) .collect(); - let schema = Arc::new(Schema::new(fields?)); + let schema = Arc::new(Schema::new_with_metadata( + fields?, + input_schema.metadata().clone(), + )); Ok(Self { expr, @@ -296,6 +299,11 @@ mod tests { Arc::new(csv), )?; + let col_field = projection.schema.field(0); + let col_metadata = col_field.metadata().clone().unwrap().clone(); + let data: &str = &col_metadata["testing"]; + assert_eq!(data, "test"); + let mut partition_count = 0; let mut row_count = 0; for partition in 0..projection.output_partitioning().partition_count() { diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs index 8548c628360e5..f1fb4dba015f1 100644 --- a/datafusion/src/test_util.rs +++ b/datafusion/src/test_util.rs @@ -17,6 +17,7 @@ //! Utility functions to make testing DataFusion based crates easier +use std::collections::BTreeMap; use std::{env, error::Error, path::PathBuf, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -229,8 +230,12 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), + let mut f1 = Field::new("c1", DataType::Utf8, false); + f1.set_metadata(Some(BTreeMap::from_iter( + vec![("testing".into(), "test".into())].into_iter(), + ))); + let schema = Schema::new(vec![ + f1, Field::new("c2", DataType::UInt32, false), Field::new("c3", DataType::Int8, false), Field::new("c4", DataType::Int16, false), @@ -243,7 +248,9 @@ pub fn aggr_test_schema() -> SchemaRef { Field::new("c11", DataType::Float32, false), Field::new("c12", DataType::Float64, false), Field::new("c13", DataType::Utf8, false), - ])) + ]); + + Arc::new(schema) } #[cfg(test)]