Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
351 changes: 220 additions & 131 deletions Cargo.lock

Large diffs are not rendered by default.

1,163 changes: 1,114 additions & 49 deletions bindings/python/Cargo.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ crate-type = ["cdylib"]

[dependencies]
iceberg = { path = "../../crates/iceberg" }
pyo3 = { version = "0.22.3", features = ["extension-module", "abi3-py39"] }
arrow = { version = "53", features = ["pyarrow"] }
pyo3 = { version = "0.22.4", features = ["extension-module", "abi3-py39"] }
arrow = { version = "53.3.0", features = ["pyarrow"] }
iceberg-datafusion = { path = "../../crates/integrations/datafusion" }
tokio = { version = "1", features = ["full"] }
datafusion-ffi = { path = "/Users/kevinliu/repos/datafusion/datafusion/ffi" }
4 changes: 2 additions & 2 deletions bindings/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ module-name = "pyiceberg_core.pyiceberg_core_rust"
ignore = ["F403", "F405"]

[tool.hatch.envs.dev]
dependencies = ["maturin>=1.0,<2.0", "pytest>=8.3.2", "pyarrow>=17.0.0"]
dependencies = ["maturin>=1.0,<2.0", "pytest>=8.3.2", "pyarrow>=17.0.0", "pyiceberg[sql-sqlite]>=0.8.1", "datafusion>=43.1.0"]

[tool.hatch.envs.dev.scripts]
develop = "maturin develop"
build = "maturin build --out dist --sdist"
test = "pytest"
test = "RUST_BACKTRACE=full pytest -vvv -s"
2 changes: 2 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
use pyo3::prelude::*;

mod error;
mod table_provider;
mod transform;

#[pymodule]
fn pyiceberg_core_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
table_provider::register_module(py, m)?;
transform::register_module(py, m)?;
Ok(())
}
121 changes: 121 additions & 0 deletions bindings/python/src/table_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::ffi::CString;
use std::sync::Arc;

use datafusion_ffi::table_provider::FFI_TableProvider;
use iceberg::io::FileIO;
use iceberg::table::StaticTable;
use iceberg::TableIdent;
use iceberg_datafusion::table::IcebergTableProvider;
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use tokio::runtime::Runtime;

#[pyclass(name = "IcebergTableProvider")]
#[derive(Clone)]
pub struct PyIcebergTableProvider {
inner: IcebergTableProvider,
runtime: Arc<Runtime>,
}

#[pymethods]
impl PyIcebergTableProvider {
#[new]
fn new(metadata_location: String) -> PyResult<Self> {
// Create TableIdent
let table_ident = TableIdent::from_strs(["myschema", "mytable"]).map_err(|e| {
PyErr::new::<PyRuntimeError, _>(format!("Failed to create table ident: {}", e))
})?;

// Create FileIO
let file_io = FileIO::from_path(&metadata_location)
.and_then(|builder| builder.build())
.map_err(|e| {
PyErr::new::<PyRuntimeError, _>(format!("Failed to create file IO: {}", e))
})?;

// Create a runtime for running async code
let runtime = Runtime::new().map_err(|e| {
PyErr::new::<PyRuntimeError, _>(format!("Failed to create runtime: {}", e))
})?;

// Run the async initialization in the runtime
let provider = runtime.block_on(async {
// Load static table
let static_table =
StaticTable::from_metadata_file(&metadata_location, table_ident, file_io)
.await
.map_err(|e| {
PyErr::new::<PyRuntimeError, _>(format!(
"Failed to load static table: {}",
e
))
})?;

// Convert to table and create schema
let table = static_table.into_table();

// Use the public try_new_from_table function
IcebergTableProvider::try_new_from_table(table)
.await
.map_err(|e| {
PyErr::new::<PyRuntimeError, _>(format!(
"Failed to create table provider: {}",
e
))
})
})?;

Ok(PyIcebergTableProvider {
inner: provider,
runtime: Arc::new(runtime),
})
}

/// Expose as a DataFusion table provider
fn __datafusion_table_provider__<'py>(
&self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
let name = CString::new("datafusion_table_provider").unwrap();
let provider = FFI_TableProvider::new(Arc::new(self.inner.clone()), false, Some(self.runtime.clone()));
PyCapsule::new_bound(py, provider, Some(name.clone()))
}
}

/// Standalone function to create a table provider
#[pyfunction]
pub fn create_table_provider(metadata_location: String) -> PyResult<PyIcebergTableProvider> {
PyIcebergTableProvider::new(metadata_location)
}

/// Register the module
pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
let submod = PyModule::new_bound(py, "table_provider")?;
submod.add_function(wrap_pyfunction!(create_table_provider, &submod)?)?;

// Add as submodule and register in sys.modules
m.add_submodule(&submod)?;
py.import_bound("sys")?
.getattr("modules")?
.set_item("pyiceberg_core.table_provider", submod)?;

Ok(())
}
77 changes: 77 additions & 0 deletions bindings/python/tests/test_table_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


import pytest
from pyiceberg_core import table_provider
from datafusion import SessionContext
import tempfile
from pyiceberg.catalog.sql import SqlCatalog
import pyarrow as pa


def test_iceberg_table_provider():
with tempfile.TemporaryDirectory() as temp_dir:
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{temp_dir}/pyiceberg_catalog.db",
"warehouse": f"file://{temp_dir}",
},
)
df = pa.Table.from_pydict(
{
"column1": [1, 2, 3],
"column2": ["A", "B", "C"],
}
)
catalog.create_namespace_if_not_exists("default")
table = catalog.create_table_if_not_exists(
"default.dataset",
schema=df.schema,
)
table.append(df)

metadata_location = table.metadata_location
iceberg_table_provider = table_provider.create_table_provider(
metadata_location=metadata_location
)

ctx = SessionContext()
ctx.register_table_provider("test", iceberg_table_provider)

# Get the registered table
table = ctx.table("test")

# Check that we got a table back
assert table is not None

# Instead of comparing table objects directly, let's verify functionality
# For example, check if we can get the schema
try:
schema = table.schema()
assert schema is not None
except Exception as e:
pytest.fail(f"Failed to get schema: {e}")

# Or try executing a simple query
try:
df = ctx.sql("SELECT * FROM test LIMIT 1")
assert df is not None
print(df)
except Exception as e:
pytest.fail(f"Failed to query table: {e}")
4 changes: 3 additions & 1 deletion crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ keywords = ["iceberg", "integrations", "datafusion"]
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
datafusion = { version = "43" }
datafusion = { path = "/Users/kevinliu/repos/datafusion/datafusion/core" }
futures = { workspace = true }
iceberg = { workspace = true }
tokio = { workspace = true }
pyo3 = { version = "0.22.4", features = ["extension-module", "abi3"] }
arrow = { version = "53.3.0", features = ["pyarrow"] }

[dev-dependencies]
iceberg-catalog-memory = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ pub use error::*;

mod physical_plan;
mod schema;
mod table;
pub mod table;
pub use table::table_provider_factory::IcebergTableProviderFactory;
pub use table::*;
6 changes: 4 additions & 2 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayAs, ExecutionPlan, Partitioning, PlanProperties,
};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::prelude::Expr;
use futures::{Stream, TryStreamExt};
use iceberg::expr::Predicate;
Expand Down Expand Up @@ -88,7 +89,8 @@ impl IcebergTableScan {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
Expand Down
Loading