Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 8 additions & 35 deletions datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{
ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, Date32Type,
Date64Type, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Int8Type, Schema, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
UInt8Type,
Expand All @@ -56,23 +56,17 @@ type RecordSlice<'a> = &'a [&'a Vec<(String, Value)>];
pub struct AvroArrowArrayReader<'a, R: Read> {
reader: AvroReader<'a, R>,
schema: SchemaRef,
projection: Option<Vec<String>>,
schema_lookup: BTreeMap<String, usize>,
}

impl<R: Read> AvroArrowArrayReader<'_, R> {
pub fn try_new(
reader: R,
schema: SchemaRef,
projection: Option<Vec<String>>,
) -> Result<Self> {
pub fn try_new(reader: R, schema: SchemaRef) -> Result<Self> {
let reader = AvroReader::new(reader)?;
let writer_schema = reader.writer_schema().clone();
let schema_lookup = Self::schema_lookup(writer_schema)?;
Ok(Self {
reader,
schema,
projection,
schema_lookup,
})
}
Expand Down Expand Up @@ -175,20 +169,9 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
};

let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
let projection = self.projection.clone().unwrap_or_default();
let arrays =
self.build_struct_array(&rows, "", self.schema.fields(), &projection);
let projected_fields = if projection.is_empty() {
self.schema.fields().clone()
} else {
projection
.iter()
.filter_map(|name| self.schema.column_with_name(name))
.map(|(_, field)| field.clone())
.collect()
};
let projected_schema = Arc::new(Schema::new(projected_fields));
Some(arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)))
let arrays = self.build_struct_array(&rows, "", self.schema.fields());

Some(arrays.and_then(|arr| RecordBatch::try_new(Arc::clone(&self.schema), arr)))
}

