Skip to content

Commit e149dcd

Browse files
authored
feat: Support intersect all and except distinct/all in DataFrame API (#3537)
This commits adds complete support for intersect and except operations. To support that, it's consisted of: 1. Expose new methods such as `intersect_all`, `except_distinct` and `except_all` in python side's DataFrame API 2. Add Except operator in set_operations and convert corresponding logical plans 3. Complete intersect all branch with similar logic with except all 4. Add a new scalar function: `list_fill(num, val)` which will create a list of num elements with the provided `val` The intersect/except all logic is a bit complex and it will require list_fill and explode to correctly compute the intersected or excepted rows.
1 parent 39bb62c commit e149dcd

File tree

13 files changed

+798
-103
lines changed

13 files changed

+798
-103
lines changed

daft/daft/__init__.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1636,6 +1636,7 @@ class LogicalPlanBuilder:
16361636
) -> LogicalPlanBuilder: ...
16371637
def concat(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder: ...
16381638
def intersect(self, other: LogicalPlanBuilder, is_all: bool) -> LogicalPlanBuilder: ...
1639+
def except_(self, other: LogicalPlanBuilder, is_all: bool) -> LogicalPlanBuilder: ...
16391640
def add_monotonically_increasing_id(self, column_name: str | None) -> LogicalPlanBuilder: ...
16401641
def table_write(
16411642
self,

daft/dataframe/dataframe.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2557,6 +2557,94 @@ def intersect(self, other: "DataFrame") -> "DataFrame":
25572557
builder = self._builder.intersect(other._builder)
25582558
return DataFrame(builder)
25592559

2560+
@DataframePublicAPI
2561+
def intersect_all(self, other: "DataFrame") -> "DataFrame":
2562+
"""Returns the intersection of two DataFrames, including duplicates.
2563+
2564+
Example:
2565+
>>> import daft
2566+
>>> df1 = daft.from_pydict({"a": [1, 2, 2], "b": [4, 6, 6]})
2567+
>>> df2 = daft.from_pydict({"a": [1, 1, 2, 2], "b": [4, 4, 6, 6]})
2568+
>>> df1.intersect_all(df2).sort("a").collect()
2569+
╭───────┬───────╮
2570+
│ a ┆ b │
2571+
│ --- ┆ --- │
2572+
│ Int64 ┆ Int64 │
2573+
╞═══════╪═══════╡
2574+
│ 1 ┆ 4 │
2575+
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
2576+
│ 2 ┆ 6 │
2577+
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
2578+
│ 2 ┆ 6 │
2579+
╰───────┴───────╯
2580+
<BLANKLINE>
2581+
(Showing first 3 of 3 rows)
2582+
2583+
Args:
2584+
other (DataFrame): DataFrame to intersect with
2585+
2586+
Returns:
2587+
DataFrame: DataFrame with the intersection of the two DataFrames, including duplicates
2588+
"""
2589+
builder = self._builder.intersect_all(other._builder)
2590+
return DataFrame(builder)
2591+
2592+
@DataframePublicAPI
2593+
def except_distinct(self, other: "DataFrame") -> "DataFrame":
2594+
"""Returns the set difference of two DataFrames.
2595+
2596+
Example:
2597+
>>> import daft
2598+
>>> df1 = daft.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]})
2599+
>>> df2 = daft.from_pydict({"a": [1, 2, 3], "b": [4, 8, 6]})
2600+
>>> df1.except_distinct(df2).collect()
2601+
╭───────┬───────╮
2602+
│ a ┆ b │
2603+
│ --- ┆ --- │
2604+
│ Int64 ┆ Int64 │
2605+
╞═══════╪═══════╡
2606+
│ 2 ┆ 5 │
2607+
╰───────┴───────╯
2608+
<BLANKLINE>
2609+
(Showing first 1 of 1 rows)
2610+
2611+
Args:
2612+
other (DataFrame): DataFrame to except with
2613+
2614+
Returns:
2615+
DataFrame: DataFrame with the set difference of the two DataFrames
2616+
"""
2617+
builder = self._builder.except_distinct(other._builder)
2618+
return DataFrame(builder)
2619+
2620+
@DataframePublicAPI
2621+
def except_all(self, other: "DataFrame") -> "DataFrame":
2622+
"""Returns the set difference of two DataFrames, considering duplicates.
2623+
2624+
Example:
2625+
>>> import daft
2626+
>>> df1 = daft.from_pydict({"a": [1, 1, 2, 2], "b": [4, 4, 6, 6]})
2627+
>>> df2 = daft.from_pydict({"a": [1, 2, 2], "b": [4, 6, 6]})
2628+
>>> df1.except_all(df2).collect()
2629+
╭───────┬───────╮
2630+
│ a ┆ b │
2631+
│ --- ┆ --- │
2632+
│ Int64 ┆ Int64 │
2633+
╞═══════╪═══════╡
2634+
│ 1 ┆ 4 │
2635+
╰───────┴───────╯
2636+
<BLANKLINE>
2637+
(Showing first 1 of 1 rows)
2638+
2639+
Args:
2640+
other (DataFrame): DataFrame to except with
2641+
2642+
Returns:
2643+
DataFrame: DataFrame with the set difference of the two DataFrames, considering duplicates
2644+
"""
2645+
builder = self._builder.except_all(other._builder)
2646+
return DataFrame(builder)
2647+
25602648
def _materialize_results(self) -> None:
25612649
"""Materializes the results of for this DataFrame and hold a pointer to the results."""
25622650
context = get_context()

daft/logical/builder.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,18 @@ def intersect(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder:
279279
builder = self._builder.intersect(other._builder, False)
280280
return LogicalPlanBuilder(builder)
281281

282+
def intersect_all(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder:
283+
builder = self._builder.intersect(other._builder, True)
284+
return LogicalPlanBuilder(builder)
285+
286+
def except_distinct(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder:
287+
builder = self._builder.except_(other._builder, False)
288+
return LogicalPlanBuilder(builder)
289+
290+
def except_all(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder:
291+
builder = self._builder.except_(other._builder, True)
292+
return LogicalPlanBuilder(builder)
293+
282294
def add_monotonically_increasing_id(self, column_name: str | None) -> LogicalPlanBuilder:
283295
builder = self._builder.add_monotonically_increasing_id(column_name)
284296
return LogicalPlanBuilder(builder)

src/daft-core/src/array/ops/list.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{iter::repeat, sync::Arc};
22

3-
use arrow2::offset::OffsetsBuffer;
3+
use arrow2::offset::{Offsets, OffsetsBuffer};
44
use common_error::DaftResult;
55
use indexmap::{
66
map::{raw_entry_v1::RawEntryMut, RawEntryApiV1},
@@ -255,6 +255,31 @@ fn list_sort_helper_fixed_size(
255255
.collect()
256256
}
257257

258+
fn general_list_fill_helper(element: &Series, num_array: &Int64Array) -> DaftResult<Vec<Series>> {
259+
let num_iter = create_iter(num_array, element.len());
260+
let mut result = Vec::with_capacity(element.len());
261+
let element_data = element.as_physical()?;
262+
for (row_index, num) in num_iter.enumerate() {
263+
let list_arr = if element.is_valid(row_index) {
264+
let mut list_growable = make_growable(
265+
element.name(),
266+
element.data_type(),
267+
vec![&element_data],
268+
false,
269+
num as usize,
270+
);
271+
for _ in 0..num {
272+
list_growable.extend(0, row_index, 1);
273+
}
274+
list_growable.build()?
275+
} else {
276+
Series::full_null(element.name(), element.data_type(), num as usize)
277+
};
278+
result.push(list_arr);
279+
}
280+
Ok(result)
281+
}
282+
258283
impl ListArray {
259284
pub fn value_counts(&self) -> DaftResult<MapArray> {
260285
struct IndexRef {
@@ -625,6 +650,25 @@ impl ListArray {
625650
self.validity().cloned(),
626651
))
627652
}
653+
654+
pub fn list_fill(elem: &Series, num_array: &Int64Array) -> DaftResult<Self> {
655+
let generated = general_list_fill_helper(elem, num_array)?;
656+
let generated_refs: Vec<&Series> = generated.iter().collect();
657+
let lengths = generated.iter().map(|arr| arr.len());
658+
let offsets = Offsets::try_from_lengths(lengths)?;
659+
let flat_child = if generated_refs.is_empty() {
660+
// when there's no output, we should create an empty series
661+
Series::empty(elem.name(), elem.data_type())
662+
} else {
663+
Series::concat(&generated_refs)?
664+
};
665+
Ok(Self::new(
666+
elem.field().to_list_field()?,
667+
flat_child,
668+
offsets.into(),
669+
None,
670+
))
671+
}
628672
}
629673

630674
impl FixedSizeListArray {

src/daft-core/src/series/ops/list.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use common_error::{DaftError, DaftResult};
22
use daft_schema::field::Field;
33

44
use crate::{
5+
array::ListArray,
56
datatypes::{DataType, UInt64Array, Utf8Array},
6-
prelude::CountMode,
7+
prelude::{CountMode, Int64Array},
78
series::{IntoSeries, Series},
89
};
910

@@ -217,4 +218,14 @@ impl Series {
217218
))),
218219
}
219220
}
221+
222+
/// Given a series of data T, repeat each data T with num times to create a list, returns
223+
/// a series of repeated list.
224+
/// # Example
225+
/// ```txt
226+
/// repeat([1, 2, 3], [2, 0, 1]) --> [[1, 1], [], [3]]
227+
/// ```
228+
pub fn list_fill(&self, num: &Int64Array) -> DaftResult<Self> {
229+
ListArray::list_fill(self, num).map(|arr| arr.into_series())
230+
}
220231
}

0 commit comments

Comments
 (0)