Skip to content

Commit

Permalink
Support Parquet BYTE_STREAM_SPLIT for INT32, INT64, and FIXED_LEN_B…
Browse files Browse the repository at this point in the history
…YTE_ARRAY primitive types (#6159)

* add todos to help trace flow

* add support for byte_stream_split encoding for INT32 and INT64 data

* byte_stream_split encoding for fixed_len_byte_array

* revert changes to Decoder and add VariableWidthByteStreamSplitDecoder

* remove set_type_width as it is now unused

* begin implementing roundtrip test

* move test

* clean up some documentation

* add test of byte_stream_split with flba

* add check for and test of mismatched sizes

* remove type_length from Encoder and add VaribleWidthByteStreamSplitEncoder

* fix clippy error

* change type of argument to new()

* formatting

* add another test

* add variable to split/join streams for FLBA

* more informative error message

* avoid buffer copies in decoder per suggestion from review

* add roundtrip test

* optimized version...but clippy complains

* clippy was right...replace loop with copy_from_slice

* fix test

* optimize split_streams_variable for long type widths
  • Loading branch information
etseidl authored Aug 6, 2024
1 parent 7f2d9ac commit 2a4f269
Show file tree
Hide file tree
Showing 13 changed files with 743 additions and 71 deletions.
24 changes: 12 additions & 12 deletions parquet/benches/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,30 @@ fn bench_typed<T: DataType>(c: &mut Criterion, values: &[T::T], encoding: Encodi
std::any::type_name::<T::T>(),
encoding
);
let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
Arc::new(
Type::primitive_type_builder("", T::get_physical_type())
.build()
.unwrap(),
),
0,
0,
ColumnPath::new(vec![]),
));
c.bench_function(&format!("encoding: {}", name), |b| {
b.iter(|| {
let mut encoder = get_encoder::<T>(encoding).unwrap();
let mut encoder = get_encoder::<T>(encoding, &column_desc_ptr).unwrap();
encoder.put(values).unwrap();
encoder.flush_buffer().unwrap();
});
});

let mut encoder = get_encoder::<T>(encoding).unwrap();
let mut encoder = get_encoder::<T>(encoding, &column_desc_ptr).unwrap();
encoder.put(values).unwrap();
let encoded = encoder.flush_buffer().unwrap();
println!("{} encoded as {} bytes", name, encoded.len(),);

let mut buffer = vec![T::T::default(); values.len()];
let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
Arc::new(
Type::primitive_type_builder("", T::get_physical_type())
.build()
.unwrap(),
),
0,
0,
ColumnPath::new(vec![]),
));
c.bench_function(&format!("decoding: {}", name), |b| {
b.iter(|| {
let mut decoder: Box<dyn Decoder<T>> =
Expand Down
45 changes: 45 additions & 0 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ impl ColumnValueDecoder for ValueDecoder {
Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
decoder: DeltaByteArrayDecoder::new(data)?,
},
Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
buf: data,
offset: 0,
},
_ => {
return Err(general_err!(
"unsupported encoding for fixed length byte array: {}",
Expand Down Expand Up @@ -400,6 +404,20 @@ impl ColumnValueDecoder for ValueDecoder {
Ok(())
})
}
Decoder::ByteStreamSplit { buf, offset } => {
// we have n=`byte_length` streams of length `buf.len/byte_length`
// to read value i, we need the i'th byte from each of the streams
// so `offset` should be the value offset, not the byte offset
let total_values = buf.len() / self.byte_length;
let to_read = num_values.min(total_values - *offset);
out.buffer.reserve(to_read * self.byte_length);

// now read the n streams and reassemble values into the output buffer
read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length);

*offset += to_read;
Ok(to_read)
}
}
}

Expand All @@ -412,6 +430,32 @@ impl ColumnValueDecoder for ValueDecoder {
}
Decoder::Dict { decoder } => decoder.skip(num_values),
Decoder::Delta { decoder } => decoder.skip(num_values),
Decoder::ByteStreamSplit { offset, buf } => {
let total_values = buf.len() / self.byte_length;
let to_read = num_values.min(total_values - *offset);
*offset += to_read;
Ok(to_read)
}
}
}
}

