diff --git a/python/ray/data/_internal/datasource/parquet_datasource.py b/python/ray/data/_internal/datasource/parquet_datasource.py index 136c97445c07..a83852be7f7d 100644 --- a/python/ray/data/_internal/datasource/parquet_datasource.py +++ b/python/ray/data/_internal/datasource/parquet_datasource.py @@ -328,8 +328,12 @@ def __init__( self._local_scheduling = NodeAffinitySchedulingStrategy( ray.get_runtime_context().get_node_id(), soft=False ) - # Need this property for lineage tracking - self._source_paths = paths + + # Need this property for lineage tracking. We should not directly assign paths + # to self since it is captured every read_task_fn during serialization and + # causing this data being duplicated and excessive object store spilling. + self._source_paths_ref = ray.put(paths) + paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem) filesystem = RetryingPyFileSystem.wrap( self._filesystem, @@ -465,6 +469,10 @@ def __init__( sampled_file_infos, DataContext.get_current().target_max_block_size ) + @property + def _source_paths(self) -> List[str]: + return ray.get(self._source_paths_ref) + def estimate_inmemory_data_size(self) -> int: # In case of empty projections no data will be read if self._projection_map == {}: diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index 3e7e8c137663..d1655e9912a4 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -172,8 +172,10 @@ def __init__( self._partitioning = partitioning self._ignore_missing_paths = ignore_missing_paths self._include_paths = include_paths - # Need this property for lineage tracking - self._source_paths = paths + # Need this property for lineage tracking. We should not directly assign paths + # to self since it is captured every read_task_fn during serialization and + # causing this data being duplicated and excessive object store spilling. + self._source_paths_ref = ray.put(paths) paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem) self._filesystem = RetryingPyFileSystem.wrap( self._filesystem, retryable_errors=self._data_context.retried_io_errors @@ -227,6 +229,10 @@ def __init__( self._paths_ref = ray.put(paths) self._file_sizes_ref = ray.put(file_sizes) + @property + def _source_paths(self) -> List[str]: + return ray.get(self._source_paths_ref) + def _paths(self) -> List[str]: return ray.get(self._paths_ref)