Skip to content

Commit

Permalink
Add repartition by targer number of rows per block (#50179)
Browse files Browse the repository at this point in the history
## Why are these changes needed?
Add repartition by target number of rows per block
Addresses #36724

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: srinathk10 <[email protected]>
  • Loading branch information
srinathk10 authored Feb 11, 2025
1 parent 1021d7f commit db4bcbd
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 62 deletions.
7 changes: 5 additions & 2 deletions python/ray/data/_internal/datasource/avro_datasource.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING, Iterator, List, Union

from ray.data._internal.output_buffer import BlockOutputBuffer
from ray.data._internal.output_buffer import BlockOutputBuffer, OutputBlockSizeOption
from ray.data._internal.util import _check_import
from ray.data.block import Block
from ray.data.context import DataContext
Expand Down Expand Up @@ -31,7 +31,10 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
reader = fastavro.reader(f)

ctx = DataContext.get_current()
output_buffer = BlockOutputBuffer(ctx.target_max_block_size)
output_block_size_option = OutputBlockSizeOption(
target_max_block_size=ctx.target_max_block_size
)
output_buffer = BlockOutputBuffer(output_block_size_option)
for record in reader:
output_buffer.add(record)
while output_buffer.has_next():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from ray.data._internal.logical.interfaces import LogicalOperator, Operator
from ray.data._internal.stats import StatsDict
from ray.data.context import DataContext
from ray.data._internal.output_buffer import OutputBlockSizeOption


# TODO(hchen): Ray Core should have a common interface for these two types.
Waitable = Union[ray.ObjectRef, ObjectRefGenerator]
Expand Down Expand Up @@ -191,7 +193,8 @@ def __init__(
for x in input_dependencies:
assert isinstance(x, PhysicalOperator), x
self._inputs_complete = not input_dependencies
self._target_max_block_size = target_max_block_size
self._output_block_size_option = None
self.set_target_max_block_size(target_max_block_size)
self._started = False
self._in_task_submission_backpressure = False
self._in_task_output_backpressure = False
Expand Down Expand Up @@ -236,20 +239,28 @@ def target_max_block_size(self) -> Optional[int]:
Target max block size output by this operator. If this returns None,
then the default from DataContext should be used.
"""
return self._target_max_block_size
if self._output_block_size_option is None:
return None
else:
return self._output_block_size_option.target_max_block_size

@property
def actual_target_max_block_size(self) -> int:
"""
The actual target max block size output by this operator.
"""
target_max_block_size = self._target_max_block_size
target_max_block_size = self.target_max_block_size
if target_max_block_size is None:
target_max_block_size = self.data_context.target_max_block_size
return target_max_block_size

def set_target_max_block_size(self, target_max_block_size: Optional[int]):
self._target_max_block_size = target_max_block_size
if target_max_block_size is not None:
self._output_block_size_option = OutputBlockSizeOption(
target_max_block_size=target_max_block_size
)
elif self._output_block_size_option is not None:
self._output_block_size_option = None

def mark_execution_completed(self):
"""Manually mark this operator has completed execution."""
Expand Down
86 changes: 72 additions & 14 deletions python/ray/data/_internal/execution/operators/map_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from ray.data._internal.block_batching.block_batching import batch_blocks
from ray.data._internal.execution.interfaces.task_context import TaskContext
from ray.data._internal.output_buffer import BlockOutputBuffer
from ray.data._internal.output_buffer import BlockOutputBuffer, OutputBlockSizeOption
from ray.data.block import Block, BlockAccessor, DataBatch

# Allowed input/output data types for a MapTransformFn.
Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(
self._input_type = input_type
self._output_type = output_type
self._category = category
self._target_max_block_size = None
self._output_block_size_option = None
self._is_udf = is_udf

@abstractmethod
Expand All @@ -85,8 +85,40 @@ def output_type(self) -> MapTransformFnDataType:
def category(self) -> MapTransformFnCategory:
return self._category

@property
def output_block_size_option(self):
return self._output_block_size_option

def set_target_max_block_size(self, target_max_block_size: int):
self._target_max_block_size = target_max_block_size
assert (
self._output_block_size_option is None and target_max_block_size is not None
)
self._output_block_size_option = OutputBlockSizeOption(
target_max_block_size=target_max_block_size
)

@property
def target_max_block_size(self):
if self._output_block_size_option is None:
return None
else:
return self._output_block_size_option.target_max_block_size

def set_target_num_rows_per_block(self, target_num_rows_per_block: int):
assert (
self._output_block_size_option is None
and target_num_rows_per_block is not None
)
self._output_block_size_option = OutputBlockSizeOption(
target_num_rows_per_block=target_num_rows_per_block
)

@property
def target_num_rows_per_block(self):
if self._output_block_size_option is None:
return None
else:
return self._output_block_size_option.target_num_rows_per_block


class MapTransformer:
Expand All @@ -113,7 +145,7 @@ def __init__(
self.set_transform_fns(transform_fns)

self._init_fn = init_fn if init_fn is not None else lambda: None
self._target_max_block_size = None
self._output_block_size_option = None
self._udf_time = 0

def set_transform_fns(self, transform_fns: List[MapTransformFn]) -> None:
Expand All @@ -138,7 +170,35 @@ def get_transform_fns(self) -> List[MapTransformFn]:
return self._transform_fns

def set_target_max_block_size(self, target_max_block_size: int):
self._target_max_block_size = target_max_block_size
if target_max_block_size is not None:
self._output_block_size_option = OutputBlockSizeOption(
target_max_block_size=target_max_block_size
)
elif self._output_block_size_option is not None:
self._output_block_size_option = None

@property
def target_max_block_size(self):
if self._output_block_size_option is None:
return None
else:
return self._output_block_size_option.target_max_block_size

def set_target_num_rows_per_block(self, target_num_rows_per_block: int):
assert (
self._output_block_size_option is None
and target_num_rows_per_block is not None
)
self._output_block_size_option = OutputBlockSizeOption(
target_num_rows_per_block=target_num_rows_per_block
)

@property
def target_num_rows_per_block(self):
if self._output_block_size_option is None:
return None
else:
return self._output_block_size_option.target_num_rows_per_block

def init(self) -> None:
"""Initialize the transformer.
Expand Down Expand Up @@ -166,10 +226,11 @@ def apply_transform(
) -> Iterable[Block]:
"""Apply the transform functions to the input blocks."""
assert (
self._target_max_block_size is not None
self.target_max_block_size is not None
), "target_max_block_size must be set before running"
for transform_fn in self._transform_fns:
transform_fn.set_target_max_block_size(self._target_max_block_size)
if not transform_fn.output_block_size_option:
transform_fn.set_target_max_block_size(self.target_max_block_size)

iter = input_blocks
# Apply the transform functions sequentially to the input iterable.
Expand All @@ -181,11 +242,11 @@ def apply_transform(

def fuse(self, other: "MapTransformer") -> "MapTransformer":
"""Fuse two `MapTransformer`s together."""
assert self._target_max_block_size == other._target_max_block_size or (
self._target_max_block_size is None or other._target_max_block_size is None
assert self.target_max_block_size == other.target_max_block_size or (
self.target_max_block_size is None or other.target_max_block_size is None
)
target_max_block_size = (
self._target_max_block_size or other._target_max_block_size
self.target_max_block_size or other.target_max_block_size
)

# Define them as standalone variables to avoid fused_init_fn capturing the
Expand Down Expand Up @@ -434,10 +495,7 @@ def __call__(
iter: the iterable of UDF-returned data, whose type
must match self._input_type.
"""
assert (
self._target_max_block_size is not None
), "target_max_block_size must be set before running"
output_buffer = BlockOutputBuffer(self._target_max_block_size)
output_buffer = BlockOutputBuffer(self.output_block_size_option)
if self._input_type == MapTransformFnDataType.Block:
add_fn = output_buffer.add_block
elif self._input_type == MapTransformFnDataType.Batch:
Expand Down
24 changes: 24 additions & 0 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,27 @@ def __init__(
@property
def can_modify_num_rows(self) -> bool:
return True


class StreamingRepartition(AbstractMap):
"""Logical operator for streaming repartition operation.
Args:
target_num_rows_per_block: The targetr number of rows per block granularity for
streaming repartition.
"""

def __init__(
self,
input_op: LogicalOperator,
target_num_rows_per_block: int,
):
super().__init__("StreamingRepartition", input_op)
self._target_num_rows_per_block = target_num_rows_per_block

@property
def target_num_rows_per_block(self) -> int:
return self._target_num_rows_per_block

@property
def can_modify_num_rows(self) -> bool:
return False
Loading

0 comments on commit db4bcbd

Please sign in to comment.