Skip to content

Commit

Permalink
feat(python): Add support for Iceberg
Browse files Browse the repository at this point in the history
Resolves #6227
  • Loading branch information
Fokko committed Aug 8, 2023
1 parent 989ac8a commit 5107b6e
Show file tree
Hide file tree
Showing 12 changed files with 757 additions and 0 deletions.
18 changes: 18 additions & 0 deletions crates/polars-plan/src/logical_plan/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@ pub(super) fn predicate_to_pa(
let input = predicate_to_pa(*input, expr_arena, args)?;
Some(format!("~({input}).is_null()"))
}
AExpr::Function {
function: FunctionExpr::Boolean(BooleanFunction::IsNan),
input,
..
} => {
let input = input.first().unwrap();
let input = predicate_to_pa(*input, expr_arena, args)?;
Some(format!("({input}).is_nan()"))
}
AExpr::Function {
function: FunctionExpr::Boolean(BooleanFunction::IsNotNan),
input,
..
} => {
let input = input.first().unwrap();
let input = predicate_to_pa(*input, expr_arena, args)?;
Some(format!("~({input}).is_nan()"))
}
#[cfg(feature = "is_in")]
AExpr::Function {
function: FunctionExpr::Boolean(BooleanFunction::IsIn),
Expand Down
7 changes: 7 additions & 0 deletions py-polars/docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ Excel
read_excel
DataFrame.write_excel

Apache Iceberg
~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/

scan_iceberg

Delta Lake
~~~~~~~~~~
.. autosummary::
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
scan_csv,
scan_delta,
scan_ds,
scan_iceberg,
scan_ipc,
scan_ndjson,
scan_parquet,
Expand Down Expand Up @@ -268,6 +269,7 @@
"scan_csv",
"scan_delta",
"scan_ds",
"scan_iceberg",
"scan_ipc",
"scan_ndjson",
"scan_parquet",
Expand Down
4 changes: 4 additions & 0 deletions py-polars/polars/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

_DATAFRAME_API_COMPAT_AVAILABLE = True
_DELTALAKE_AVAILABLE = True
_PYICEBERG_AVAILABLE = True
_FSSPEC_AVAILABLE = True
_HYPOTHESIS_AVAILABLE = True
_NUMPY_AVAILABLE = True
Expand Down Expand Up @@ -159,6 +160,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
import pandas
import pyarrow
import pydantic
import pyiceberg

if sys.version_info >= (3, 9):
import zoneinfo
Expand All @@ -177,6 +179,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
"dataframe_api_compat"
)
deltalake, _DELTALAKE_AVAILABLE = _lazy_import("deltalake")
pyiceberg, _PYICEBERG_AVAILABLE = _lazy_import("pyiceberg")
fsspec, _FSSPEC_AVAILABLE = _lazy_import("fsspec")
hypothesis, _HYPOTHESIS_AVAILABLE = _lazy_import("hypothesis")
numpy, _NUMPY_AVAILABLE = _lazy_import("numpy")
Expand Down Expand Up @@ -240,6 +243,7 @@ def _check_for_pydantic(obj: Any) -> bool:
"_LazyModule",
# exported flags/guards
"_DELTALAKE_AVAILABLE",
"_PYICEBERG_AVAILABLE",
"_FSSPEC_AVAILABLE",
"_HYPOTHESIS_AVAILABLE",
"_NUMPY_AVAILABLE",
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from polars.io.database import read_database
from polars.io.delta import read_delta, scan_delta
from polars.io.excel import read_excel
from polars.io.iceberg import scan_iceberg
from polars.io.ipc import read_ipc, read_ipc_schema, scan_ipc
from polars.io.json import read_json
from polars.io.ndjson import read_ndjson, scan_ndjson
Expand All @@ -27,6 +28,7 @@
"scan_csv",
"scan_delta",
"scan_ds",
"scan_iceberg",
"scan_ipc",
"scan_ndjson",
"scan_parquet",
Expand Down
280 changes: 280 additions & 0 deletions py-polars/polars/io/iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
from __future__ import annotations

import ast
from _ast import GtE, Lt, LtE
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pyiceberg.table import Table

