Skip to content

Commit

Permalink
[CHORE] Rename vpartition -> micropartition (#2781)
Browse files Browse the repository at this point in the history
Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Sep 4, 2024
1 parent c5a4adc commit 60ebf82
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 39 deletions.
20 changes: 10 additions & 10 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ def _populate_preview(self) -> None:
self._preview.preview_partition is None or len(self._preview.preview_partition) < self._num_preview_rows
)
if preview_partition_invalid:
preview_parts = self._result._get_preview_vpartition(self._num_preview_rows)
preview_parts = self._result._get_preview_micropartitions(self._num_preview_rows)
preview_results = LocalPartitionSet()
for i, part in enumerate(preview_parts):
preview_results.set_partition_from_table(i, part)
preview_partition = preview_results._get_merged_vpartition()
preview_partition = preview_results._get_merged_micropartition()
self._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=len(self),
Expand Down Expand Up @@ -436,8 +436,8 @@ def _from_pydict(cls, data: Dict[str, InputListType]) -> "DataFrame":
f"Expected all columns to be of the same length, but received columns with lengths: {column_lengths}"
)

data_vpartition = MicroPartition.from_pydict(data)
return cls._from_tables(data_vpartition)
data_micropartition = MicroPartition.from_pydict(data)
return cls._from_tables(data_micropartition)

