diff --git a/Cargo.toml b/Cargo.toml index cd13bfc816f..ab4f21a8647 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "rust/lance-test-macros", "rust/lance-testing", "rust/lance-tools", + "rust/lance-c", "rust/compression/fsst", "rust/compression/bitpacking", ] diff --git a/rust/lance-c/Cargo.toml b/rust/lance-c/Cargo.toml new file mode 100644 index 00000000000..303ddc5b549 --- /dev/null +++ b/rust/lance-c/Cargo.toml @@ -0,0 +1,39 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +[package] +name = "lance-c" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +description = "C/C++ bindings for the Lance columnar data format" +keywords = ["lance", "ffi", "c", "cpp", "arrow"] +categories = ["database-implementations", "external-ffi-bindings"] +rust-version.workspace = true + +[lib] +crate-type = ["cdylib", "staticlib", "rlib"] + +[dependencies] +lance = { workspace = true } +lance-core = { workspace = true } +lance-io = { workspace = true } +arrow = { workspace = true } +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +tokio = { version = "1", features = ["rt-multi-thread", "sync"] } +futures = { workspace = true } +log = { workspace = true } +pin-project = { workspace = true } +snafu = "0.8" + +[dev-dependencies] +lance = { workspace = true } +lance-datagen = { workspace = true } +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +tempfile = "3" diff --git a/rust/lance-c/build.rs b/rust/lance-c/build.rs new file mode 100644 index 00000000000..dd4c4da460c --- /dev/null +++ b/rust/lance-c/build.rs @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Build script for lance-c. +//! +//! Optionally generates `include/lance.h` via cbindgen. +//! If cbindgen is not available, the pre-committed header is used. + +fn main() { + // cbindgen header generation is optional. + // Run `cargo install cbindgen && cbindgen --crate lance-c -o include/lance.h` + // to regenerate the header manually. +} diff --git a/rust/lance-c/include/lance.h b/rust/lance-c/include/lance.h new file mode 100644 index 00000000000..ded56160bc7 --- /dev/null +++ b/rust/lance-c/include/lance.h @@ -0,0 +1,264 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/* SPDX-FileCopyrightText: Copyright The Lance Authors */ + +/** + * @file lance.h + * @brief C API for the Lance columnar data format. + * + * All data crosses this boundary via the Arrow C Data Interface + * (ArrowSchema, ArrowArray, ArrowArrayStream). + * + * Error handling uses thread-local storage: after any function returns + * NULL (pointer) or -1 (int), call lance_last_error_code() and + * lance_last_error_message() to get details. + */ + +#ifndef LANCE_H +#define LANCE_H + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* ─── Arrow C Data Interface forward declarations ─── */ +/* These match the canonical Arrow spec structs. If you already include + arrow/c/abi.h, guard with ARROW_C_DATA_INTERFACE. */ + +#ifndef ARROW_C_DATA_INTERFACE +#define ARROW_C_DATA_INTERFACE + +struct ArrowSchema { + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct ArrowSchema** children; + struct ArrowSchema* dictionary; + void (*release)(struct ArrowSchema*); + void* private_data; +}; + +struct ArrowArray { + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct ArrowArray** children; + struct ArrowArray* dictionary; + void (*release)(struct ArrowArray*); + void* private_data; +}; + +struct ArrowArrayStream { + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + const char* (*get_last_error)(struct ArrowArrayStream*); + void (*release)(struct ArrowArrayStream*); + void* private_data; +}; + +#endif /* ARROW_C_DATA_INTERFACE */ + +/* ─── Error handling ─── */ + +typedef enum { + LANCE_OK = 0, + LANCE_ERR_INVALID_ARGUMENT = 1, + LANCE_ERR_IO = 2, + LANCE_ERR_NOT_FOUND = 3, + LANCE_ERR_DATASET_ALREADY_EXISTS = 4, + LANCE_ERR_INDEX = 5, + LANCE_ERR_INTERNAL = 6, + LANCE_ERR_NOT_SUPPORTED = 7, + LANCE_ERR_COMMIT_CONFLICT = 8, +} LanceErrorCode; + +/** Return the error code from the last failed operation on this thread. */ +LanceErrorCode lance_last_error_code(void); + +/** Return the error message. Caller must free with lance_free_string(). */ +const char* lance_last_error_message(void); + +/** Free a string returned by lance_last_error_message(). */ +void lance_free_string(const char* s); + +/* ─── Opaque handles ─── */ + +typedef struct LanceDataset LanceDataset; +typedef struct LanceScanner LanceScanner; +typedef struct LanceBatch LanceBatch; + +/* ─── Dataset lifecycle ─── */ + +/** + * Open a Lance dataset. + * + * @param uri Dataset path (file://, s3://, memory://, etc.) + * @param storage_opts NULL-terminated key-value pairs ["k1","v1",NULL], or NULL + * @param version Version to open (0 = latest) + * @return Dataset handle, or NULL on error + */ +LanceDataset* lance_dataset_open( + const char* uri, + const char* const* storage_opts, + uint64_t version +); + +/** Close and free a dataset handle. Safe to call with NULL. */ +void lance_dataset_close(LanceDataset* dataset); + +/* ─── Dataset metadata (sync, in-memory) ─── */ + +/** Return the version number of this dataset snapshot. */ +uint64_t lance_dataset_version(const LanceDataset* dataset); + +/** Return the number of rows. Returns 0 on error. */ +uint64_t lance_dataset_count_rows(const LanceDataset* dataset); + +/** Return the latest version ID (I/O). Returns 0 on error. */ +uint64_t lance_dataset_latest_version(const LanceDataset* dataset); + +/** + * Export the dataset schema via Arrow C Data Interface. + * @param out Pointer to caller-allocated ArrowSchema struct + * @return 0 on success, -1 on error + */ +int32_t lance_dataset_schema( + const LanceDataset* dataset, + struct ArrowSchema* out +); + +/* ─── Random access ─── */ + +/** + * Take rows by indices. + * @param indices Array of 0-based row offsets + * @param num_indices Length of indices array + * @param columns NULL-terminated column names, or NULL for all + * @param out Pointer to caller-allocated ArrowArrayStream + * @return 0 on success, -1 on error + */ +int32_t lance_dataset_take( + const LanceDataset* dataset, + const uint64_t* indices, + size_t num_indices, + const char* const* columns, + struct ArrowArrayStream* out +); + +/* ─── Scanner builder ─── */ + +/** + * Create a scanner for the dataset. + * @param dataset Open dataset (not consumed) + * @param columns NULL-terminated column names, or NULL for all + * @param filter SQL filter expression, or NULL + * @return Scanner handle, or NULL on error + */ +LanceScanner* lance_scanner_new( + const LanceDataset* dataset, + const char* const* columns, + const char* filter +); + +int32_t lance_scanner_set_limit(LanceScanner* scanner, int64_t limit); +int32_t lance_scanner_set_offset(LanceScanner* scanner, int64_t offset); +int32_t lance_scanner_set_batch_size(LanceScanner* scanner, int64_t batch_size); +int32_t lance_scanner_with_row_id(LanceScanner* scanner, bool enable); + +/** Close and free a scanner handle. */ +void lance_scanner_close(LanceScanner* scanner); + +/* ─── Sync scan: ArrowArrayStream ─── */ + +/** + * Materialize the scan as an ArrowArrayStream (blocking). + * @return 0 on success, -1 on error + */ +int32_t lance_scanner_to_arrow_stream( + LanceScanner* scanner, + struct ArrowArrayStream* out +); + +/* ─── Sync scan: batch iteration ─── */ + +/** + * Read the next batch (blocking). + * @param out Set to a LanceBatch* on success, NULL on end/error + * @return 0 = batch available, 1 = end of stream, -1 = error + */ +int32_t lance_scanner_next( + LanceScanner* scanner, + LanceBatch** out +); + +/* ─── Async scan: callback-based ─── */ + +/** + * Callback type for async operations. + * @param ctx Opaque pointer passed back from the caller + * @param status 0 = success, -1 = error + * @param result Operation-specific result (e.g., ArrowArrayStream*) + */ +typedef void (*LanceCallback)(void* ctx, int32_t status, void* result); + +/** + * Start an async scan. The callback fires on a dedicated dispatcher thread + * when the ArrowArrayStream is ready. + */ +void lance_scanner_scan_async( + const LanceScanner* scanner, + LanceCallback callback, + void* callback_ctx +); + +/* ─── Poll-based scan (for cooperative async runtimes) ─── */ + +typedef enum { + LANCE_POLL_READY = 0, + LANCE_POLL_PENDING = 1, + LANCE_POLL_FINISHED = 2, + LANCE_POLL_ERROR = -1, +} LancePollStatus; + +/** Waker callback: called from a Tokio thread when data is ready. */ +typedef void (*LanceWaker)(void* ctx); + +/** + * Poll for the next batch without blocking. + * See RFC for usage pattern. + */ +LancePollStatus lance_scanner_poll_next( + LanceScanner* scanner, + LanceWaker waker, + void* waker_ctx, + LanceBatch** out +); + +/* ─── Batch (Arrow C Data Interface) ─── */ + +/** + * Export a batch as Arrow C Data Interface structs. + * @return 0 on success, -1 on error + */ +int32_t lance_batch_to_arrow( + const LanceBatch* batch, + struct ArrowArray* out_array, + struct ArrowSchema* out_schema +); + +/** Free a batch handle. */ +void lance_batch_free(LanceBatch* batch); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* LANCE_H */ diff --git a/rust/lance-c/include/lance.hpp b/rust/lance-c/include/lance.hpp new file mode 100644 index 00000000000..a22ba2d2e64 --- /dev/null +++ b/rust/lance-c/include/lance.hpp @@ -0,0 +1,247 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/* SPDX-FileCopyrightText: Copyright The Lance Authors */ + +/** + * @file lance.hpp + * @brief C++ RAII wrappers for the Lance C API. + * + * Header-only library providing: + * - lance::Error exception class + * - lance::Dataset RAII handle with builder-pattern Scanner + * - lance::Scanner fluent API + * - All data exchange via Arrow C Data Interface + */ + +#ifndef LANCE_HPP +#define LANCE_HPP + +#include "lance.h" + +#include +#include +#include +#include +#include + +namespace lance { + +// ─── Error ─────────────────────────────────────────────────────────────────── + +class Error : public std::runtime_error { +public: + LanceErrorCode code; + + Error(LanceErrorCode code, std::string msg) + : std::runtime_error(std::move(msg)), code(code) {} +}; + +/// Check thread-local error and throw if non-OK. +inline void check_error() { + LanceErrorCode code = lance_last_error_code(); + if (code != LANCE_OK) { + const char* msg = lance_last_error_message(); + std::string owned(msg ? msg : "Unknown error"); + if (msg) lance_free_string(msg); + throw Error(code, std::move(owned)); + } +} + +// ─── RAII Handle Template ──────────────────────────────────────────────────── + +template +class Handle { + T* ptr_; + +public: + explicit Handle(T* ptr = nullptr) : ptr_(ptr) {} + ~Handle() { + if (ptr_) Deleter(ptr_); + } + + Handle(Handle&& o) noexcept : ptr_(o.ptr_) { o.ptr_ = nullptr; } + Handle& operator=(Handle&& o) noexcept { + if (this != &o) { + if (ptr_) Deleter(ptr_); + ptr_ = o.ptr_; + o.ptr_ = nullptr; + } + return *this; + } + + Handle(const Handle&) = delete; + Handle& operator=(const Handle&) = delete; + + T* get() const { return ptr_; } + T* release() { + auto p = ptr_; + ptr_ = nullptr; + return p; + } + explicit operator bool() const { return ptr_ != nullptr; } +}; + +// ─── Forward Declarations ──────────────────────────────────────────────────── + +class Scanner; + +// ─── Dataset ───────────────────────────────────────────────────────────────── + +class Dataset { + Handle handle_; + +public: + /// Open a dataset at the given URI. + static Dataset open( + const std::string& uri, + const std::vector>& storage_opts = {}, + uint64_t version = 0) { + + // Build NULL-terminated key-value array for storage options. + std::vector kv; + for (auto& [k, v] : storage_opts) { + kv.push_back(k.c_str()); + kv.push_back(v.c_str()); + } + kv.push_back(nullptr); + + const char* const* opts_ptr = + storage_opts.empty() ? nullptr : kv.data(); + + auto* ds = lance_dataset_open(uri.c_str(), opts_ptr, version); + if (!ds) check_error(); + return Dataset(ds); + } + + /// Number of rows in the dataset. + uint64_t count_rows() const { + uint64_t n = lance_dataset_count_rows(handle_.get()); + if (lance_last_error_code() != LANCE_OK) check_error(); + return n; + } + + /// Version of this dataset snapshot. + uint64_t version() const { + return lance_dataset_version(handle_.get()); + } + + /// Latest version ID (queries object store). + uint64_t latest_version() const { + uint64_t v = lance_dataset_latest_version(handle_.get()); + if (lance_last_error_code() != LANCE_OK) check_error(); + return v; + } + + /// Export the schema as an Arrow C Data Interface struct. + void schema(ArrowSchema* out) const { + if (lance_dataset_schema(handle_.get(), out) != 0) { + check_error(); + } + } + + /// Take rows by indices. Results exported as ArrowArrayStream. + void take(const uint64_t* indices, size_t num_indices, + const std::vector& columns, + ArrowArrayStream* out) const { + std::vector col_ptrs; + for (auto& c : columns) col_ptrs.push_back(c.c_str()); + col_ptrs.push_back(nullptr); + const char* const* cols_ptr = columns.empty() ? nullptr : col_ptrs.data(); + + if (lance_dataset_take(handle_.get(), indices, num_indices, cols_ptr, out) != 0) { + check_error(); + } + } + + /// Take all columns. + void take(const uint64_t* indices, size_t num_indices, + ArrowArrayStream* out) const { + if (lance_dataset_take(handle_.get(), indices, num_indices, nullptr, out) != 0) { + check_error(); + } + } + + /// Create a Scanner builder for this dataset. + Scanner scan() const; + + /// Access the underlying C handle (does not transfer ownership). + const LanceDataset* c_handle() const { return handle_.get(); } + +private: + explicit Dataset(LanceDataset* ptr) : handle_(ptr) {} +}; + +// ─── Scanner ───────────────────────────────────────────────────────────────── + +class Scanner { + Handle handle_; + +public: + explicit Scanner(LanceScanner* s) : handle_(s) {} + + /// Set the row limit. + Scanner& limit(int64_t n) { + if (lance_scanner_set_limit(handle_.get(), n) != 0) + check_error(); + return *this; + } + + /// Set the row offset. + Scanner& offset(int64_t n) { + if (lance_scanner_set_offset(handle_.get(), n) != 0) + check_error(); + return *this; + } + + /// Set the batch size. + Scanner& batch_size(int64_t n) { + if (lance_scanner_set_batch_size(handle_.get(), n) != 0) + check_error(); + return *this; + } + + /// Enable/disable row ID in output. + Scanner& with_row_id(bool enable = true) { + if (lance_scanner_with_row_id(handle_.get(), enable) != 0) + check_error(); + return *this; + } + + /// Materialize the scan as an ArrowArrayStream (blocking). + void to_arrow_stream(ArrowArrayStream* out) { + if (lance_scanner_to_arrow_stream(handle_.get(), out) != 0) + check_error(); + } + + /// Start an async scan. Callback fires when ArrowArrayStream is ready. + void scan_async(LanceCallback callback, void* ctx) const { + lance_scanner_scan_async(handle_.get(), callback, ctx); + } + + /// Access the underlying C handle. + LanceScanner* c_handle() { return handle_.get(); } +}; + +inline Scanner Dataset::scan() const { + auto* s = lance_scanner_new(handle_.get(), nullptr, nullptr); + if (!s) check_error(); + return Scanner(s); +} + +// ─── Batch ─────────────────────────────────────────────────────────────────── + +class Batch { + Handle handle_; + +public: + explicit Batch(LanceBatch* b) : handle_(b) {} + + /// Export as Arrow C Data Interface structs. + void to_arrow(ArrowArray* out_array, ArrowSchema* out_schema) const { + if (lance_batch_to_arrow(handle_.get(), out_array, out_schema) != 0) + check_error(); + } +}; + +} // namespace lance + +#endif /* LANCE_HPP */ diff --git a/rust/lance-c/src/async_dispatcher.rs b/rust/lance-c/src/async_dispatcher.rs new file mode 100644 index 00000000000..e8fa432cb71 --- /dev/null +++ b/rust/lance-c/src/async_dispatcher.rs @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Async callback dispatcher for non-blocking scan operations. +//! +//! Inspired by the Java JNI dispatcher (PR #6102). A dedicated background thread +//! receives completion messages from Tokio tasks and invokes C callbacks +//! sequentially, avoiding reentrancy and Tokio thread blocking. + +use std::ffi::c_void; +use std::sync::{mpsc, LazyLock}; + +/// C callback function pointer type for async operations. +/// - `ctx`: opaque pointer passed back to the caller +/// - `status`: 0 = success, -1 = error (check `lance_last_error_*`) +/// - `result`: operation-specific result pointer (e.g., `*mut ArrowArrayStream`) +pub type LanceCallback = unsafe extern "C" fn(ctx: *mut c_void, status: i32, result: *mut c_void); + +// Safety: LanceCallback is a C function pointer (Send by definition for FFI). +// The ctx pointer is transferred to the dispatcher thread which calls the callback. +unsafe impl Send for DispatcherMessage {} + +pub(crate) struct DispatcherMessage { + pub callback: LanceCallback, + pub callback_ctx: *mut c_void, + pub status: i32, + pub result: *mut c_void, +} + +struct Dispatcher { + tx: mpsc::Sender, +} + +impl Dispatcher { + fn new() -> Self { + let (tx, rx) = mpsc::channel::(); + + std::thread::Builder::new() + .name("lance-c-dispatcher".to_string()) + .spawn(move || { + log::debug!("Lance C dispatcher thread started"); + while let Ok(msg) = rx.recv() { + // Invoke the C callback on this dedicated thread. + // This ensures callbacks are serialized and don't run on Tokio I/O threads. + unsafe { + (msg.callback)(msg.callback_ctx, msg.status, msg.result); + } + } + log::debug!("Lance C dispatcher thread shutting down"); + }) + .expect("Failed to spawn lance-c dispatcher thread"); + + Self { tx } + } + + fn send(&self, msg: DispatcherMessage) { + let _ = self.tx.send(msg); + } +} + +static DISPATCHER: LazyLock = LazyLock::new(Dispatcher::new); + +/// Send a completion message to the dispatcher thread, which will invoke the callback. +pub(crate) fn dispatch_callback( + callback: LanceCallback, + callback_ctx: *mut c_void, + status: i32, + result: *mut c_void, +) { + DISPATCHER.send(DispatcherMessage { + callback, + callback_ctx, + status, + result, + }); +} diff --git a/rust/lance-c/src/batch.rs b/rust/lance-c/src/batch.rs new file mode 100644 index 00000000000..3a6d57e6105 --- /dev/null +++ b/rust/lance-c/src/batch.rs @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! LanceBatch C API: Arrow C Data Interface export. + +use std::ptr; + +use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +use arrow_array::RecordBatch; +use lance_core::Result; + +use crate::error::ffi_try; + +/// Opaque handle wrapping an Arrow RecordBatch. +pub struct LanceBatch { + pub(crate) inner: RecordBatch, +} + +/// Export a `LanceBatch` as Arrow C Data Interface structs. +/// +/// Writes the array data to `out_array` and the schema to `out_schema`. +/// Returns 0 on success, -1 on error. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_batch_to_arrow( + batch: *const LanceBatch, + out_array: *mut FFI_ArrowArray, + out_schema: *mut FFI_ArrowSchema, +) -> i32 { + ffi_try!( + unsafe { batch_to_arrow_inner(batch, out_array, out_schema) }, + neg + ) +} + +unsafe fn batch_to_arrow_inner( + batch: *const LanceBatch, + out_array: *mut FFI_ArrowArray, + out_schema: *mut FFI_ArrowSchema, +) -> Result { + if batch.is_null() || out_array.is_null() || out_schema.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "batch, out_array, and out_schema must not be NULL".into(), + location: snafu::location!(), + }); + } + let b = unsafe { &*batch }; + let struct_array: arrow_array::StructArray = b.inner.clone().into(); + let (ffi_array, ffi_schema) = arrow::ffi::to_ffi(&struct_array.into())?; + unsafe { + ptr::write_unaligned(out_array, ffi_array); + ptr::write_unaligned(out_schema, ffi_schema); + } + Ok(0) +} + +/// Free a `LanceBatch` handle. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_batch_free(batch: *mut LanceBatch) { + if !batch.is_null() { + unsafe { + let _ = Box::from_raw(batch); + } + } +} diff --git a/rust/lance-c/src/dataset.rs b/rust/lance-c/src/dataset.rs new file mode 100644 index 00000000000..cc2fc32d452 --- /dev/null +++ b/rust/lance-c/src/dataset.rs @@ -0,0 +1,242 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Dataset C API: open, close, metadata, schema, take. + +use std::ffi::c_char; +use std::sync::Arc; + +use arrow::ffi::FFI_ArrowSchema; +use arrow::ffi_stream::FFI_ArrowArrayStream; +use arrow_schema::Schema as ArrowSchema; +use lance::dataset::builder::DatasetBuilder; +use lance::Dataset; +use lance_core::Result; + +use crate::error::{ffi_try, set_last_error, LanceErrorCode}; +use crate::helpers; +use crate::runtime::block_on; + +/// Opaque handle representing an opened Lance dataset. +pub struct LanceDataset { + pub(crate) inner: Arc, +} + +// --------------------------------------------------------------------------- +// Dataset lifecycle +// --------------------------------------------------------------------------- + +/// Open a Lance dataset at the given URI. +/// +/// - `uri`: Dataset path (file://, s3://, az://, gs://, memory://) +/// - `storage_options`: NULL-terminated key-value pairs `["k1","v1","k2","v2",NULL]`, or NULL. +/// - `version`: Dataset version to open. Pass 0 for latest. +/// +/// Returns an opaque `LanceDataset*` on success, or NULL on error. +/// On error, call `lance_last_error_code()` / `lance_last_error_message()`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_open( + uri: *const c_char, + storage_options: *const *const c_char, + version: u64, +) -> *mut LanceDataset { + ffi_try!( + unsafe { open_dataset_inner(uri, storage_options, version) }, + null + ) +} + +unsafe fn open_dataset_inner( + uri: *const c_char, + storage_options: *const *const c_char, + version: u64, +) -> Result<*mut LanceDataset> { + let uri_str = unsafe { helpers::parse_c_string(uri)? }.ok_or_else(|| { + lance_core::Error::InvalidInput { + source: "uri must not be NULL".into(), + location: snafu::location!(), + } + })?; + + let opts = unsafe { helpers::parse_storage_options(storage_options)? }; + + let mut builder = DatasetBuilder::from_uri(uri_str); + if !opts.is_empty() { + builder = builder.with_storage_options(opts); + } + if version != 0 { + builder = builder.with_version(version); + } + + let dataset = block_on(builder.load())?; + let handle = LanceDataset { + inner: Arc::new(dataset), + }; + Ok(Box::into_raw(Box::new(handle))) +} + +/// Close and free a dataset handle. +/// Safe to call with NULL. Safe to call multiple times (subsequent calls are no-ops). +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_close(dataset: *mut LanceDataset) { + if !dataset.is_null() { + unsafe { + let _ = Box::from_raw(dataset); + } + } +} + +// --------------------------------------------------------------------------- +// Metadata (in-memory, sync only) +// --------------------------------------------------------------------------- + +/// Return the version number of this dataset snapshot. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_version(dataset: *const LanceDataset) -> u64 { + if dataset.is_null() { + set_last_error(LanceErrorCode::InvalidArgument, "dataset is NULL"); + return 0; + } + let ds = unsafe { &*dataset }; + ds.inner.version().version +} + +/// Return the number of rows in the dataset. +/// Returns 0 on error — check `lance_last_error_code()`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_count_rows(dataset: *const LanceDataset) -> u64 { + if dataset.is_null() { + set_last_error(LanceErrorCode::InvalidArgument, "dataset is NULL"); + return 0; + } + let ds = unsafe { &*dataset }; + match block_on(ds.inner.count_rows(None)) { + Ok(n) => { + crate::error::clear_last_error(); + n as u64 + } + Err(err) => { + crate::error::set_lance_error(&err); + 0 + } + } +} + +/// Return the latest version ID of the dataset. +/// Returns 0 on error — check `lance_last_error_code()`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_latest_version(dataset: *const LanceDataset) -> u64 { + if dataset.is_null() { + set_last_error(LanceErrorCode::InvalidArgument, "dataset is NULL"); + return 0; + } + let ds = unsafe { &*dataset }; + match block_on(ds.inner.latest_version_id()) { + Ok(v) => { + crate::error::clear_last_error(); + v + } + Err(err) => { + crate::error::set_lance_error(&err); + 0 + } + } +} + +// --------------------------------------------------------------------------- +// Schema (Arrow C Data Interface) +// --------------------------------------------------------------------------- + +/// Export the dataset schema as an Arrow C Data Interface `ArrowSchema`. +/// +/// The caller must provide a pointer to a stack-allocated `ArrowSchema` struct. +/// Returns 0 on success, -1 on error. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_schema( + dataset: *const LanceDataset, + out: *mut FFI_ArrowSchema, +) -> i32 { + ffi_try!(unsafe { dataset_schema_inner(dataset, out) }, neg) +} + +unsafe fn dataset_schema_inner( + dataset: *const LanceDataset, + out: *mut FFI_ArrowSchema, +) -> Result { + if dataset.is_null() || out.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "dataset and out must not be NULL".into(), + location: snafu::location!(), + }); + } + let ds = unsafe { &*dataset }; + let lance_schema = ds.inner.schema(); + let arrow_schema: ArrowSchema = lance_schema.into(); + let ffi_schema = FFI_ArrowSchema::try_from(&arrow_schema)?; + unsafe { + std::ptr::write_unaligned(out, ffi_schema); + } + Ok(0) +} + +// --------------------------------------------------------------------------- +// Random access (take) +// --------------------------------------------------------------------------- + +/// Take rows by indices, returning results as an ArrowArrayStream. +/// +/// - `indices`: array of row indices (0-based offsets) +/// - `num_indices`: length of the indices array +/// - `columns`: NULL-terminated column name array, or NULL for all columns +/// - `out`: pointer to a stack-allocated `ArrowArrayStream` +/// +/// Returns 0 on success, -1 on error. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_take( + dataset: *const LanceDataset, + indices: *const u64, + num_indices: usize, + columns: *const *const c_char, + out: *mut FFI_ArrowArrayStream, +) -> i32 { + ffi_try!( + unsafe { dataset_take_inner(dataset, indices, num_indices, columns, out) }, + neg + ) +} + +unsafe fn dataset_take_inner( + dataset: *const LanceDataset, + indices: *const u64, + num_indices: usize, + columns: *const *const c_char, + out: *mut FFI_ArrowArrayStream, +) -> Result { + if dataset.is_null() || indices.is_null() || out.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "dataset, indices, and out must not be NULL".into(), + location: snafu::location!(), + }); + } + let ds = unsafe { &*dataset }; + let idx_slice = unsafe { std::slice::from_raw_parts(indices, num_indices) }; + let col_names = unsafe { helpers::parse_c_string_array(columns)? }; + + let projection = match &col_names { + Some(cols) => { + lance::dataset::ProjectionRequest::from_columns(cols.iter(), ds.inner.schema()) + } + None => lance::dataset::ProjectionRequest::from_schema(ds.inner.schema().clone()), + }; + + let batch = block_on(ds.inner.take(idx_slice, projection))?; + + // Wrap the single RecordBatch as a RecordBatchReader, then export as FFI stream. + let schema = batch.schema(); + let reader = arrow::record_batch::RecordBatchIterator::new(vec![Ok(batch)], schema); + let ffi_stream = FFI_ArrowArrayStream::new(Box::new(reader)); + unsafe { + std::ptr::write_unaligned(out, ffi_stream); + } + Ok(0) +} diff --git a/rust/lance-c/src/error.rs b/rust/lance-c/src/error.rs new file mode 100644 index 00000000000..12a6bb89c45 --- /dev/null +++ b/rust/lance-c/src/error.rs @@ -0,0 +1,144 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Thread-local error handling for FFI. +//! +//! After any C function returns an error indicator (NULL pointer or negative int), +//! the caller retrieves the error code and message from thread-local storage. + +use std::cell::RefCell; +use std::ffi::{c_char, CString}; +use std::ptr; + +/// Error codes returned by `lance_last_error_code()`. +#[repr(C)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum LanceErrorCode { + Ok = 0, + InvalidArgument = 1, + IoError = 2, + NotFound = 3, + DatasetAlreadyExists = 4, + IndexError = 5, + Internal = 6, + NotSupported = 7, + CommitConflict = 8, +} + +struct LastError { + code: LanceErrorCode, + message: CString, +} + +thread_local! { + static LAST_ERROR: RefCell> = const { RefCell::new(None) }; +} + +pub fn clear_last_error() { + LAST_ERROR.with(|e| { + *e.borrow_mut() = None; + }); +} + +pub fn set_last_error(code: LanceErrorCode, message: impl AsRef) { + let message = match CString::new(message.as_ref()) { + Ok(v) => v, + Err(_) => CString::new(message.as_ref().replace('\0', "\\0")) + .unwrap_or_else(|_| CString::new("invalid error message").unwrap()), + }; + LAST_ERROR.with(|e| { + *e.borrow_mut() = Some(LastError { code, message }); + }); +} + +/// Map a `lance_core::Error` to an `LanceErrorCode`. +pub fn error_code_from_lance(err: &lance_core::Error) -> LanceErrorCode { + use lance_core::Error; + match err { + Error::InvalidInput { .. } => LanceErrorCode::InvalidArgument, + Error::DatasetAlreadyExists { .. } => LanceErrorCode::DatasetAlreadyExists, + Error::CommitConflict { .. } => LanceErrorCode::CommitConflict, + Error::DatasetNotFound { .. } | Error::NotFound { .. } => LanceErrorCode::NotFound, + Error::IO { .. } => LanceErrorCode::IoError, + Error::Index { .. } => LanceErrorCode::IndexError, + Error::NotSupported { .. } => LanceErrorCode::NotSupported, + _ => LanceErrorCode::Internal, + } +} + +/// Set the thread-local error from a `lance_core::Error`. +pub fn set_lance_error(err: &lance_core::Error) { + set_last_error(error_code_from_lance(err), err.to_string()); +} + +// --------------------------------------------------------------------------- +// Public C API +// --------------------------------------------------------------------------- + +/// Return the error code from the last failed operation on this thread. +/// Returns `LanceErrorCode::Ok` if no error is pending. +#[unsafe(no_mangle)] +pub extern "C" fn lance_last_error_code() -> LanceErrorCode { + LAST_ERROR.with(|e| { + e.borrow() + .as_ref() + .map(|v| v.code) + .unwrap_or(LanceErrorCode::Ok) + }) +} + +/// Return the error message from the last failed operation on this thread. +/// The caller must free the returned string with `lance_free_string()`. +/// Returns NULL if no error is pending. +#[unsafe(no_mangle)] +pub extern "C" fn lance_last_error_message() -> *const c_char { + LAST_ERROR.with(|e| match e.borrow_mut().take() { + Some(err) => err.message.into_raw() as *const c_char, + None => ptr::null(), + }) +} + +/// Free a string returned by `lance_last_error_message()`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_free_string(s: *const c_char) { + if !s.is_null() { + unsafe { + let _ = CString::from_raw(s as *mut c_char); + } + } +} + +// --------------------------------------------------------------------------- +// Helper macro for FFI functions +// --------------------------------------------------------------------------- + +/// Wrap an FFI function body: on error, set thread-local error and return $err_val. +/// On success, clear the error and return the value. +macro_rules! ffi_try { + ($body:expr, null) => { + match $body { + Ok(val) => { + $crate::error::clear_last_error(); + val + } + Err(err) => { + $crate::error::set_lance_error(&err); + std::ptr::null_mut() + } + } + }; + ($body:expr, neg) => { + match $body { + Ok(val) => { + $crate::error::clear_last_error(); + val + } + Err(err) => { + $crate::error::set_lance_error(&err); + -1 + } + } + }; +} + +pub(crate) use ffi_try; diff --git a/rust/lance-c/src/helpers.rs b/rust/lance-c/src/helpers.rs new file mode 100644 index 00000000000..ce348400e9a --- /dev/null +++ b/rust/lance-c/src/helpers.rs @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! C string parsing utilities. + +use std::collections::HashMap; +use std::ffi::{c_char, CStr}; + +use lance_core::{Error, Result}; +use snafu::location; + +/// Parse a nullable C string pointer into an `Option<&str>`. +pub unsafe fn parse_c_string<'a>(ptr: *const c_char) -> Result> { + if ptr.is_null() { + return Ok(None); + } + let cstr = unsafe { CStr::from_ptr(ptr) }; + let s = cstr.to_str().map_err(|e| Error::InvalidInput { + source: Box::new(e), + location: location!(), + })?; + Ok(Some(s)) +} + +/// Parse a NULL-terminated array of C strings into `Option>`. +/// If `ptr` is NULL, returns `None` (meaning "all columns" / "no filter"). +/// The array must end with a NULL pointer sentinel. +pub unsafe fn parse_c_string_array(ptr: *const *const c_char) -> Result>> { + if ptr.is_null() { + return Ok(None); + } + let mut result = Vec::new(); + let mut i = 0; + loop { + let entry = unsafe { *ptr.add(i) }; + if entry.is_null() { + break; + } + let s = unsafe { parse_c_string(entry)? }; + match s { + Some(s) => result.push(s.to_string()), + None => break, + } + i += 1; + } + Ok(Some(result)) +} + +/// Parse NULL-terminated key-value pairs into a `HashMap`. +/// Format: `["key1", "val1", "key2", "val2", NULL]` +/// If `ptr` is NULL, returns an empty map. +pub unsafe fn parse_storage_options(ptr: *const *const c_char) -> Result> { + let mut map = HashMap::new(); + if ptr.is_null() { + return Ok(map); + } + let mut i = 0; + loop { + let key_ptr = unsafe { *ptr.add(i) }; + if key_ptr.is_null() { + break; + } + let val_ptr = unsafe { *ptr.add(i + 1) }; + if val_ptr.is_null() { + return Err(Error::InvalidInput { + source: "storage options must be key-value pairs; odd number of entries".into(), + location: location!(), + }); + } + let key = unsafe { parse_c_string(key_ptr)? }.unwrap().to_string(); + let val = unsafe { parse_c_string(val_ptr)? }.unwrap().to_string(); + map.insert(key, val); + i += 2; + } + Ok(map) +} diff --git a/rust/lance-c/src/lib.rs b/rust/lance-c/src/lib.rs new file mode 100644 index 00000000000..7f686d2ca49 --- /dev/null +++ b/rust/lance-c/src/lib.rs @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! C/C++ bindings for the Lance columnar data format. +//! +//! This crate exposes Lance's functionality through a stable C-ABI with +//! opaque handle patterns and Arrow C Data Interface for zero-copy data exchange. +//! +//! # Safety +//! +//! All `extern "C"` functions in this crate follow the C FFI safety contract: +//! - Pointers must be valid and non-null (unless documented as nullable). +//! - Opaque handles must have been created by the corresponding `lance_*_open` +//! or `lance_*_new` function and must not be used after `lance_*_close`/`lance_*_free`. +//! - The caller is responsible for freeing returned strings with `lance_free_string()`. +#![allow(clippy::missing_safety_doc)] + +mod async_dispatcher; +mod batch; +mod dataset; +mod error; +mod helpers; +pub mod runtime; +mod scanner; + +// Re-export all extern "C" symbols so they appear in the cdylib. +pub use batch::*; +pub use dataset::*; +pub use error::{ + lance_free_string, lance_last_error_code, lance_last_error_message, LanceErrorCode, +}; +pub use scanner::*; diff --git a/rust/lance-c/src/runtime.rs b/rust/lance-c/src/runtime.rs new file mode 100644 index 00000000000..0153d3ff3ce --- /dev/null +++ b/rust/lance-c/src/runtime.rs @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Global Tokio runtime for the C FFI layer. + +use std::sync::LazyLock; + +/// Global multi-threaded Tokio runtime, shared across all FFI calls. +/// Initialized lazily on first access. +pub static RT: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to create tokio runtime") +}); + +/// Block the current thread on an async future using the global runtime. +pub fn block_on(f: F) -> F::Output { + RT.block_on(f) +} diff --git a/rust/lance-c/src/scanner.rs b/rust/lance-c/src/scanner.rs new file mode 100644 index 00000000000..166c6ae2f72 --- /dev/null +++ b/rust/lance-c/src/scanner.rs @@ -0,0 +1,520 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Scanner C API: builder, sync iteration, async scan, poll-based iteration. + +use std::ffi::{c_char, c_void}; +use std::pin::Pin; +use std::ptr; +use std::sync::Arc; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +use arrow::ffi_stream::FFI_ArrowArrayStream; +use arrow_schema::SchemaRef; +use futures::{Stream, StreamExt}; +use lance::dataset::scanner::DatasetRecordBatchStream; +use lance::Dataset; +use lance_core::Result; +use lance_io::ffi::to_ffi_arrow_array_stream; +use lance_io::stream::RecordBatchStream; + +use crate::async_dispatcher::{self, LanceCallback}; +use crate::batch::LanceBatch; +use crate::dataset::LanceDataset; +use crate::error::{clear_last_error, ffi_try, set_lance_error, set_last_error, LanceErrorCode}; +use crate::helpers; +use crate::runtime::{block_on, RT}; + +/// Opaque scanner handle. Stores configuration until stream materialization. +pub struct LanceScanner { + dataset: Arc, + columns: Option>, + filter: Option, + limit: Option, + offset: Option, + batch_size: Option, + with_row_id: bool, + // Materialized on first iteration call + stream: Option>>, + #[allow(dead_code)] + schema: Option, +} + +/// Poll status for `lance_scanner_poll_next`. +#[repr(C)] +#[derive(Debug, PartialEq, Eq)] +pub enum LancePollStatus { + /// Batch available in `*out`. + Ready = 0, + /// I/O in progress; waker will fire when ready. + Pending = 1, + /// End of stream. + Finished = 2, + /// Error occurred (check `lance_last_error_*`). + Error = -1, +} + +/// Waker callback type for poll-based iteration. +/// Called from a Tokio I/O thread when data becomes available. +/// Must be thread-safe and must NOT call back into `lance_scanner_*`. +pub type LanceWaker = unsafe extern "C" fn(ctx: *mut c_void); + +impl LanceScanner { + fn new(dataset: Arc) -> Self { + Self { + dataset, + columns: None, + filter: None, + limit: None, + offset: None, + batch_size: None, + with_row_id: false, + stream: None, + schema: None, + } + } + + /// Build the underlying Scanner and open a stream. + fn materialize_stream(&mut self) -> Result<()> { + let mut scanner = self.dataset.scan(); + if let Some(cols) = &self.columns { + scanner.project(cols)?; + } + if let Some(filter) = &self.filter { + scanner.filter(filter)?; + } + if self.limit.is_some() || self.offset.is_some() { + scanner.limit(self.limit, self.offset)?; + } + if let Some(bs) = self.batch_size { + scanner.batch_size(bs); + } + if self.with_row_id { + scanner.with_row_id(); + } + let stream = block_on(scanner.try_into_stream())?; + self.schema = Some(stream.schema()); + self.stream = Some(Box::pin(stream)); + Ok(()) + } + + /// Build a Scanner (without materializing) and return it. + fn build_scanner(&self) -> Result { + let mut scanner = self.dataset.scan(); + if let Some(cols) = &self.columns { + scanner.project(cols)?; + } + if let Some(filter) = &self.filter { + scanner.filter(filter)?; + } + if self.limit.is_some() || self.offset.is_some() { + scanner.limit(self.limit, self.offset)?; + } + if let Some(bs) = self.batch_size { + scanner.batch_size(bs); + } + if self.with_row_id { + scanner.with_row_id(); + } + Ok(scanner) + } +} + +// --------------------------------------------------------------------------- +// Scanner lifecycle + builder +// --------------------------------------------------------------------------- + +/// Create a new scanner for the given dataset. +/// +/// - `dataset`: An open `LanceDataset*` (not consumed; remains valid). +/// - `columns`: NULL-terminated column name array, or NULL for all columns. +/// - `filter`: SQL filter expression, or NULL for no filter. +/// +/// Returns a `LanceScanner*` on success, or NULL on error. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_new( + dataset: *const LanceDataset, + columns: *const *const c_char, + filter: *const c_char, +) -> *mut LanceScanner { + ffi_try!(unsafe { scanner_new_inner(dataset, columns, filter) }, null) +} + +unsafe fn scanner_new_inner( + dataset: *const LanceDataset, + columns: *const *const c_char, + filter: *const c_char, +) -> Result<*mut LanceScanner> { + if dataset.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "dataset must not be NULL".into(), + location: snafu::location!(), + }); + } + let ds = unsafe { &*dataset }; + let col_names = unsafe { helpers::parse_c_string_array(columns)? }; + let filter_str = unsafe { helpers::parse_c_string(filter)? }.map(|s| s.to_string()); + + let mut scanner = LanceScanner::new(ds.inner.clone()); + scanner.columns = col_names; + scanner.filter = filter_str; + Ok(Box::into_raw(Box::new(scanner))) +} + +/// Set the row limit on the scanner. Returns 0. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_set_limit(scanner: *mut LanceScanner, limit: i64) -> i32 { + if scanner.is_null() { + set_last_error(LanceErrorCode::InvalidArgument, "scanner is NULL"); + return -1; + } + let s = unsafe { &mut *scanner }; + s.limit = Some(limit); + clear_last_error(); + 0 +} + +/// Set the row offset on the scanner. Returns 0. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_set_offset(scanner: *mut LanceScanner, offset: i64) -> i32 { + if scanner.is_null() { + set_last_error(LanceErrorCode::InvalidArgument, "scanner is NULL"); + return -1; + } + let s = unsafe { &mut *scanner }; + s.offset = Some(offset); + clear_last_error(); + 0 +} + +/// Set the batch size on the scanner. Returns 0. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_set_batch_size( + scanner: *mut LanceScanner, + batch_size: i64, +) -> i32 { + if scanner.is_null() { + set_last_error(LanceErrorCode::InvalidArgument, "scanner is NULL"); + return -1; + } + let s = unsafe { &mut *scanner }; + s.batch_size = Some(batch_size as usize); + clear_last_error(); + 0 +} + +/// Enable or disable row ID in scan output. Returns 0. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_with_row_id( + scanner: *mut LanceScanner, + enable: bool, +) -> i32 { + if scanner.is_null() { + set_last_error(LanceErrorCode::InvalidArgument, "scanner is NULL"); + return -1; + } + let s = unsafe { &mut *scanner }; + s.with_row_id = enable; + clear_last_error(); + 0 +} + +/// Close and free a scanner handle. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_close(scanner: *mut LanceScanner) { + if !scanner.is_null() { + unsafe { + let _ = Box::from_raw(scanner); + } + } +} + +// --------------------------------------------------------------------------- +// Sync stream: ArrowArrayStream export +// --------------------------------------------------------------------------- + +/// Materialize the scan as an Arrow C Data Interface `ArrowArrayStream`. +/// +/// This is the preferred API for simple integrations — blocks the calling thread. +/// The scanner is consumed by this call and should not be used afterward (close it). +/// +/// Returns 0 on success, -1 on error. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_to_arrow_stream( + scanner: *mut LanceScanner, + out: *mut FFI_ArrowArrayStream, +) -> i32 { + ffi_try!(unsafe { scanner_to_arrow_stream_inner(scanner, out) }, neg) +} + +unsafe fn scanner_to_arrow_stream_inner( + scanner: *mut LanceScanner, + out: *mut FFI_ArrowArrayStream, +) -> Result { + if scanner.is_null() || out.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "scanner and out must not be NULL".into(), + location: snafu::location!(), + }); + } + let s = unsafe { &*scanner }; + let built_scanner = s.build_scanner()?; + let stream = block_on(built_scanner.try_into_stream())?; + let ffi_stream = to_ffi_arrow_array_stream(stream, RT.handle().clone())?; + unsafe { + ptr::write_unaligned(out, ffi_stream); + } + Ok(0) +} + +// --------------------------------------------------------------------------- +// Sync iteration: blocking batch-at-a-time +// --------------------------------------------------------------------------- + +/// Read the next batch from the scanner (blocking). +/// +/// Returns: +/// - `0` — batch available, `*out` is set. +/// - `1` — end of stream, `*out` is NULL. +/// - `-1` — error (check `lance_last_error_*`), `*out` is NULL. +/// +/// The caller must free each returned batch with `lance_batch_free()`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_next( + scanner: *mut LanceScanner, + out: *mut *mut LanceBatch, +) -> i32 { + if scanner.is_null() || out.is_null() { + set_last_error( + LanceErrorCode::InvalidArgument, + "scanner and out must not be NULL", + ); + return -1; + } + let s = unsafe { &mut *scanner }; + + // Lazily materialize the stream on first call. + if s.stream.is_none() { + if let Err(err) = s.materialize_stream() { + set_lance_error(&err); + unsafe { *out = ptr::null_mut() }; + return -1; + } + } + + let stream = s.stream.as_mut().unwrap(); + match block_on(stream.next()) { + Some(Ok(batch)) => { + clear_last_error(); + let lance_batch = LanceBatch { inner: batch }; + unsafe { *out = Box::into_raw(Box::new(lance_batch)) }; + 0 + } + Some(Err(err)) => { + set_lance_error(&err); + unsafe { *out = ptr::null_mut() }; + -1 + } + None => { + // End of stream + clear_last_error(); + unsafe { *out = ptr::null_mut() }; + 1 + } + } +} + +// --------------------------------------------------------------------------- +// Async scan: callback-based +// --------------------------------------------------------------------------- + +/// Start an async scan. The callback is invoked on a dedicated dispatcher thread +/// when the ArrowArrayStream is ready. +/// +/// - `callback`: Called with `(ctx, 0, *mut ArrowArrayStream)` on success, +/// or `(ctx, -1, NULL)` on error (check `lance_last_error_*`). +/// - `callback_ctx`: Opaque pointer passed back to the callback. +/// +/// The scanner configuration is captured at call time. The scanner handle +/// can be closed immediately after this call. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_scan_async( + scanner: *const LanceScanner, + callback: LanceCallback, + callback_ctx: *mut c_void, +) { + if scanner.is_null() { + set_last_error(LanceErrorCode::InvalidArgument, "scanner is NULL"); + async_dispatcher::dispatch_callback(callback, callback_ctx, -1, ptr::null_mut()); + return; + } + + let s = unsafe { &*scanner }; + let built_scanner = match s.build_scanner() { + Ok(sc) => sc, + Err(err) => { + set_lance_error(&err); + async_dispatcher::dispatch_callback(callback, callback_ctx, -1, ptr::null_mut()); + return; + } + }; + + let handle = RT.handle().clone(); + + // Wrap non-Send raw pointers for the async task. + // Safety: The C caller guarantees callback_ctx remains valid until callback fires. + struct SendCallback { + callback: LanceCallback, + ctx: *mut c_void, + } + unsafe impl Send for SendCallback {} + + impl SendCallback { + fn dispatch(&self, status: i32, result: *mut c_void) { + async_dispatcher::dispatch_callback(self.callback, self.ctx, status, result); + } + } + + let send_cb = SendCallback { + callback, + ctx: callback_ctx, + }; + + RT.spawn(async move { + let result = built_scanner.try_into_stream().await; + match result { + Ok(stream) => match to_ffi_arrow_array_stream(stream, handle) { + Ok(ffi_stream) => { + let ptr = Box::into_raw(Box::new(ffi_stream)); + send_cb.dispatch(0, ptr as *mut c_void); + } + Err(err) => { + set_lance_error(&err); + send_cb.dispatch(-1, std::ptr::null_mut()); + } + }, + Err(err) => { + set_lance_error(&err); + send_cb.dispatch(-1, std::ptr::null_mut()); + } + } + }); +} + +// --------------------------------------------------------------------------- +// Poll-based iteration (for cooperative async runtimes) +// --------------------------------------------------------------------------- + +/// Poll for the next batch without blocking. +/// +/// - If data is already buffered, returns `LANCE_POLL_READY` immediately. +/// - If I/O is needed, returns `LANCE_POLL_PENDING` and schedules the waker callback. +/// The caller should yield the thread and re-poll after the waker fires. +/// - The waker is single-use: it fires at most once per poll call that returns PENDING. +/// +/// The stream is lazily materialized on the first poll call (which will typically +/// return PENDING while the stream opens). +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_scanner_poll_next( + scanner: *mut LanceScanner, + waker: LanceWaker, + waker_ctx: *mut c_void, + out: *mut *mut LanceBatch, +) -> LancePollStatus { + if scanner.is_null() || out.is_null() { + set_last_error( + LanceErrorCode::InvalidArgument, + "scanner and out must not be NULL", + ); + return LancePollStatus::Error; + } + let s = unsafe { &mut *scanner }; + + // Lazily materialize the stream. + if s.stream.is_none() { + if let Err(err) = s.materialize_stream() { + set_lance_error(&err); + unsafe { *out = ptr::null_mut() }; + return LancePollStatus::Error; + } + } + + let stream = s.stream.as_mut().unwrap(); + + // Construct a std::task::Waker from the C function pointer. + let raw_waker = make_raw_waker(waker, waker_ctx); + let waker_obj = unsafe { Waker::from_raw(raw_waker) }; + let mut cx = Context::from_waker(&waker_obj); + + match stream.as_mut().poll_next(&mut cx) { + Poll::Ready(Some(Ok(batch))) => { + clear_last_error(); + let lance_batch = LanceBatch { inner: batch }; + unsafe { *out = Box::into_raw(Box::new(lance_batch)) }; + LancePollStatus::Ready + } + Poll::Ready(Some(Err(err))) => { + set_lance_error(&err); + unsafe { *out = ptr::null_mut() }; + LancePollStatus::Error + } + Poll::Ready(None) => { + clear_last_error(); + unsafe { *out = ptr::null_mut() }; + LancePollStatus::Finished + } + Poll::Pending => { + clear_last_error(); + unsafe { *out = ptr::null_mut() }; + LancePollStatus::Pending + } + } +} + +// --------------------------------------------------------------------------- +// Waker construction from C function pointer +// --------------------------------------------------------------------------- + +/// Context for a C waker callback. +struct CWakerContext { + waker_fn: LanceWaker, + ctx: *mut c_void, +} + +// C function pointers + void* are Send by convention for FFI. +unsafe impl Send for CWakerContext {} +unsafe impl Sync for CWakerContext {} + +fn make_raw_waker(waker_fn: LanceWaker, ctx: *mut c_void) -> RawWaker { + let data = Box::into_raw(Box::new(CWakerContext { waker_fn, ctx })) as *const (); + + const VTABLE: RawWakerVTable = RawWakerVTable::new( + // clone + |data| { + let orig = unsafe { &*(data as *const CWakerContext) }; + let cloned = Box::new(CWakerContext { + waker_fn: orig.waker_fn, + ctx: orig.ctx, + }); + RawWaker::new(Box::into_raw(cloned) as *const (), &VTABLE) + }, + // wake (consumes) + |data| { + let ctx = unsafe { Box::from_raw(data as *mut CWakerContext) }; + unsafe { (ctx.waker_fn)(ctx.ctx) }; + }, + // wake_by_ref + |data| { + let ctx = unsafe { &*(data as *const CWakerContext) }; + unsafe { (ctx.waker_fn)(ctx.ctx) }; + }, + // drop + |data| { + unsafe { + let _ = Box::from_raw(data as *mut CWakerContext); + }; + }, + ); + + RawWaker::new(data, &VTABLE) +} diff --git a/rust/lance-c/tests/c_api_test.rs b/rust/lance-c/tests/c_api_test.rs new file mode 100644 index 00000000000..4973ee28210 --- /dev/null +++ b/rust/lance-c/tests/c_api_test.rs @@ -0,0 +1,1244 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Integration tests for the Lance C API. +//! +//! These tests call the `extern "C"` functions directly from Rust, +//! validating the C API contract without needing a C compiler. + +use std::ffi::CString; +use std::ptr; +use std::sync::Arc; + +use arrow::ffi::from_ffi; +use arrow::ffi::FFI_ArrowSchema; +use arrow::ffi_stream::ArrowArrayStreamReader; +use arrow::ffi_stream::FFI_ArrowArrayStream; +use arrow::record_batch::RecordBatchReader; +use arrow_array::{Float32Array, Int32Array, RecordBatch, StringArray}; +use arrow_schema::{DataType, Field, Schema}; +use lance::Dataset; +use lance_c::*; + +/// Helper: create a test dataset in a temp directory and return its path. +fn create_test_dataset() -> (tempfile::TempDir, String) { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("test_ds").to_str().unwrap().to_string(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec![ + "alice", "bob", "carol", "dave", "eve", + ])), + ], + ) + .unwrap(); + + // Use lance-c's internal runtime to write the dataset. + lance_c::runtime::block_on(async { + Dataset::write( + arrow::record_batch::RecordBatchIterator::new(vec![Ok(batch)], schema), + &uri, + None, + ) + .await + .unwrap(); + }); + + (tmp, uri) +} + +/// Helper: create a larger dataset with multiple columns and many rows. +fn create_large_dataset(num_rows: i32) -> (tempfile::TempDir, String) { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("large_ds").to_str().unwrap().to_string(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Float32, true), + Field::new("label", DataType::Utf8, true), + ])); + + let ids: Vec = (0..num_rows).collect(); + let values: Vec = (0..num_rows).map(|i| i as f32 * 0.5).collect(); + let labels: Vec = (0..num_rows).map(|i| format!("row_{i}")).collect(); + let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Float32Array::from(values)), + Arc::new(StringArray::from(label_refs)), + ], + ) + .unwrap(); + + lance_c::runtime::block_on(async { + Dataset::write( + arrow::record_batch::RecordBatchIterator::new(vec![Ok(batch)], schema), + &uri, + None, + ) + .await + .unwrap(); + }); + + (tmp, uri) +} + +fn c_str(s: &str) -> CString { + CString::new(s).unwrap() +} + +/// Helper: scan to ArrowArrayStream and collect all rows. +fn scan_all_rows(ds: *const LanceDataset) -> Vec { + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), ptr::null()) }; + assert!(!scanner.is_null()); + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }; + assert_eq!(rc, 0); + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + unsafe { lance_scanner_close(scanner) }; + batches +} + +// --------------------------------------------------------------------------- +// Dataset tests +// --------------------------------------------------------------------------- + +#[test] +fn test_open_close() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null(), "dataset open should succeed"); + assert_eq!(lance_last_error_code(), LanceErrorCode::Ok); + + unsafe { lance_dataset_close(ds) }; + + // Closing NULL is safe. + unsafe { lance_dataset_close(ptr::null_mut()) }; +} + +#[test] +fn test_open_nonexistent() { + let c_uri = c_str("memory://nonexistent_dataset_xyz"); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!( + ds.is_null(), + "opening nonexistent dataset should return NULL" + ); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); + + let msg = lance_last_error_message(); + assert!(!msg.is_null()); + unsafe { lance_free_string(msg) }; +} + +#[test] +fn test_version() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let version = unsafe { lance_dataset_version(ds) }; + assert!(version >= 1, "version should be >= 1, got {version}"); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_count_rows() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let count = unsafe { lance_dataset_count_rows(ds) }; + assert_eq!(count, 5); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_schema_export() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let mut ffi_schema = FFI_ArrowSchema::empty(); + let rc = unsafe { lance_dataset_schema(ds, &mut ffi_schema) }; + assert_eq!(rc, 0); + + // Import the schema back and verify fields. + let schema = Schema::try_from(&ffi_schema).unwrap(); + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(1).name(), "name"); + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Scanner tests +// --------------------------------------------------------------------------- + +#[test] +fn test_scanner_full_scan() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + // Create scanner (all columns, no filter). + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), ptr::null()) }; + assert!(!scanner.is_null()); + + // Iterate via lance_scanner_next. + let mut total_rows = 0u64; + loop { + let mut batch: *mut LanceBatch = ptr::null_mut(); + let rc = unsafe { lance_scanner_next(scanner, &mut batch) }; + match rc { + 0 => { + assert!(!batch.is_null()); + // Export to Arrow and count rows. + let mut ffi_array = arrow::ffi::FFI_ArrowArray::empty(); + let mut ffi_schema = FFI_ArrowSchema::empty(); + let rc2 = unsafe { lance_batch_to_arrow(batch, &mut ffi_array, &mut ffi_schema) }; + assert_eq!(rc2, 0); + let data = unsafe { from_ffi(ffi_array, &ffi_schema) }.unwrap(); + total_rows += data.len() as u64; + unsafe { lance_batch_free(batch) }; + } + 1 => break, // end of stream + _ => panic!("scanner_next returned error: {rc}"), + } + } + assert_eq!(total_rows, 5); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_scanner_to_arrow_stream() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), ptr::null()) }; + assert!(!scanner.is_null()); + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }; + assert_eq!(rc, 0); + + // Read via Arrow's standard stream reader. + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 5); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_scanner_with_filter() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let filter = c_str("id > 3"); + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), filter.as_ptr()) }; + assert!(!scanner.is_null()); + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }; + assert_eq!(rc, 0); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum(); + assert_eq!(total_rows, 2); // id=4 and id=5 + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_scanner_with_projection() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + // Project only "name" column. + let col = c_str("name"); + let columns: [*const i8; 2] = [col.as_ptr(), ptr::null()]; + let scanner = unsafe { lance_scanner_new(ds, columns.as_ptr(), ptr::null()) }; + assert!(!scanner.is_null()); + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }; + assert_eq!(rc, 0); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let schema = reader.schema(); + assert_eq!(schema.fields().len(), 1); + assert_eq!(schema.field(0).name(), "name"); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_scanner_with_limit_offset() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), ptr::null()) }; + assert!(!scanner.is_null()); + unsafe { + lance_scanner_set_limit(scanner, 2); + lance_scanner_set_offset(scanner, 1); + }; + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }; + assert_eq!(rc, 0); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum(); + assert_eq!(total_rows, 2); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Take test +// --------------------------------------------------------------------------- + +#[test] +fn test_dataset_take() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let indices: [u64; 3] = [0, 2, 4]; + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_dataset_take(ds, indices.as_ptr(), 3, ptr::null(), &mut ffi_stream) }; + assert_eq!(rc, 0); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); + + // Verify the taken IDs. + let id_col = batches[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_col.values(), &[1, 3, 5]); + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Error handling tests +// --------------------------------------------------------------------------- + +#[test] +fn test_null_inputs() { + // NULL dataset in version query. + let v = unsafe { lance_dataset_version(ptr::null()) }; + assert_eq!(v, 0); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); + + // NULL dataset in scanner creation. + let scanner = unsafe { lance_scanner_new(ptr::null(), ptr::null(), ptr::null()) }; + assert!(scanner.is_null()); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); +} + +// --------------------------------------------------------------------------- +// Async scan test +// --------------------------------------------------------------------------- + +#[test] +fn test_scanner_scan_async() { + use std::sync::{Condvar, Mutex}; + + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), ptr::null()) }; + assert!(!scanner.is_null()); + + // Synchronization primitive for the async callback. + struct CallbackResult { + status: i32, + stream_ptr: *mut std::ffi::c_void, + } + unsafe impl Send for CallbackResult {} + + let pair = Arc::new((Mutex::new(None::), Condvar::new())); + let pair_clone = pair.clone(); + + unsafe extern "C" fn on_complete( + ctx: *mut std::ffi::c_void, + status: i32, + result: *mut std::ffi::c_void, + ) { + let pair = unsafe { &*(ctx as *const (Mutex>, Condvar)) }; + let mut guard = pair.0.lock().unwrap(); + *guard = Some(CallbackResult { + status, + stream_ptr: result, + }); + pair.1.notify_one(); + } + + unsafe { + lance_scanner_scan_async( + scanner, + on_complete, + Arc::as_ptr(&pair_clone) as *mut std::ffi::c_void, + ); + } + + // Wait for callback. + let (lock, cvar) = &*pair; + let guard = cvar + .wait_while(lock.lock().unwrap(), |r| r.is_none()) + .unwrap(); + let result = guard.as_ref().unwrap(); + assert_eq!(result.status, 0, "async scan should succeed"); + assert!(!result.stream_ptr.is_null()); + + // Read the stream. + let ffi_stream = unsafe { &mut *(result.stream_ptr as *mut FFI_ArrowArrayStream) }; + let reader = unsafe { ArrowArrayStreamReader::from_raw(ffi_stream) }.unwrap(); + let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum(); + assert_eq!(total_rows, 5); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +// =========================================================================== +// Additional tests +// =========================================================================== + +// --------------------------------------------------------------------------- +// Schema field types validation +// --------------------------------------------------------------------------- + +#[test] +fn test_schema_field_types() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let mut ffi_schema = FFI_ArrowSchema::empty(); + let rc = unsafe { lance_dataset_schema(ds, &mut ffi_schema) }; + assert_eq!(rc, 0); + + let schema = Schema::try_from(&ffi_schema).unwrap(); + assert_eq!(*schema.field(0).data_type(), DataType::Int32); + assert_eq!(*schema.field(1).data_type(), DataType::Utf8); + assert!(!schema.field(0).is_nullable()); + assert!(schema.field(1).is_nullable()); + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Latest version +// --------------------------------------------------------------------------- + +#[test] +fn test_latest_version() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let latest = unsafe { lance_dataset_latest_version(ds) }; + let current = unsafe { lance_dataset_version(ds) }; + assert!( + latest >= current, + "latest({latest}) should be >= current({current})" + ); + assert_eq!(lance_last_error_code(), LanceErrorCode::Ok); + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Batch size control +// --------------------------------------------------------------------------- + +#[test] +fn test_scanner_batch_size() { + let (_tmp, uri) = create_large_dataset(100); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), ptr::null()) }; + assert!(!scanner.is_null()); + let rc = unsafe { lance_scanner_set_batch_size(scanner, 10) }; + assert_eq!(rc, 0); + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }; + assert_eq!(rc, 0); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + + assert!( + batches.len() > 1, + "expected multiple batches, got {}", + batches.len() + ); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 100); + + for (i, b) in batches.iter().enumerate() { + assert!( + b.num_rows() <= 10, + "batch {i} has {} rows, expected <= 10", + b.num_rows() + ); + } + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Combined filter + projection + limit +// --------------------------------------------------------------------------- + +#[test] +fn test_scanner_combined_options() { + let (_tmp, uri) = create_large_dataset(50); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let filter = c_str("id >= 10 AND id < 30"); + let col_id = c_str("id"); + let col_label = c_str("label"); + let columns: [*const i8; 3] = [col_id.as_ptr(), col_label.as_ptr(), ptr::null()]; + + let scanner = unsafe { lance_scanner_new(ds, columns.as_ptr(), filter.as_ptr()) }; + assert!(!scanner.is_null()); + unsafe { lance_scanner_set_limit(scanner, 5) }; + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }; + assert_eq!(rc, 0); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let schema = reader.schema(); + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(1).name(), "label"); + + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 5); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Take with column projection +// --------------------------------------------------------------------------- + +#[test] +fn test_take_with_projection() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let indices: [u64; 2] = [1, 3]; + let col_name = c_str("name"); + let columns: [*const i8; 2] = [col_name.as_ptr(), ptr::null()]; + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = + unsafe { lance_dataset_take(ds, indices.as_ptr(), 2, columns.as_ptr(), &mut ffi_stream) }; + assert_eq!(rc, 0); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let schema = reader.schema(); + assert_eq!(schema.fields().len(), 1); + assert_eq!(schema.field(0).name(), "name"); + + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + assert_eq!(batches[0].num_rows(), 2); + + let names = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(names.value(0), "bob"); + assert_eq!(names.value(1), "dave"); + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Multiple scanners on same dataset +// --------------------------------------------------------------------------- + +#[test] +fn test_multiple_scanners_same_dataset() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let filter1 = c_str("id <= 2"); + let filter2 = c_str("id > 3"); + let scanner1 = unsafe { lance_scanner_new(ds, ptr::null(), filter1.as_ptr()) }; + let scanner2 = unsafe { lance_scanner_new(ds, ptr::null(), filter2.as_ptr()) }; + assert!(!scanner1.is_null()); + assert!(!scanner2.is_null()); + + let mut stream1 = FFI_ArrowArrayStream::empty(); + let mut stream2 = FFI_ArrowArrayStream::empty(); + assert_eq!( + unsafe { lance_scanner_to_arrow_stream(scanner1, &mut stream1) }, + 0 + ); + assert_eq!( + unsafe { lance_scanner_to_arrow_stream(scanner2, &mut stream2) }, + 0 + ); + + let reader1 = unsafe { ArrowArrayStreamReader::from_raw(&mut stream1) }.unwrap(); + let reader2 = unsafe { ArrowArrayStreamReader::from_raw(&mut stream2) }.unwrap(); + let rows1: usize = reader1.map(|r| r.unwrap().num_rows()).sum(); + let rows2: usize = reader2.map(|r| r.unwrap().num_rows()).sum(); + assert_eq!(rows1, 2); // id=1,2 + assert_eq!(rows2, 2); // id=4,5 + + unsafe { lance_scanner_close(scanner1) }; + unsafe { lance_scanner_close(scanner2) }; + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Open with specific version +// --------------------------------------------------------------------------- + +#[test] +fn test_open_specific_version() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 1) }; + assert!(!ds.is_null()); + assert_eq!(unsafe { lance_dataset_version(ds) }, 1); + unsafe { lance_dataset_close(ds) }; + + // Non-existent version should fail. + let ds2 = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 9999) }; + assert!(ds2.is_null()); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); +} + +// --------------------------------------------------------------------------- +// Error: invalid filter / column +// --------------------------------------------------------------------------- + +#[test] +fn test_scanner_invalid_filter() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let bad_filter = c_str("NOT A VALID >>> FILTER ???"); + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), bad_filter.as_ptr()) }; + if !scanner.is_null() { + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }; + assert_eq!(rc, -1); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); + let msg = lance_last_error_message(); + assert!(!msg.is_null()); + unsafe { lance_free_string(msg) }; + unsafe { lance_scanner_close(scanner) }; + } + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_scanner_invalid_column() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let col = c_str("nonexistent_column"); + let columns: [*const i8; 2] = [col.as_ptr(), ptr::null()]; + let scanner = unsafe { lance_scanner_new(ds, columns.as_ptr(), ptr::null()) }; + if !scanner.is_null() { + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }; + assert_eq!(rc, -1); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); + unsafe { lance_scanner_close(scanner) }; + } else { + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); + } + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Comprehensive NULL safety +// --------------------------------------------------------------------------- + +#[test] +fn test_null_safety_comprehensive() { + // Free functions with NULL should not crash. + unsafe { lance_free_string(ptr::null()) }; + unsafe { lance_batch_free(ptr::null_mut()) }; + unsafe { lance_scanner_close(ptr::null_mut()) }; + unsafe { lance_dataset_close(ptr::null_mut()) }; + + // Dataset functions with NULL. + assert_eq!(unsafe { lance_dataset_count_rows(ptr::null()) }, 0); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); + assert_eq!(unsafe { lance_dataset_latest_version(ptr::null()) }, 0); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); + + let mut ffi_schema = FFI_ArrowSchema::empty(); + assert_eq!( + unsafe { lance_dataset_schema(ptr::null(), &mut ffi_schema) }, + -1 + ); + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let indices: [u64; 1] = [0]; + assert_eq!( + unsafe { + lance_dataset_take( + ptr::null(), + indices.as_ptr(), + 1, + ptr::null(), + &mut ffi_stream, + ) + }, + -1 + ); + + // Scanner builder functions with NULL. + assert_eq!(unsafe { lance_scanner_set_limit(ptr::null_mut(), 10) }, -1); + assert_eq!(unsafe { lance_scanner_set_offset(ptr::null_mut(), 10) }, -1); + assert_eq!( + unsafe { lance_scanner_set_batch_size(ptr::null_mut(), 10) }, + -1 + ); + assert_eq!( + unsafe { lance_scanner_with_row_id(ptr::null_mut(), true) }, + -1 + ); + + // Scanner iteration with NULL. + let mut ffi_stream2 = FFI_ArrowArrayStream::empty(); + assert_eq!( + unsafe { lance_scanner_to_arrow_stream(ptr::null_mut(), &mut ffi_stream2) }, + -1 + ); + let mut batch_ptr: *mut LanceBatch = ptr::null_mut(); + assert_eq!( + unsafe { lance_scanner_next(ptr::null_mut(), &mut batch_ptr) }, + -1 + ); + + // Batch functions with NULL. + let mut ffi_array = arrow::ffi::FFI_ArrowArray::empty(); + let mut ffi_schema2 = FFI_ArrowSchema::empty(); + assert_eq!( + unsafe { lance_batch_to_arrow(ptr::null(), &mut ffi_array, &mut ffi_schema2) }, + -1 + ); +} + +// --------------------------------------------------------------------------- +// Error message lifecycle +// --------------------------------------------------------------------------- + +#[test] +fn test_error_message_lifecycle() { + let c_uri = c_str("memory://does_not_exist_12345"); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(ds.is_null()); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); + + let msg = lance_last_error_message(); + assert!(!msg.is_null()); + let msg_str = unsafe { std::ffi::CStr::from_ptr(msg) }.to_str().unwrap(); + assert!(!msg_str.is_empty()); + unsafe { lance_free_string(msg) }; + + // Message consumed — next call returns NULL. + let msg2 = lance_last_error_message(); + assert!(msg2.is_null()); +} + +// --------------------------------------------------------------------------- +// Large dataset scan +// --------------------------------------------------------------------------- + +#[test] +fn test_large_dataset_scan() { + let (_tmp, uri) = create_large_dataset(10_000); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 10_000); + let batches = scan_all_rows(ds); + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 10_000); + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Equality filter with value verification +// --------------------------------------------------------------------------- + +#[test] +fn test_scanner_equality_filter() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let filter = c_str("name = 'carol'"); + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), filter.as_ptr()) }; + assert!(!scanner.is_null()); + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + assert_eq!( + unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }, + 0 + ); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 1); + + let id_col = batches[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_col.value(0), 3); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Limit only / Offset only +// --------------------------------------------------------------------------- + +#[test] +fn test_scanner_limit_only() { + let (_tmp, uri) = create_large_dataset(50); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), ptr::null()) }; + unsafe { lance_scanner_set_limit(scanner, 7) }; + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + assert_eq!( + unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }, + 0 + ); + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + assert_eq!(reader.map(|r| r.unwrap().num_rows()).sum::(), 7); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_scanner_offset_only() { + let (_tmp, uri) = create_large_dataset(20); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), ptr::null()) }; + unsafe { lance_scanner_set_offset(scanner, 15) }; + + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + assert_eq!( + unsafe { lance_scanner_to_arrow_stream(scanner, &mut ffi_stream) }, + 0 + ); + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + assert_eq!(reader.map(|r| r.unwrap().num_rows()).sum::(), 5); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Take edge cases +// --------------------------------------------------------------------------- + +#[test] +fn test_take_empty_indices() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let indices: [u64; 0] = []; + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_dataset_take(ds, indices.as_ptr(), 0, ptr::null(), &mut ffi_stream) }; + assert_eq!(rc, 0); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + assert_eq!(reader.map(|r| r.unwrap().num_rows()).sum::(), 0); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_take_large_dataset_values() { + let (_tmp, uri) = create_large_dataset(100); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let indices: [u64; 3] = [0, 50, 99]; + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + assert_eq!( + unsafe { lance_dataset_take(ds, indices.as_ptr(), 3, ptr::null(), &mut ffi_stream) }, + 0 + ); + + let reader = unsafe { ArrowArrayStreamReader::from_raw(&mut ffi_stream) }.unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + assert_eq!(batches[0].num_rows(), 3); + + let ids = batches[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.values(), &[0, 50, 99]); + + let labels = batches[0] + .column_by_name("label") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(labels.value(0), "row_0"); + assert_eq!(labels.value(1), "row_50"); + assert_eq!(labels.value(2), "row_99"); + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Async scan with filter +// --------------------------------------------------------------------------- + +#[test] +fn test_async_scan_with_filter() { + use std::sync::{Condvar, Mutex}; + + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let filter = c_str("id <= 2"); + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), filter.as_ptr()) }; + + struct CallbackResult { + status: i32, + stream_ptr: *mut std::ffi::c_void, + } + unsafe impl Send for CallbackResult {} + + let pair = Arc::new((Mutex::new(None::), Condvar::new())); + let pair_clone = pair.clone(); + + unsafe extern "C" fn on_complete( + ctx: *mut std::ffi::c_void, + status: i32, + result: *mut std::ffi::c_void, + ) { + let pair = unsafe { &*(ctx as *const (Mutex>, Condvar)) }; + pair.0.lock().unwrap().replace(CallbackResult { + status, + stream_ptr: result, + }); + pair.1.notify_one(); + } + + unsafe { + lance_scanner_scan_async( + scanner, + on_complete, + Arc::as_ptr(&pair_clone) as *mut std::ffi::c_void, + ); + } + + let (lock, cvar) = &*pair; + let guard = cvar + .wait_while(lock.lock().unwrap(), |r| r.is_none()) + .unwrap(); + let result = guard.as_ref().unwrap(); + assert_eq!(result.status, 0); + + let ffi_stream = unsafe { &mut *(result.stream_ptr as *mut FFI_ArrowArrayStream) }; + let reader = unsafe { ArrowArrayStreamReader::from_raw(ffi_stream) }.unwrap(); + assert_eq!(reader.map(|r| r.unwrap().num_rows()).sum::(), 2); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Poll-based iteration +// --------------------------------------------------------------------------- + +#[test] +#[ignore = "poll_next requires running from a non-tokio thread; run manually"] +fn test_poll_next_basic() { + let (_tmp, uri) = create_test_dataset(); + let _c_uri = c_str(&uri); + + // poll_next calls materialize_stream() which uses block_on(). + // This must run on a non-tokio thread to avoid nested runtime panics. + let uri_clone = uri.clone(); + let handle = std::thread::spawn(move || { + let c_uri = c_str(&uri_clone); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let scanner = unsafe { lance_scanner_new(ds, ptr::null(), ptr::null()) }; + + use std::sync::atomic::{AtomicBool, Ordering}; + static WOKE: AtomicBool = AtomicBool::new(false); + unsafe extern "C" fn test_waker(_ctx: *mut std::ffi::c_void) { + WOKE.store(true, Ordering::SeqCst); + } + + let mut total_rows = 0usize; + let mut iterations = 0; + loop { + let mut batch: *mut LanceBatch = ptr::null_mut(); + let status = unsafe { + lance_scanner_poll_next(scanner, test_waker, ptr::null_mut(), &mut batch) + }; + match status { + LancePollStatus::Ready => { + assert!(!batch.is_null()); + let mut ffi_array = arrow::ffi::FFI_ArrowArray::empty(); + let mut ffi_schema = FFI_ArrowSchema::empty(); + unsafe { lance_batch_to_arrow(batch, &mut ffi_array, &mut ffi_schema) }; + let data = unsafe { from_ffi(ffi_array, &ffi_schema) }.unwrap(); + total_rows += data.len(); + unsafe { lance_batch_free(batch) }; + } + LancePollStatus::Pending => { + std::thread::sleep(std::time::Duration::from_millis(1)); + } + LancePollStatus::Finished => break, + LancePollStatus::Error => panic!("poll_next returned error"), + } + iterations += 1; + assert!(iterations < 1000, "poll loop should not spin forever"); + } + assert_eq!(total_rows, 5); + + unsafe { lance_scanner_close(scanner) }; + unsafe { lance_dataset_close(ds) }; + }); + handle.join().unwrap(); +} + +// --------------------------------------------------------------------------- +// Scan data value verification +// --------------------------------------------------------------------------- + +#[test] +fn test_scan_data_values() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let batches = scan_all_rows(ds); + let mut all_ids = Vec::new(); + let mut all_names = Vec::new(); + for batch in &batches { + let ids = batch + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let names = batch + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + all_ids.push(ids.value(i)); + all_names.push(names.value(i).to_string()); + } + } + assert_eq!(all_ids, vec![1, 2, 3, 4, 5]); + assert_eq!(all_names, vec!["alice", "bob", "carol", "dave", "eve"]); + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Reopen dataset / large dataset schema +// --------------------------------------------------------------------------- + +#[test] +fn test_reopen_dataset() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + + let ds1 = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert_eq!(unsafe { lance_dataset_count_rows(ds1) }, 5); + unsafe { lance_dataset_close(ds1) }; + + let ds2 = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert_eq!(unsafe { lance_dataset_count_rows(ds2) }, 5); + assert_eq!( + scan_all_rows(ds2) + .iter() + .map(|b| b.num_rows()) + .sum::(), + 5 + ); + + unsafe { lance_dataset_close(ds2) }; +} + +#[test] +fn test_large_dataset_schema() { + let (_tmp, uri) = create_large_dataset(10); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let mut ffi_schema = FFI_ArrowSchema::empty(); + assert_eq!(unsafe { lance_dataset_schema(ds, &mut ffi_schema) }, 0); + + let schema = Schema::try_from(&ffi_schema).unwrap(); + assert_eq!(schema.fields().len(), 3); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(1).name(), "value"); + assert_eq!(schema.field(2).name(), "label"); + assert_eq!(*schema.field(1).data_type(), DataType::Float32); + + unsafe { lance_dataset_close(ds) }; +} + +// --------------------------------------------------------------------------- +// Tests with checked-in historical test datasets +// --------------------------------------------------------------------------- + +/// Helper: resolve path to a checked-in test dataset. +fn test_data_path(relative: &str) -> String { + let mut path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("../../test_data"); + path.push(relative); + assert!(path.exists(), "Test data not found at {}", path.display()); + path.to_str().unwrap().to_string() +} + +#[test] +fn test_historical_dataset_v0_27_1() { + let uri = test_data_path("v0.27.1/pq_in_schema"); + let c_uri = c_str(&uri); + + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null(), "should open historical dataset"); + + let version = unsafe { lance_dataset_version(ds) }; + assert!(version >= 1); + + let count = unsafe { lance_dataset_count_rows(ds) }; + assert!(count > 0, "historical dataset should have rows"); + + let mut ffi_schema = FFI_ArrowSchema::empty(); + let rc = unsafe { lance_dataset_schema(ds, &mut ffi_schema) }; + assert_eq!(rc, 0); + let schema = Schema::try_from(&ffi_schema).unwrap(); + assert!(!schema.fields().is_empty(), "schema should have fields"); + + let batches = scan_all_rows(ds); + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, count as usize); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_historical_dataset_open_specific_version() { + let uri = test_data_path("v0.27.1/pq_in_schema"); + let c_uri = c_str(&uri); + + // This dataset has 2 versions. + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 1) }; + assert!(!ds.is_null()); + assert_eq!(unsafe { lance_dataset_version(ds) }, 1); + let count_v1 = unsafe { lance_dataset_count_rows(ds) }; + assert!(count_v1 > 0); + unsafe { lance_dataset_close(ds) }; + + let ds2 = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 2) }; + assert!(!ds2.is_null()); + assert_eq!(unsafe { lance_dataset_version(ds2) }, 2); + unsafe { lance_dataset_close(ds2) }; +} diff --git a/rust/lance-c/tests/compile_and_run_test.rs b/rust/lance-c/tests/compile_and_run_test.rs new file mode 100644 index 00000000000..9ca26fe47d1 --- /dev/null +++ b/rust/lance-c/tests/compile_and_run_test.rs @@ -0,0 +1,215 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Tests that compile and run actual C and C++ programs against the lance-c library. +//! +//! These tests: +//! 1. Create a test dataset on disk +//! 2. Build the lance-c shared library +//! 3. Compile C/C++ test programs linking against it +//! 4. Run the compiled binaries with the dataset path +//! +//! This validates that lance.h and lance.hpp are valid C/C++ and that +//! the API works end-to-end from a real C/C++ caller. + +use std::path::{Path, PathBuf}; +use std::process::Command; +use std::sync::Arc; + +use arrow_array::{Int32Array, RecordBatch, StringArray}; +use arrow_schema::{DataType, Field, Schema}; +use lance::Dataset; + +/// Build the lance-c cdylib and return the path to the shared library and include dir. +fn build_lance_c() -> (PathBuf, PathBuf) { + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let workspace_root = manifest_dir.parent().unwrap().parent().unwrap(); + + // Build the cdylib in debug mode. + let status = Command::new("cargo") + .args(["build", "-p", "lance-c"]) + .current_dir(workspace_root) + .status() + .expect("Failed to run cargo build"); + assert!(status.success(), "cargo build -p lance-c failed"); + + let target_dir = workspace_root.join("target").join("debug"); + + // Find the shared library. + let lib_path = if cfg!(target_os = "macos") { + target_dir.join("liblance_c.dylib") + } else if cfg!(target_os = "linux") { + target_dir.join("liblance_c.so") + } else { + panic!("Unsupported OS for C/C++ link test"); + }; + + assert!( + lib_path.exists(), + "Shared library not found at {}", + lib_path.display() + ); + + let include_dir = manifest_dir.join("include"); + (lib_path, include_dir) +} + +/// Create a test dataset on disk and return (TempDir, path_string). +fn create_test_dataset_on_disk() -> (tempfile::TempDir, String) { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("c_test_ds").to_str().unwrap().to_string(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])), + Arc::new(StringArray::from(vec![ + "alice", "bob", "carol", "dave", "eve", "frank", "grace", "heidi", "ivan", "judy", + ])), + ], + ) + .unwrap(); + + lance_c::runtime::block_on(async { + Dataset::write( + arrow::record_batch::RecordBatchIterator::new(vec![Ok(batch)], schema), + &uri, + None, + ) + .await + .unwrap(); + }); + + (tmp, uri) +} + +/// Compile a C source file, linking against lance-c. +fn compile_c_test(source: &Path, output: &Path, include_dir: &Path, lib_path: &Path) -> bool { + let lib_dir = lib_path.parent().unwrap(); + let lib_name = "lance_c"; + + let status = Command::new("cc") + .args([ + "-std=c11", + "-Wall", + "-Wextra", + "-o", + output.to_str().unwrap(), + source.to_str().unwrap(), + &format!("-I{}", include_dir.display()), + &format!("-L{}", lib_dir.display()), + &format!("-l{lib_name}"), + // On macOS, set rpath so the dylib is found at runtime. + &format!("-Wl,-rpath,{}", lib_dir.display()), + ]) + .status(); + + match status { + Ok(s) => s.success(), + Err(e) => { + eprintln!("C compiler not available: {e}"); + false + } + } +} + +/// Compile a C++ source file, linking against lance-c. +fn compile_cpp_test(source: &Path, output: &Path, include_dir: &Path, lib_path: &Path) -> bool { + let lib_dir = lib_path.parent().unwrap(); + let lib_name = "lance_c"; + + let status = Command::new("c++") + .args([ + "-std=c++17", + "-Wall", + "-Wextra", + "-o", + output.to_str().unwrap(), + source.to_str().unwrap(), + &format!("-I{}", include_dir.display()), + &format!("-L{}", lib_dir.display()), + &format!("-l{lib_name}"), + &format!("-Wl,-rpath,{}", lib_dir.display()), + ]) + .status(); + + match status { + Ok(s) => s.success(), + Err(e) => { + eprintln!("C++ compiler not available: {e}"); + false + } + } +} + +/// Run a compiled test binary with the dataset URI. +fn run_test_binary(binary: &Path, dataset_uri: &str) { + let output = Command::new(binary) + .arg(dataset_uri) + .output() + .unwrap_or_else(|e| panic!("Failed to run {}: {e}", binary.display())); + + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + + println!("--- stdout ---\n{stdout}"); + if !stderr.is_empty() { + eprintln!("--- stderr ---\n{stderr}"); + } + + assert!( + output.status.success(), + "Test binary {} failed with exit code {:?}\nstdout: {}\nstderr: {}", + binary.display(), + output.status.code(), + stdout, + stderr + ); +} + +#[test] +#[ignore = "requires C compiler (cc); run with: cargo test -p lance-c -- --ignored test_c_compilation"] +fn test_c_compilation_and_execution() { + let (lib_path, include_dir) = build_lance_c(); + let (_tmp, dataset_uri) = create_test_dataset_on_disk(); + let build_dir = tempfile::tempdir().unwrap(); + + let source = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests") + .join("cpp") + .join("test_c_api.c"); + let binary = build_dir.path().join("test_c_api"); + + if !compile_c_test(&source, &binary, &include_dir, &lib_path) { + eprintln!("Skipping C test: compilation failed (C compiler may not be available)"); + return; + } + + run_test_binary(&binary, &dataset_uri); +} + +#[test] +#[ignore = "requires C++ compiler (c++); run with: cargo test -p lance-c -- --ignored test_cpp_compilation"] +fn test_cpp_compilation_and_execution() { + let (lib_path, include_dir) = build_lance_c(); + let (_tmp, dataset_uri) = create_test_dataset_on_disk(); + let build_dir = tempfile::tempdir().unwrap(); + + let source = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests") + .join("cpp") + .join("test_cpp_api.cpp"); + let binary = build_dir.path().join("test_cpp_api"); + + if !compile_cpp_test(&source, &binary, &include_dir, &lib_path) { + eprintln!("Skipping C++ test: compilation failed (C++ compiler may not be available)"); + return; + } + + run_test_binary(&binary, &dataset_uri); +} diff --git a/rust/lance-c/tests/cpp/test_c_api.c b/rust/lance-c/tests/cpp/test_c_api.c new file mode 100644 index 00000000000..bd639999528 --- /dev/null +++ b/rust/lance-c/tests/cpp/test_c_api.c @@ -0,0 +1,194 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/* SPDX-FileCopyrightText: Copyright The Lance Authors */ + +/** + * @file test_c_api.c + * @brief C compilation and functional test for lance.h + * + * This file is compiled by the Rust integration test to verify that + * lance.h is valid C and the API works end-to-end. + * + * Usage: test_c_api + */ + +#include "lance.h" +#include +#include +#include + +#define ASSERT(cond, msg) \ + do { \ + if (!(cond)) { \ + fprintf(stderr, "FAIL: %s (line %d)\n", msg, __LINE__); \ + exit(1); \ + } \ + } while (0) + +#define CHECK_OK() \ + do { \ + if (lance_last_error_code() != LANCE_OK) { \ + const char *msg = lance_last_error_message(); \ + fprintf(stderr, "FAIL: lance error: %s (line %d)\n", \ + msg ? msg : "unknown", __LINE__); \ + if (msg) lance_free_string(msg); \ + exit(1); \ + } \ + } while (0) + +static void test_open_and_metadata(const char *uri) { + printf(" test_open_and_metadata... "); + + LanceDataset *ds = lance_dataset_open(uri, NULL, 0); + ASSERT(ds != NULL, "dataset open failed"); + CHECK_OK(); + + uint64_t version = lance_dataset_version(ds); + ASSERT(version >= 1, "version should be >= 1"); + + uint64_t count = lance_dataset_count_rows(ds); + CHECK_OK(); + ASSERT(count > 0, "dataset should have rows"); + printf("version=%llu, rows=%llu... ", (unsigned long long)version, + (unsigned long long)count); + + /* Schema export */ + struct ArrowSchema schema; + memset(&schema, 0, sizeof(schema)); + int32_t rc = lance_dataset_schema(ds, &schema); + ASSERT(rc == 0, "schema export failed"); + ASSERT(schema.n_children > 0, "schema should have fields"); + printf("fields=%lld... ", (long long)schema.n_children); + + /* Release the schema */ + if (schema.release) { + schema.release(&schema); + } + + lance_dataset_close(ds); + printf("OK\n"); +} + +static void test_scan(const char *uri) { + printf(" test_scan... "); + + LanceDataset *ds = lance_dataset_open(uri, NULL, 0); + ASSERT(ds != NULL, "dataset open failed"); + + uint64_t expected_rows = lance_dataset_count_rows(ds); + CHECK_OK(); + + /* Full scan via ArrowArrayStream */ + LanceScanner *scanner = lance_scanner_new(ds, NULL, NULL); + ASSERT(scanner != NULL, "scanner creation failed"); + + struct ArrowArrayStream stream; + memset(&stream, 0, sizeof(stream)); + int32_t rc = lance_scanner_to_arrow_stream(scanner, &stream); + ASSERT(rc == 0, "to_arrow_stream failed"); + + /* Read schema from stream */ + struct ArrowSchema schema; + memset(&schema, 0, sizeof(schema)); + rc = stream.get_schema(&stream, &schema); + ASSERT(rc == 0, "get_schema from stream failed"); + ASSERT(schema.n_children > 0, "stream schema should have fields"); + if (schema.release) schema.release(&schema); + + /* Read all batches */ + uint64_t total_rows = 0; + while (1) { + struct ArrowArray array; + memset(&array, 0, sizeof(array)); + rc = stream.get_next(&stream, &array); + ASSERT(rc == 0, "get_next failed"); + if (array.release == NULL) { + break; /* end of stream */ + } + total_rows += (uint64_t)array.length; + array.release(&array); + } + + ASSERT(total_rows == expected_rows, "row count mismatch"); + printf("rows=%llu... ", (unsigned long long)total_rows); + + if (stream.release) stream.release(&stream); + lance_scanner_close(scanner); + lance_dataset_close(ds); + printf("OK\n"); +} + +static void test_scan_with_limit(const char *uri) { + printf(" test_scan_with_limit... "); + + LanceDataset *ds = lance_dataset_open(uri, NULL, 0); + ASSERT(ds != NULL, "dataset open failed"); + + LanceScanner *scanner = lance_scanner_new(ds, NULL, NULL); + ASSERT(scanner != NULL, "scanner creation failed"); + + lance_scanner_set_limit(scanner, 3); + + struct ArrowArrayStream stream; + memset(&stream, 0, sizeof(stream)); + int32_t rc = lance_scanner_to_arrow_stream(scanner, &stream); + ASSERT(rc == 0, "to_arrow_stream failed"); + + uint64_t total_rows = 0; + while (1) { + struct ArrowArray array; + memset(&array, 0, sizeof(array)); + rc = stream.get_next(&stream, &array); + ASSERT(rc == 0, "get_next failed"); + if (array.release == NULL) break; + total_rows += (uint64_t)array.length; + array.release(&array); + } + + ASSERT(total_rows == 3, "limit should return exactly 3 rows"); + printf("rows=%llu... ", (unsigned long long)total_rows); + + if (stream.release) stream.release(&stream); + lance_scanner_close(scanner); + lance_dataset_close(ds); + printf("OK\n"); +} + +static void test_error_handling(void) { + printf(" test_error_handling... "); + + /* Open non-existent dataset */ + LanceDataset *ds = lance_dataset_open("file:///nonexistent/path/xyz", NULL, 0); + ASSERT(ds == NULL, "should fail to open nonexistent dataset"); + ASSERT(lance_last_error_code() != LANCE_OK, "error code should be set"); + + const char *msg = lance_last_error_message(); + ASSERT(msg != NULL, "error message should be set"); + ASSERT(strlen(msg) > 0, "error message should be non-empty"); + lance_free_string(msg); + + /* NULL safety */ + lance_dataset_close(NULL); + lance_scanner_close(NULL); + lance_batch_free(NULL); + lance_free_string(NULL); + + printf("OK\n"); +} + +int main(int argc, char **argv) { + if (argc < 2) { + fprintf(stderr, "Usage: %s \n", argv[0]); + return 1; + } + + const char *uri = argv[1]; + printf("Running C API tests with dataset: %s\n", uri); + + test_open_and_metadata(uri); + test_scan(uri); + test_scan_with_limit(uri); + test_error_handling(); + + printf("All C tests passed!\n"); + return 0; +} diff --git a/rust/lance-c/tests/cpp/test_cpp_api.cpp b/rust/lance-c/tests/cpp/test_cpp_api.cpp new file mode 100644 index 00000000000..13788aae7f5 --- /dev/null +++ b/rust/lance-c/tests/cpp/test_cpp_api.cpp @@ -0,0 +1,178 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/* SPDX-FileCopyrightText: Copyright The Lance Authors */ + +/** + * @file test_cpp_api.cpp + * @brief C++ compilation and functional test for lance.hpp + * + * Tests the RAII wrappers, exception handling, and builder pattern. + * + * Usage: test_cpp_api + */ + +#include "lance.hpp" +#include +#include +#include +#include +#include +#include + +#define TEST(name) printf(" %s... ", #name) +#define PASS() printf("OK\n") + +static void test_dataset_open(const std::string& uri) { + TEST(test_dataset_open); + + auto ds = lance::Dataset::open(uri); + assert(ds.version() >= 1); + assert(ds.count_rows() > 0); + + printf("version=%llu, rows=%llu... ", + (unsigned long long)ds.version(), + (unsigned long long)ds.count_rows()); + + PASS(); +} + +static void test_dataset_schema(const std::string& uri) { + TEST(test_dataset_schema); + + auto ds = lance::Dataset::open(uri); + + ArrowSchema schema; + memset(&schema, 0, sizeof(schema)); + ds.schema(&schema); + + assert(schema.n_children > 0); + printf("fields=%lld... ", (long long)schema.n_children); + + // Print field names + for (int64_t i = 0; i < schema.n_children; i++) { + if (i > 0) printf(", "); + printf("%s", schema.children[i]->name); + } + printf("... "); + + if (schema.release) schema.release(&schema); + + PASS(); +} + +static void test_scanner_fluent(const std::string& uri) { + TEST(test_scanner_fluent); + + auto ds = lance::Dataset::open(uri); + + // Fluent builder pattern. + auto scanner = ds.scan(); + scanner.limit(5).offset(0).batch_size(2); + + ArrowArrayStream stream; + memset(&stream, 0, sizeof(stream)); + scanner.to_arrow_stream(&stream); + + // Count rows from stream. + uint64_t total = 0; + while (true) { + ArrowArray arr; + memset(&arr, 0, sizeof(arr)); + int rc = stream.get_next(&stream, &arr); + assert(rc == 0); + if (!arr.release) break; + total += (uint64_t)arr.length; + arr.release(&arr); + } + + assert(total == 5); + printf("rows=%llu... ", (unsigned long long)total); + + if (stream.release) stream.release(&stream); + PASS(); +} + +static void test_dataset_take(const std::string& uri) { + TEST(test_dataset_take); + + auto ds = lance::Dataset::open(uri); + + uint64_t indices[] = {0, 1, 2}; + ArrowArrayStream stream; + memset(&stream, 0, sizeof(stream)); + ds.take(indices, 3, &stream); + + uint64_t total = 0; + while (true) { + ArrowArray arr; + memset(&arr, 0, sizeof(arr)); + int rc = stream.get_next(&stream, &arr); + assert(rc == 0); + if (!arr.release) break; + total += (uint64_t)arr.length; + arr.release(&arr); + } + + assert(total == 3); + printf("rows=%llu... ", (unsigned long long)total); + + if (stream.release) stream.release(&stream); + PASS(); +} + +static void test_raii_cleanup(const std::string& uri) { + TEST(test_raii_cleanup); + + // Dataset and Scanner should clean up automatically. + { + auto ds = lance::Dataset::open(uri); + auto scanner = ds.scan(); + scanner.limit(1); + // Goes out of scope — RAII cleanup. + } + + // Move semantics. + { + auto ds1 = lance::Dataset::open(uri); + auto ds2 = std::move(ds1); + assert(ds2.count_rows() > 0); + } + + PASS(); +} + +static void test_error_exception(const std::string& /*uri*/) { + TEST(test_error_exception); + + bool caught = false; + try { + lance::Dataset::open("file:///nonexistent/path/xyz"); + } catch (const lance::Error& e) { + caught = true; + assert(e.code != LANCE_OK); + assert(strlen(e.what()) > 0); + printf("caught: %s... ", e.what()); + } + assert(caught); + + PASS(); +} + +int main(int argc, char** argv) { + if (argc < 2) { + fprintf(stderr, "Usage: %s \n", argv[0]); + return 1; + } + + std::string uri(argv[1]); + printf("Running C++ API tests with dataset: %s\n", uri.c_str()); + + test_dataset_open(uri); + test_dataset_schema(uri); + test_scanner_fluent(uri); + test_dataset_take(uri); + test_raii_cleanup(uri); + test_error_exception(uri); + + printf("All C++ tests passed!\n"); + return 0; +}