Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions rust/arrow/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use std::fmt::{Debug, Formatter};
use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write};
use std::mem;
use std::ops::{BitAnd, BitOr, Not};
use std::slice::from_raw_parts;
#[cfg(feature = "simd")]
use std::slice::from_raw_parts_mut;
use std::slice::{from_raw_parts, from_raw_parts_mut};
use std::sync::Arc;

use crate::array::{BufferBuilderTrait, UInt8BufferBuilder};
Expand Down Expand Up @@ -68,14 +66,11 @@ struct BufferData {

impl PartialEq for BufferData {
fn eq(&self, other: &BufferData) -> bool {
if self.len != other.len {
return false;
}
if self.capacity != other.capacity {
return false;
}

unsafe { memory::memcmp(self.ptr, other.ptr, self.len) == 0 }
self.data() == other.data()
}
}

Expand All @@ -96,16 +91,22 @@ impl Debug for BufferData {
self.ptr, self.len, self.capacity
)?;

unsafe {
f.debug_list()
.entries(std::slice::from_raw_parts(self.ptr, self.len).iter())
.finish()?;
}
f.debug_list().entries(self.data().iter()).finish()?;

write!(f, " }}")
}
}

impl BufferData {
fn data(&self) -> &[u8] {
if self.ptr.is_null() {
&[]
} else {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
}
}
}

impl Buffer {
/// Creates a buffer from an existing memory region (must already be byte-aligned), this
/// `Buffer` will free this piece of memory when dropped.
Expand Down Expand Up @@ -194,13 +195,13 @@ impl Buffer {

/// Returns the byte slice stored in this buffer
pub fn data(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) }
&self.data.data()[self.offset..]
}

/// Returns a slice of this buffer, starting from `offset`.

This comment was marked as resolved.

pub fn slice(&self, offset: usize) -> Self {
assert!(
self.offset + offset <= self.len(),
offset <= self.len(),
"the offset of the new Buffer cannot exceed the existing length"
);
Self {
Expand Down Expand Up @@ -511,12 +512,20 @@ impl MutableBuffer {

/// Returns the data stored in this buffer as a slice.
pub fn data(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) }
if self.data.is_null() {
&[]
} else {
unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) }
}
}

/// Returns the data stored in this buffer as a mutable slice.
pub fn data_mut(&mut self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.raw_data() as *mut u8, self.len()) }
if self.data.is_null() {
&mut []
} else {
unsafe { std::slice::from_raw_parts_mut(self.raw_data_mut(), self.len()) }
}
}

/// Returns a raw pointer for this buffer.

This comment was marked as resolved.

Expand All @@ -527,6 +536,10 @@ impl MutableBuffer {
self.data
}

pub fn raw_data_mut(&mut self) -> *mut u8 {
self.data
}

/// Freezes this buffer and return an immutable version of it.
pub fn freeze(self) -> Buffer {
let buffer_data = BufferData {
Expand All @@ -541,6 +554,18 @@ impl MutableBuffer {
offset: 0,
}
}

/// View buffer as typed slice.
pub fn typed_data_mut<T: ArrowNativeType + num::Num>(&mut self) -> &mut [T] {
assert_eq!(self.len() % mem::size_of::<T>(), 0);
assert!(memory::is_ptr_aligned::<T>(self.raw_data() as *const T));
unsafe {
from_raw_parts_mut(
self.raw_data() as *mut T,

This comment was marked as resolved.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix has disappeared, I can't see it in the diff anymore 😕

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened ARROW-8479

self.len() / mem::size_of::<T>(),
)
}
}
}