@classmethod
def _from_arrow(cls, data: Union["pyarrow.Table", List["pyarrow.Table"], Iterable["pyarrow.Table"]]) -> "DataFrame":
Expand All @@ -446,16 +446,16 @@ def _from_arrow(cls, data: Union["pyarrow.Table", List["pyarrow.Table"], Iterabl
data = list(data)
if not isinstance(data, list):
data = [data]
data_vpartitions = [MicroPartition.from_arrow(table) for table in data]
return cls._from_tables(*data_vpartitions)
data_micropartitions = [MicroPartition.from_arrow(table) for table in data]
return cls._from_tables(*data_micropartitions)

@classmethod
def _from_pandas(cls, data: Union["pandas.DataFrame", List["pandas.DataFrame"]]) -> "DataFrame":
"""Creates a Daft DataFrame from a `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__."""
if not isinstance(data, list):
data = [data]
data_vpartitions = [MicroPartition.from_pandas(df) for df in data]
return cls._from_tables(*data_vpartitions)
data_micropartitions = [MicroPartition.from_pandas(df) for df in data]
return cls._from_tables(*data_micropartitions)

@classmethod
def _from_tables(cls, *parts: MicroPartition) -> "DataFrame":
Expand Down Expand Up @@ -2636,7 +2636,7 @@ def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame":
preview_results = partition_set

# set preview
preview_partition = preview_results._get_merged_vpartition()
preview_partition = preview_results._get_merged_micropartition()
df._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=dataframe_num_rows,
Expand Down Expand Up @@ -2728,7 +2728,7 @@ def _from_dask_dataframe(cls, ddf: "dask.DataFrame") -> "DataFrame":
preview_results = partition_set

# set preview
preview_partition = preview_results._get_merged_vpartition()
preview_partition = preview_results._get_merged_micropartition()
df._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=dataframe_num_rows,
Expand Down
8 changes: 4 additions & 4 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ def partition_metadata(self) -> PartitionMetadata:
[partial_metadata] = self.partial_metadatas
return self.result().metadata().merge_with_partial(partial_metadata)

def vpartition(self) -> MicroPartition:
def micropartition(self) -> MicroPartition:
"""Get the raw vPartition of the result."""
return self.result().vpartition()
return self.result().micropartition()

def __str__(self) -> str:
return super().__str__()
Expand Down Expand Up @@ -248,10 +248,10 @@ def partition_metadatas(self) -> list[PartitionMetadata]:
for result, partial_metadata in zip(self._results, self.partial_metadatas)
]

def vpartition(self, index: int) -> MicroPartition:
def micropartition(self, index: int) -> MicroPartition:
"""Get the raw vPartition of the result."""
assert self._results is not None
return self._results[index].vpartition()
return self._results[index].micropartition()

def __str__(self) -> str:
return super().__str__()
Expand Down
4 changes: 3 additions & 1 deletion daft/execution/native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def run(
) -> Iterator[PyMaterializedResult]:
from daft.runners.pyrunner import PyMaterializedResult

psets_mp = {part_id: [part.vpartition()._micropartition for part in parts] for part_id, parts in psets.items()}
psets_mp = {
part_id: [part.micropartition()._micropartition for part in parts] for part_id, parts in psets.items()
}
return (
PyMaterializedResult(MicroPartition._from_pymicropartition(part))
for part in self._executor.run(psets_mp, daft_execution_config, results_buffer_size)
Expand Down
4 changes: 2 additions & 2 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ def sort_merge_join_aligned_boundaries(
]

# Execute a sorting reduce on it.
per_partition_bounds = _to_per_partition_bounds(boundaries.vpartition(), num_partitions)
per_partition_bounds = _to_per_partition_bounds(boundaries.micropartition(), num_partitions)
sorted_plans.append(
reduce(
fanout_plan=iter(range_fanout_plan),
Expand Down Expand Up @@ -1498,7 +1498,7 @@ def sort(
)
for source in consume_deque(source_materializations)
)
per_partition_bounds = _to_per_partition_bounds(boundaries.vpartition(), num_partitions)
per_partition_bounds = _to_per_partition_bounds(boundaries.micropartition(), num_partitions)

# Execute a sorting reduce on it.
yield from reduce(
Expand Down
16 changes: 8 additions & 8 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ class MaterializedResult(Generic[PartitionT]):

@abstractmethod
def partition(self) -> PartitionT:
"""Get the partition of this result."""
"""Get the result data as a generic PartitionT, which is an internal backend-specific representation of the result."""
...

@abstractmethod
def vpartition(self) -> MicroPartition:
"""Get the vPartition of this result."""
def micropartition(self) -> MicroPartition:
"""Get the result data as an in-memory MicroPartition."""
...

@abstractmethod
Expand All @@ -202,15 +202,15 @@ def _noop(self, _: PartitionT) -> None:


class PartitionSet(Generic[PartitionT]):
def _get_merged_vpartition(self) -> MicroPartition:
def _get_merged_micropartition(self) -> MicroPartition:
raise NotImplementedError()

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
def _get_preview_micropartitions(self, num_rows: int) -> list[MicroPartition]:
raise NotImplementedError()

def to_pydict(self) -> dict[str, list[Any]]:
"""Retrieves all the data in a PartitionSet as a Python dictionary. Values are the raw data from each Block."""
merged_partition = self._get_merged_vpartition()
merged_partition = self._get_merged_micropartition()
return merged_partition.to_pydict()

def to_pandas(
Expand All @@ -219,15 +219,15 @@ def to_pandas(
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
merged_partition = self._get_merged_vpartition()
merged_partition = self._get_merged_micropartition()
return merged_partition.to_pandas(
schema=schema,
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table:
merged_partition = self._get_merged_vpartition()
merged_partition = self._get_merged_micropartition()
return merged_partition.to_arrow(cast_tensors_to_ray_tensor_dtype)

def items(self) -> list[tuple[PartID, MaterializedResult[PartitionT]]]:
Expand Down
6 changes: 3 additions & 3 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ def __init__(self) -> None:
def items(self) -> list[tuple[PartID, MaterializedResult[MicroPartition]]]:
return sorted(self._partitions.items())

def _get_merged_vpartition(self) -> MicroPartition:
def _get_merged_micropartition(self) -> MicroPartition:
ids_and_partitions = self.items()
assert ids_and_partitions[0][0] == 0
assert ids_and_partitions[-1][0] + 1 == len(ids_and_partitions)
return MicroPartition.concat([part.partition() for id, part in ids_and_partitions])

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
def _get_preview_micropartitions(self, num_rows: int) -> list[MicroPartition]:
ids_and_partitions = self.items()
preview_parts = []
for _, mat_result in ids_and_partitions:
Expand Down Expand Up @@ -401,7 +401,7 @@ class PyMaterializedResult(MaterializedResult[MicroPartition]):
def partition(self) -> MicroPartition:
return self._partition

def vpartition(self) -> MicroPartition:
def micropartition(self) -> MicroPartition:
return self._partition

def metadata(self) -> PartitionMetadata:
Expand Down
27 changes: 16 additions & 11 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _glob_path_into_file_infos(


@ray.remote
def _make_ray_block_from_vpartition(partition: MicroPartition) -> RayDatasetBlock:
def _make_ray_block_from_micropartition(partition: MicroPartition) -> RayDatasetBlock:
try:
return partition.to_arrow(cast_tensors_to_ray_tensor_dtype=True)
except pa.ArrowInvalid:
Expand Down Expand Up @@ -135,14 +135,14 @@ def __init__(self) -> None:
def items(self) -> list[tuple[PartID, MaterializedResult[ray.ObjectRef]]]:
return [(pid, result) for pid, result in sorted(self._results.items())]

def _get_merged_vpartition(self) -> MicroPartition:
def _get_merged_micropartition(self) -> MicroPartition:
ids_and_partitions = self.items()
assert ids_and_partitions[0][0] == 0
assert ids_and_partitions[-1][0] + 1 == len(ids_and_partitions)
all_partitions = ray.get([part.partition() for id, part in ids_and_partitions])
return MicroPartition.concat(all_partitions)

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
def _get_preview_micropartitions(self, num_rows: int) -> list[MicroPartition]:
ids_and_partitions = self.items()
preview_parts = []
for _, mat_result in ids_and_partitions:
Expand All @@ -163,7 +163,9 @@ def to_ray_dataset(self) -> RayDataset:
"Unable to import `ray.data.from_arrow_refs`. Please ensure that you have a compatible version of Ray >= 1.10 installed."
)

blocks = [_make_ray_block_from_vpartition.remote(self._results[k].partition()) for k in self._results.keys()]
blocks = [
_make_ray_block_from_micropartition.remote(self._results[k].partition()) for k in self._results.keys()
]
# NOTE: although the Ray method is called `from_arrow_refs`, this method works also when the blocks are List[T] types
# instead of Arrow tables as the codepath for Dataset creation is the same.
return from_arrow_refs(blocks)
Expand All @@ -179,11 +181,12 @@ def to_dask_dataframe(
dask.config.set(scheduler=ray_dask_get)

@dask.delayed
def _make_dask_dataframe_partition_from_vpartition(partition: MicroPartition) -> pd.DataFrame:
def _make_dask_dataframe_partition_from_micropartition(partition: MicroPartition) -> pd.DataFrame:
return partition.to_pandas()

ddf_parts = [
_make_dask_dataframe_partition_from_vpartition(self._results[k].partition()) for k in self._results.keys()
_make_dask_dataframe_partition_from_micropartition(self._results[k].partition())
for k in self._results.keys()
]
return dd.from_delayed(ddf_parts, meta=meta)

Expand Down Expand Up @@ -266,12 +269,12 @@ def partition_set_from_ray_dataset(

# NOTE: This materializes the entire Ray Dataset - we could make this more intelligent by creating a new RayDatasetScan node
# which can iterate on Ray Dataset blocks and materialize as-needed
daft_vpartitions = [
daft_micropartitions = [
_make_daft_partition_from_ray_dataset_blocks.remote(block, daft_schema) for block in block_refs
]
pset = RayPartitionSet()

for i, obj in enumerate(daft_vpartitions):
for i, obj in enumerate(daft_micropartitions):
pset.set_partition(i, RayMaterializedResult(obj))
return (
pset,
Expand All @@ -290,7 +293,9 @@ def partition_set_from_dask_dataframe(
raise ValueError("Can't convert an empty Dask DataFrame (with no partitions) to a Daft DataFrame.")
persisted_partitions = dask.persist(*partitions, scheduler=ray_dask_get)
parts = [_to_pandas_ref(next(iter(part.dask.values()))) for part in persisted_partitions]
daft_vpartitions, schemas = zip(*(_make_daft_partition_from_dask_dataframe_partitions.remote(p) for p in parts))
daft_micropartitions, schemas = zip(
*(_make_daft_partition_from_dask_dataframe_partitions.remote(p) for p in parts)
)
schemas = ray.get(list(schemas))
# Dask shouldn't allow inconsistent schemas across partitions, but we double-check here.
if not all(schemas[0] == schema for schema in schemas[1:]):
Expand All @@ -301,7 +306,7 @@ def partition_set_from_dask_dataframe(

pset = RayPartitionSet()

for i, obj in enumerate(daft_vpartitions):
for i, obj in enumerate(daft_micropartitions):
pset.set_partition(i, RayMaterializedResult(obj))
return (
pset,
Expand Down Expand Up @@ -924,7 +929,7 @@ def __init__(
def partition(self) -> ray.ObjectRef:
return self._partition

def vpartition(self) -> MicroPartition:
def micropartition(self) -> MicroPartition:
return ray.get(self._partition)

def metadata(self) -> PartitionMetadata:
Expand Down

0 comments on commit 60ebf82

Please sign in to comment.