diff --git a/python/ray/data/_internal/datasource/parquet_datasource.py b/python/ray/data/_internal/datasource/parquet_datasource.py index feecbb846805..65d52fc80083 100644 --- a/python/ray/data/_internal/datasource/parquet_datasource.py +++ b/python/ray/data/_internal/datasource/parquet_datasource.py @@ -208,7 +208,8 @@ def __init__( self._local_scheduling = NodeAffinitySchedulingStrategy( ray.get_runtime_context().get_node_id(), soft=False ) - + # Need this property for lineage tracking + self._source_paths = paths paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem) filesystem = RetryingPyFileSystem.wrap( self._filesystem, diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index d3fd82b75ad2..256927931087 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -140,6 +140,8 @@ def __init__( self._partitioning = partitioning self._ignore_missing_paths = ignore_missing_paths self._include_paths = include_paths + # Need this property for lineage tracking + self._source_paths = paths paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem) self._filesystem = RetryingPyFileSystem.wrap( self._filesystem, retryable_errors=self._data_context.retried_io_errors diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 484e82cc2800..5ecc364b14fc 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -846,6 +846,7 @@ def udf(row): assert isinstance(logical_ops[0], Read) datasource = logical_ops[0]._datasource assert isinstance(datasource, ParquetDatasource) + assert datasource._source_paths == input_path assert isinstance(logical_ops[1], MapRows) assert logical_ops[1]._fn == udf