Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions python/ray/air/util/data_batch_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.data_batch_type import DataBatchType
from ray.air.util.tensor_extensions.arrow import (
get_arrow_extension_fixed_shape_tensor_types,
)
from ray.util.annotations import Deprecated, DeveloperAPI

if TYPE_CHECKING:
Expand Down Expand Up @@ -217,14 +220,15 @@ def _convert_batch_type_to_numpy(
)
return data
elif pyarrow is not None and isinstance(data, pyarrow.Table):
from ray.air.util.tensor_extensions.arrow import ArrowTensorType
from ray.air.util.transform_pyarrow import (
_concatenate_extension_column,
_is_column_extension_type,
)

if data.column_names == [TENSOR_COLUMN_NAME] and (
isinstance(data.schema.types[0], ArrowTensorType)
isinstance(
data.schema.types[0], get_arrow_extension_fixed_shape_tensor_types()
)
):
# If representing a tensor dataset, return as a single numpy array.
# Example: ray.data.from_numpy(np.arange(12).reshape((3, 2, 2)))
Expand Down
29 changes: 23 additions & 6 deletions python/ray/air/util/tensor_extensions/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,26 @@ def get_arrow_extension_tensor_types():
"""Returns list of extension types of Arrow Array holding
multidimensional tensors
"""
return ArrowTensorType, ArrowTensorTypeV2, ArrowVariableShapedTensorType
return (
*get_arrow_extension_fixed_shape_tensor_types(),
*get_arrow_extension_variable_shape_tensor_types(),
)


@DeveloperAPI
def get_arrow_extension_fixed_shape_tensor_types():
"""Returns list of Arrow extension types holding multidimensional
tensors of *fixed* shape
"""
return ArrowTensorType, ArrowTensorTypeV2


@DeveloperAPI
def get_arrow_extension_variable_shape_tensor_types():
"""Returns list of Arrow extension types holding multidimensional
tensors of *fixed* shape
"""
return (ArrowVariableShapedTensorType,)


