diff --git a/crates/polars-plan/src/logical_plan/pyarrow.rs b/crates/polars-plan/src/logical_plan/pyarrow.rs index 64cb77aa6f26..72394818efe1 100644 --- a/crates/polars-plan/src/logical_plan/pyarrow.rs +++ b/crates/polars-plan/src/logical_plan/pyarrow.rs @@ -145,6 +145,24 @@ pub(super) fn predicate_to_pa( let input = predicate_to_pa(*input, expr_arena, args)?; Some(format!("~({input}).is_null()")) } + AExpr::Function { + function: FunctionExpr::Boolean(BooleanFunction::IsNan), + input, + .. + } => { + let input = input.first().unwrap(); + let input = predicate_to_pa(*input, expr_arena, args)?; + Some(format!("({input}).is_nan()")) + } + AExpr::Function { + function: FunctionExpr::Boolean(BooleanFunction::IsNotNan), + input, + .. + } => { + let input = input.first().unwrap(); + let input = predicate_to_pa(*input, expr_arena, args)?; + Some(format!("~({input}).is_nan()")) + } #[cfg(feature = "is_in")] AExpr::Function { function: FunctionExpr::Boolean(BooleanFunction::IsIn), diff --git a/py-polars/docs/source/reference/io.rst b/py-polars/docs/source/reference/io.rst index ae00950734a9..8f339e7927c4 100644 --- a/py-polars/docs/source/reference/io.rst +++ b/py-polars/docs/source/reference/io.rst @@ -72,6 +72,13 @@ Excel read_excel DataFrame.write_excel +Apache Iceberg +~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + scan_iceberg + Delta Lake ~~~~~~~~~~ .. autosummary:: diff --git a/py-polars/polars/__init__.py b/py-polars/polars/__init__.py index ddc26de3b43c..aaefc8401a6f 100644 --- a/py-polars/polars/__init__.py +++ b/py-polars/polars/__init__.py @@ -164,6 +164,7 @@ scan_csv, scan_delta, scan_ds, + scan_iceberg, scan_ipc, scan_ndjson, scan_parquet, @@ -268,6 +269,7 @@ "scan_csv", "scan_delta", "scan_ds", + "scan_iceberg", "scan_ipc", "scan_ndjson", "scan_parquet", diff --git a/py-polars/polars/dependencies.py b/py-polars/polars/dependencies.py index fac71538f887..b6854173e548 100644 --- a/py-polars/polars/dependencies.py +++ b/py-polars/polars/dependencies.py @@ -10,6 +10,7 @@ _DATAFRAME_API_COMPAT_AVAILABLE = True _DELTALAKE_AVAILABLE = True +_PYICEBERG_AVAILABLE = True _FSSPEC_AVAILABLE = True _HYPOTHESIS_AVAILABLE = True _NUMPY_AVAILABLE = True @@ -159,6 +160,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]: import pandas import pyarrow import pydantic + import pyiceberg if sys.version_info >= (3, 9): import zoneinfo @@ -177,6 +179,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]: "dataframe_api_compat" ) deltalake, _DELTALAKE_AVAILABLE = _lazy_import("deltalake") + pyiceberg, _PYICEBERG_AVAILABLE = _lazy_import("pyiceberg") fsspec, _FSSPEC_AVAILABLE = _lazy_import("fsspec") hypothesis, _HYPOTHESIS_AVAILABLE = _lazy_import("hypothesis") numpy, _NUMPY_AVAILABLE = _lazy_import("numpy") @@ -240,6 +243,7 @@ def _check_for_pydantic(obj: Any) -> bool: "_LazyModule", # exported flags/guards "_DELTALAKE_AVAILABLE", + "_PYICEBERG_AVAILABLE", "_FSSPEC_AVAILABLE", "_HYPOTHESIS_AVAILABLE", "_NUMPY_AVAILABLE", diff --git a/py-polars/polars/io/__init__.py b/py-polars/polars/io/__init__.py index b7d25d6a64f7..5c8e788ad8d9 100644 --- a/py-polars/polars/io/__init__.py +++ b/py-polars/polars/io/__init__.py @@ -5,6 +5,7 @@ from polars.io.database import read_database from polars.io.delta import read_delta, scan_delta from polars.io.excel import read_excel +from polars.io.iceberg import scan_iceberg from polars.io.ipc import read_ipc, read_ipc_schema, scan_ipc from polars.io.json import read_json from polars.io.ndjson import read_ndjson, scan_ndjson @@ -27,6 +28,7 @@ "scan_csv", "scan_delta", "scan_ds", + "scan_iceberg", "scan_ipc", "scan_ndjson", "scan_parquet", diff --git a/py-polars/polars/io/iceberg.py b/py-polars/polars/io/iceberg.py new file mode 100644 index 000000000000..e2821a0e943e --- /dev/null +++ b/py-polars/polars/io/iceberg.py @@ -0,0 +1,280 @@ +from __future__ import annotations + +import ast +from _ast import GtE, Lt, LtE +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pyiceberg.table import Table + + from polars import DataFrame, LazyFrame, Series + +from ast import ( + Attribute, + BinOp, + BitAnd, + BitOr, + Call, + Compare, + Constant, + Eq, + Gt, + Invert, + List, + UnaryOp, +) +from functools import partial, singledispatch +from typing import Any + +import polars._reexport as pl +from polars.dependencies import _PYICEBERG_AVAILABLE + +if _PYICEBERG_AVAILABLE: + from polars.dependencies.pyiceberg.expressions import ( + AlwaysFalse, + AlwaysTrue, + And, + EqualTo, + GreaterThan, + GreaterThanOrEqual, + In, + IsNaN, + IsNull, + LessThan, + LessThanOrEqual, + Not, + Or, + ) + + +def scan_iceberg( + source: str | Table, + *, + storage_options: dict[str, Any] | None = None, +) -> LazyFrame: + """ + Lazily read from an Apache Iceberg table. + + Parameters + ---------- + source + URI or Table to the root of the Delta lake table. + + Note: For Local filesystem, absolute and relative paths are supported but + for the supported object storages - GCS, Azure and S3 full URI must be provided. + storage_options + Extra options for the storage backends supported by `pyiceberg`. + For cloud storages, this may include configurations for authentication etc. + + More info is available `here`__. + + Returns + ------- + LazyFrame + + Examples + -------- + Creates a scan for a Iceberg table from local filesystem, or object store. + + >>> table_path = "file:/path/to/iceberg-table/metadata.json" + >>> pl.scan_iceberg(table_path).collect() # doctest: +SKIP + + Creates a scan for an Iceberg table from S3. + See a list of supported storage options for S3 `here + `__. + + >>> table_path = "s3://bucket/path/to/iceberg-table/metadata.json" + >>> storage_options = { + ... "s3.region": "eu-central-1", + ... "s3.access-key-id": "THE_AWS_ACCESS_KEY_ID", + ... "s3.secret-access-key": "THE_AWS_SECRET_ACCESS_KEY", + ... } + >>> pl.scan_iceberg( + ... table_path, storage_options=storage_options + ... ).collect() # doctest: +SKIP + + Creates a scan for a Delta table from Azure. + Supported options for Azure are available `here + `__. + + Following type of table paths are supported, + * az:////metadata.json + * adl:////metadata.json + * abfs[s]:////metadata.json + + >>> table_path = "az://container/path/to/iceberg-table/metadata.json" + >>> storage_options = { + ... "adlfs.account-name": "AZURE_STORAGE_ACCOUNT_NAME", + ... "adlfs.account-key": "AZURE_STORAGE_ACCOUNT_KEY", + ... } + >>> pl.scan_iceberg( + ... table_path, storage_options=storage_options + ... ).collect() # doctest: +SKIP + + Creates a scan for a Delta table with additional delta specific options. + In the below example, `without_files` option is used which loads the table without + file tracking information. + + >>> table_path = "/path/to/iceberg-table/metadata.json" + >>> delta_table_options = {"without_files": True} + >>> pl.scan_iceberg( + ... table_path, delta_table_options=delta_table_options + ... ).collect() # doctest: +SKIP + + """ + from pyiceberg.io.pyarrow import schema_to_pyarrow + from pyiceberg.table import StaticTable + + if isinstance(source, str): + source = StaticTable.from_metadata( + metadata_location=source, properties=storage_options or {} + ) + + func = partial(_scan_pyarrow_dataset_impl, source) + arrow_schema = schema_to_pyarrow(source.schema()) + return pl.LazyFrame._scan_python_function(arrow_schema, func, pyarrow=True) + + +def _to_ast(expr: str) -> Any: + return ast.parse(expr, mode="eval").body + + +def _scan_pyarrow_dataset_impl( + tbl: Table, + with_columns: list[str] | None = None, + predicate: str = "", + n_rows: int | None = None, + **kwargs: Any, +) -> DataFrame | Series: + """ + Take the projected columns and materialize an arrow table. + + Parameters + ---------- + tbl + pyarrow dataset + with_columns + Columns that are projected + predicate + pyarrow expression that can be evaluated with eval + n_rows: + Materialize only n rows from the arrow dataset + batch_size + The maximum row count for scanned pyarrow record batches. + kwargs: + For backward compatibility + + Returns + ------- + DataFrame + + """ + scan = tbl.scan(limit=n_rows) + + if with_columns is not None: + scan = scan.select(*with_columns) + + if predicate is not None: + try: + expr_ast = _to_ast(predicate) + pyiceberg_expr = _convert_predicate(expr_ast) + except ValueError as e: + raise ValueError( + f"Could not convert predicate to PyIceberg: {predicate}" + ) from e + + scan = scan.filter(pyiceberg_expr) + + from polars import from_arrow + + return from_arrow(scan.to_arrow()) + + +@singledispatch +def _convert_predicate(a: Any) -> Any: + """Walks the AST to convert the PyArrow expression to a PyIceberg expression.""" + raise ValueError(f"Unexpected symbol: {a}") + + +@_convert_predicate.register(Constant) +def _(a: Constant) -> Any: + return a.value + + +@_convert_predicate.register(UnaryOp) +def _(a: UnaryOp) -> Any: + if isinstance(a.op, Invert): + return Not(_convert_predicate(a.operand)) + else: + raise ValueError(f"Unexpected UnaryOp: {a}") + + +@_convert_predicate.register(Call) +def _(a: Call) -> Any: + args = [_convert_predicate(arg) for arg in a.args] + f = _convert_predicate(a.func) + if f == "field": + return args + else: + ref = _convert_predicate(a.func.value)[0] # type: ignore[attr-defined] + if f == "isin": + return In(ref, args[0]) + elif f == "is_null": + return IsNull(ref) + elif f == "is_nan": + return IsNaN(ref) + elif f == "scalar": + return AlwaysTrue() if args[0] else AlwaysFalse() + + raise ValueError(f"Unknown call: {f}") + + +@_convert_predicate.register(Attribute) +def _(a: Attribute) -> Any: + return a.attr + + +@_convert_predicate.register(BinOp) +def _(a: BinOp) -> Any: + lhs = _convert_predicate(a.left) + rhs = _convert_predicate(a.right) + + op = a.op + if isinstance(op, BitAnd): + return And(lhs, rhs) + if isinstance(op, BitOr): + return Or(lhs, rhs) + else: + raise ValueError(f"Unknown: {lhs} {op} {rhs}") + + +@_convert_predicate.register(Compare) +def _(a: Compare) -> Any: + op = a.ops[0] + lhs = _convert_predicate(a.left)[0] + rhs = _convert_predicate(a.comparators[0]) + + if isinstance(op, Gt): + return GreaterThan(lhs, rhs) + if isinstance(op, GtE): + return GreaterThanOrEqual(lhs, rhs) + if isinstance(op, Eq): + return EqualTo(lhs, rhs) + if isinstance(op, Lt): + return LessThan(lhs, rhs) + if isinstance(op, LtE): + return LessThanOrEqual(lhs, rhs) + else: + raise ValueError(f"Unknown comparison: {op}") + + +@_convert_predicate.register(List) +def _(a: List) -> Any: + return [_convert_predicate(e) for e in a.elts] + + +def _check_if_pyiceberg_available() -> None: + if not _PYICEBERG_AVAILABLE: + raise ImportError( + "pyiceberg is not installed. Please run `pip install pyiceberg[pyarrow]>=0.4.0`." + ) diff --git a/py-polars/pyproject.toml b/py-polars/pyproject.toml index 11bb61416009..68a7ff4ef90d 100644 --- a/py-polars/pyproject.toml +++ b/py-polars/pyproject.toml @@ -48,6 +48,7 @@ deltalake = ["deltalake >= 0.10.0"] timezone = ["backports.zoneinfo; python_version < '3.9'", "tzdata; platform_system == 'Windows'"] matplotlib = ["matplotlib"] pydantic = ["pydantic"] +pyiceberg = ["pyiceberg >= 0.4.0"] sqlalchemy = ["sqlalchemy", "pandas"] xlsxwriter = ["xlsxwriter"] adbc = ["adbc_driver_sqlite"] @@ -81,6 +82,7 @@ module = [ "polars.polars", "pyarrow.*", "pydantic", + "pyiceberg.*", "sqlalchemy", "xlsx2csv", "xlsxwriter.*", diff --git a/py-polars/requirements-dev.txt b/py-polars/requirements-dev.txt index 1599eb24c3c3..34835a47d0fa 100644 --- a/py-polars/requirements-dev.txt +++ b/py-polars/requirements-dev.txt @@ -7,6 +7,7 @@ # Dependencies dataframe-api-compat >= 0.1.6 deltalake >= 0.10.0 +pyiceberg >= 0.4.0 numpy pandas pyarrow diff --git a/py-polars/tests/unit/io/files/iceberg-table/metadata/00001-55cdf97b-255c-4983-b9f3-0e468fadfe9e.metadata.json b/py-polars/tests/unit/io/files/iceberg-table/metadata/00001-55cdf97b-255c-4983-b9f3-0e468fadfe9e.metadata.json new file mode 100644 index 000000000000..25d54f9edc88 --- /dev/null +++ b/py-polars/tests/unit/io/files/iceberg-table/metadata/00001-55cdf97b-255c-4983-b9f3-0e468fadfe9e.metadata.json @@ -0,0 +1,266 @@ +{ + "format-version" : 1, + "table-uuid" : "adeb1f9c-de6a-4605-8b48-029408ad3004", + "location" : "file:///tmp/iceberg/t1", + "last-updated-ms" : 1691498730402, + "last-column-id" : 19, + "schema" : { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "VendorID", + "required" : false, + "type" : "long" + }, { + "id" : 2, + "name" : "tpep_pickup_datetime", + "required" : false, + "type" : "timestamptz" + }, { + "id" : 3, + "name" : "tpep_dropoff_datetime", + "required" : false, + "type" : "timestamptz" + }, { + "id" : 4, + "name" : "passenger_count", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "trip_distance", + "required" : false, + "type" : "double" + }, { + "id" : 6, + "name" : "RatecodeID", + "required" : false, + "type" : "double" + }, { + "id" : 7, + "name" : "store_and_fwd_flag", + "required" : false, + "type" : "string" + }, { + "id" : 8, + "name" : "PULocationID", + "required" : false, + "type" : "long" + }, { + "id" : 9, + "name" : "DOLocationID", + "required" : false, + "type" : "long" + }, { + "id" : 10, + "name" : "payment_type", + "required" : false, + "type" : "long" + }, { + "id" : 11, + "name" : "fare_amount", + "required" : false, + "type" : "double" + }, { + "id" : 12, + "name" : "extra", + "required" : false, + "type" : "double" + }, { + "id" : 13, + "name" : "mta_tax", + "required" : false, + "type" : "double" + }, { + "id" : 14, + "name" : "tip_amount", + "required" : false, + "type" : "double" + }, { + "id" : 15, + "name" : "tolls_amount", + "required" : false, + "type" : "double" + }, { + "id" : 16, + "name" : "improvement_surcharge", + "required" : false, + "type" : "double" + }, { + "id" : 17, + "name" : "total_amount", + "required" : false, + "type" : "double" + }, { + "id" : 18, + "name" : "congestion_surcharge", + "required" : false, + "type" : "double" + }, { + "id" : 19, + "name" : "airport_fee", + "required" : false, + "type" : "double" + } ] + }, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "VendorID", + "required" : false, + "type" : "long" + }, { + "id" : 2, + "name" : "tpep_pickup_datetime", + "required" : false, + "type" : "timestamptz" + }, { + "id" : 3, + "name" : "tpep_dropoff_datetime", + "required" : false, + "type" : "timestamptz" + }, { + "id" : 4, + "name" : "passenger_count", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "trip_distance", + "required" : false, + "type" : "double" + }, { + "id" : 6, + "name" : "RatecodeID", + "required" : false, + "type" : "double" + }, { + "id" : 7, + "name" : "store_and_fwd_flag", + "required" : false, + "type" : "string" + }, { + "id" : 8, + "name" : "PULocationID", + "required" : false, + "type" : "long" + }, { + "id" : 9, + "name" : "DOLocationID", + "required" : false, + "type" : "long" + }, { + "id" : 10, + "name" : "payment_type", + "required" : false, + "type" : "long" + }, { + "id" : 11, + "name" : "fare_amount", + "required" : false, + "type" : "double" + }, { + "id" : 12, + "name" : "extra", + "required" : false, + "type" : "double" + }, { + "id" : 13, + "name" : "mta_tax", + "required" : false, + "type" : "double" + }, { + "id" : 14, + "name" : "tip_amount", + "required" : false, + "type" : "double" + }, { + "id" : 15, + "name" : "tolls_amount", + "required" : false, + "type" : "double" + }, { + "id" : 16, + "name" : "improvement_surcharge", + "required" : false, + "type" : "double" + }, { + "id" : 17, + "name" : "total_amount", + "required" : false, + "type" : "double" + }, { + "id" : 18, + "name" : "congestion_surcharge", + "required" : false, + "type" : "double" + }, { + "id" : 19, + "name" : "airport_fee", + "required" : false, + "type" : "double" + } ] + } ], + "partition-spec" : [ { + "name" : "tpep_pickup_datetime_day", + "transform" : "day", + "source-id" : 2, + "field-id" : 1000 + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ { + "name" : "tpep_pickup_datetime_day", + "transform" : "day", + "source-id" : 2, + "field-id" : 1000 + } ] + } ], + "last-partition-id" : 1000, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "root" + }, + "current-snapshot-id" : 2532316387469898569, + "refs" : { + "main" : { + "snapshot-id" : 2532316387469898569, + "type" : "branch" + } + }, + "snapshots" : [ { + "snapshot-id" : 2532316387469898569, + "timestamp-ms" : 1691498730402, + "summary" : { + "operation" : "overwrite", + "spark.app.id" : "local-1691498272142", + "added-data-files" : "2", + "added-records" : "233822", + "added-files-size" : "3646886", + "changed-partition-count" : "2", + "total-records" : "233822", + "total-files-size" : "3646886", + "total-data-files" : "2", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "file:///tmp/iceberg/t1/metadata/snap-2532316387469898569-1-17f0c3f1-0072-4fb2-a3d7-eee89e606c9a.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1691498730402, + "snapshot-id" : 2532316387469898569 + } ], + "metadata-log" : [] +} \ No newline at end of file diff --git a/py-polars/tests/unit/io/files/iceberg-table/metadata/17f0c3f1-0072-4fb2-a3d7-eee89e606c9a-m0.avro b/py-polars/tests/unit/io/files/iceberg-table/metadata/17f0c3f1-0072-4fb2-a3d7-eee89e606c9a-m0.avro new file mode 100644 index 000000000000..8624250cfc94 Binary files /dev/null and b/py-polars/tests/unit/io/files/iceberg-table/metadata/17f0c3f1-0072-4fb2-a3d7-eee89e606c9a-m0.avro differ diff --git a/py-polars/tests/unit/io/files/iceberg-table/metadata/snap-2532316387469898569-1-17f0c3f1-0072-4fb2-a3d7-eee89e606c9a.avro b/py-polars/tests/unit/io/files/iceberg-table/metadata/snap-2532316387469898569-1-17f0c3f1-0072-4fb2-a3d7-eee89e606c9a.avro new file mode 100644 index 000000000000..7288ac75b1dc Binary files /dev/null and b/py-polars/tests/unit/io/files/iceberg-table/metadata/snap-2532316387469898569-1-17f0c3f1-0072-4fb2-a3d7-eee89e606c9a.avro differ diff --git a/py-polars/tests/unit/io/test_iceberg.py b/py-polars/tests/unit/io/test_iceberg.py new file mode 100644 index 000000000000..2288878ba821 --- /dev/null +++ b/py-polars/tests/unit/io/test_iceberg.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +import contextlib +import os +from pathlib import Path + +import pytest + +import polars as pl +from polars.io.iceberg import _convert_predicate, _to_ast + + +@pytest.fixture() +def iceberg_path() -> str: + # Iceberg requires absolute paths, so we'll symlink + # the test table into /tmp/iceberg/t1/ + Path("/tmp/iceberg").mkdir(parents=True, exist_ok=True) + current_path = Path.cwd() + with contextlib.suppress(FileExistsError): + os.symlink(f"{current_path}/files/iceberg-table", "/tmp/iceberg/t1") + + return "file:///tmp/iceberg/t1/metadata/00001-55cdf97b-255c-4983-b9f3-0e468fadfe9e.metadata.json" + + +def test_scan_iceberg_plain(iceberg_path: str) -> None: + df = pl.scan_iceberg(iceberg_path) + assert len(df.collect()) == 233822 + + +def test_scan_iceberg_filter_on_partition(iceberg_path: str) -> None: + df = pl.scan_iceberg(iceberg_path) + df = df.filter(pl.col("tpep_pickup_datetime") >= "2022-03-02T00:00:00+00:00") + assert len(df.collect()) == 119863 + + +def test_scan_iceberg_filter_on_column(iceberg_path: str) -> None: + df = pl.scan_iceberg(iceberg_path) + df = df.filter(pl.col("fare_amount") < 0.0) + assert len(df.collect()) == 1192 + + +def test_true_expression() -> None: + from pyiceberg.expressions import ( + AlwaysTrue, + ) + + expr = _to_ast("pa.compute.scalar(True)") + assert _convert_predicate(expr) == AlwaysTrue() + + +def test_false_expression() -> None: + from pyiceberg.expressions import ( + AlwaysFalse, + ) + + expr = _to_ast("pa.compute.scalar(False)") + assert _convert_predicate(expr) == AlwaysFalse() + + +def test_is_null_expression() -> None: + from pyiceberg.expressions import ( + IsNull, + ) + + expr = _to_ast("(pa.compute.field('borough')).is_null()") + assert _convert_predicate(expr) == IsNull("borough") + + +def test_is_not_null_expression() -> None: + from pyiceberg.expressions import ( + IsNull, + Not, + ) + + expr = _to_ast("~(pa.compute.field('location_id')).is_null()") + assert _convert_predicate(expr) == Not(IsNull("location_id")) + + +def test_is_nan_expression() -> None: + from pyiceberg.expressions import ( + IsNaN, + ) + + expr = _to_ast("(pa.compute.field('borough')).is_nan()") + assert _convert_predicate(expr) == IsNaN("borough") + + +def test_is_not_nan_expression() -> None: + from pyiceberg.expressions import ( + IsNaN, + Not, + ) + + expr = _to_ast("~(pa.compute.field('location_id')).is_nan()") + assert _convert_predicate(expr) == Not(IsNaN("location_id")) + + +def test_isin_expression() -> None: + from pyiceberg.expressions import ( + In, + literal, + ) + + expr = _to_ast("(pa.compute.field('location_id')).isin([1,2,3])") + assert _convert_predicate(expr) == In( + "location_id", {literal(1), literal(2), literal(3)} + ) + + +def test_parse_combined_expression() -> None: + from pyiceberg.expressions import ( + And, + EqualTo, + GreaterThan, + In, + Or, + Reference, + literal, + ) + + expr = _to_ast( + "(((pa.compute.field('borough') == 'Manhattan') & (pa.compute.field('location_id') > 10)) | (pa.compute.field('location_id')).isin([1,2,3]))" + ) + assert _convert_predicate(expr) == Or( + left=And( + left=EqualTo(term=Reference(name="borough"), literal=literal("Manhattan")), + right=GreaterThan(term="location_id", literal=literal(10)), + ), + right=In("location_id", {literal(1), literal(2), literal(3)}), + ) + + +def test_parse_gt() -> None: + from pyiceberg.expressions import ( + GreaterThan, + ) + + expr = _to_ast("(pa.compute.field('dt') > '2023-08-08')") + assert _convert_predicate(expr) == GreaterThan("dt", "2023-08-08") + + +def test_parse_gteq() -> None: + from pyiceberg.expressions import ( + GreaterThanOrEqual, + ) + + expr = _to_ast("(pa.compute.field('dt') >= '2023-08-08')") + assert _convert_predicate(expr) == GreaterThanOrEqual("dt", "2023-08-08") + + +def test_parse_eq() -> None: + from pyiceberg.expressions import ( + EqualTo, + ) + + expr = _to_ast("(pa.compute.field('dt') == '2023-08-08')") + assert _convert_predicate(expr) == EqualTo("dt", "2023-08-08") + + +def test_parse_lt() -> None: + from pyiceberg.expressions import ( + LessThan, + ) + + expr = _to_ast("(pa.compute.field('dt') < '2023-08-08')") + assert _convert_predicate(expr) == LessThan("dt", "2023-08-08") + + +def test_parse_lteq() -> None: + from pyiceberg.expressions import ( + LessThanOrEqual, + ) + + expr = _to_ast("(pa.compute.field('dt') <= '2023-08-08')") + assert _convert_predicate(expr) == LessThanOrEqual("dt", "2023-08-08")