Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion protos/index.proto
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,6 @@ message JsonIndexDetails {
string path = 1;
google.protobuf.Any target_details = 2;
}
message BloomFilterIndexDetails {}
message BloomFilterIndexDetails {}

message GeoIndexDetails {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should call this more specifically like "KdbTreeIndex" instead of a generic Geo index, so that we can have different types of geo index that can be used for different Geo columns if necessary

1 change: 1 addition & 0 deletions protos/index_old.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ message BitmapIndexDetails {}
message LabelListIndexDetails {}
message NGramIndexDetails {}
message ZoneMapIndexDetails {}
message GeoIndexDetails {}
message InvertedIndexDetails {
// Marking this field as optional as old versions of the index store blank details and we
// need to make sure we have a proper optional field to detect this.
Expand Down
27 changes: 22 additions & 5 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2345,6 +2345,7 @@ def create_scalar_index(
"LABEL_LIST",
"INVERTED",
"BLOOMFILTER",
"GEO"
]:
raise NotImplementedError(
(
Expand Down Expand Up @@ -2392,11 +2393,27 @@ def create_scalar_index(
f"INVERTED index column {column} must be string, large string"
" or list of strings, but got {value_type}"
)

if pa.types.is_duration(field_type):
raise TypeError(
f"Scalar index column {column} cannot currently be a duration"
)
elif index_type == "GEO":
# Accept struct<x: double, y: double> for GeoArrow point data
if pa.types.is_struct(field_type):
field_names = [field.name for field in field_type]
if set(field_names) == {"x", "y"}:
# This is geoarrow point data - allow it
pass
else:
raise TypeError(
f"GEO index column {column} must be a struct with x,y fields for point data. "
f"Got struct with fields: {field_names}"
)
else:
raise TypeError(
f"GEO index column {column} must be a struct<x: double, y: double> type. "
f"Got field type: {field_type}"
)
if pa.types.is_duration(field_type):
raise TypeError(
f"Scalar index column {column} cannot currently be a duration"
)
elif isinstance(index_type, IndexConfig):
config = json.dumps(index_type.parameters)
kwargs["config"] = indices.IndexConfig(index_type.index_type, config)
Expand Down
5 changes: 5 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,7 @@ impl Dataset {
"BLOOMFILTER" => IndexType::BloomFilter,
"LABEL_LIST" => IndexType::LabelList,
"INVERTED" | "FTS" => IndexType::Inverted,
"GEO" => IndexType::Geo,
"IVF_FLAT" | "IVF_PQ" | "IVF_SQ" | "IVF_RQ" | "IVF_HNSW_FLAT" | "IVF_HNSW_PQ"
| "IVF_HNSW_SQ" => IndexType::Vector,
_ => {
Expand Down Expand Up @@ -1702,6 +1703,10 @@ impl Dataset {
}
Box::new(params)
}
"GEO" => Box::new(ScalarIndexParams {
index_type: "geo".to_string(),
params: None,
}),
_ => {
let column_type = match self.ds.schema().field(columns[0]) {
Some(f) => f.data_type().clone(),
Expand Down
108 changes: 108 additions & 0 deletions rust/lance-datafusion/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,116 @@
ctx.register_udf(json::json_get_bool_udf());
ctx.register_udf(json::json_array_contains_udf());
ctx.register_udf(json::json_array_length_udf());

// GEO functions
ctx.register_udf(ST_INTERSECTS_UDF.clone());
ctx.register_udf(ST_WITHIN_UDF.clone());
ctx.register_udf(BBOX_UDF.clone());
}

static ST_WITHIN_UDF: LazyLock<ScalarUDF> = LazyLock::new(st_within);
static BBOX_UDF: LazyLock<ScalarUDF> = LazyLock::new(bbox);
static ST_INTERSECTS_UDF: LazyLock<ScalarUDF> = LazyLock::new(st_intersects);

fn st_intersects() -> ScalarUDF {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should directly use functions in geodatafusion

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if that Pr is merged its just rebase. Also I remember there was some issue in datafusion python api where mixed case functions couldn't be used.

let function = Arc::new(make_scalar_function(
|_args: &[ArrayRef]| {
// Throw an error indicating that a spatial index is required
Err(datafusion::error::DataFusionError::Execution(
"st_intersects requires a spatial index. Please create a spatial index on the geometry column using dataset.create_scalar_index(column='your_column', index_type='GEO') before running spatial queries.".to_string(),
))
},
vec![],
));

create_udf(

Check warning on line 51 in rust/lance-datafusion/src/udf.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-datafusion/src/udf.rs
"st_intersects",
vec![
DataType::Struct(vec![
Arc::new(arrow_schema::Field::new("x", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("y", DataType::Float64, false)),
].into()),
DataType::Struct(vec![
Arc::new(arrow_schema::Field::new("xmin", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("ymin", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("xmax", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("ymax", DataType::Float64, false)),
].into())
], // GeoArrow Point struct, GeoArrow Box struct
DataType::Boolean,
Volatility::Immutable,
function,
)

Check warning on line 68 in rust/lance-datafusion/src/udf.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-datafusion/src/udf.rs
}


fn st_within() -> ScalarUDF {
let function = Arc::new(make_scalar_function(
|_args: &[ArrayRef]| {
// Throw an error indicating that a spatial index is required
Err(datafusion::error::DataFusionError::Execution(
"st_within requires a spatial index. Please create a spatial index on the geometry column using dataset.create_scalar_index(column='your_column', index_type='RTREE') before running spatial queries.".to_string(),
))
},
vec![],
));

create_udf(

Check warning on line 83 in rust/lance-datafusion/src/udf.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-datafusion/src/udf.rs
"st_within",
vec![
DataType::Struct(vec![
Arc::new(arrow_schema::Field::new("x", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("y", DataType::Float64, false)),
].into()),
DataType::Struct(vec![
Arc::new(arrow_schema::Field::new("xmin", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("ymin", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("xmax", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("ymax", DataType::Float64, false)),
].into())
], // GeoArrow Point struct, GeoArrow Box struct
DataType::Boolean,
Volatility::Immutable,
function,
)

Check warning on line 100 in rust/lance-datafusion/src/udf.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-datafusion/src/udf.rs
}


/// BBOX function that creates a bounding box from four numeric arguments.
/// This function is used internally by spatial queries and doesn't perform actual computation.
/// It's intercepted by Lance's geo query parser for index optimization.
///
/// Usage in SQL:
/// ```sql
/// SELECT * FROM table WHERE ST_Intersects(geometry_column, BBOX(-180, -90, 180, 90))
/// ```
fn bbox() -> ScalarUDF {
let function = Arc::new(make_scalar_function(
|_args: &[ArrayRef]| {
// This UDF should never be called because BBOX functions are intercepted by the query parser
// If this executes, it means no spatial index exists
Err(datafusion::error::DataFusionError::Execution(
"BBOX function requires a spatial index. Please create a spatial index on the geometry column using dataset.create_scalar_index(column='your_column', index_type='RTREE') before running spatial queries.".to_string(),
))
},
vec![],
));

Check warning on line 123 in rust/lance-datafusion/src/udf.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-datafusion/src/udf.rs
create_udf(
"bbox",
vec![DataType::Float64, DataType::Float64, DataType::Float64, DataType::Float64], // min_x, min_y, max_x, max_y
DataType::Struct(vec![
Arc::new(arrow_schema::Field::new("xmin", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("ymin", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("xmax", DataType::Float64, false)),
Arc::new(arrow_schema::Field::new("ymax", DataType::Float64, false)),
].into()), // Returns a GeoArrow Box struct
Volatility::Immutable,
function,
)
}

Check warning on line 136 in rust/lance-datafusion/src/udf.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-datafusion/src/udf.rs


/// This method checks whether a string contains all specified tokens. The tokens are separated by
/// punctuations and white spaces.
///
Expand Down
7 changes: 6 additions & 1 deletion rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub enum IndexType {

BloomFilter = 9, // Bloom filter

Geo = 10, // Geo

// 100+ and up for vector index.
/// Flat vector index.
Vector = 100, // Legacy vector index, alias to IvfPq
Expand All @@ -130,6 +132,7 @@ impl std::fmt::Display for IndexType {
Self::NGram => write!(f, "NGram"),
Self::FragmentReuse => write!(f, "FragmentReuse"),
Self::MemWal => write!(f, "MemWal"),
Self::Geo => write!(f, "Geo"),
Self::ZoneMap => write!(f, "ZoneMap"),
Self::BloomFilter => write!(f, "BloomFilter"),
Self::Vector | Self::IvfPq => write!(f, "IVF_PQ"),
Expand All @@ -156,6 +159,7 @@ impl TryFrom<i32> for IndexType {
v if v == Self::Inverted as i32 => Ok(Self::Inverted),
v if v == Self::FragmentReuse as i32 => Ok(Self::FragmentReuse),
v if v == Self::MemWal as i32 => Ok(Self::MemWal),
v if v == Self::Geo as i32 => Ok(Self::Geo),
v if v == Self::ZoneMap as i32 => Ok(Self::ZoneMap),
v if v == Self::BloomFilter as i32 => Ok(Self::BloomFilter),
v if v == Self::Vector as i32 => Ok(Self::Vector),
Expand Down Expand Up @@ -214,6 +218,7 @@ impl IndexType {
| Self::NGram
| Self::ZoneMap
| Self::BloomFilter
| Self::Geo
)
}

Expand Down Expand Up @@ -252,7 +257,7 @@ impl IndexType {
Self::MemWal => 0,
Self::ZoneMap => 0,
Self::BloomFilter => 0,

Self::Geo => 0,
// for now all vector indices are built by the same builder,
// so they share the same version.
Self::Vector
Expand Down
44 changes: 44 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ use crate::metrics::MetricsCollector;
use crate::scalar::registry::TrainingCriteria;
use crate::{Index, IndexParams, IndexType};

pub mod bkd;
pub mod bitmap;
pub mod bloomfilter;
pub mod btree;
pub mod expression;
pub mod flat;
pub mod geoindex;
pub mod inverted;
pub mod json;
pub mod label_list;
Expand Down Expand Up @@ -61,6 +63,7 @@ pub enum BuiltinIndexType {
ZoneMap,
BloomFilter,
Inverted,
Geo,
Comment thread
jackye1995 marked this conversation as resolved.
Outdated
}

impl BuiltinIndexType {
Expand All @@ -73,6 +76,7 @@ impl BuiltinIndexType {
Self::ZoneMap => "zonemap",
Self::Inverted => "inverted",
Self::BloomFilter => "bloomfilter",
Self::Geo => "geo",
}
}
}
Expand All @@ -89,6 +93,7 @@ impl TryFrom<IndexType> for BuiltinIndexType {
IndexType::ZoneMap => Ok(Self::ZoneMap),
IndexType::Inverted => Ok(Self::Inverted),
IndexType::BloomFilter => Ok(Self::BloomFilter),
IndexType::Geo => Ok(Self::Geo),
_ => Err(Error::Index {
message: "Invalid index type".to_string(),
location: location!(),
Expand Down Expand Up @@ -587,6 +592,45 @@ pub enum TokenQuery {
TokensContains(String),
}

/// A query that a GeoIndex can satisfy
#[derive(Debug, Clone, PartialEq)]
pub enum GeoQuery {
/// Retrieve all row ids where the geometry intersects with the given bounding box
/// Format: (min_x, min_y, max_x, max_y)
Intersects(f64, f64, f64, f64),
}

impl AnyQuery for GeoQuery {
fn as_any(&self) -> &dyn Any {
self
}

fn format(&self, col: &str) -> String {
match self {
Self::Intersects(min_x, min_y, max_x, max_y) => {
format!("st_intersects({}, bbox({}, {}, {}, {}))", col, min_x, min_y, max_x, max_y)
}
}
}

fn to_expr(&self, _col: String) -> Expr {
match self {
Self::Intersects(_min_x, _min_y, _max_x, _max_y) => {
// For now, return a placeholder expression
// This would need to be a proper st_intersects UDF call
Expr::Literal(ScalarValue::Boolean(Some(true)), None)
}
}
}

fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
match other.as_any().downcast_ref::<Self>() {
Some(o) => self == o,
None => false,
}
}
}

/// A query that a BloomFilter index can satisfy
///
/// This is a subset of SargableQuery that only includes operations that bloom filters
Expand Down
Loading
Loading