class _BaseFixedShapeArrowTensorType(pa.ExtensionType, abc.ABC):
Expand Down Expand Up @@ -225,7 +244,7 @@ def _need_variable_shaped_tensor_array(
# short-circuit since we require a variable-shaped representation.
if isinstance(arr_type, ArrowVariableShapedTensorType):
return True
if not isinstance(arr_type, (ArrowTensorType, ArrowTensorTypeV2)):
if not isinstance(arr_type, get_arrow_extension_fixed_shape_tensor_types()):
raise ValueError(
"All provided array types must be an instance of either "
"ArrowTensorType or ArrowVariableShapedTensorType, but "
Expand Down Expand Up @@ -469,9 +488,7 @@ def _from_numpy(

from ray.data import DataContext

should_use_tensor_v2 = DataContext.get_current().use_arrow_tensor_v2

if should_use_tensor_v2:
if DataContext.get_current().use_arrow_tensor_v2:
pa_type_ = ArrowTensorTypeV2(element_shape, scalar_dtype)
else:
pa_type_ = ArrowTensorType(element_shape, scalar_dtype)
Expand Down Expand Up @@ -635,7 +652,7 @@ def _chunk_tensor_arrays(
if ArrowTensorType._need_variable_shaped_tensor_array(arrs_types):
new_arrs = []
for a in arrs:
if isinstance(a.type, (ArrowTensorType, ArrowTensorTypeV2)):
if isinstance(a.type, get_arrow_extension_fixed_shape_tensor_types()):
a = a.to_variable_shaped_tensor_array()
assert isinstance(a.type, ArrowVariableShapedTensorType)
new_arrs.append(a)
Expand Down
17 changes: 9 additions & 8 deletions python/ray/data/_internal/arrow_ops/transform_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from packaging.version import parse as parse_version

from ray._private.utils import _get_pyarrow_version
from ray.air.util.tensor_extensions.arrow import ArrowTensorTypeV2

try:
import pyarrow
Expand Down Expand Up @@ -90,11 +89,14 @@ def unify_schemas(
cols_with_null_list.add(col_name)
all_columns.add(col_name)

arrow_tensor_types = (
ArrowVariableShapedTensorType,
ArrowTensorType,
ArrowTensorTypeV2,
from ray.air.util.tensor_extensions.arrow import (
get_arrow_extension_fixed_shape_tensor_types,
get_arrow_extension_tensor_types,
)

arrow_tensor_types = get_arrow_extension_tensor_types()
arrow_fixed_shape_tensor_types = get_arrow_extension_fixed_shape_tensor_types()

columns_with_objects = set()
columns_with_tensor_array = set()
for col_name in all_columns:
Expand Down Expand Up @@ -124,12 +126,11 @@ def unify_schemas(
for s in schemas
if isinstance(s.field(col_name).type, arrow_tensor_types)
]

if ArrowTensorType._need_variable_shaped_tensor_array(tensor_array_types):
if isinstance(tensor_array_types[0], ArrowVariableShapedTensorType):
new_type = tensor_array_types[0]
elif isinstance(
tensor_array_types[0], (ArrowTensorType, ArrowTensorTypeV2)
):
elif isinstance(tensor_array_types[0], arrow_fixed_shape_tensor_types):
new_type = ArrowVariableShapedTensorType(
dtype=tensor_array_types[0].scalar_type,
ndim=len(tensor_array_types[0].shape),
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
# total cumulative size (due to it internally utilizing int32 offsets)
#
# V2 in turn relies on int64 offsets, therefore having a limit of ~9Eb (exabytes)
DEFAULT_USE_ARROW_TENSOR_V2 = env_bool("RAY_DATA_USE_ARROW_TENSOR_V2", False)
DEFAULT_USE_ARROW_TENSOR_V2 = env_bool("RAY_DATA_USE_ARROW_TENSOR_V2", True)

DEFAULT_AUTO_LOG_STATS = False

Expand Down
13 changes: 9 additions & 4 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import ray.cloudpickle as pickle
from ray._private.thirdparty.tabulate.tabulate import tabulate
from ray._private.usage import usage_lib
from ray.air.util.tensor_extensions.arrow import ArrowTensorTypeV2
from ray.air.util.tensor_extensions.arrow import (
ArrowTensorTypeV2,
get_arrow_extension_fixed_shape_tensor_types,
)
from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray
from ray.data._internal.aggregate import Max, Mean, Min, Std, Sum
from ray.data._internal.compute import ComputeStrategy
Expand Down Expand Up @@ -4478,15 +4481,17 @@ def block_to_df(block_ref: ObjectRef[Block]) -> pd.DataFrame:
}
)
elif pa is not None and isinstance(schema, pa.Schema):
from ray.data.extensions import ArrowTensorType
arrow_tensor_ext_types = get_arrow_extension_fixed_shape_tensor_types()

if any(isinstance(type_, ArrowTensorType) for type_ in schema.types):
if any(
isinstance(type_, arrow_tensor_ext_types) for type_ in schema.types
):
meta = pd.DataFrame(
{
col: pd.Series(
dtype=(
dtype.to_pandas_dtype()
if not isinstance(dtype, ArrowTensorType)
if not isinstance(dtype, arrow_tensor_ext_types)
else np.object_
)
)
Expand Down
6 changes: 4 additions & 2 deletions python/ray/data/tests/test_ecosystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import pytest

import ray
from ray.air.util.tensor_extensions.arrow import (
get_arrow_extension_fixed_shape_tensor_types,
)
from ray.data.extensions.tensor_extension import (
ArrowTensorArray,
ArrowTensorType,
TensorArray,
TensorDtype,
)
Expand Down Expand Up @@ -119,7 +121,7 @@ def test_to_dask_tensor_column_cast_arrow(ray_start_regular_shared):
in_table = pa.table({"a": ArrowTensorArray.from_numpy(data)})
ds = ray.data.from_arrow(in_table)
dtype = ds.schema().base_schema.field(0).type
assert isinstance(dtype, ArrowTensorType)
assert isinstance(dtype, get_arrow_extension_fixed_shape_tensor_types())
out_df = ds.to_dask().compute()
assert out_df["a"].dtype.type is np.object_
expected_df = pd.DataFrame({"a": list(data)})
Expand Down
8 changes: 5 additions & 3 deletions python/ray/data/tests/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
from PIL import Image

import ray
from ray.air.util.tensor_extensions.arrow import (
get_arrow_extension_fixed_shape_tensor_types,
)
from ray.data._internal.datasource.image_datasource import (
ImageDatasource,
ImageFileMetadataProvider,
)
from ray.data.datasource import Partitioning
from ray.data.datasource.file_meta_provider import FastFileMetadataProvider
from ray.data.extensions import ArrowTensorType
from ray.data.tests.conftest import * # noqa
from ray.data.tests.mock_http_server import * # noqa
from ray.tests.conftest import * # noqa
Expand All @@ -27,7 +29,7 @@ def test_basic(self, ray_start_regular_shared):
ds = ray.data.read_images("example://image-datasets/simple")
assert ds.schema().names == ["image"]
column_type = ds.schema().types[0]
assert isinstance(column_type, ArrowTensorType)
assert isinstance(column_type, get_arrow_extension_fixed_shape_tensor_types())
assert all(record["image"].shape == (32, 32, 3) for record in ds.take())

@pytest.mark.parametrize("num_threads", [-1, 0, 1, 2, 4])
Expand Down Expand Up @@ -139,7 +141,7 @@ def test_partitioning(
assert ds.schema().names == ["image", "label"]

image_type, label_type = ds.schema().types
assert isinstance(image_type, ArrowTensorType)
assert isinstance(image_type, get_arrow_extension_fixed_shape_tensor_types())
assert pa.types.is_string(label_type)

df = ds.to_pandas()
Expand Down
39 changes: 24 additions & 15 deletions python/ray/data/tests/test_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from pytest_lazyfixture import lazy_fixture

import ray
from ray.data import Schema
from ray.air.util.tensor_extensions.arrow import ArrowTensorTypeV2
from ray.data import DataContext, Schema
from ray.data.datasource import (
BaseFileMetadataProvider,
FastFileMetadataProvider,
Expand All @@ -23,6 +24,14 @@
from ray.tests.conftest import * # noqa


def _get_tensor_type():
return (
ArrowTensorTypeV2
if DataContext.get_current().use_arrow_tensor_v2
else ArrowTensorType
)


def test_numpy_read_partitioning(ray_start_regular_shared, tmp_path):
path = os.path.join(tmp_path, "country=us", "data.npy")
os.mkdir(os.path.dirname(path))
Expand Down Expand Up @@ -113,27 +122,27 @@ def test_to_numpy_refs(ray_start_regular_shared):
],
)
def test_numpy_roundtrip(ray_start_regular_shared, fs, data_path):
tensor_type = _get_tensor_type()

ds = ray.data.range_tensor(10, override_num_blocks=2)
ds.write_numpy(data_path, filesystem=fs, column="data")
ds = ray.data.read_numpy(data_path, filesystem=fs)
assert ds.count() == 10
assert ds.schema() == Schema(
pa.schema([("data", ArrowTensorType((1,), pa.int64()))])
)
assert ds.schema() == Schema(pa.schema([("data", tensor_type((1,), pa.int64()))]))
assert sorted(ds.take_all(), key=lambda row: row["data"]) == [
{"data": np.array([i])} for i in range(10)
]


def test_numpy_read(ray_start_regular_shared, tmp_path):
def test_numpy_read_x(ray_start_regular_shared, tmp_path):
tensor_type = _get_tensor_type()

path = os.path.join(tmp_path, "test_np_dir")
os.mkdir(path)
np.save(os.path.join(path, "test.npy"), np.expand_dims(np.arange(0, 10), 1))
ds = ray.data.read_numpy(path, override_num_blocks=1)
assert ds.count() == 10
assert ds.schema() == Schema(
pa.schema([("data", ArrowTensorType((1,), pa.int64()))])
)
assert ds.schema() == Schema(pa.schema([("data", tensor_type((1,), pa.int64()))]))
np.testing.assert_equal(
extract_values("data", ds.take(2)), [np.array([0]), np.array([1])]
)
Expand All @@ -145,9 +154,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path):
ds = ray.data.read_numpy(path, override_num_blocks=1)
assert ds._plan.initial_num_blocks() == 1
assert ds.count() == 10
assert ds.schema() == Schema(
pa.schema([("data", ArrowTensorType((1,), pa.int64()))])
)
assert ds.schema() == Schema(pa.schema([("data", tensor_type((1,), pa.int64()))]))
assert [v["data"].item() for v in ds.take(2)] == [0, 1]


Expand All @@ -174,6 +181,8 @@ def test_numpy_read_ignore_missing_paths(


def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path):
tensor_type = _get_tensor_type()

path = os.path.join(tmp_path, "test_np_dir")
os.mkdir(path)
path = os.path.join(path, "test.npy")
Expand All @@ -182,9 +191,7 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path):
path, meta_provider=FastFileMetadataProvider(), override_num_blocks=1
)
assert ds.count() == 10
assert ds.schema() == Schema(
pa.schema([("data", ArrowTensorType((1,), pa.int64()))])
)
assert ds.schema() == Schema(pa.schema([("data", tensor_type((1,), pa.int64()))]))
np.testing.assert_equal(
extract_values("data", ds.take(2)), [np.array([0]), np.array([1])]
)
Expand All @@ -204,6 +211,8 @@ def test_numpy_read_partitioned_with_filter(
write_partitioned_df,
assert_base_partitioned_ds,
):
tensor_type = _get_tensor_type()

def df_to_np(dataframe, path, **kwargs):
np.save(path, dataframe.to_numpy(dtype=np.dtype(np.int8)), **kwargs)

Expand Down Expand Up @@ -245,7 +254,7 @@ def sorted_values_transform_fn(sorted_values):
val_str = "".join(f"array({v}, dtype=int8), " for v in vals)[:-2]
assert_base_partitioned_ds(
ds,
schema=Schema(pa.schema([("data", ArrowTensorType((2,), pa.int8()))])),
schema=Schema(pa.schema([("data", tensor_type((2,), pa.int8()))])),
sorted_values=f"[[{val_str}]]",
ds_take_transform_fn=lambda taken: [extract_values("data", taken)],
sorted_values_transform_fn=sorted_values_transform_fn,
Expand Down
7 changes: 5 additions & 2 deletions python/ray/data/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import pytest

import ray
from ray.air.util.tensor_extensions.arrow import (
get_arrow_extension_fixed_shape_tensor_types,
)
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data.block import Block
from ray.data.extensions import ArrowTensorArray, ArrowTensorType, TensorDtype
from ray.data.extensions import ArrowTensorArray, TensorDtype
from ray.data.tests.conftest import * # noqa
from ray.data.tests.mock_http_server import * # noqa
from ray.tests.conftest import * # noqa
Expand Down Expand Up @@ -186,7 +189,7 @@ def test_to_pandas_tensor_column_cast_arrow(ray_start_regular_shared):
in_table = pa.table({"a": ArrowTensorArray.from_numpy(data)})
ds = ray.data.from_arrow(in_table)
dtype = ds.schema().base_schema.field(0).type
assert isinstance(dtype, ArrowTensorType)
assert isinstance(dtype, get_arrow_extension_fixed_shape_tensor_types())
out_df = ds.to_pandas()
assert out_df["a"].dtype.type is np.object_
expected_df = pd.DataFrame({"a": list(data)})
Expand Down
8 changes: 6 additions & 2 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
from pytest_lazyfixture import lazy_fixture

import ray
from ray.air.util.tensor_extensions.arrow import ArrowTensorType, ArrowTensorTypeV2
from ray.air.util.tensor_extensions.arrow import (
ArrowTensorTypeV2,
get_arrow_extension_fixed_shape_tensor_types,
)
from ray.data import Schema
from ray.data._internal.datasource.parquet_bulk_datasource import ParquetBulkDatasource
from ray.data._internal.datasource.parquet_datasource import (
Expand Down Expand Up @@ -1245,7 +1248,8 @@ def test_tensors_in_tables_parquet(
)

assert isinstance(
ds.schema().base_schema.field_by_name(tensor_col_name).type, ArrowTensorType
ds.schema().base_schema.field_by_name(tensor_col_name).type,
get_arrow_extension_fixed_shape_tensor_types(),
)

expected_tuples = list(zip(id_vals, group_vals, arr))
Expand Down
Loading