Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datatypes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ postgres = ["postgres-types"]

[dependencies]
# simd compiles with nightly only, allow default features to build with arrow-flight once it is possible
arrow = {version = "2.0", features = ["simd"] }
# arrow = { version = "3.0", features = ["simd"] } TODO: activate when version is patched
arrow = { version = "3.0" }
chrono = "0.4"
float-cmp = "0.8"
geo = "0.17"
Expand Down
68 changes: 31 additions & 37 deletions datatypes/src/collections/batch_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,23 +226,21 @@ impl RawFeatureCollectionBuilder {
.len(num_features)
.add_buffer(feature_offsets)
.add_child_data(
ArrayData::builder(arrow::datatypes::DataType::List(
Coordinate2D::arrow_data_type().into(),
))
.len(num_lines)
.add_buffer(line_offsets)
.add_child_data(
ArrayData::builder(Coordinate2D::arrow_data_type())
.len(num_coords)
.add_child_data(
ArrayData::builder(DataType::Float64)
.len(num_floats)
.add_buffer(coords)
.build(),
)
.build(),
)
.build(),
ArrayData::builder(Coordinate2D::arrow_list_data_type())
.len(num_lines)
.add_buffer(line_offsets)
.add_child_data(
ArrayData::builder(Coordinate2D::arrow_data_type())
.len(num_coords)
.add_child_data(
ArrayData::builder(DataType::Float64)
.len(num_floats)
.add_buffer(coords)
.build(),
)
.build(),
)
.build(),
)
.build();

Expand All @@ -269,31 +267,27 @@ impl RawFeatureCollectionBuilder {
.len(num_features)
.add_buffer(feature_offsets)
.add_child_data(
ArrayData::builder(arrow::datatypes::DataType::List(
arrow::datatypes::DataType::List(Coordinate2D::arrow_data_type().into()).into(),
))
.len(num_polygons)
.add_buffer(polygon_offsets)
.add_child_data(
ArrayData::builder(arrow::datatypes::DataType::List(
Coordinate2D::arrow_data_type().into(),
))
.len(num_rings)
.add_buffer(ring_offsets)
ArrayData::builder(MultiLineString::arrow_data_type())
.len(num_polygons)
.add_buffer(polygon_offsets)
.add_child_data(
ArrayData::builder(Coordinate2D::arrow_data_type())
.len(num_coords)
ArrayData::builder(Coordinate2D::arrow_list_data_type())
.len(num_rings)
.add_buffer(ring_offsets)
.add_child_data(
ArrayData::builder(DataType::Float64)
.len(num_floats)
.add_buffer(coords)
ArrayData::builder(Coordinate2D::arrow_data_type())
.len(num_coords)
.add_child_data(
ArrayData::builder(DataType::Float64)
.len(num_floats)
.add_buffer(coords)
.build(),
)
.build(),
)
.build(),
)
.build(),
)
.build(),
)
.build();

Expand Down Expand Up @@ -462,7 +456,7 @@ mod tests {

let num_bytes = bit_util::ceil(numbers.len(), 8);
let mut null_buffer = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
let null_slice = null_buffer.data_mut();
let null_slice = null_buffer.as_slice_mut();

for (i, null) in nulls.iter().enumerate() {
if *null {
Expand All @@ -472,7 +466,7 @@ mod tests {

// nulls
builder
.set_column::<Float64Type>("foo", value_buffer, Some(null_buffer.freeze()))
.set_column::<Float64Type>("foo", value_buffer, Some(null_buffer.into()))
.unwrap();

builder.finish().unwrap();
Expand Down
191 changes: 17 additions & 174 deletions datatypes/src/collections/feature_collection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow::array::{
as_primitive_array, as_string_array, Array, ArrayData, ArrayRef, BooleanArray, Float64Array,
ListArray, PrimitiveArrayOps, StructArray,
as_primitive_array, as_string_array, Array, ArrayData, ArrayRef, BooleanArray, ListArray,
StructArray,
};
use arrow::datatypes::{DataType, Field, Float64Type, Int64Type};
use arrow::error::ArrowError;
Expand Down Expand Up @@ -498,7 +498,7 @@ where
downcast_array(array_a),
downcast_array(array_b),
)?),
_ => arrow::compute::concat(&[array_a.clone(), array_b.clone()])?,
_ => arrow::compute::concat(&[array_a.as_ref(), array_b.as_ref()])?,
},
));
}
Expand Down Expand Up @@ -833,7 +833,7 @@ where

