Skip to content

Commit

Permalink
[FEAT] Add dataframe iteration on rows and change default buffer size (
Browse files Browse the repository at this point in the history
…#2685)

1. Changes default `results_buffer_size` on our dataframe iteration
methods to use the total number of available CPUs on the current machine
instead of `1`, which was empirically observed by users to significant
slow down processing speeds
2. Adds a new `df.iter_rows()` API which is used by `__iter__`, but
provides an alternative API if a user needs to configure
`results_buffer_size`

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Aug 19, 2024
1 parent 774a5d6 commit 9f5aee8
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 9 deletions.
101 changes: 92 additions & 9 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# For technical details, see https://github.com/Eventual-Inc/Daft/pull/630

import io
import multiprocessing
import os
import pathlib
import typing
Expand Down Expand Up @@ -64,6 +65,9 @@
ManyColumnsInputType = Union[ColumnInputType, Iterable[ColumnInputType]]


NUM_CPUS = multiprocessing.cpu_count()


class DataFrame:
"""A Daft DataFrame is a table of data. It has columns, where each column has a type and the same
number of items (rows) as all other columns.
Expand Down Expand Up @@ -218,11 +222,47 @@ def columns(self) -> List[Expression]:

@DataframePublicAPI
def __iter__(self) -> Iterator[Dict[str, Any]]:
"""Alias of `self.iter_rows()` with default arguments for convenient access of data."""
return self.iter_rows(results_buffer_size=None)

@DataframePublicAPI
def iter_rows(self, results_buffer_size: Optional[int] = NUM_CPUS) -> Iterator[Dict[str, Any]]:
"""Return an iterator of rows for this dataframe.
Each row will be a pydict of the form { "key" : value }.
"""
Each row will be a Python dictionary of the form { "key" : value, ... }. If you are instead looking to iterate over
entire partitions of data, see: :meth:`df.iter_partitions() <daft.DataFrame.iter_partitions>`.
.. NOTE::
A quick note on configuring asynchronous/parallel execution using `results_buffer_size`.
The `results_buffer_size` kwarg controls how many results Daft will allow to be in the buffer while iterating.
Once this buffer is filled, Daft will not run any more work until some partition is consumed from the buffer.
* Increasing this value means the iterator will consume more memory and CPU resources but have higher throughput
* Decreasing this value means the iterator will consume lower memory and CPU resources, but have lower throughput
* Setting this value to `None` means the iterator will consume as much resources as it deems appropriate per-iteration
The default value is the total number of CPUs available on the current machine.
Example:
>>> import daft
>>>
>>> df = daft.from_pydict({"foo": [1, 2, 3], "bar": ["a", "b", "c"]})
>>> for row in df.iter_rows():
... print(row)
{'foo': 1, 'bar': 'a'}
{'foo': 2, 'bar': 'b'}
{'foo': 3, 'bar': 'c'}
Args:
results_buffer_size: how many partitions to allow in the results buffer (defaults to the total number of CPUs
available on the machine).
.. seealso::
:meth:`df.iter_partitions() <daft.DataFrame.iter_partitions>`: iterator over entire partitions instead of single rows
"""
if self._result is not None:
# If the dataframe has already finished executing,
# use the precomputed results.
Expand All @@ -234,7 +274,7 @@ def __iter__(self) -> Iterator[Dict[str, Any]]:
else:
# Execute the dataframe in a streaming fashion.
context = get_context()
partitions_iter = context.runner().run_iter_tables(self._builder, results_buffer_size=1)
partitions_iter = context.runner().run_iter_tables(self._builder, results_buffer_size=results_buffer_size)

# Iterate through partitions.
for partition in partitions_iter:
Expand Down Expand Up @@ -268,19 +308,62 @@ def to_arrow_iter(self, results_buffer_size: Optional[int] = 1) -> Iterator["pya

@DataframePublicAPI
def iter_partitions(
self, results_buffer_size: Optional[int] = 1
self, results_buffer_size: Optional[int] = NUM_CPUS
) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]:
"""Begin executing this dataframe and return an iterator over the partitions.
Each partition will be returned as a daft.Table object (if using Python runner backend)
or a ray ObjectRef (if using Ray runner backend).
.. NOTE::
A quick note on configuring asynchronous/parallel execution using `results_buffer_size`.
The `results_buffer_size` kwarg controls how many results Daft will allow to be in the buffer while iterating.
Once this buffer is filled, Daft will not run any more work until some partition is consumed from the buffer.
* Increasing this value means the iterator will consume more memory and CPU resources but have higher throughput
* Decreasing this value means the iterator will consume lower memory and CPU resources, but have lower throughput
* Setting this value to `None` means the iterator will consume as much resources as it deems appropriate per-iteration
The default value is the total number of CPUs available on the current machine.
Args:
results_buffer_size: how many partitions to allow in the results buffer (defaults to 1).
Setting this value will buffer results up to the provided size and provide backpressure
to dataframe execution based on the rate of consumption from the returned iterator. Setting this to
`None` will result in a buffer of unbounded size, causing the dataframe to run asynchronously
to completion.
results_buffer_size: how many partitions to allow in the results buffer (defaults to the total number of CPUs
available on the machine).
>>> import daft
>>>
>>> df = daft.from_pydict({"foo": [1, 2, 3], "bar": ["a", "b", "c"]}).into_partitions(2)
>>> for part in df.iter_partitions():
... print(part)
MicroPartition with 2 rows:
TableState: Loaded. 1 tables
╭───────┬──────╮
│ foo ┆ bar │
│ --- ┆ --- │
│ Int64 ┆ Utf8 │
╞═══════╪══════╡
│ 1 ┆ a │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2 ┆ b │
╰───────┴──────╯
<BLANKLINE>
<BLANKLINE>
Statistics: missing
<BLANKLINE>
MicroPartition with 1 rows:
TableState: Loaded. 1 tables
╭───────┬──────╮
│ foo ┆ bar │
│ --- ┆ --- │
│ Int64 ┆ Utf8 │
╞═══════╪══════╡
│ 3 ┆ c │
╰───────┴──────╯
<BLANKLINE>
<BLANKLINE>
Statistics: missing
<BLANKLINE>
"""
if results_buffer_size is not None and not results_buffer_size > 0:
raise ValueError(f"Provided `results_buffer_size` value must be > 0, received: {results_buffer_size}")
Expand Down
1 change: 1 addition & 0 deletions docs/source/api_docs/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ These methods will run the dataframe and retrieve them to where the code is bein

DataFrame.to_pydict
DataFrame.iter_partitions
DataFrame.iter_rows

Materialization
***************
Expand Down

0 comments on commit 9f5aee8

Please sign in to comment.