Skip to content

Commit

Permalink
Enable using Buffer<T> for Serialize/Deserialize (#98)
Browse files Browse the repository at this point in the history
* Enable using `Buffer<T>` for Serialize/Deserialize

* cargo fmt

* disable fancy doc things on windows

---------

Co-authored-by: John Hughes <[email protected]>
Co-authored-by: Clement Rey <[email protected]>
  • Loading branch information
3 people authored Feb 11, 2023
1 parent 82ce45f commit a270bb6
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 6 deletions.
8 changes: 8 additions & 0 deletions arrow2_convert/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@ trybuild = "1.0"

[dev-dependencies]
arrow2_convert_derive = { version = "0.4.0", path = "../arrow2_convert_derive" }
criterion = "0.4"

[features]
default = ["derive"]
derive = ["arrow2_convert_derive"]

[lib]
bench = false

[[bench]]
name = "bench"
harness = false
68 changes: 68 additions & 0 deletions arrow2_convert/benches/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use arrow2::{array::Array, buffer::Buffer};
use arrow2_convert::{
deserialize::TryIntoCollection, serialize::TryIntoArrow, ArrowDeserialize, ArrowField,
ArrowSerialize,
};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};

#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
#[arrow_field(transparent)]
pub struct BufStruct(Buffer<u16>);

#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
#[arrow_field(transparent)]
pub struct VecStruct(Vec<u16>);

pub fn bench_buffer_serialize(c: &mut Criterion) {
let mut group = c.benchmark_group("serialize");
for size in [1, 10, 100, 1000, 10000].iter() {
group.throughput(Throughput::Elements(*size as u64));
group.bench_with_input(BenchmarkId::new("Buffer", size), size, |b, &size| {
let data = [BufStruct((0..size as u16).into_iter().collect())];
b.iter(|| {
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
});
});
group.bench_with_input(BenchmarkId::new("Vec", size), size, |b, &size| {
let data = [VecStruct((0..size as u16).into_iter().collect())];
b.iter(|| {
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
});
});
}
}
pub fn bench_buffer_deserialize(c: &mut Criterion) {
let mut group = c.benchmark_group("deserialize");
for size in [1, 10, 100, 1000, 10000].iter() {
group.throughput(Throughput::Elements(*size as u64));
group.bench_with_input(BenchmarkId::new("Buffer", size), size, |b, &size| {
let data: Box<dyn Array> = [BufStruct((0..size as u16).into_iter().collect())]
.try_into_arrow()
.unwrap();
b.iter_batched(
|| data.clone(),
|data| {
let _: Vec<BufStruct> =
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
},
criterion::BatchSize::SmallInput,
)
});
group.bench_with_input(BenchmarkId::new("Vec", size), size, |b, &size| {
let data: Box<dyn Array> = [VecStruct((0..size as u16).into_iter().collect())]
.try_into_arrow()
.unwrap();
b.iter_batched(
|| data.clone(),
|data| {
let _: Vec<VecStruct> =
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
},
criterion::BatchSize::SmallInput,
);
});
}
}

criterion_group!(benches, bench_buffer_serialize, bench_buffer_deserialize);
criterion_main!(benches);
25 changes: 24 additions & 1 deletion arrow2_convert/src/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Implementation and traits for deserializing from Arrow.
use arrow2::array::*;
use arrow2::{array::*, buffer::Buffer, types::NativeType};
use chrono::{NaiveDate, NaiveDateTime};

use crate::field::*;
Expand Down Expand Up @@ -71,6 +71,7 @@ macro_rules! impl_arrow_array {
impl ArrowArray for $array {
type BaseArrayType = Self;

#[inline]
fn iter_from_array_ref(b: &dyn Array) -> <&Self as IntoIterator>::IntoIter {
b.as_any()
.downcast_ref::<Self::BaseArrayType>()
Expand Down Expand Up @@ -213,6 +214,28 @@ where
})
}

// Blanket implementation for Buffer
impl<T> ArrowDeserialize for Buffer<T>
where
T: ArrowDeserialize + NativeType + ArrowEnableVecForType,
for<'b> &'b <T as ArrowDeserialize>::ArrayType: IntoIterator,
{
type ArrayType = ListArray<i32>;

#[inline]
fn arrow_deserialize(
v: <&Self::ArrayType as IntoIterator>::Item,
) -> Option<<Self as ArrowField>::Type> {
v.map(|t| {
t.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap()
.values()
.clone()
})
}
}

// Blanket implementation for Vec
impl<T> ArrowDeserialize for Vec<T>
where
Expand Down
28 changes: 27 additions & 1 deletion arrow2_convert/src/field.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! Implementation and traits for mapping rust types to Arrow types
use arrow2::datatypes::{DataType, Field};
use arrow2::{
buffer::Buffer,
datatypes::{DataType, Field},
types::NativeType,
};
use chrono::{NaiveDate, NaiveDateTime};

/// Trait implemented by all types that can be used as an Arrow field.
Expand Down Expand Up @@ -169,6 +173,15 @@ impl ArrowField for NaiveDate {
}
}

impl ArrowField for Buffer<u8> {
type Type = Self;

#[inline]
fn data_type() -> arrow2::datatypes::DataType {
arrow2::datatypes::DataType::Binary
}
}

impl ArrowField for Vec<u8> {
type Type = Self;

Expand Down Expand Up @@ -202,6 +215,19 @@ impl<const SIZE: usize> ArrowField for FixedSizeBinary<SIZE> {
}
}

