From 60ebf82ea9c8f07ba8912d9adcdf35efd1165cef Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Tue, 3 Sep 2024 21:38:45 -0700 Subject: [PATCH] [CHORE] Rename vpartition -> micropartition (#2781) Co-authored-by: Jay Chia --- daft/dataframe/dataframe.py | 20 ++++++++++---------- daft/execution/execution_step.py | 8 ++++---- daft/execution/native_executor.py | 4 +++- daft/execution/physical_plan.py | 4 ++-- daft/runners/partitioning.py | 16 ++++++++-------- daft/runners/pyrunner.py | 6 +++--- daft/runners/ray_runner.py | 27 ++++++++++++++++----------- 7 files changed, 46 insertions(+), 39 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 2f4fa800f8..418b03bd97 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -390,11 +390,11 @@ def _populate_preview(self) -> None: self._preview.preview_partition is None or len(self._preview.preview_partition) < self._num_preview_rows ) if preview_partition_invalid: - preview_parts = self._result._get_preview_vpartition(self._num_preview_rows) + preview_parts = self._result._get_preview_micropartitions(self._num_preview_rows) preview_results = LocalPartitionSet() for i, part in enumerate(preview_parts): preview_results.set_partition_from_table(i, part) - preview_partition = preview_results._get_merged_vpartition() + preview_partition = preview_results._get_merged_micropartition() self._preview = DataFramePreview( preview_partition=preview_partition, dataframe_num_rows=len(self), @@ -436,8 +436,8 @@ def _from_pydict(cls, data: Dict[str, InputListType]) -> "DataFrame": f"Expected all columns to be of the same length, but received columns with lengths: {column_lengths}" ) - data_vpartition = MicroPartition.from_pydict(data) - return cls._from_tables(data_vpartition) + data_micropartition = MicroPartition.from_pydict(data) + return cls._from_tables(data_micropartition) @classmethod def _from_arrow(cls, data: Union["pyarrow.Table", List["pyarrow.Table"], Iterable["pyarrow.Table"]]) -> "DataFrame": @@ -446,16 +446,16 @@ def _from_arrow(cls, data: Union["pyarrow.Table", List["pyarrow.Table"], Iterabl data = list(data) if not isinstance(data, list): data = [data] - data_vpartitions = [MicroPartition.from_arrow(table) for table in data] - return cls._from_tables(*data_vpartitions) + data_micropartitions = [MicroPartition.from_arrow(table) for table in data] + return cls._from_tables(*data_micropartitions) @classmethod def _from_pandas(cls, data: Union["pandas.DataFrame", List["pandas.DataFrame"]]) -> "DataFrame": """Creates a Daft DataFrame from a `pandas DataFrame `__.""" if not isinstance(data, list): data = [data] - data_vpartitions = [MicroPartition.from_pandas(df) for df in data] - return cls._from_tables(*data_vpartitions) + data_micropartitions = [MicroPartition.from_pandas(df) for df in data] + return cls._from_tables(*data_micropartitions) @classmethod def _from_tables(cls, *parts: MicroPartition) -> "DataFrame": @@ -2636,7 +2636,7 @@ def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame": preview_results = partition_set # set preview - preview_partition = preview_results._get_merged_vpartition() + preview_partition = preview_results._get_merged_micropartition() df._preview = DataFramePreview( preview_partition=preview_partition, dataframe_num_rows=dataframe_num_rows, @@ -2728,7 +2728,7 @@ def _from_dask_dataframe(cls, ddf: "dask.DataFrame") -> "DataFrame": preview_results = partition_set # set preview - preview_partition = preview_results._get_merged_vpartition() + preview_partition = preview_results._get_merged_micropartition() df._preview = DataFramePreview( preview_partition=preview_partition, dataframe_num_rows=dataframe_num_rows, diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index 0eb0f1cf75..36250ca5c7 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -200,9 +200,9 @@ def partition_metadata(self) -> PartitionMetadata: [partial_metadata] = self.partial_metadatas return self.result().metadata().merge_with_partial(partial_metadata) - def vpartition(self) -> MicroPartition: + def micropartition(self) -> MicroPartition: """Get the raw vPartition of the result.""" - return self.result().vpartition() + return self.result().micropartition() def __str__(self) -> str: return super().__str__() @@ -248,10 +248,10 @@ def partition_metadatas(self) -> list[PartitionMetadata]: for result, partial_metadata in zip(self._results, self.partial_metadatas) ] - def vpartition(self, index: int) -> MicroPartition: + def micropartition(self, index: int) -> MicroPartition: """Get the raw vPartition of the result.""" assert self._results is not None - return self._results[index].vpartition() + return self._results[index].micropartition() def __str__(self) -> str: return super().__str__() diff --git a/daft/execution/native_executor.py b/daft/execution/native_executor.py index 3c790fbfb5..2fe5cf9eba 100644 --- a/daft/execution/native_executor.py +++ b/daft/execution/native_executor.py @@ -34,7 +34,9 @@ def run( ) -> Iterator[PyMaterializedResult]: from daft.runners.pyrunner import PyMaterializedResult - psets_mp = {part_id: [part.vpartition()._micropartition for part in parts] for part_id, parts in psets.items()} + psets_mp = { + part_id: [part.micropartition()._micropartition for part in parts] for part_id, parts in psets.items() + } return ( PyMaterializedResult(MicroPartition._from_pymicropartition(part)) for part in self._executor.run(psets_mp, daft_execution_config, results_buffer_size) diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index d0d529075a..e5a8e424cb 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -1024,7 +1024,7 @@ def sort_merge_join_aligned_boundaries( ] # Execute a sorting reduce on it. - per_partition_bounds = _to_per_partition_bounds(boundaries.vpartition(), num_partitions) + per_partition_bounds = _to_per_partition_bounds(boundaries.micropartition(), num_partitions) sorted_plans.append( reduce( fanout_plan=iter(range_fanout_plan), @@ -1498,7 +1498,7 @@ def sort( ) for source in consume_deque(source_materializations) ) - per_partition_bounds = _to_per_partition_bounds(boundaries.vpartition(), num_partitions) + per_partition_bounds = _to_per_partition_bounds(boundaries.micropartition(), num_partitions) # Execute a sorting reduce on it. yield from reduce( diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index ff395594ff..0cf68e4cbe 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -175,12 +175,12 @@ class MaterializedResult(Generic[PartitionT]): @abstractmethod def partition(self) -> PartitionT: - """Get the partition of this result.""" + """Get the result data as a generic PartitionT, which is an internal backend-specific representation of the result.""" ... @abstractmethod - def vpartition(self) -> MicroPartition: - """Get the vPartition of this result.""" + def micropartition(self) -> MicroPartition: + """Get the result data as an in-memory MicroPartition.""" ... @abstractmethod @@ -202,15 +202,15 @@ def _noop(self, _: PartitionT) -> None: class PartitionSet(Generic[PartitionT]): - def _get_merged_vpartition(self) -> MicroPartition: + def _get_merged_micropartition(self) -> MicroPartition: raise NotImplementedError() - def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]: + def _get_preview_micropartitions(self, num_rows: int) -> list[MicroPartition]: raise NotImplementedError() def to_pydict(self) -> dict[str, list[Any]]: """Retrieves all the data in a PartitionSet as a Python dictionary. Values are the raw data from each Block.""" - merged_partition = self._get_merged_vpartition() + merged_partition = self._get_merged_micropartition() return merged_partition.to_pydict() def to_pandas( @@ -219,7 +219,7 @@ def to_pandas( cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False, ) -> pd.DataFrame: - merged_partition = self._get_merged_vpartition() + merged_partition = self._get_merged_micropartition() return merged_partition.to_pandas( schema=schema, cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, @@ -227,7 +227,7 @@ def to_pandas( ) def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table: - merged_partition = self._get_merged_vpartition() + merged_partition = self._get_merged_micropartition() return merged_partition.to_arrow(cast_tensors_to_ray_tensor_dtype) def items(self) -> list[tuple[PartID, MaterializedResult[PartitionT]]]: diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 04f67b9eb2..7e5bef14b9 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -41,13 +41,13 @@ def __init__(self) -> None: def items(self) -> list[tuple[PartID, MaterializedResult[MicroPartition]]]: return sorted(self._partitions.items()) - def _get_merged_vpartition(self) -> MicroPartition: + def _get_merged_micropartition(self) -> MicroPartition: ids_and_partitions = self.items() assert ids_and_partitions[0][0] == 0 assert ids_and_partitions[-1][0] + 1 == len(ids_and_partitions) return MicroPartition.concat([part.partition() for id, part in ids_and_partitions]) - def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]: + def _get_preview_micropartitions(self, num_rows: int) -> list[MicroPartition]: ids_and_partitions = self.items() preview_parts = [] for _, mat_result in ids_and_partitions: @@ -401,7 +401,7 @@ class PyMaterializedResult(MaterializedResult[MicroPartition]): def partition(self) -> MicroPartition: return self._partition - def vpartition(self) -> MicroPartition: + def micropartition(self) -> MicroPartition: return self._partition def metadata(self) -> PartitionMetadata: diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 88044b95d3..d57fc220d6 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -91,7 +91,7 @@ def _glob_path_into_file_infos( @ray.remote -def _make_ray_block_from_vpartition(partition: MicroPartition) -> RayDatasetBlock: +def _make_ray_block_from_micropartition(partition: MicroPartition) -> RayDatasetBlock: try: return partition.to_arrow(cast_tensors_to_ray_tensor_dtype=True) except pa.ArrowInvalid: @@ -135,14 +135,14 @@ def __init__(self) -> None: def items(self) -> list[tuple[PartID, MaterializedResult[ray.ObjectRef]]]: return [(pid, result) for pid, result in sorted(self._results.items())] - def _get_merged_vpartition(self) -> MicroPartition: + def _get_merged_micropartition(self) -> MicroPartition: ids_and_partitions = self.items() assert ids_and_partitions[0][0] == 0 assert ids_and_partitions[-1][0] + 1 == len(ids_and_partitions) all_partitions = ray.get([part.partition() for id, part in ids_and_partitions]) return MicroPartition.concat(all_partitions) - def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]: + def _get_preview_micropartitions(self, num_rows: int) -> list[MicroPartition]: ids_and_partitions = self.items() preview_parts = [] for _, mat_result in ids_and_partitions: @@ -163,7 +163,9 @@ def to_ray_dataset(self) -> RayDataset: "Unable to import `ray.data.from_arrow_refs`. Please ensure that you have a compatible version of Ray >= 1.10 installed." ) - blocks = [_make_ray_block_from_vpartition.remote(self._results[k].partition()) for k in self._results.keys()] + blocks = [ + _make_ray_block_from_micropartition.remote(self._results[k].partition()) for k in self._results.keys() + ] # NOTE: although the Ray method is called `from_arrow_refs`, this method works also when the blocks are List[T] types # instead of Arrow tables as the codepath for Dataset creation is the same. return from_arrow_refs(blocks) @@ -179,11 +181,12 @@ def to_dask_dataframe( dask.config.set(scheduler=ray_dask_get) @dask.delayed - def _make_dask_dataframe_partition_from_vpartition(partition: MicroPartition) -> pd.DataFrame: + def _make_dask_dataframe_partition_from_micropartition(partition: MicroPartition) -> pd.DataFrame: return partition.to_pandas() ddf_parts = [ - _make_dask_dataframe_partition_from_vpartition(self._results[k].partition()) for k in self._results.keys() + _make_dask_dataframe_partition_from_micropartition(self._results[k].partition()) + for k in self._results.keys() ] return dd.from_delayed(ddf_parts, meta=meta) @@ -266,12 +269,12 @@ def partition_set_from_ray_dataset( # NOTE: This materializes the entire Ray Dataset - we could make this more intelligent by creating a new RayDatasetScan node # which can iterate on Ray Dataset blocks and materialize as-needed - daft_vpartitions = [ + daft_micropartitions = [ _make_daft_partition_from_ray_dataset_blocks.remote(block, daft_schema) for block in block_refs ] pset = RayPartitionSet() - for i, obj in enumerate(daft_vpartitions): + for i, obj in enumerate(daft_micropartitions): pset.set_partition(i, RayMaterializedResult(obj)) return ( pset, @@ -290,7 +293,9 @@ def partition_set_from_dask_dataframe( raise ValueError("Can't convert an empty Dask DataFrame (with no partitions) to a Daft DataFrame.") persisted_partitions = dask.persist(*partitions, scheduler=ray_dask_get) parts = [_to_pandas_ref(next(iter(part.dask.values()))) for part in persisted_partitions] - daft_vpartitions, schemas = zip(*(_make_daft_partition_from_dask_dataframe_partitions.remote(p) for p in parts)) + daft_micropartitions, schemas = zip( + *(_make_daft_partition_from_dask_dataframe_partitions.remote(p) for p in parts) + ) schemas = ray.get(list(schemas)) # Dask shouldn't allow inconsistent schemas across partitions, but we double-check here. if not all(schemas[0] == schema for schema in schemas[1:]): @@ -301,7 +306,7 @@ def partition_set_from_dask_dataframe( pset = RayPartitionSet() - for i, obj in enumerate(daft_vpartitions): + for i, obj in enumerate(daft_micropartitions): pset.set_partition(i, RayMaterializedResult(obj)) return ( pset, @@ -924,7 +929,7 @@ def __init__( def partition(self) -> ray.ObjectRef: return self._partition - def vpartition(self) -> MicroPartition: + def micropartition(self) -> MicroPartition: return ray.get(self._partition) def metadata(self) -> PartitionMetadata: