From 895765dc67730fe546e8b1445091167a293843b9 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 3 Sep 2024 14:22:10 -0700 Subject: [PATCH 1/6] [PERF] Use to_arrow_iter in to_arrow to avoid unnecessary array concats --- daft/dataframe/dataframe.py | 47 ++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 874199ed8f..596e55b013 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -65,9 +65,6 @@ ManyColumnsInputType = Union[ColumnInputType, Iterable[ColumnInputType]] -NUM_CPUS = multiprocessing.cpu_count() - - class DataFrame: """A Daft DataFrame is a table of data. It has columns, where each column has a type and the same number of items (rows) as all other columns. @@ -226,7 +223,7 @@ def __iter__(self) -> Iterator[Dict[str, Any]]: return self.iter_rows(results_buffer_size=None) @DataframePublicAPI - def iter_rows(self, results_buffer_size: Optional[int] = NUM_CPUS) -> Iterator[Dict[str, Any]]: + def iter_rows(self, results_buffer_size: Optional[int] | Literal["num_cpus"] = "num_cpus") -> Iterator[Dict[str, Any]]: """Return an iterator of rows for this dataframe. Each row will be a Python dictionary of the form { "key" : value, ... }. If you are instead looking to iterate over @@ -271,6 +268,9 @@ def iter_rows(self, results_buffer_size: Optional[int] = NUM_CPUS) -> Iterator[D row = {key: value[i] for (key, value) in pydict.items()} yield row + if results_buffer_size == "num_cpus": + results_buffer_size = multiprocessing.cpu_count() + else: # Execute the dataframe in a streaming fashion. context = get_context() @@ -286,21 +286,35 @@ def iter_rows(self, results_buffer_size: Optional[int] = NUM_CPUS) -> Iterator[D yield row @DataframePublicAPI - def to_arrow_iter(self, results_buffer_size: Optional[int] = 1) -> Iterator["pyarrow.RecordBatch"]: + def to_arrow_iter( + self, results_buffer_size: Optional[int] | Literal["num_cpus"] = "num_cpus", + ) -> Iterator["pyarrow.RecordBatch"]: """ Return an iterator of pyarrow recordbatches for this dataframe. """ + for name in self.schema().column_names(): + if self.schema()[name].dtype._is_python_type(): + raise ValueError( + f"Cannot convert column {name} to Arrow type, found Python type: {self.schema()[name].dtype}" + ) + + if results_buffer_size == "num_cpus": + results_buffer_size = multiprocessing.cpu_count() if results_buffer_size is not None and not results_buffer_size > 0: raise ValueError(f"Provided `results_buffer_size` value must be > 0, received: {results_buffer_size}") if self._result is not None: # If the dataframe has already finished executing, # use the precomputed results. - yield from self.to_arrow().to_batches() - + for _, result in self._result.items(): + yield from ( + result.vpartition() + .to_arrow() + .to_batches() + ) else: # Execute the dataframe in a streaming fashion. context = get_context() - partitions_iter = context.runner().run_iter_tables(self._builder, results_buffer_size) + partitions_iter = context.runner().run_iter_tables(self._builder, results_buffer_size=results_buffer_size) # Iterate through partitions. for partition in partitions_iter: @@ -308,7 +322,7 @@ def to_arrow_iter(self, results_buffer_size: Optional[int] = 1) -> Iterator["pya @DataframePublicAPI def iter_partitions( - self, results_buffer_size: Optional[int] = NUM_CPUS + self, results_buffer_size: Optional[int] | Literal["num_cpus"] = "num_cpus" ) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]: """Begin executing this dataframe and return an iterator over the partitions. @@ -367,6 +381,8 @@ def iter_partitions( """ if results_buffer_size is not None and not results_buffer_size > 0: raise ValueError(f"Provided `results_buffer_size` value must be > 0, received: {results_buffer_size}") + elif results_buffer_size == "num_cpus": + results_buffer_size = multiprocessing.cpu_count() if self._result is not None: # If the dataframe has already finished executing, @@ -2496,17 +2512,10 @@ def to_arrow(self) -> "pyarrow.Table": .. NOTE:: This call is **blocking** and will execute the DataFrame when called """ - for name in self.schema().column_names(): - if self.schema()[name].dtype._is_python_type(): - raise ValueError( - f"Cannot convert column {name} to Arrow type, found Python type: {self.schema()[name].dtype}" - ) - - self.collect() - result = self._result - assert result is not None + import pyarrow as pa - return result.to_arrow() + arrow_rb_iter = self.to_arrow_iter(results_buffer_size=None) + return pa.Table.from_batches(arrow_rb_iter) @DataframePublicAPI def to_pydict(self) -> Dict[str, List[Any]]: From 3b727323614c4b941a7bed97badcb28481583240 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 3 Sep 2024 14:37:58 -0700 Subject: [PATCH 2/6] Explicitly pass in pyarrow schema during table construction --- daft/dataframe/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 596e55b013..c0445fdaac 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -2515,7 +2515,7 @@ def to_arrow(self) -> "pyarrow.Table": import pyarrow as pa arrow_rb_iter = self.to_arrow_iter(results_buffer_size=None) - return pa.Table.from_batches(arrow_rb_iter) + return pa.Table.from_batches(arrow_rb_iter, schema=self.schema().to_pyarrow_schema()) @DataframePublicAPI def to_pydict(self) -> Dict[str, List[Any]]: From 0594db11bd7f1b892673147073dc5f721e277233 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sun, 8 Sep 2024 18:31:47 -0700 Subject: [PATCH 3/6] Lints --- daft/dataframe/dataframe.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index c0445fdaac..00b2c6157f 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -223,7 +223,9 @@ def __iter__(self) -> Iterator[Dict[str, Any]]: return self.iter_rows(results_buffer_size=None) @DataframePublicAPI - def iter_rows(self, results_buffer_size: Optional[int] | Literal["num_cpus"] = "num_cpus") -> Iterator[Dict[str, Any]]: + def iter_rows( + self, results_buffer_size: Union[Optional[int], Literal["num_cpus"]] = "num_cpus" + ) -> Iterator[Dict[str, Any]]: """Return an iterator of rows for this dataframe. Each row will be a Python dictionary of the form { "key" : value, ... }. If you are instead looking to iterate over @@ -287,7 +289,8 @@ def iter_rows(self, results_buffer_size: Optional[int] | Literal["num_cpus"] = " @DataframePublicAPI def to_arrow_iter( - self, results_buffer_size: Optional[int] | Literal["num_cpus"] = "num_cpus", + self, + results_buffer_size: Union[Optional[int], Literal["num_cpus"]] = "num_cpus", ) -> Iterator["pyarrow.RecordBatch"]: """ Return an iterator of pyarrow recordbatches for this dataframe. @@ -306,11 +309,7 @@ def to_arrow_iter( # If the dataframe has already finished executing, # use the precomputed results. for _, result in self._result.items(): - yield from ( - result.vpartition() - .to_arrow() - .to_batches() - ) + yield from (result.micropartition().to_arrow().to_batches()) else: # Execute the dataframe in a streaming fashion. context = get_context() @@ -322,7 +321,7 @@ def to_arrow_iter( @DataframePublicAPI def iter_partitions( - self, results_buffer_size: Optional[int] | Literal["num_cpus"] = "num_cpus" + self, results_buffer_size: Union[Optional[int], Literal["num_cpus"]] = "num_cpus" ) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]: """Begin executing this dataframe and return an iterator over the partitions. @@ -379,10 +378,10 @@ def iter_partitions( Statistics: missing """ - if results_buffer_size is not None and not results_buffer_size > 0: - raise ValueError(f"Provided `results_buffer_size` value must be > 0, received: {results_buffer_size}") - elif results_buffer_size == "num_cpus": + if results_buffer_size == "num_cpus": results_buffer_size = multiprocessing.cpu_count() + elif results_buffer_size is not None and not results_buffer_size > 0: + raise ValueError(f"Provided `results_buffer_size` value must be > 0, received: {results_buffer_size}") if self._result is not None: # If the dataframe has already finished executing, From 278fa0b83f23593f0be470240f01d5c3c821d840 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sun, 8 Sep 2024 18:44:02 -0700 Subject: [PATCH 4/6] Fix if statement palcement --- daft/dataframe/dataframe.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 00b2c6157f..c32e69ebe5 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -262,6 +262,9 @@ def iter_rows( .. seealso:: :meth:`df.iter_partitions() `: iterator over entire partitions instead of single rows """ + if results_buffer_size == "num_cpus": + results_buffer_size = multiprocessing.cpu_count() + if self._result is not None: # If the dataframe has already finished executing, # use the precomputed results. @@ -269,10 +272,6 @@ def iter_rows( for i in range(len(self)): row = {key: value[i] for (key, value) in pydict.items()} yield row - - if results_buffer_size == "num_cpus": - results_buffer_size = multiprocessing.cpu_count() - else: # Execute the dataframe in a streaming fashion. context = get_context() From 8add10e203ed003b147c2f03dfc769491b1bd913 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sun, 8 Sep 2024 19:07:20 -0700 Subject: [PATCH 5/6] Fix schema to_pyarrow_schema to convert datatype using Python --- src/daft-core/src/python/schema.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/daft-core/src/python/schema.rs b/src/daft-core/src/python/schema.rs index 33fce18df0..eaa2ff78ac 100644 --- a/src/daft-core/src/python/schema.rs +++ b/src/daft-core/src/python/schema.rs @@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize}; use super::datatype::PyDataType; use super::field::PyField; use crate::datatypes; -use crate::ffi::field_to_py; use crate::schema; use common_py_serde::impl_bincode_py_state_serialization; @@ -29,7 +28,16 @@ impl PySchema { .schema .fields .iter() - .map(|(_, f)| field_to_py(&f.to_arrow()?, py, pyarrow)) + .map(|(_, f)| { + // NOTE: Use PyDataType::to_arrow because we need to dip into Python to get + // the registered Arrow extension types + let py_dtype: PyDataType = f.dtype.clone().into(); + let py_arrow_dtype = py_dtype.to_arrow(py)?; + pyarrow + .getattr(pyo3::intern!(py, "field")) + .unwrap() + .call1((f.name.clone(), py_arrow_dtype)) + }) .collect::>>()?; pyarrow .getattr(pyo3::intern!(py, "schema")) From f741a64373b87426618e611526876b8c9d338116 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sun, 8 Sep 2024 19:27:15 -0700 Subject: [PATCH 6/6] Fix bug with type to pyarrow type --- src/daft-core/src/ffi.rs | 5 +++-- src/daft-core/src/python/datatype.rs | 5 ++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/daft-core/src/ffi.rs b/src/daft-core/src/ffi.rs index 67ad5fb234..e7084c9a7a 100644 --- a/src/daft-core/src/ffi.rs +++ b/src/daft-core/src/ffi.rs @@ -69,7 +69,7 @@ pub fn field_to_py( Ok(field.to_object(py)) } -pub fn to_py_schema( +pub fn dtype_to_py( dtype: &arrow2::datatypes::DataType, py: Python, pyarrow: &PyModule, @@ -81,8 +81,9 @@ pub fn to_py_schema( pyo3::intern!(py, "_import_from_c"), (schema_ptr as Py_uintptr_t,), )?; + let dtype = field.getattr(pyo3::intern!(py, "type"))?.to_object(py); - Ok(field.to_object(py)) + Ok(dtype.to_object(py)) } fn fix_child_array_slice_offsets(array: ArrayRef) -> ArrayRef { diff --git a/src/daft-core/src/python/datatype.rs b/src/daft-core/src/python/datatype.rs index e50608b05d..e5615bd072 100644 --- a/src/daft-core/src/python/datatype.rs +++ b/src/daft-core/src/python/datatype.rs @@ -331,11 +331,10 @@ impl PyDataType { } else { // Fall back to default Daft super extension representation if installed pyarrow doesn't have the // canonical tensor extension type. - ffi::to_py_schema(&self.dtype.to_arrow()?, py, pyarrow)? + ffi::dtype_to_py(&self.dtype.to_arrow()?, py, pyarrow)? }, ), - _ => ffi::to_py_schema(&self.dtype.to_arrow()?, py, pyarrow)? - .getattr(py, pyo3::intern!(py, "type")), + _ => ffi::dtype_to_py(&self.dtype.to_arrow()?, py, pyarrow), } }