// Blanket implementation for Buffer
impl<T> ArrowField for Buffer<T>
where
T: ArrowField + NativeType + ArrowEnableVecForType,
{
type Type = Self;

#[inline]
fn data_type() -> DataType {
DataType::List(Box::new(<T as ArrowField>::field("item")))
}
}

// Blanket implementation for Vec.
impl<T> ArrowField for Vec<T>
where
Expand Down
4 changes: 2 additions & 2 deletions arrow2_convert/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![doc = include_str!("../README.md")]
#![cfg_attr(not(target_os = "windows"), doc = include_str!("../README.md"))]
#![deny(missing_docs)]
#![forbid(unsafe_code)]

Expand All @@ -14,6 +14,6 @@ pub mod serialize;
pub use arrow2_convert_derive::{ArrowDeserialize, ArrowField, ArrowSerialize};

// Test README with doctests
#[doc = include_str!("../README.md")]
#[cfg_attr(not(target_os = "windows"), doc = include_str!("../README.md"))]
#[cfg(doctest)]
struct ReadmeDoctests;
45 changes: 44 additions & 1 deletion arrow2_convert/src/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Implementation and traits for serializing to Arrow.
use arrow2::array::Array;
use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::types::NativeType;
use arrow2::{array::Array, buffer::Buffer};
use chrono::{NaiveDate, NaiveDateTime};
use std::sync::Arc;

Expand Down Expand Up @@ -178,6 +179,20 @@ impl ArrowSerialize for NaiveDate {
}
}

impl ArrowSerialize for Buffer<u8> {
type MutableArrayType = MutableBinaryArray<i32>;

#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::default()
}

#[inline]
fn arrow_serialize(v: &Self, array: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
array.try_push(Some(v.as_slice()))
}
}

impl ArrowSerialize for Vec<u8> {
type MutableArrayType = MutableBinaryArray<i32>;

Expand Down Expand Up @@ -226,6 +241,34 @@ impl<const SIZE: usize> ArrowSerialize for FixedSizeBinary<SIZE> {
}
}

// Blanket implementation for Buffer
impl<T> ArrowSerialize for Buffer<T>
where
T: NativeType + ArrowSerialize + ArrowEnableVecForType,
{
type MutableArrayType = MutableListArray<i32, MutablePrimitiveArray<T>>;

#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::new_with_field(
MutablePrimitiveArray::new(),
"item",
<T as ArrowField>::is_nullable(),
)
}

#[inline]
fn arrow_serialize(
v: &<Self as ArrowField>::Type,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
let values = array.mut_values();
values.reserve(v.len());
values.extend_from_slice(v.as_slice());
array.try_push_valid()
}
}

// Blanket implementation for Vec
impl<T> ArrowSerialize for Vec<T>
where
Expand Down
12 changes: 11 additions & 1 deletion arrow2_convert/tests/test_deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow2::array::*;
use arrow2::error::Result;
use arrow2::{array::*, buffer::Buffer};
use arrow2_convert::{deserialize::*, serialize::*, ArrowDeserialize, ArrowField, ArrowSerialize};

#[test]
Expand Down Expand Up @@ -77,3 +77,13 @@ fn test_deserialize_large_types_schema_mismatch_error() {
let result: Result<Vec<S2>> = arr1.try_into_collection();
assert!(result.is_err());
}

#[test]
fn test_deserialize_buffer() {
let original_array = [Buffer::from_iter(0u16..5), Buffer::from_iter(7..9)];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let iter = arrow_array_deserialize_iterator::<Buffer<u16>>(b.as_ref()).unwrap();
for (i, k) in iter.zip(original_array.iter()) {
assert_eq!(&i, k);
}
}
18 changes: 18 additions & 0 deletions arrow2_convert/tests/test_serialize.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow2::array::Array;
use arrow2::buffer::Buffer;
use arrow2::chunk::Chunk;
use arrow2_convert::field::{ArrowField, FixedSizeBinary};
use arrow2_convert::serialize::*;
Expand Down Expand Up @@ -69,6 +70,23 @@ fn test_array() {
assert_eq!(r.data_type(), &<Vec<u8> as ArrowField>::data_type());
}

#[test]
fn test_buffer() {
// Buffer<u8> and Vec<u8> should serialize into BinaryArray
let dat: Vec<Buffer<u8>> = vec![(0..10).into_iter().collect()];
let r: Box<dyn Array> = dat.try_into_arrow().unwrap();
assert_eq!(r.len(), 1);
assert_eq!(r.data_type(), &<Buffer<u8> as ArrowField>::data_type());
assert_eq!(r.data_type(), &<Vec<u8> as ArrowField>::data_type());

// Buffer<u16> and Vec<u16> should serialize into ListArray
let dat: Vec<Buffer<u16>> = vec![(0..10).into_iter().collect()];
let r: Box<dyn Array> = dat.try_into_arrow().unwrap();
assert_eq!(r.len(), 1);
assert_eq!(r.data_type(), &<Buffer<u16> as ArrowField>::data_type());
assert_eq!(r.data_type(), &<Vec<u16> as ArrowField>::data_type());
}

#[test]
fn test_field_serialize_error() {
pub struct CustomType(u64);
Expand Down

0 comments on commit a270bb6

Please sign in to comment.