Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions python/ray/data/_internal/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,12 @@ def __init__(
self._local_scheduling = NodeAffinitySchedulingStrategy(
ray.get_runtime_context().get_node_id(), soft=False
)
Copy link
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lee1258561 We can't remove this attribute cause there are other dependencies that need it.

What I recommend instead is to call ray.put() on the paths and then have a property based function called _source_paths. That way only the ObjectRef is serialized and not all the paths.

You can look how we do this for _paths in file_based_datasource

Link:

def _paths(self) -> List[str]:
return ray.get(self._paths_ref)

Copy link
Contributor Author

@lee1258561 lee1258561 Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the PR based on your suggestion. However, I did not see any other place in this repo referencing _source_paths except the three places that is added in this PR #55978.

Please help me double check if any other places I need to be aware of. Thanks!

# Need this property for lineage tracking
self._source_paths = paths

# Need this property for lineage tracking. We should not directly assign paths
# to self since it is captured every read_task_fn during serialization and
# causing this data being duplicated and excessive object store spilling.
self._source_paths_ref = ray.put(paths)

paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem)
filesystem = RetryingPyFileSystem.wrap(
self._filesystem,
Expand Down Expand Up @@ -465,6 +469,10 @@ def __init__(
sampled_file_infos, DataContext.get_current().target_max_block_size
)

@property
def _source_paths(self) -> List[str]:
return ray.get(self._source_paths_ref)

def estimate_inmemory_data_size(self) -> int:
# In case of empty projections no data will be read
if self._projection_map == {}:
Expand Down
10 changes: 8 additions & 2 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ def __init__(
self._partitioning = partitioning
self._ignore_missing_paths = ignore_missing_paths
self._include_paths = include_paths
# Need this property for lineage tracking
self._source_paths = paths
# Need this property for lineage tracking. We should not directly assign paths
# to self since it is captured every read_task_fn during serialization and
# causing this data being duplicated and excessive object store spilling.
self._source_paths_ref = ray.put(paths)
paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem)
self._filesystem = RetryingPyFileSystem.wrap(
self._filesystem, retryable_errors=self._data_context.retried_io_errors
Expand Down Expand Up @@ -227,6 +229,10 @@ def __init__(
self._paths_ref = ray.put(paths)
self._file_sizes_ref = ray.put(file_sizes)

@property
def _source_paths(self) -> List[str]:
return ray.get(self._source_paths_ref)

def _paths(self) -> List[str]:
return ray.get(self._paths_ref)

Expand Down