fn build_boolean_array(&self, rows: RecordSlice, col_name: &str) -> ArrayRef {
Expand Down Expand Up @@ -615,7 +598,7 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
let sub_parent_field_name =
format!("{}.{}", parent_field_name, list_field.name());
let arrays =
self.build_struct_array(&rows, &sub_parent_field_name, fields, &[])?;
self.build_struct_array(&rows, &sub_parent_field_name, fields)?;
let data_type = DataType::Struct(fields.clone());
ArrayDataBuilder::new(data_type)
.len(rows.len())
Expand Down Expand Up @@ -645,20 +628,14 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
/// The function does not construct the StructArray as some callers would want the child arrays.
///
/// *Note*: The function is recursive, and will read nested structs.
///
/// If `projection` is not empty, then all values are returned. The first level of projection
/// occurs at the `RecordBatch` level. No further projection currently occurs, but would be
/// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`.
fn build_struct_array(
&self,
rows: RecordSlice,
parent_field_name: &str,
struct_fields: &Fields,
projection: &[String],
) -> ArrowResult<Vec<ArrayRef>> {
let arrays: ArrowResult<Vec<ArrayRef>> = struct_fields
.iter()
.filter(|field| projection.is_empty() || projection.contains(field.name()))
.map(|field| {
let field_path = if parent_field_name.is_empty() {
field.name().to_string()
Expand Down Expand Up @@ -840,12 +817,8 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
}
})
.collect::<Vec<&Vec<(String, Value)>>>();
let arrays = self.build_struct_array(
&struct_rows,
&field_path,
fields,
&[],
)?;
let arrays =
self.build_struct_array(&struct_rows, &field_path, fields)?;
// construct a struct array's data in order to set null buffer
let data_type = DataType::Struct(fields.clone());
let data = ArrayDataBuilder::new(data_type)
Expand Down
87 changes: 80 additions & 7 deletions datafusion/datasource-avro/src/avro_to_arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use super::arrow_array_reader::AvroArrowArrayReader;
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Fields, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
Expand Down Expand Up @@ -133,19 +133,35 @@ impl<R: Read> Reader<'_, R> {
///
/// If reading a `File`, you can customise the Reader, such as to enable schema
/// inference, use `ReaderBuilder`.
///
/// If projection is provided, it uses a schema with only the fields in the projection, respecting their order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// Only the first level of projection is handled. No further projection currently occurs, but would be
/// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`.
pub fn try_new(
reader: R,
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Result<Self> {
let projected_schema = projection.as_ref().filter(|p| !p.is_empty()).map_or_else(
|| Arc::clone(&schema),
|proj| {
Arc::new(arrow::datatypes::Schema::new(
proj.iter()
.filter_map(|name| {
schema.column_with_name(name).map(|(_, f)| f.clone())
})
.collect::<Fields>(),
))
},
);

Ok(Self {
array_reader: AvroArrowArrayReader::try_new(
reader,
Arc::clone(&schema),
projection,
Arc::clone(&projected_schema),
)?,
schema,
schema: projected_schema,
batch_size,
})
}
Expand Down Expand Up @@ -179,10 +195,13 @@ mod tests {
use arrow::datatypes::{DataType, Field};
use std::fs::File;

fn build_reader(name: &str) -> Reader<File> {
fn build_reader(name: &str, projection: Option<Vec<String>>) -> Reader<File> {
let testdata = datafusion_common::test_util::arrow_test_data();
let filename = format!("{testdata}/avro/{name}");
let builder = ReaderBuilder::new().read_schema().with_batch_size(64);
let mut builder = ReaderBuilder::new().read_schema().with_batch_size(64);
if let Some(projection) = projection {
builder = builder.with_projection(projection);
}
builder.build(File::open(filename).unwrap()).unwrap()
}

Expand All @@ -195,7 +214,7 @@ mod tests {

#[test]
fn test_avro_basic() {
let mut reader = build_reader("alltypes_dictionary.avro");
let mut reader = build_reader("alltypes_dictionary.avro", None);
let batch = reader.next().unwrap().unwrap();

assert_eq!(11, batch.num_columns());
Expand Down Expand Up @@ -281,4 +300,58 @@ mod tests {
assert_eq!(1230768000000000, col.value(0));
assert_eq!(1230768060000000, col.value(1));
}

#[test]
fn test_avro_with_projection() {
// Test projection to filter and reorder columns
let projection = Some(vec![
"string_col".to_string(),
"double_col".to_string(),
"bool_col".to_string(),
]);
let mut reader = build_reader("alltypes_dictionary.avro", projection);
let batch = reader.next().unwrap().unwrap();

// Only 3 columns should be present (not all 11)
assert_eq!(3, batch.num_columns());
assert_eq!(2, batch.num_rows());

let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);

// Verify columns are in the order specified in projection
// First column should be string_col (was at index 9 in original)
assert_eq!("string_col", schema.field(0).name());
assert_eq!(&DataType::Binary, schema.field(0).data_type());
let col = batch
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
assert_eq!("0".as_bytes(), col.value(0));
assert_eq!("1".as_bytes(), col.value(1));

// Second column should be double_col (was at index 7 in original)
assert_eq!("double_col", schema.field(1).name());
assert_eq!(&DataType::Float64, schema.field(1).data_type());
let col = batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(0.0, col.value(0));
assert_eq!(10.1, col.value(1));

// Third column should be bool_col (was at index 1 in original)
assert_eq!("bool_col", schema.field(2).name());
assert_eq!(&DataType::Boolean, schema.field(2).data_type());
let col = batch
.column(2)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
assert!(col.value(0));
assert!(!col.value(1));
}
}
10 changes: 10 additions & 0 deletions datafusion/sqllogictest/test_files/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,13 @@ physical_plan
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, file_type=avro

# test column projection order from avro file
query ITII
SELECT id, string_col, int_col, bigint_col FROM alltypes_plain ORDER BY id LIMIT 5
----
0 0 0 0
1 1 1 10
2 0 0 0
3 1 1 10
4 0 0 0