Skip to content

Commit 3901ec2

Browse files
committed
add partitions to range scan
1 parent 76eb9a2 commit 3901ec2

File tree

3 files changed

+24
-13
lines changed

3 files changed

+24
-13
lines changed

daft/io/_range.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,30 @@
1111
from daft.table.table import Table
1212

1313

14-
def _range_generators(start: int, end: int, step: int) -> Iterator[Callable[[], Iterator[Table]]]:
15-
def generator_for_value(value: int) -> Callable[[], Iterator[Table]]:
16-
def generator() -> Iterator[Table]:
17-
yield Table.from_pydict({"id": [value]})
14+
def _range_generators(start: int, end: int, step: int, partitions: int) -> Iterator[Callable[[], Iterator[Table]]]:
15+
# TODO: Partitioning with range scan is currently untested and unused.
16+
# There may be issues with balanced partitions and step size.
1817

19-
return generator
18+
# Calculate partition bounds upfront
19+
partition_size = (end - start) // partitions
20+
partition_bounds = [
21+
(start + (i * partition_size), start + ((i + 1) * partition_size) if i < partitions - 1 else end)
22+
for i in range(partitions)
23+
]
2024

21-
for value in range(start, end, step):
22-
yield generator_for_value(value)
25+
def generator(partition_idx: int) -> Iterator[Table]:
26+
partition_start, partition_end = partition_bounds[partition_idx]
27+
values = list(range(partition_start, partition_end, step))
28+
yield Table.from_pydict({"id": values})
29+
30+
from functools import partial
31+
32+
for partition_idx in range(partitions):
33+
yield partial(generator, partition_idx)
2334

2435

2536
class RangeScanOperator(GeneratorScanOperator):
26-
def __init__(self, start: int, end: int, step: int = 1) -> None:
37+
def __init__(self, start: int, end: int, step: int = 1, partitions: int = 1) -> None:
2738
schema = Schema._from_field_name_and_types([("id", DataType.int64())])
2839

29-
super().__init__(schema=schema, generators=_range_generators(start, end, step))
40+
super().__init__(schema=schema, generators=_range_generators(start, end, step, partitions))

src/daft-connect/src/translation/logical_plan.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ fn range(range: Range) -> eyre::Result<LogicalPlanBuilder> {
3333
num_partitions,
3434
} = range;
3535

36-
if let Some(partitions) = num_partitions {
37-
warn!("{partitions} ignored");
38-
}
36+
let partitions = num_partitions.unwrap_or(1);
37+
38+
ensure!(partitions > 0, "num_partitions must be greater than 0");
3939

4040
let start = start.unwrap_or(0);
4141

@@ -51,7 +51,7 @@ fn range(range: Range) -> eyre::Result<LogicalPlanBuilder> {
5151
.wrap_err("Failed to get range function")?;
5252

5353
let range = range
54-
.call1((start, end, step))
54+
.call1((start, end, step, partitions))
5555
.wrap_err("Failed to create range scan operator")?
5656
.to_object(py);
5757

xyz

Whitespace-only changes.

0 commit comments

Comments
 (0)