unsafe {
std::slice::from_raw_parts(
timestamps.raw_values().cast::<TimeInterval>(),
timestamps.values().as_ptr().cast::<TimeInterval>(),
number_of_time_intervals,
)
}
Expand Down Expand Up @@ -1021,62 +1021,11 @@ impl<CollectionType> Clone for FeatureCollection<CollectionType> {
}
}

impl<CollectionType> VectorDataTyped for FeatureCollection<CollectionType>
where
CollectionType: Geometry,
{
fn vector_data_type(&self) -> VectorDataType {
CollectionType::DATA_TYPE
}
}

impl<CollectionType> PartialEq for FeatureCollection<CollectionType>
where
CollectionType: Geometry + ArrowTyped,
{
#[allow(clippy::too_many_lines)] // TODO: split function
fn eq(&self, other: &Self) -> bool {
/// compares two `f64` typed columns
/// treats `f64::NAN` values as if they are equal
fn f64_column_equals(a: &Float64Array, b: &Float64Array) -> bool {
if (a.len() != b.len()) || (a.null_count() != b.null_count()) {
return false;
}
let number_of_values = a.len();

if a.null_count() == 0 {
let a_values: &[f64] = a.value_slice(0, number_of_values);
let b_values: &[f64] = a.value_slice(0, number_of_values);

for (&v1, &v2) in a_values.iter().zip(b_values) {
match (v1.is_nan(), v2.is_nan()) {
(true, true) => continue,
(false, false) if float_cmp::approx_eq!(f64, v1, v2) => continue,
_ => return false,
}
}
} else {
for i in 0..number_of_values {
match (a.is_null(i), b.is_null(i)) {
(true, true) => continue,
(false, false) => (), // need to compare values
_ => return false,
};

let v1: f64 = a.value(i);
let v2: f64 = b.value(i);

match (v1.is_nan(), v2.is_nan()) {
(true, true) => continue,
(false, false) if float_cmp::approx_eq!(f64, v1, v2) => continue,
_ => return false,
}
}
}

true
}

if self.types != other.types {
return false;
}
Expand All @@ -1091,129 +1040,24 @@ where
let c1 = self.table.column_by_name(key).expect("column must exist");
let c2 = other.table.column_by_name(key).expect("column must exist");

match (c1.data_type(), c2.data_type()) {
(DataType::Float64, DataType::Float64) => {
if !f64_column_equals(downcast_array(c1), downcast_array(c2)) {
return false;
}
}
(DataType::List(_), DataType::List(_)) => {
// TODO: remove special treatment for geometry types on next arrow version

match CollectionType::DATA_TYPE {
VectorDataType::Data => {}
VectorDataType::MultiPoint => {
if !c1.equals(c2.as_ref()) {
return false;
}
}
VectorDataType::MultiLineString => {
let c1_feature_offsets = c1.data();
let c2_feature_offsets = c2.data();
let c1_lines_offsets = c1_feature_offsets.child_data().first().unwrap();
let c2_lines_offsets = c2_feature_offsets.child_data().first().unwrap();
let c1_coordinates = c1_lines_offsets
.child_data()
.first()
.unwrap()
.child_data()
.first()
.unwrap();
let c2_coordinates = c2_lines_offsets
.child_data()
.first()
.unwrap()
.child_data()
.first()
.unwrap();

let feature_offsets_eq = || {
c1_feature_offsets.buffers()[0].data()
== c2_feature_offsets.buffers()[0].data()
};

let lines_offsets_eq = || {
c1_lines_offsets.buffers()[0].data()
== c2_lines_offsets.buffers()[0].data()
};

let coordinates_eq = || {
c1_coordinates.buffers()[0].data()
== c2_coordinates.buffers()[0].data()
};

if !feature_offsets_eq() || !lines_offsets_eq() || !coordinates_eq() {
return false;
}
}
VectorDataType::MultiPolygon => {
let c1_feature_offsets = c1.data();
let c2_feature_offsets = c2.data();
let c1_polygons_offsets =
c1_feature_offsets.child_data().first().unwrap();
let c2_polygons_offsets =
c2_feature_offsets.child_data().first().unwrap();
let c1_rings_offsets =
c1_polygons_offsets.child_data().first().unwrap();
let c2_rings_offsets =
c2_polygons_offsets.child_data().first().unwrap();
let c1_coordinates = c1_rings_offsets
.child_data()
.first()
.unwrap()
.child_data()
.first()
.unwrap();
let c2_coordinates = c2_rings_offsets
.child_data()
.first()
.unwrap()
.child_data()
.first()
.unwrap();

let feature_offsets_eq = || {
c1_feature_offsets.buffers()[0].data()
== c2_feature_offsets.buffers()[0].data()
};

let polygons_offsets_eq = || {
c1_polygons_offsets.buffers()[0].data()
== c2_polygons_offsets.buffers()[0].data()
};

let rings_offsets_eq = || {
c1_rings_offsets.buffers()[0].data()
== c2_rings_offsets.buffers()[0].data()
};

let coordinates_eq = || {
c1_coordinates.buffers()[0].data()
== c2_coordinates.buffers()[0].data()
};

if !feature_offsets_eq()
|| !polygons_offsets_eq()
|| !rings_offsets_eq()
|| !coordinates_eq()
{
return false;
}
}
}
}
_ => {
if !c1.equals(c2.as_ref()) {
return false;
}
}
if c1 != c2 {
return false;
}
}

true
}
}