from polars import DataFrame, LazyFrame, Series

from ast import (
Attribute,
BinOp,
BitAnd,
BitOr,
Call,
Compare,
Constant,
Eq,
Gt,
Invert,
List,
UnaryOp,
)
from functools import partial, singledispatch
from typing import Any

import polars._reexport as pl
from polars.dependencies import _PYICEBERG_AVAILABLE

if _PYICEBERG_AVAILABLE:
from polars.dependencies.pyiceberg.expressions import (
AlwaysFalse,
AlwaysTrue,
And,
EqualTo,
GreaterThan,
GreaterThanOrEqual,
In,
IsNaN,
IsNull,
LessThan,
LessThanOrEqual,
Not,
Or,
)


def scan_iceberg(
source: str | Table,
*,
storage_options: dict[str, Any] | None = None,
) -> LazyFrame:
"""
Lazily read from an Apache Iceberg table.
Parameters
----------
source
URI or Table to the root of the Delta lake table.
Note: For Local filesystem, absolute and relative paths are supported but
for the supported object storages - GCS, Azure and S3 full URI must be provided.
storage_options
Extra options for the storage backends supported by `pyiceberg`.
For cloud storages, this may include configurations for authentication etc.
More info is available `here<https://py.iceberg.apache.org/configuration/>`__.
Returns
-------
LazyFrame
Examples
--------
Creates a scan for a Iceberg table from local filesystem, or object store.
>>> table_path = "file:/path/to/iceberg-table/metadata.json"
>>> pl.scan_iceberg(table_path).collect() # doctest: +SKIP
Creates a scan for an Iceberg table from S3.
See a list of supported storage options for S3 `here
<https://py.iceberg.apache.org/configuration/#fileio>`__.
>>> table_path = "s3://bucket/path/to/iceberg-table/metadata.json"
>>> storage_options = {
... "s3.region": "eu-central-1",
... "s3.access-key-id": "THE_AWS_ACCESS_KEY_ID",
... "s3.secret-access-key": "THE_AWS_SECRET_ACCESS_KEY",
... }
>>> pl.scan_iceberg(
... table_path, storage_options=storage_options
... ).collect() # doctest: +SKIP
Creates a scan for a Delta table from Azure.
Supported options for Azure are available `here
<https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants>`__.
Following type of table paths are supported,
* az://<container>/<path>/metadata.json
* adl://<container>/<path>/metadata.json
* abfs[s]://<container>/<path>/metadata.json
>>> table_path = "az://container/path/to/iceberg-table/metadata.json"
>>> storage_options = {
... "adlfs.account-name": "AZURE_STORAGE_ACCOUNT_NAME",
... "adlfs.account-key": "AZURE_STORAGE_ACCOUNT_KEY",
... }
>>> pl.scan_iceberg(
... table_path, storage_options=storage_options
... ).collect() # doctest: +SKIP
Creates a scan for a Delta table with additional delta specific options.
In the below example, `without_files` option is used which loads the table without
file tracking information.
>>> table_path = "/path/to/iceberg-table/metadata.json"
>>> delta_table_options = {"without_files": True}
>>> pl.scan_iceberg(
... table_path, delta_table_options=delta_table_options
... ).collect() # doctest: +SKIP
"""
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.table import StaticTable

if isinstance(source, str):
source = StaticTable.from_metadata(
metadata_location=source, properties=storage_options or {}
)

func = partial(_scan_pyarrow_dataset_impl, source)
arrow_schema = schema_to_pyarrow(source.schema())
return pl.LazyFrame._scan_python_function(arrow_schema, func, pyarrow=True)


def _to_ast(expr: str) -> Any:
return ast.parse(expr, mode="eval").body


def _scan_pyarrow_dataset_impl(
tbl: Table,
with_columns: list[str] | None = None,
predicate: str = "",
n_rows: int | None = None,
**kwargs: Any,
) -> DataFrame | Series:
"""
Take the projected columns and materialize an arrow table.
Parameters
----------
tbl
pyarrow dataset
with_columns
Columns that are projected
predicate
pyarrow expression that can be evaluated with eval
n_rows:
Materialize only n rows from the arrow dataset
batch_size
The maximum row count for scanned pyarrow record batches.
kwargs:
For backward compatibility
Returns
-------
DataFrame
"""
scan = tbl.scan(limit=n_rows)

