From b896bdf3de844335c9186c85f7a3a36483e2893f Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sat, 11 Dec 2021 19:08:15 -0500 Subject: [PATCH 1/4] Allow Schema and RecordBatch to project schemas on specific columns returning a new schema with those columns only --- arrow/src/datatypes/schema.rs | 33 ++++++++++++++++++++++++++++++++- arrow/src/record_batch.rs | 6 ++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/arrow/src/datatypes/schema.rs b/arrow/src/datatypes/schema.rs index 22b1ceb39a76..d2a0787ac138 100644 --- a/arrow/src/datatypes/schema.rs +++ b/arrow/src/datatypes/schema.rs @@ -87,6 +87,18 @@ impl Schema { Self { fields, metadata } } + + /// Returns a new schema with only the specified columns in the new schema + /// This carries metadata from the parent schema over as well + pub fn project(&self, indices: impl IntoIterator) -> Result { + let mut new_fields = vec![]; + for i in indices { + let f = self.fields[i].clone(); + new_fields.push(f); + } + Ok(Self::new_with_metadata(new_fields, self.metadata.clone())) + } + /// Merge schema into self if it is compatible. Struct fields will be merged recursively. /// /// Example: @@ -115,7 +127,7 @@ impl Schema { /// ]), /// ); /// ``` - pub fn try_merge(schemas: impl IntoIterator) -> Result { + pub fn try_merge(schemas: impl IntoIterator) -> Result { schemas .into_iter() .try_fold(Self::empty(), |mut merged, schema| { @@ -369,4 +381,23 @@ mod tests { assert_eq!(schema, de_schema); } + + #[test] + fn test_project() { + let mut metadata = HashMap::new(); + metadata.insert("meta".to_string(), "data".to_string()); + + let schema = Schema::new_with_metadata(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("address", DataType::Utf8, false), + Field::new("priority", DataType::UInt8, false), + ], metadata); + + let projected: Schema = schema.project(vec![0, 2]).unwrap(); + + assert_eq!(projected.fields().len(), 2); + assert_eq!(projected.fields()[0].name(), "name"); + assert_eq!(projected.fields()[1].name(), "priority"); + assert_eq!(projected.metadata.get("meta").unwrap(), "data") + } } diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index b441f6cf295e..ac7fe631611a 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -175,6 +175,12 @@ impl RecordBatch { self.schema.clone() } + + /// Projects the schema onto the specified columns + pub fn project(&self, indices: impl IntoIterator) -> Result { + self.schema.project(indices) + } + /// Returns the number of columns in the record batch. /// /// # Example From 753d40fe91b8cb34ff4cf5a33bf6d6e544290c19 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Tue, 14 Dec 2021 17:27:02 -0500 Subject: [PATCH 2/4] Addressing PR updates and adding a test for out of range projection --- arrow/src/datatypes/schema.rs | 59 ++++++++++++++++++++++++++--------- arrow/src/record_batch.rs | 22 +++++++++++-- 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/arrow/src/datatypes/schema.rs b/arrow/src/datatypes/schema.rs index d2a0787ac138..516475e2b654 100644 --- a/arrow/src/datatypes/schema.rs +++ b/arrow/src/datatypes/schema.rs @@ -87,15 +87,21 @@ impl Schema { Self { fields, metadata } } - /// Returns a new schema with only the specified columns in the new schema /// This carries metadata from the parent schema over as well - pub fn project(&self, indices: impl IntoIterator) -> Result { - let mut new_fields = vec![]; - for i in indices { - let f = self.fields[i].clone(); - new_fields.push(f); - } + pub fn project(&self, indices: impl IntoIterator) -> Result { + let new_fields = indices + .into_iter() + .map(|i: usize| { + self.fields.get(i).cloned().ok_or_else(|| { + ArrowError::SchemaError(format!( + "project index {} out of bounds, max field {}", + i, + self.fields().len() + )) + }) + }) + .collect::>>()?; Ok(Self::new_with_metadata(new_fields, self.metadata.clone())) } @@ -127,7 +133,7 @@ impl Schema { /// ]), /// ); /// ``` - pub fn try_merge(schemas: impl IntoIterator) -> Result { + pub fn try_merge(schemas: impl IntoIterator) -> Result { schemas .into_iter() .try_fold(Self::empty(), |mut merged, schema| { @@ -383,15 +389,18 @@ mod tests { } #[test] - fn test_project() { + fn test_projection() { let mut metadata = HashMap::new(); metadata.insert("meta".to_string(), "data".to_string()); - let schema = Schema::new_with_metadata(vec![ - Field::new("name", DataType::Utf8, false), - Field::new("address", DataType::Utf8, false), - Field::new("priority", DataType::UInt8, false), - ], metadata); + let schema = Schema::new_with_metadata( + vec![ + Field::new("name", DataType::Utf8, false), + Field::new("address", DataType::Utf8, false), + Field::new("priority", DataType::UInt8, false), + ], + metadata, + ); let projected: Schema = schema.project(vec![0, 2]).unwrap(); @@ -400,4 +409,26 @@ mod tests { assert_eq!(projected.fields()[1].name(), "priority"); assert_eq!(projected.metadata.get("meta").unwrap(), "data") } + + #[test] + fn test_oob_projection() { + let mut metadata = HashMap::new(); + metadata.insert("meta".to_string(), "data".to_string()); + + let schema = Schema::new_with_metadata( + vec![ + Field::new("name", DataType::Utf8, false), + Field::new("address", DataType::Utf8, false), + Field::new("priority", DataType::UInt8, false), + ], + metadata, + ); + + let projected: Result = schema.project(vec![0, 3]); + + assert!(projected.is_err()); + if let Err(e) = projected { + assert_eq!(e.to_string(), "Schema error: project index 3 out of bounds, max field 3".to_string()) + } + } } diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index ac7fe631611a..1b171046cc9e 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -175,10 +175,26 @@ impl RecordBatch { self.schema.clone() } - /// Projects the schema onto the specified columns - pub fn project(&self, indices: impl IntoIterator) -> Result { - self.schema.project(indices) + pub fn project( + &self, + indices: impl IntoIterator + Clone, + ) -> Result { + let projected_schema = self.schema.project(indices.clone())?; + let batch_fields = indices + .into_iter() + .map(|f: usize| { + self.columns.get(f).cloned().ok_or_else(|| { + ArrowError::SchemaError(format!( + "project index {} out of bounds, max field {}", + f, + self.columns.len() + )) + }) + }) + .collect::>>()?; + + RecordBatch::try_new(SchemaRef::new(projected_schema), batch_fields) } /// Returns the number of columns in the record batch. From 824f8ff192e8ece8edf70bca5e27e84ca3e4b2cb Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Fri, 17 Dec 2021 19:30:31 -0500 Subject: [PATCH 3/4] switch to &[usize] --- arrow/src/datatypes/schema.rs | 15 +++++++-------- arrow/src/record_batch.rs | 30 +++++++++++++++++++++++------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/arrow/src/datatypes/schema.rs b/arrow/src/datatypes/schema.rs index 516475e2b654..cc2e153e24d8 100644 --- a/arrow/src/datatypes/schema.rs +++ b/arrow/src/datatypes/schema.rs @@ -89,11 +89,10 @@ impl Schema { /// Returns a new schema with only the specified columns in the new schema /// This carries metadata from the parent schema over as well - pub fn project(&self, indices: impl IntoIterator) -> Result { - let new_fields = indices - .into_iter() - .map(|i: usize| { - self.fields.get(i).cloned().ok_or_else(|| { + pub fn project(&self, indices: &[usize]) -> Result { + let new_fields = indices.into_iter() + .map(|i| { + self.fields.get(*i).cloned().ok_or_else(|| { ArrowError::SchemaError(format!( "project index {} out of bounds, max field {}", i, @@ -133,7 +132,7 @@ impl Schema { /// ]), /// ); /// ``` - pub fn try_merge(schemas: impl IntoIterator) -> Result { + pub fn try_merge(schemas: impl IntoIterator) -> Result { schemas .into_iter() .try_fold(Self::empty(), |mut merged, schema| { @@ -402,7 +401,7 @@ mod tests { metadata, ); - let projected: Schema = schema.project(vec![0, 2]).unwrap(); + let projected: Schema = schema.project(&[0, 2]).unwrap(); assert_eq!(projected.fields().len(), 2); assert_eq!(projected.fields()[0].name(), "name"); @@ -424,7 +423,7 @@ mod tests { metadata, ); - let projected: Result = schema.project(vec![0, 3]); + let projected: Result = schema.project(&vec![0, 3]); assert!(projected.is_err()); if let Err(e) = projected { diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index 1b171046cc9e..e60de7033c8b 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -176,15 +176,12 @@ impl RecordBatch { } /// Projects the schema onto the specified columns - pub fn project( - &self, - indices: impl IntoIterator + Clone, - ) -> Result { - let projected_schema = self.schema.project(indices.clone())?; + pub fn project(&self, indices: &[usize]) -> Result { + let projected_schema = self.schema.project(indices)?; let batch_fields = indices .into_iter() - .map(|f: usize| { - self.columns.get(f).cloned().ok_or_else(|| { + .map(|f| { + self.columns.get(*f).cloned().ok_or_else(|| { ArrowError::SchemaError(format!( "project index {} out of bounds, max field {}", f, @@ -922,4 +919,23 @@ mod tests { assert_ne!(batch1, batch2); } + + #[test] + fn project() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(3), + ])); + let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"])); + let c: ArrayRef = Arc::new(StringArray::from(vec!["d", "e", "f"])); + + let record_batch = RecordBatch::try_from_iter(vec![("a", a.clone()), ("b", b.clone()), ("c", c.clone())]) + .expect("valid conversion"); + + let expected = RecordBatch::try_from_iter(vec![("a", a), ("c", c)]) + .expect("valid conversion"); + + assert_eq!(expected, record_batch.project(&vec![0, 2]).unwrap()); + } } From 8ade651f1e67264a36bb68bd57cd4fa56f13684f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Dec 2021 11:18:02 -0500 Subject: [PATCH 4/4] fix: clippy and fmt --- arrow/src/datatypes/schema.rs | 12 ++++++++---- arrow/src/record_batch.rs | 18 +++++++++--------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/arrow/src/datatypes/schema.rs b/arrow/src/datatypes/schema.rs index cc2e153e24d8..8c3014490d35 100644 --- a/arrow/src/datatypes/schema.rs +++ b/arrow/src/datatypes/schema.rs @@ -90,7 +90,8 @@ impl Schema { /// Returns a new schema with only the specified columns in the new schema /// This carries metadata from the parent schema over as well pub fn project(&self, indices: &[usize]) -> Result { - let new_fields = indices.into_iter() + let new_fields = indices + .iter() .map(|i| { self.fields.get(*i).cloned().ok_or_else(|| { ArrowError::SchemaError(format!( @@ -132,7 +133,7 @@ impl Schema { /// ]), /// ); /// ``` - pub fn try_merge(schemas: impl IntoIterator) -> Result { + pub fn try_merge(schemas: impl IntoIterator) -> Result { schemas .into_iter() .try_fold(Self::empty(), |mut merged, schema| { @@ -423,11 +424,14 @@ mod tests { metadata, ); - let projected: Result = schema.project(&vec![0, 3]); + let projected: Result = schema.project(&[0, 3]); assert!(projected.is_err()); if let Err(e) = projected { - assert_eq!(e.to_string(), "Schema error: project index 3 out of bounds, max field 3".to_string()) + assert_eq!( + e.to_string(), + "Schema error: project index 3 out of bounds, max field 3".to_string() + ) } } } diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index e60de7033c8b..9faba7ddce1e 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -179,7 +179,7 @@ impl RecordBatch { pub fn project(&self, indices: &[usize]) -> Result { let projected_schema = self.schema.project(indices)?; let batch_fields = indices - .into_iter() + .iter() .map(|f| { self.columns.get(*f).cloned().ok_or_else(|| { ArrowError::SchemaError(format!( @@ -922,20 +922,20 @@ mod tests { #[test] fn project() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![ - Some(1), - None, - Some(3), - ])); + let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"])); let c: ArrayRef = Arc::new(StringArray::from(vec!["d", "e", "f"])); - let record_batch = RecordBatch::try_from_iter(vec![("a", a.clone()), ("b", b.clone()), ("c", c.clone())]) - .expect("valid conversion"); + let record_batch = RecordBatch::try_from_iter(vec![ + ("a", a.clone()), + ("b", b.clone()), + ("c", c.clone()), + ]) + .expect("valid conversion"); let expected = RecordBatch::try_from_iter(vec![("a", a), ("c", c)]) .expect("valid conversion"); - assert_eq!(expected, record_batch.project(&vec![0, 2]).unwrap()); + assert_eq!(expected, record_batch.project(&[0, 2]).unwrap()); } }