Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 90 additions & 21 deletions crates/polars-core/src/chunked_array/ops/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,33 +223,70 @@ impl ExplodeByOffsets for BooleanChunked {
}

/// Convert Arrow array offsets to indexes of the original list
pub(crate) fn offsets_to_indexes(offsets: &[i64], capacity: usize) -> Vec<IdxSize> {
pub(crate) fn offsets_to_indexes(
offsets: &[i64],
capacity: usize,
options: ExplodeOptions,
validity: Option<&Bitmap>,
) -> Vec<IdxSize> {
if offsets.is_empty() {
return vec![];
}

let mut idx = Vec::with_capacity(capacity);

let mut last_idx = 0;
for (offset_start, offset_end) in offsets.iter().zip(offsets[1..].iter()) {
if idx.len() >= capacity {
// significant speed-up in edge cases with many offsets,
// no measurable overhead in typical case due to branch prediction
break;
}
match validity {
None => {
for (offset_start, offset_end) in offsets.iter().zip(offsets[1..].iter()) {
if idx.len() >= capacity {
// significant speed-up in edge cases with many offsets,
// no measurable overhead in typical case due to branch prediction
break;
}

if offset_start == offset_end {
// if the previous offset is equal to the current offset, we have an empty
// list and we duplicate the previous index
idx.push(last_idx);
} else {
let width = (offset_end - offset_start) as usize;
for _ in 0..width {
idx.push(last_idx);
if offset_start == offset_end {
if options.empty_as_null {
// if the previous offset is equal to the current offset, we have an empty
// list and we duplicate the previous index
idx.push(last_idx);
}
} else {
let width = (offset_end - offset_start) as usize;
for _ in 0..width {
idx.push(last_idx);
}
}

last_idx += 1;
}
}
},
Some(validity) => {
for ((offset_start, offset_end), is_valid) in
offsets.iter().zip(offsets[1..].iter()).zip(validity.iter())
{
if idx.len() >= capacity {
// significant speed-up in edge cases with many offsets,
// no measurable overhead in typical case due to branch prediction
break;
}

last_idx += 1;
if offset_start == offset_end {
if (is_valid && options.empty_as_null) || (!is_valid && options.keep_nulls) {
// if the previous offset is equal to the current offset, we have an empty
// list and we duplicate the previous index
idx.push(last_idx);
}
} else {
let width = (offset_end - offset_start) as usize;
for _ in 0..width {
idx.push(last_idx);
}
}

last_idx += 1;
}
},
}

// take the remaining values
Expand Down Expand Up @@ -436,29 +473,61 @@ mod test {
#[test]
fn test_row_offsets() {
let offsets = &[0, 1, 2, 2, 3, 4, 4];
let out = offsets_to_indexes(offsets, 6);
let out = offsets_to_indexes(
offsets,
6,
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
None,
);
assert_eq!(out, &[0, 1, 2, 3, 4, 5]);
}

#[test]
fn test_empty_row_offsets() {
let offsets = &[0, 0];
let out = offsets_to_indexes(offsets, 0);
let out = offsets_to_indexes(
offsets,
0,
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
None,
);
let expected: Vec<IdxSize> = Vec::new();
assert_eq!(out, expected);
}

#[test]
fn test_row_offsets_over_capacity() {
let offsets = &[0, 1, 1, 2, 2];
let out = offsets_to_indexes(offsets, 2);
let out = offsets_to_indexes(
offsets,
2,
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
None,
);
assert_eq!(out, &[0, 1]);
}

#[test]
fn test_row_offsets_nonzero_first_offset() {
let offsets = &[3, 6, 8];
let out = offsets_to_indexes(offsets, 10);
let out = offsets_to_indexes(
offsets,
10,
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
None,
);
assert_eq!(out, &[0, 0, 0, 1, 1, 2, 2, 2, 2, 2]);
}
}
2 changes: 2 additions & 0 deletions crates/polars-core/src/chunked_array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ pub trait ChunkAnyValue {
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
pub struct ExplodeOptions {
/// Explode an empty list into a `null`.
pub empty_as_null: bool,
/// Explode a `null` into a `null`.
pub keep_nulls: bool,
}

Expand Down
75 changes: 52 additions & 23 deletions crates/polars-core/src/frame/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ use crate::chunked_array::ops::explode::offsets_to_indexes;
use crate::prelude::*;
use crate::series::IsSorted;

fn get_exploded(series: &Series) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
fn get_exploded(
series: &Series,
options: ExplodeOptions,
) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
match series.dtype() {
DataType::List(_) => series.list().unwrap().explode_and_offsets(ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
}),
DataType::List(_) => series.list().unwrap().explode_and_offsets(options),
#[cfg(feature = "dtype-array")]
DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
}),
DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(options),
_ => polars_bail!(opq = explode, series.dtype()),
}
}
Expand All @@ -35,15 +32,16 @@ pub struct UnpivotArgsIR {
}

impl DataFrame {
pub fn explode_impl(&self, mut columns: Vec<Column>) -> PolarsResult<DataFrame> {
pub fn explode_impl(
&self,
mut columns: Vec<Column>,
options: ExplodeOptions,
) -> PolarsResult<DataFrame> {
polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
let mut df = self.clone();
if self.is_empty() {
for s in &columns {
df.with_column(s.as_materialized_series().explode(ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
})?)?;
df.with_column(s.as_materialized_series().explode(options)?)?;
}
return Ok(df);
}
Expand All @@ -66,8 +64,7 @@ impl DataFrame {
let exploded_columns = POOL.install(|| {
columns
.par_iter()
.map(Column::as_materialized_series)
.map(get_exploded)
.map(|c| get_exploded(c.as_materialized_series(), options))
.map(|s| s.map(|(s, o)| (Column::from(s), o)))
.collect::<PolarsResult<Vec<_>>>()
})?;
Expand Down Expand Up @@ -110,9 +107,15 @@ impl DataFrame {
Ok(())
};
let process_first = || {
let validity = columns[0].rechunk_validity();
let (exploded, offsets) = &exploded_columns[0];

let row_idx = offsets_to_indexes(offsets.as_slice(), exploded.len());
let row_idx = offsets_to_indexes(
offsets.as_slice(),
exploded.len(),
options,
validity.as_ref(),
);
let mut row_idx = IdxCa::from_vec(PlSmallStr::EMPTY, row_idx);
row_idx.set_sorted_flag(IsSorted::Ascending);

Expand Down Expand Up @@ -191,15 +194,15 @@ impl DataFrame {
/// | 2 | 3 | 1 |
/// +-----+-----+-----+
/// ```
pub fn explode<I, S>(&self, columns: I) -> PolarsResult<DataFrame>
pub fn explode<I, S>(&self, columns: I, options: ExplodeOptions) -> PolarsResult<DataFrame>
where
I: IntoIterator<Item = S>,
S: Into<PlSmallStr>,
{
// We need to sort the column by order of original occurrence. Otherwise the insert by index
// below will panic
let columns = self.select_columns(columns)?;
self.explode_impl(columns)
self.explode_impl(columns, options)
}
}

Expand All @@ -219,7 +222,15 @@ mod test {
let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
let df = DataFrame::new(vec![list, s0, s1]).unwrap();
let exploded = df.explode(["foo"]).unwrap();
let exploded = df
.explode(
["foo"],
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
)
.unwrap();
assert_eq!(exploded.shape(), (9, 3));
assert_eq!(
exploded
Expand Down Expand Up @@ -266,7 +277,13 @@ mod test {
let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
let df = DataFrame::new(vec![list, s0.clone(), s1.clone()])?;

let out = df.explode(["foo"])?;
let out = df.explode(
["foo"],
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
)?;
let expected = df![
"foo" => [Some(1), Some(2), Some(3), Some(1), Some(1), Some(1), None],
"B" => [1, 1, 1, 2, 2, 2, 3],
Expand All @@ -284,7 +301,13 @@ mod test {
],
);
let df = DataFrame::new(vec![list, s0, s1])?;
let out = df.explode(["foo"])?;
let out = df.explode(
["foo"],
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
)?;
let expected = df![
"foo" => [Some(1), Some(2), Some(3), None, Some(1), Some(1), Some(1)],
"B" => [1, 1, 1, 2, 3, 3, 3],
Expand All @@ -303,7 +326,13 @@ mod test {
let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1]);
let df = DataFrame::new(vec![list])?;

let out = df.explode(["foo"])?;
let out = df.explode(
["foo"],
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
)?;
let out = out
.column("foo")?
.as_materialized_series()
Expand Down
33 changes: 25 additions & 8 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1769,16 +1769,21 @@ impl LazyFrame {
}

/// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
pub fn explode(self, columns: Selector) -> LazyFrame {
self.explode_impl(columns, false)
pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
self.explode_impl(columns, options, false)
}

/// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
fn explode_impl(
self,
columns: Selector,
options: ExplodeOptions,
allow_empty: bool,
) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.explode(columns, allow_empty)
.explode(columns, options, allow_empty)
.build();
Self::from_logical_plan(lp, opt_state)
}
Expand Down Expand Up @@ -2162,8 +2167,14 @@ impl LazyGroupBy {
.filter_map(|expr| expr_output_name(expr).ok())
.collect::<Vec<_>>();

self.agg([all().as_expr().head(n)])
.explode_impl(all() - by_name(keys.iter().cloned(), false), true)
self.agg([all().as_expr().head(n)]).explode_impl(
all() - by_name(keys.iter().cloned(), false),
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
true,
)
}

/// Return last n rows of each group
Expand All @@ -2174,8 +2185,14 @@ impl LazyGroupBy {
.filter_map(|expr| expr_output_name(expr).ok())
.collect::<Vec<_>>();

self.agg([all().as_expr().tail(n)])
.explode_impl(all() - by_name(keys.iter().cloned(), false), true)
self.agg([all().as_expr().tail(n)]).explode_impl(
all() - by_name(keys.iter().cloned(), false),
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
true,
)
}

/// Apply a function over the groups as a new DataFrame.
Expand Down
8 changes: 7 additions & 1 deletion crates/polars-lazy/src/tests/aggregations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,13 @@ fn test_shift_elementwise_issue_2509() -> PolarsResult<()> {
.sort(["x"], Default::default())
.collect()?;

let out = out.explode(["sum"])?;
let out = out.explode(
["sum"],
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
)?;
let out = out.column("sum")?;
assert_eq!(out.get(0)?, AnyValue::Int32(10));
assert_eq!(out.get(1)?, AnyValue::Int32(20));
Expand Down
Loading
Loading