impl Drop for MutableBuffer {
Expand Down Expand Up @@ -665,6 +690,7 @@ mod tests {
assert_eq!(empty_slice, buf4.data());
assert_eq!(0, buf4.len());
assert!(buf4.is_empty());
assert_eq!(buf2.slice(2).data(), &[10]);
}

#[test]
Expand Down
178 changes: 58 additions & 120 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ use std::cmp::{max, min};
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::mem::size_of;
use std::mem::transmute;
use std::rc::Rc;
use std::result::Result::Ok;
use std::slice::from_raw_parts_mut;
use std::sync::Arc;
use std::vec::Vec;

Expand Down Expand Up @@ -151,120 +149,63 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {

// convert to arrays
let array = match (&self.data_type, T::get_physical_type()) {
(ArrowType::Boolean, PhysicalType::BOOLEAN) => unsafe {
BoolConverter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<BoolType>,
>(&mut self.record_reader))
},
(ArrowType::Int8, PhysicalType::INT32) => unsafe {
Int8Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::Int16, PhysicalType::INT32) => unsafe {
Int16Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::Int32, PhysicalType::INT32) => unsafe {
Int32Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::UInt8, PhysicalType::INT32) => unsafe {
UInt8Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::UInt16, PhysicalType::INT32) => unsafe {
UInt16Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::UInt32, PhysicalType::INT32) => unsafe {
UInt32Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::Int64, PhysicalType::INT64) => unsafe {
Int64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::UInt64, PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Float32, PhysicalType::FLOAT) => unsafe {
Float32Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<FloatType>,
>(&mut self.record_reader))
},
(ArrowType::Float64, PhysicalType::DOUBLE) => unsafe {
Float64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<DoubleType>,
>(&mut self.record_reader))
},
(ArrowType::Timestamp(_, _), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Date32(_), PhysicalType::INT32) => unsafe {
UInt32Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::Date64(_), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Time32(_), PhysicalType::INT32) => unsafe {
UInt32Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::Time64(_), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => unsafe {
UInt32Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Duration(_), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Boolean, PhysicalType::BOOLEAN) => {
BoolConverter::convert(self.record_reader.cast::<BoolType>())
}
(ArrowType::Int8, PhysicalType::INT32) => {
Int8Converter::convert(self.record_reader.cast::<Int32Type>())
}
(ArrowType::Int16, PhysicalType::INT32) => {
Int16Converter::convert(self.record_reader.cast::<Int32Type>())
}
(ArrowType::Int32, PhysicalType::INT32) => {
Int32Converter::convert(self.record_reader.cast::<Int32Type>())
}
(ArrowType::UInt8, PhysicalType::INT32) => {
UInt8Converter::convert(self.record_reader.cast::<Int32Type>())
}
(ArrowType::UInt16, PhysicalType::INT32) => {
UInt16Converter::convert(self.record_reader.cast::<Int32Type>())
}
(ArrowType::UInt32, PhysicalType::INT32) => {
UInt32Converter::convert(self.record_reader.cast::<Int32Type>())
}
(ArrowType::Int64, PhysicalType::INT64) => {
Int64Converter::convert(self.record_reader.cast::<Int64Type>())
}
(ArrowType::UInt64, PhysicalType::INT64) => {
UInt64Converter::convert(self.record_reader.cast::<Int64Type>())
}
(ArrowType::Float32, PhysicalType::FLOAT) => {
Float32Converter::convert(self.record_reader.cast::<FloatType>())
}
(ArrowType::Float64, PhysicalType::DOUBLE) => {
Float64Converter::convert(self.record_reader.cast::<DoubleType>())
}
(ArrowType::Timestamp(_, _), PhysicalType::INT64) => {
UInt64Converter::convert(self.record_reader.cast::<Int64Type>())
}
(ArrowType::Date32(_), PhysicalType::INT32) => {
UInt32Converter::convert(self.record_reader.cast::<Int32Type>())
}
(ArrowType::Date64(_), PhysicalType::INT64) => {
UInt64Converter::convert(self.record_reader.cast::<Int64Type>())
}
(ArrowType::Time32(_), PhysicalType::INT32) => {
UInt32Converter::convert(self.record_reader.cast::<Int32Type>())
}
(ArrowType::Time64(_), PhysicalType::INT64) => {
UInt64Converter::convert(self.record_reader.cast::<Int64Type>())
}
(ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => {
UInt32Converter::convert(self.record_reader.cast::<Int32Type>())
}
(ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => {
UInt64Converter::convert(self.record_reader.cast::<Int64Type>())
}
(ArrowType::Duration(_), PhysicalType::INT64) => {
UInt64Converter::convert(self.record_reader.cast::<Int64Type>())
}
(arrow_type, physical_type) => Err(general_err!(
"Reading {:?} type from parquet {:?} is not supported yet.",
arrow_type,
Expand Down Expand Up @@ -562,10 +503,7 @@ impl ArrayReader for StructArrayReader {
let mut def_level_data_buffer = MutableBuffer::new(buffer_size);
def_level_data_buffer.resize(buffer_size)?;

let def_level_data = unsafe {
let ptr = transmute::<*const u8, *mut i16>(def_level_data_buffer.raw_data());
from_raw_parts_mut(ptr, children_array_len)
};
let def_level_data = def_level_data_buffer.typed_data_mut();

def_level_data
.iter_mut()
Expand Down
Loading