diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index 63cfbf84c147..6b7771618abc 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -32,11 +32,13 @@ from ibis.backends.sql.compilers.base import STAR, C if TYPE_CHECKING: + from collections.abc import Mapping from urllib.parse import ParseResult import pandas as pd import polars as pl import pyarrow as pa + from typing_extensions import Self def metadata_row_to_type( @@ -628,3 +630,44 @@ def _clean_up_tmp_table(self, name: str) -> None: bind.execute(drop) _finalize_memtable = _drop_cached_table = _clean_up_tmp_table + + @util.experimental + def to_pyarrow_batches( + self, + expr: ir.Expr, + /, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + **_: Any, + ) -> pa.ipc.RecordBatchReader: + import pandas as pd + import pyarrow as pa + + def handle_lob(x): + if isinstance(x, oracledb.LOB): + # LOBs are not supported by pyarrow, so we convert them to strings + return x.read() + return x + + def _batches(self: Self, *, schema: pa.Schema, query: str): + self._run_pre_execute_hooks(expr) + con = self.con + columns = schema.names + with con.cursor() as cursor: + cur = cursor.execute(query) + while batch := cur.fetchmany(chunk_size): + df = pd.DataFrame(batch, columns=columns).map(handle_lob) + yield pa.RecordBatch.from_pandas(df, schema=schema) + + schema = expr.as_table().schema().to_pyarrow() + query = self.compile(expr, limit=limit, params=params) + return pa.RecordBatchReader.from_batches( + schema, _batches(self, schema=schema, query=query) + ) + + def _run_pre_execute_hooks(self, expr: ir.Expr) -> None: + """Backend-specific hooks to run before an expression is executed.""" + self._register_udfs(expr) + self._register_in_memory_tables(expr) diff --git a/ibis/backends/oracle/tests/test_datatypes.py b/ibis/backends/oracle/tests/test_datatypes.py index 8f4fec91f542..10c46a48145e 100644 --- a/ibis/backends/oracle/tests/test_datatypes.py +++ b/ibis/backends/oracle/tests/test_datatypes.py @@ -1,5 +1,6 @@ from __future__ import annotations +import pyarrow as pa import pytest import sqlglot as sg import sqlglot.expressions as sge @@ -52,3 +53,27 @@ def test_number(con): assert raw_blob.schema() == ibis.Schema( dict(number_8_2="decimal(8, 2)", number_8="int64", number_default="int64") ) + + +def test_lob_pyarrow(con): + con.drop_table("lob_table", force=True) + + with con.begin() as bind: + bind.execute( + """CREATE TABLE "lob_table" ("clob" CLOB, "blob" BLOB, "nclob" NCLOB)""" + ) + bind.execute( + """INSERT INTO "lob_table" ("clob", "blob", "nclob") VALUES ('test clob', hextoraw('010203'), 'test nclob')""" + ) + + conn = ibis.connect("oracle://ibis:ibis@localhost:1521/IBIS_TESTING") + result = conn.sql('SELECT "clob", "blob", "nclob" FROM "lob_table"').to_pyarrow() + # assert arrow schema matches expected arrow schema + expected_schema = pa.schema( + [("clob", pa.string()), ("blob", pa.binary()), ("nclob", pa.string())] + ) + assert result.schema == expected_schema + + assert result.column("clob").to_pylist() == ["test clob"] + assert result.column("blob").to_pylist() == [b"\x01\x02\x03"] + assert result.column("nclob").to_pylist() == ["test nclob"]