-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix compute_record_batch_statistics
wrong with projection
#8489
Changes from 38 commits
7afeb8b
6332bec
cc5e0c7
a114310
928c811
839093e
a836cde
5648dc7
a670409
22894a3
73a59d2
46409c2
8a86a4c
cf5c584
62ae9b9
da02fa2
d98eb2e
79e7216
ba51abd
2468f52
180c303
68980ba
9411940
ba28346
df0942f
edccb66
fb74b99
767b004
2e0eef5
749e0c8
5d43a94
71047f3
4b6921b
deefdd0
c00027e
d46a9f9
41a520f
632b460
d19294f
928cbb1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Statistics}; | |
use arrow::datatypes::Schema; | ||
use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; | ||
use arrow::record_batch::RecordBatch; | ||
use arrow_array::Array; | ||
use datafusion_common::stats::Precision; | ||
use datafusion_common::{plan_err, DataFusionError, Result}; | ||
use datafusion_execution::memory_pool::MemoryReservation; | ||
|
@@ -139,17 +140,24 @@ pub fn compute_record_batch_statistics( | |
) -> Statistics { | ||
let nb_rows = batches.iter().flatten().map(RecordBatch::num_rows).sum(); | ||
|
||
let total_byte_size = batches | ||
.iter() | ||
.flatten() | ||
.map(|b| b.get_array_memory_size()) | ||
.sum(); | ||
|
||
let projection = match projection { | ||
Some(p) => p, | ||
None => (0..schema.fields().len()).collect(), | ||
}; | ||
|
||
let total_byte_size = batches | ||
.iter() | ||
.flatten() | ||
.map(|b| { | ||
b.columns() | ||
.iter() | ||
.enumerate() | ||
.filter(|(index, _)| projection.contains(index)) | ||
Dandandan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.map(|(_, col)| col.get_array_memory_size()) | ||
.sum::<usize>() | ||
}) | ||
.sum(); | ||
|
||
let mut column_statistics = vec![ColumnStatistics::new_unknown(); projection.len()]; | ||
|
||
for partition in batches.iter() { | ||
|
@@ -388,6 +396,7 @@ mod tests { | |
datatypes::{DataType, Field, Schema}, | ||
record_batch::RecordBatch, | ||
}; | ||
use arrow_array::UInt64Array; | ||
use datafusion_expr::Operator; | ||
use datafusion_physical_expr::expressions::{col, Column}; | ||
|
||
|
@@ -685,20 +694,30 @@ mod tests { | |
let schema = Arc::new(Schema::new(vec![ | ||
Field::new("f32", DataType::Float32, false), | ||
Field::new("f64", DataType::Float64, false), | ||
Field::new("u64", DataType::UInt64, false), | ||
])); | ||
let batch = RecordBatch::try_new( | ||
Arc::clone(&schema), | ||
vec![ | ||
Arc::new(Float32Array::from(vec![1., 2., 3.])), | ||
Arc::new(Float64Array::from(vec![9., 8., 7.])), | ||
Arc::new(UInt64Array::from(vec![4, 5, 6])), | ||
], | ||
)?; | ||
|
||
// just select f32,f64 | ||
let select_projection = Some(vec![0, 1]); | ||
let byte_size = batch | ||
.project(&select_projection.clone().unwrap()) | ||
.unwrap() | ||
.get_array_memory_size(); | ||
|
||
let actual = | ||
compute_record_batch_statistics(&[vec![batch]], &schema, Some(vec![0, 1])); | ||
compute_record_batch_statistics(&[vec![batch]], &schema, select_projection); | ||
|
||
let mut expected = Statistics { | ||
let expected = Statistics { | ||
num_rows: Precision::Exact(3), | ||
total_byte_size: Precision::Exact(464), // this might change a bit if the way we compute the size changes | ||
total_byte_size: Precision::Exact(byte_size), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this is appropriate, if you have any good suggestions please leave a message There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is ok and a nice way to make the code less brittle to future changes in arrow's layout There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm curious as to why the previous code was There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it happens to be the (current) size of the record batch in the test: let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Float32Array::from(vec![1., 2., 3.])),
Arc::new(Float64Array::from(vec![9., 8., 7.])),
Arc::new(UInt64Array::from(vec![4, 5, 6])),
],
)?; |
||
column_statistics: vec![ | ||
ColumnStatistics { | ||
distinct_count: Precision::Absent, | ||
|
@@ -715,9 +734,6 @@ mod tests { | |
], | ||
}; | ||
|
||
// Prevent test flakiness due to undefined / changing implementation details | ||
expected.total_byte_size = actual.total_byte_size.clone(); | ||
|
||
assert_eq!(actual, expected); | ||
Ok(()) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
RecordBatch.project
method is not used, because there is aclone
internally, so there is no need to generate a new RecordBatch here.