impl<CollectionType> VectorDataTyped for FeatureCollection<CollectionType>
where
CollectionType: Geometry,
{
fn vector_data_type(&self) -> VectorDataType {
CollectionType::DATA_TYPE
}
}

/// This implements `IntoGeometryOptionsIterator` for `FeatureCollection`s that implement `IntoGeometryIterator`
impl<'i, CollectionType> IntoGeometryOptionsIterator<'i> for FeatureCollection<CollectionType>
where
Expand Down Expand Up @@ -1484,7 +1328,6 @@ mod tests {
}

#[test]
#[should_panic = "duplicate column"]
fn rename_columns_fails() {
let collection = DataCollection::from_data(
vec![],
Expand All @@ -1499,8 +1342,8 @@ mod tests {
)
.unwrap();

collection
assert!(collection
.rename_columns(&[("foo", "baz"), ("bar", "baz")])
.expect("duplicate column");
.is_err());
}
}
17 changes: 11 additions & 6 deletions datatypes/src/collections/feature_collection_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,18 @@ where
.builders
.values()
.map(|builder| {
let data_type_size = match builder.data_type() {
arrow::datatypes::DataType::Float64 => std::mem::size_of::<f64>(),
arrow::datatypes::DataType::Int64 => std::mem::size_of::<i64>(),
arrow::datatypes::DataType::UInt8 => std::mem::size_of::<u8>(),
arrow::datatypes::DataType::Utf8 => 0, // TODO: how to get this dynamic value
_ => unreachable!("This type is not an attribute type"),
let data_type_size = if builder.as_any().is::<Float64Builder>() {
std::mem::size_of::<f64>()
} else if builder.as_any().is::<Int64Builder>() {
std::mem::size_of::<i64>()
} else if builder.as_any().is::<UInt8Builder>() {
std::mem::size_of::<u8>()
} else if builder.as_any().is::<StringBuilder>() {
0 // TODO: how to get this dynamic value
} else {
unreachable!("This type is not an attribute type");
};

let values_size = builder.len() * data_type_size;
let null_size_estimate = builder.len() / 8;

Expand Down
Loading