// `src` is an array laid out like a NxM matrix where N == `data_width` and
// M == total_values_in_src. Each "row" of the matrix is a stream of bytes, with stream `i`
// containing the `ith` byte for each value. Each "column" is a single value.
// This will reassemble `num_values` values by reading columns of the matrix starting at
// `offset`. Values will be appended to `dst`.
fn read_byte_stream_split(
dst: &mut Vec<u8>,
src: &mut Bytes,
offset: usize,
num_values: usize,
data_width: usize,
) {
let stride = src.len() / data_width;
for i in 0..num_values {
for j in 0..data_width {
dst.push(src[offset + j * stride + i]);
}
}
}
Expand All @@ -420,6 +464,7 @@ enum Decoder {
Plain { buf: Bytes, offset: usize },
Dict { decoder: DictIndexDecoder },
Delta { decoder: DeltaByteArrayDecoder },
ByteStreamSplit { buf: Bytes, offset: usize },
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ pub fn utf8_column() -> ColumnDescPtr {

/// Encode `data` with the provided `encoding`
pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> Bytes {
let mut encoder = get_encoder::<ByteArrayType>(encoding).unwrap();
let desc = utf8_column();
let mut encoder = get_encoder::<ByteArrayType>(encoding, &desc).unwrap();

encoder.put(data).unwrap();
encoder.flush_buffer().unwrap()
Expand Down
82 changes: 82 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ mod tests {
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
Encoding::BYTE_STREAM_SPLIT,
],
);
run_single_column_reader_tests::<Int64Type, _, Int64Type>(
Expand All @@ -1070,6 +1071,7 @@ mod tests {
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
Encoding::BYTE_STREAM_SPLIT,
],
);
run_single_column_reader_tests::<FloatType, _, FloatType>(
Expand Down Expand Up @@ -1641,6 +1643,86 @@ mod tests {
assert_eq!(row_count, 300);
}

#[test]
fn test_read_extended_byte_stream_split() {
let path = format!(
"{}/byte_stream_split_extended.gzip.parquet",
arrow::util::test_util::parquet_test_data(),
);
let file = File::open(path).unwrap();
let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();

let mut row_count = 0;
for batch in record_reader {
let batch = batch.unwrap();
row_count += batch.num_rows();

// 0,1 are f16
let f16_col = batch.column(0).as_primitive::<Float16Type>();
let f16_bss = batch.column(1).as_primitive::<Float16Type>();
assert_eq!(f16_col.len(), f16_bss.len());
f16_col
.iter()
.zip(f16_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 2,3 are f32
let f32_col = batch.column(2).as_primitive::<Float32Type>();
let f32_bss = batch.column(3).as_primitive::<Float32Type>();
assert_eq!(f32_col.len(), f32_bss.len());
f32_col
.iter()
.zip(f32_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 4,5 are f64
let f64_col = batch.column(4).as_primitive::<Float64Type>();
let f64_bss = batch.column(5).as_primitive::<Float64Type>();
assert_eq!(f64_col.len(), f64_bss.len());
f64_col
.iter()
.zip(f64_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 6,7 are i32
let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
assert_eq!(i32_col.len(), i32_bss.len());
i32_col
.iter()
.zip(i32_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 8,9 are i64
let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
assert_eq!(i64_col.len(), i64_bss.len());
i64_col
.iter()
.zip(i64_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 10,11 are FLBA(5)
let flba_col = batch.column(10).as_fixed_size_binary();
let flba_bss = batch.column(11).as_fixed_size_binary();
assert_eq!(flba_col.len(), flba_bss.len());
flba_col
.iter()
.zip(flba_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 12,13 are FLBA(4) (decimal(7,3))
let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
assert_eq!(dec_col.len(), dec_bss.len());
dec_col
.iter()
.zip(dec_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
}
assert_eq!(row_count, 200);
}

#[test]
fn test_read_incorrect_map_schema_file() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand Down
6 changes: 5 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,11 @@ mod tests {
| DataType::UInt64
| DataType::UInt32
| DataType::UInt16
| DataType::UInt8 => vec![Encoding::PLAIN, Encoding::DELTA_BINARY_PACKED],
| DataType::UInt8 => vec![
Encoding::PLAIN,
Encoding::DELTA_BINARY_PACKED,
Encoding::BYTE_STREAM_SPLIT,
],
DataType::Float32 | DataType::Float64 => {
vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
}
Expand Down
1 change: 1 addition & 0 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
props
.encoding(descr.path())
.unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
descr,
)?;

let statistics_enabled = props.statistics_enabled(descr.path());
Expand Down
Loading

0 comments on commit 2a4f269

Please sign in to comment.