if with_columns is not None:
scan = scan.select(*with_columns)

if predicate is not None:
try:
expr_ast = _to_ast(predicate)
pyiceberg_expr = _convert_predicate(expr_ast)
except ValueError as e:
raise ValueError(
f"Could not convert predicate to PyIceberg: {predicate}"
) from e

scan = scan.filter(pyiceberg_expr)

from polars import from_arrow

return from_arrow(scan.to_arrow())


@singledispatch
def _convert_predicate(a: Any) -> Any:
"""Walks the AST to convert the PyArrow expression to a PyIceberg expression."""
raise ValueError(f"Unexpected symbol: {a}")


@_convert_predicate.register(Constant)
def _(a: Constant) -> Any:
return a.value


@_convert_predicate.register(UnaryOp)
def _(a: UnaryOp) -> Any:
if isinstance(a.op, Invert):
return Not(_convert_predicate(a.operand))
else:
raise ValueError(f"Unexpected UnaryOp: {a}")


@_convert_predicate.register(Call)
def _(a: Call) -> Any:
args = [_convert_predicate(arg) for arg in a.args]
f = _convert_predicate(a.func)
if f == "field":
return args
else:
ref = _convert_predicate(a.func.value)[0] # type: ignore[attr-defined]
if f == "isin":
return In(ref, args[0])
elif f == "is_null":
return IsNull(ref)
elif f == "is_nan":
return IsNaN(ref)
elif f == "scalar":
return AlwaysTrue() if args[0] else AlwaysFalse()

raise ValueError(f"Unknown call: {f}")


@_convert_predicate.register(Attribute)
def _(a: Attribute) -> Any:
return a.attr


@_convert_predicate.register(BinOp)
def _(a: BinOp) -> Any:
lhs = _convert_predicate(a.left)
rhs = _convert_predicate(a.right)

op = a.op
if isinstance(op, BitAnd):
return And(lhs, rhs)
if isinstance(op, BitOr):
return Or(lhs, rhs)
else:
raise ValueError(f"Unknown: {lhs} {op} {rhs}")


@_convert_predicate.register(Compare)
def _(a: Compare) -> Any:
op = a.ops[0]
lhs = _convert_predicate(a.left)[0]
rhs = _convert_predicate(a.comparators[0])

if isinstance(op, Gt):
return GreaterThan(lhs, rhs)
if isinstance(op, GtE):
return GreaterThanOrEqual(lhs, rhs)
if isinstance(op, Eq):
return EqualTo(lhs, rhs)
if isinstance(op, Lt):
return LessThan(lhs, rhs)
if isinstance(op, LtE):
return LessThanOrEqual(lhs, rhs)
else:
raise ValueError(f"Unknown comparison: {op}")


@_convert_predicate.register(List)
def _(a: List) -> Any:
return [_convert_predicate(e) for e in a.elts]


def _check_if_pyiceberg_available() -> None:
if not _PYICEBERG_AVAILABLE:
raise ImportError(
"pyiceberg is not installed. Please run `pip install pyiceberg[pyarrow]>=0.4.0`."
)
2 changes: 2 additions & 0 deletions py-polars/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ deltalake = ["deltalake >= 0.10.0"]
timezone = ["backports.zoneinfo; python_version < '3.9'", "tzdata; platform_system == 'Windows'"]
matplotlib = ["matplotlib"]
pydantic = ["pydantic"]
pyiceberg = ["pyiceberg >= 0.4.0"]
sqlalchemy = ["sqlalchemy", "pandas"]
xlsxwriter = ["xlsxwriter"]
adbc = ["adbc_driver_sqlite"]
Expand Down Expand Up @@ -81,6 +82,7 @@ module = [
"polars.polars",
"pyarrow.*",
"pydantic",
"pyiceberg.*",
"sqlalchemy",
"xlsx2csv",
"xlsxwriter.*",
Expand Down
Loading

0 comments on commit 5107b6e

Please sign in to comment.