diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index f5fffa2d4778..54771044f2ec 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -133,8 +133,7 @@ def __init__( self._lazy = lazy if not lazy: - # TODO(ekl) we should clear inputs once we have full lineage recorded. - self._plan.execute(clear_input_blocks=False) + self._plan.execute(allow_clear_input_blocks=False) @staticmethod def copy(dataset: "Dataset[T]") -> "Dataset[T]": @@ -2667,7 +2666,7 @@ def repeat(self, times: Optional[int] = None) -> "DatasetPipeline[T]": ctx = DatasetContext.get_current() if self._plan.is_read_stage() and ctx.optimize_fuse_read_stages: - blocks, _ = self._plan._get_source_blocks() + blocks, _, _ = self._plan._get_source_blocks_and_stages() blocks.clear() blocks, outer_stats, read_stage = _rewrite_read_stage(blocks) else: @@ -2786,7 +2785,7 @@ def window( ctx = DatasetContext.get_current() if self._plan.is_read_stage() and ctx.optimize_fuse_read_stages: - blocks, _ = self._plan._get_source_blocks() + blocks, _, _ = self._plan._get_source_blocks_and_stages() blocks.clear() blocks, outer_stats, read_stage = _rewrite_read_stage(blocks) else: @@ -2807,7 +2806,7 @@ def __next__(self) -> "Dataset[T]": def gen(): ds = Dataset( - ExecutionPlan(blocks, outer_stats), self._epoch, lazy=False + ExecutionPlan(blocks, outer_stats), self._epoch, lazy=True ) return ds @@ -2861,19 +2860,27 @@ def __iter__(self): ) return pipe - def fully_executed(self) -> "Dataset[T]": + def fully_executed(self, preserve_original: bool = True) -> "Dataset[T]": """Force full evaluation of the blocks of this dataset. This can be used to read all blocks into memory. By default, Datasets doesn't read blocks from the datasource until the first transform. + Args: + preserve_original: Whether the original unexecuted dataset should be + preserved. If False, this function will mutate the original dataset, + which can more efficiently reclaim memory. + Returns: A Dataset with all blocks fully materialized in memory. """ - plan = self._plan.deep_copy(preserve_uuid=True) - plan.execute(force_read=True) - ds = Dataset(plan, self._epoch, lazy=False) - ds._set_uuid(self._get_uuid()) + if preserve_original: + plan = self._plan.deep_copy(preserve_uuid=True) + ds = Dataset(plan, self._epoch, self._lazy) + ds._set_uuid(self._get_uuid()) + else: + ds = self + ds._plan.execute(force_read=True) return ds def is_fully_executed(self) -> bool: diff --git a/python/ray/data/impl/block_list.py b/python/ray/data/impl/block_list.py index 158f492335f8..b6df51be8fb0 100644 --- a/python/ray/data/impl/block_list.py +++ b/python/ray/data/impl/block_list.py @@ -34,9 +34,13 @@ def clear(self) -> None: """Erase references to the tasks tracked by the BlockList.""" self._blocks = None + def is_cleared(self) -> bool: + """Whether this BlockList has been cleared.""" + return self._blocks is None + def _check_if_cleared(self) -> None: """Raise an error if this BlockList has been previously cleared.""" - if self._blocks is None: + if self.is_cleared(): raise ValueError( "This Dataset's blocks have been moved, which means that you " "can no longer use this Dataset." diff --git a/python/ray/data/impl/lazy_block_list.py b/python/ray/data/impl/lazy_block_list.py index 28b48e106b84..90f7de8e70fb 100644 --- a/python/ray/data/impl/lazy_block_list.py +++ b/python/ray/data/impl/lazy_block_list.py @@ -134,6 +134,9 @@ def clear(self): ] self._cached_metadata = [None for _ in self._cached_metadata] + def is_cleared(self) -> bool: + return all(ref is None for ref in self._block_partition_refs) + def _check_if_cleared(self): pass # LazyBlockList can always be re-computed. @@ -158,7 +161,6 @@ def split(self, split_size: int) -> List["LazyBlockList"]: # Note: does not force execution prior to splitting. def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]: - self._check_if_cleared() output = [] cur_tasks, cur_blocks, cur_blocks_meta = [], [], [] cur_size = 0 diff --git a/python/ray/data/impl/pipeline_executor.py b/python/ray/data/impl/pipeline_executor.py index a052dafedf6f..a8eed8dd4ca6 100644 --- a/python/ray/data/impl/pipeline_executor.py +++ b/python/ray/data/impl/pipeline_executor.py @@ -19,7 +19,7 @@ def pipeline_stage(fn: Callable[[], Dataset[T]]) -> Dataset[T]: # Force eager evaluation of all blocks in the pipeline stage. This # prevents resource deadlocks due to overlapping stage execution (e.g., # task -> actor stage). - return fn().fully_executed() + return fn().fully_executed(preserve_original=False) class PipelineExecutor: diff --git a/python/ray/data/impl/plan.py b/python/ray/data/impl/plan.py index 2cdd3c9a6232..95eb6fc39b4a 100644 --- a/python/ray/data/impl/plan.py +++ b/python/ray/data/impl/plan.py @@ -47,6 +47,12 @@ def fuse(self, other: "Stage") -> "Stage": """Fuse this stage with a compatible stage.""" raise NotImplementedError + def __repr__(self): + return f'{type(self).__name__}("{self.name}")' + + def __str__(self): + return repr(self) + class ExecutionPlan: """A lazy execution plan for a Dataset.""" @@ -224,13 +230,15 @@ def meta_count(self) -> Optional[int]: return None def execute( - self, clear_input_blocks: bool = True, force_read: bool = False + self, + allow_clear_input_blocks: bool = True, + force_read: bool = False, ) -> BlockList: """Execute this plan. Args: - clear_input_blocks: Whether to assume ownership of the input blocks, - allowing them to be dropped from memory during execution. + allow_clear_input_blocks: Whether we should try to clear the input blocks + for each stage. force_read: Whether to force the read stage to fully execute. Returns: @@ -238,7 +246,13 @@ def execute( """ if not self.has_computed_output(): blocks, stats, stages = self._optimize() - for stage in stages: + for stage_idx, stage in enumerate(stages): + if allow_clear_input_blocks: + clear_input_blocks = self._should_clear_input_blocks( + blocks, stage_idx + ) + else: + clear_input_blocks = False stats_builder = stats.child_builder(stage.name) blocks, stage_info = stage(blocks, clear_input_blocks) if stage_info: @@ -275,13 +289,34 @@ def stats(self) -> DatasetStats: self.execute() return self._snapshot_stats + def _should_clear_input_blocks( + self, + blocks: BlockList, + stage_idx: int, + ): + """Whether the provided blocks should be cleared when passed into the stage. + + Args: + blocks: The blocks that we may want to clear. + stage_idx: The position of the stage in the optimized after-snapshot chain. + """ + if stage_idx != 0 or self._stages_before_snapshot: + # Not the first stage, always clear stage input blocks. + return True + elif isinstance(blocks, LazyBlockList): + # Always clear lazy input blocks since they can be recomputed. + return True + else: + # Otherwise, we have non-lazy input blocks that's the source of this + # execution plan, so we don't clear these. + return False + def _optimize(self) -> Tuple[BlockList, DatasetStats, List[Stage]]: """Apply stage fusion optimizations, returning an updated source block list and associated stats, and a set of optimized stages. """ context = DatasetContext.get_current() - blocks, stats = self._get_source_blocks() - stages = self._stages_after_snapshot.copy() + blocks, stats, stages = self._get_source_blocks_and_stages() if context.optimize_fuse_stages: if context.optimize_fuse_read_stages: # If using a lazy datasource, rewrite read stage into one-to-one stage @@ -293,20 +328,32 @@ def _optimize(self) -> Tuple[BlockList, DatasetStats, List[Stage]]: self._last_optimized_stages = stages return blocks, stats, stages - def _get_source_blocks(self) -> Tuple[BlockList, DatasetStats]: - """Get the source blocks (and corresponding stats) for plan execution. + def _get_source_blocks_and_stages( + self, + ) -> Tuple[BlockList, DatasetStats, List[Stage]]: + """Get the source blocks, corresponding stats, and the stages for plan + execution. - If a computed snapshot exists, return the snapshot blocks and stats; otherwise, - return the input blocks and stats that the plan was created with. + If a computed snapshot exists and has not been cleared, return the snapshot + blocks and stats; otherwise, return the input blocks and stats that the plan was + created with. """ + stages = self._stages_after_snapshot.copy() if self._snapshot_blocks is not None: - # If snapshot exists, we only have to execute the plan from the - # snapshot. - blocks = self._snapshot_blocks - stats = self._snapshot_stats - # Unlink the snapshot blocks from the plan so we can eagerly reclaim the - # snapshot block memory after the first stage is done executing. - self._snapshot_blocks = None + if not self._snapshot_blocks.is_cleared(): + # If snapshot exists, we only have to execute the plan from the + # snapshot. + blocks = self._snapshot_blocks + stats = self._snapshot_stats + # Unlink the snapshot blocks from the plan so we can eagerly reclaim the + # snapshot block memory after the first stage is done executing. + self._snapshot_blocks = None + else: + # Snapshot exists but has been cleared, so we need to recompute from the + # source (input blocks). + blocks = self._in_blocks + stats = self._in_stats + stages = self._stages_before_snapshot + self._stages_after_snapshot else: # If no snapshot exists, we have to execute the full plan from the # beginning. @@ -317,7 +364,7 @@ def _get_source_blocks(self) -> Tuple[BlockList, DatasetStats]: # can eagerly reclaim the input block memory after the first stage is # done executing. self._in_blocks = None - return blocks, stats + return blocks, stats, stages def has_lazy_input(self) -> bool: """Return whether this plan has lazy input blocks.""" @@ -335,7 +382,11 @@ def has_computed_output(self) -> bool: """Whether this plan has a computed snapshot for the final stage, i.e. for the output of this plan. """ - return self._snapshot_blocks is not None and not self._stages_after_snapshot + return ( + self._snapshot_blocks is not None + and not self._stages_after_snapshot + and not self._snapshot_blocks.is_cleared() + ) class OneToOneStage(Stage): diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index b233f9ecec97..9bb31a4fcc6e 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -2,15 +2,45 @@ import numpy as np import pandas as pd import os +from typing import List import ray +from ray.data.block import BlockMetadata from ray.data.context import DatasetContext +from ray.data.datasource import Datasource, ReadTask from ray.data.datasource.csv_datasource import CSVDatasource from ray.internal.internal_api import memory_summary from ray.tests.conftest import * # noqa +@ray.remote +class Counter: + def __init__(self): + self.value = 0 + + def increment(self): + self.value += 1 + return self.value + + def get(self): + return self.value + + def reset(self): + self.value = 0 + + +class MySource(CSVDatasource): + def __init__(self, counter): + self.counter = counter + + def _read_stream(self, f, path: str, **reader_args): + count = self.counter.increment.remote() + ray.get(count) + for block in super()._read_stream(f, path, **reader_args): + yield block + + def expect_stages(pipe, num_stages_expected, stage_names): stats = pipe.stats() for name in stage_names: @@ -29,23 +59,92 @@ def test_memory_sanity(shutdown_only): assert "Spilled" in meminfo, meminfo -def test_memory_release_eager(shutdown_only): - info = ray.init(num_cpus=1, object_store_memory=1500e6) - ds = ray.data.range(10) +class OnesSource(Datasource): + def prepare_read( + self, + parallelism: int, + n_per_block: int, + ) -> List[ReadTask]: + read_tasks: List[ReadTask] = [] + meta = BlockMetadata( + num_rows=1, + size_bytes=n_per_block, + schema=None, + input_files=None, + exec_stats=None, + ) + + for _ in range(parallelism): + read_tasks.append( + ReadTask(lambda: [[np.ones(n_per_block, dtype=np.uint8)]], meta) + ) + return read_tasks + + +@pytest.mark.parametrize("lazy_input", [True, False]) +def test_memory_release_pipeline(shutdown_only, lazy_input): + context = DatasetContext.get_current() + # Disable stage fusion so we can keep reads and maps from being fused together, + # since we're trying to test multi-stage memory releasing here. + context.optimize_fuse_stages = False + # This object store allocation can hold at most 1 copy of the transformed dataset. + if lazy_input: + object_store_memory = 3000e6 + else: + object_store_memory = 3000e6 + + n = 10 + info = ray.init(num_cpus=n, object_store_memory=object_store_memory) + if lazy_input: + ds = ray.data.read_datasource( + OnesSource(), + parallelism=n, + n_per_block=100 * 1024 * 1024, + ) + else: + ds = ray.data.from_items(list(range(n)), parallelism=n) + + # Create a single-window pipeline. + pipe = ds.window(blocks_per_window=n) # Round 1. - ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) - meminfo = memory_summary(info.address_info["address"], stats_only=True) - assert "Spilled" not in meminfo, meminfo - - # Round 2. - ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + def gen(x): + import time + + # TODO(Clark): Remove this sleep once we have fixed memory pressure handling. + time.sleep(2) + if isinstance(x, np.ndarray): + return x + else: + return np.ones(100 * 1024 * 1024, dtype=np.uint8) + + pipe = pipe.map(gen) + + def inc(x): + import time + + # TODO(Clark): Remove this sleep once we have fixed memory pressure handling. + time.sleep(2) + return x + 1 + + num_rounds = 10 + for _ in range(num_rounds): + pipe = pipe.map(inc) + + for block in pipe.iter_batches(batch_size=None): + for arr in block: + np.testing.assert_equal( + arr, + np.ones(100 * 1024 * 1024, dtype=np.uint8) + num_rounds, + ) meminfo = memory_summary(info["address"], stats_only=True) - # TODO(ekl) we can add this back once we clear inputs on eager exec as well. - # assert "Spilled" not in meminfo, meminfo + assert "Spilled" not in meminfo, meminfo def test_memory_release_lazy(shutdown_only): + context = DatasetContext.get_current() + # Ensure that stage fusion is enabled. + context.optimize_fuse_stages = True info = ray.init(num_cpus=1, object_store_memory=1500e6) ds = ray.data.range(10) @@ -83,6 +182,67 @@ def test_memory_release_lazy_shuffle(shutdown_only): raise error +def test_lazy_fanout(shutdown_only, local_path): + ray.init(num_cpus=1) + map_counter = Counter.remote() + + def inc(row): + map_counter.increment.remote() + row = row.as_pydict() + row["one"] += 1 + return row + + df = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + path = os.path.join(local_path, "test.csv") + df.to_csv(path, index=False, storage_options={}) + read_counter = Counter.remote() + source = MySource(read_counter) + + # Test that fan-out of a lazy dataset results in re-execution up to the datasource, + # due to block move semantics. + ds = ray.data.read_datasource(source, parallelism=1, paths=path) + ds = ds.experimental_lazy() + ds1 = ds.map(inc) + ds2 = ds1.map(inc) + ds3 = ds1.map(inc) + # Test content. + assert ds2.fully_executed().take() == [ + {"one": 3, "two": "a"}, + {"one": 4, "two": "b"}, + {"one": 5, "two": "c"}, + ] + assert ds3.fully_executed().take() == [ + {"one": 3, "two": "a"}, + {"one": 4, "two": "b"}, + {"one": 5, "two": "c"}, + ] + # Test that data is read twice (+ 1 extra for ramp-up read before converting to a + # lazy dataset). + assert ray.get(read_counter.get.remote()) == 3 + # Test that first map is executed twice. + assert ray.get(map_counter.get.remote()) == 2 * 3 + 3 + 3 + + # Test that fan-out of a lazy dataset with a non-lazy datasource results in + # re-execution up to the datasource, due to block move semantics. + ray.get(map_counter.reset.remote()) + + def inc(x): + map_counter.increment.remote() + return x + 1 + + # The source data shouldn't be cleared since it's non-lazy. + ds = ray.data.from_items(list(range(10))) + ds = ds.experimental_lazy() + ds1 = ds.map(inc) + ds2 = ds1.map(inc) + ds3 = ds1.map(inc) + # Test content. + assert ds2.fully_executed().take() == list(range(2, 12)) + assert ds3.fully_executed().take() == list(range(2, 12)) + # Test that first map is executed twice. + assert ray.get(map_counter.get.remote()) == 2 * 10 + 10 + 10 + + def test_spread_hint_inherit(ray_start_regular_shared): ds = ray.data.range(10).experimental_lazy() ds = ds.map(lambda x: x + 1) @@ -112,8 +272,7 @@ def test_execution_preserves_original(ray_start_regular_shared): # Check original lazy dataset content. assert ds1.take() == list(range(2, 12)) # Check source lazy dataset content. - # TODO(Clark): Uncomment this when we add new block clearing semantics. - # assert ds.take() == list(range(1, 11)) + assert ds.take() == list(range(1, 11)) def _assert_has_stages(stages, stage_names): @@ -253,33 +412,6 @@ def test_optimize_incompatible_stages(ray_start_regular_shared): ) -@ray.remote -class Counter: - def __init__(self): - self.value = 0 - - def increment(self): - self.value += 1 - return self.value - - def get(self): - return self.value - - def reset(self): - self.value = 0 - - -class MySource(CSVDatasource): - def __init__(self, counter): - self.counter = counter - - def _read_stream(self, f, path: str, **reader_args): - count = self.counter.increment.remote() - ray.get(count) - for block in super()._read_stream(f, path, **reader_args): - yield block - - def test_optimize_reread_base_data(ray_start_regular_shared, local_path): context = DatasetContext.get_current() context.optimize_fuse_stages = True