Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Parquet BYTE_STREAM_SPLIT for INT32, INT64, and FIXED_LEN_BYTE_ARRAY primitive types #6159

Merged
merged 24 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
71ae965
add todos to help trace flow
etseidl Jul 27, 2024
dddf8a0
add support for byte_stream_split encoding for INT32 and INT64 data
etseidl Jul 29, 2024
1c1af32
byte_stream_split encoding for fixed_len_byte_array
etseidl Jul 30, 2024
75ea319
revert changes to Decoder and add VariableWidthByteStreamSplitDecoder
etseidl Jul 30, 2024
7ce40ae
remove set_type_width as it is now unused
etseidl Jul 30, 2024
e7829c3
begin implementing roundtrip test
etseidl Jul 30, 2024
c0eb828
move test
etseidl Jul 30, 2024
fec8001
clean up some documentation
etseidl Aug 1, 2024
b9d4baf
add test of byte_stream_split with flba
etseidl Aug 1, 2024
f8ee320
add check for and test of mismatched sizes
etseidl Aug 1, 2024
29c5119
remove type_length from Encoder and add VaribleWidthByteStreamSplitEn…
etseidl Aug 1, 2024
3e598be
Merge remote-tracking branch 'origin/master' into bss
etseidl Aug 1, 2024
ef14c7d
fix clippy error
etseidl Aug 1, 2024
6513ffb
change type of argument to new()
etseidl Aug 1, 2024
3a650a7
formatting
etseidl Aug 1, 2024
c63a1ce
add another test
etseidl Aug 1, 2024
09f467d
add variable to split/join streams for FLBA
etseidl Aug 2, 2024
3fd6bc5
more informative error message
etseidl Aug 2, 2024
c6bb2ef
avoid buffer copies in decoder per suggestion from review
etseidl Aug 2, 2024
3f6d944
add roundtrip test
etseidl Aug 5, 2024
97d159b
optimized version...but clippy complains
etseidl Aug 5, 2024
340eab4
clippy was right...replace loop with copy_from_slice
etseidl Aug 5, 2024
b2d90ce
fix test
etseidl Aug 5, 2024
104b72e
optimize split_streams_variable for long type widths
etseidl Aug 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions parquet/benches/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,30 @@ fn bench_typed<T: DataType>(c: &mut Criterion, values: &[T::T], encoding: Encodi
std::any::type_name::<T::T>(),
encoding
);
let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
Arc::new(
Type::primitive_type_builder("", T::get_physical_type())
.build()
.unwrap(),
),
0,
0,
ColumnPath::new(vec![]),
));
c.bench_function(&format!("encoding: {}", name), |b| {
b.iter(|| {
let mut encoder = get_encoder::<T>(encoding).unwrap();
let mut encoder = get_encoder::<T>(encoding, &column_desc_ptr).unwrap();
encoder.put(values).unwrap();
encoder.flush_buffer().unwrap();
});
});

let mut encoder = get_encoder::<T>(encoding).unwrap();
let mut encoder = get_encoder::<T>(encoding, &column_desc_ptr).unwrap();
encoder.put(values).unwrap();
let encoded = encoder.flush_buffer().unwrap();
println!("{} encoded as {} bytes", name, encoded.len(),);

let mut buffer = vec![T::T::default(); values.len()];
let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
Arc::new(
Type::primitive_type_builder("", T::get_physical_type())
.build()
.unwrap(),
),
0,
0,
ColumnPath::new(vec![]),
));
c.bench_function(&format!("decoding: {}", name), |b| {
b.iter(|| {
let mut decoder: Box<dyn Decoder<T>> =
Expand Down
45 changes: 45 additions & 0 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ impl ColumnValueDecoder for ValueDecoder {
Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
decoder: DeltaByteArrayDecoder::new(data)?,
},
Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
buf: data,
offset: 0,
},
_ => {
return Err(general_err!(
"unsupported encoding for fixed length byte array: {}",
Expand Down Expand Up @@ -400,6 +404,20 @@ impl ColumnValueDecoder for ValueDecoder {
Ok(())
})
}
Decoder::ByteStreamSplit { buf, offset } => {
// we have n=`byte_length` streams of length `buf.len/byte_length`
// to read value i, we need the i'th byte from each of the streams
// so `offset` should be the value offset, not the byte offset
let total_values = buf.len() / self.byte_length;
let to_read = num_values.min(total_values - *offset);
out.buffer.reserve(to_read * self.byte_length);

// now read the n streams and reassemble values into the output buffer
read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length);

*offset += to_read;
Ok(to_read)
}
}
}

Expand All @@ -412,6 +430,32 @@ impl ColumnValueDecoder for ValueDecoder {
}
Decoder::Dict { decoder } => decoder.skip(num_values),
Decoder::Delta { decoder } => decoder.skip(num_values),
Decoder::ByteStreamSplit { offset, buf } => {
let total_values = buf.len() / self.byte_length;
let to_read = num_values.min(total_values - *offset);
*offset += to_read;
Ok(to_read)
}
}
}
}

// `src` is an array laid out like a NxM matrix where N == `data_width` and
// M == total_values_in_src. Each "row" of the matrix is a stream of bytes, with stream `i`
// containing the `ith` byte for each value. Each "column" is a single value.
// This will reassemble `num_values` values by reading columns of the matrix starting at
// `offset`. Values will be appended to `dst`.
fn read_byte_stream_split(
dst: &mut Vec<u8>,
src: &mut Bytes,
offset: usize,
num_values: usize,
data_width: usize,
) {
let stride = src.len() / data_width;
for i in 0..num_values {
for j in 0..data_width {
dst.push(src[offset + j * stride + i]);
}
}
}
Expand All @@ -420,6 +464,7 @@ enum Decoder {
Plain { buf: Bytes, offset: usize },
Dict { decoder: DictIndexDecoder },
Delta { decoder: DeltaByteArrayDecoder },
ByteStreamSplit { buf: Bytes, offset: usize },
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ pub fn utf8_column() -> ColumnDescPtr {

/// Encode `data` with the provided `encoding`
pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> Bytes {
let mut encoder = get_encoder::<ByteArrayType>(encoding).unwrap();
let desc = utf8_column();
let mut encoder = get_encoder::<ByteArrayType>(encoding, &desc).unwrap();

encoder.put(data).unwrap();
encoder.flush_buffer().unwrap()
Expand Down
82 changes: 82 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ mod tests {
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
Encoding::BYTE_STREAM_SPLIT,
],
);
run_single_column_reader_tests::<Int64Type, _, Int64Type>(
Expand All @@ -1070,6 +1071,7 @@ mod tests {
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
Encoding::BYTE_STREAM_SPLIT,
],
);
run_single_column_reader_tests::<FloatType, _, FloatType>(
Expand Down Expand Up @@ -1641,6 +1643,86 @@ mod tests {
assert_eq!(row_count, 300);
}

#[test]
fn test_read_extended_byte_stream_split() {
let path = format!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I see here we did have the test 👍

Copy link
Contributor Author

@etseidl etseidl Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that tests one path, but this bypasses the BSS decoder in encodings::decoding::byte_stream_split_decoder. parquet-read exercises that path, so I hope to recreate that path (goes through serialized file reader) in an additional test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this test implements the suggestion from https://github.com/apache/parquet-testing/blob/master/data/README.md#additional-types

To check conformance of a BYTE_STREAM_SPLIT decoder, read each BYTE_STREAM_SPLIT-encoded column and compare the decoded values against the values from the corresponding PLAIN-encoded column. The values should be equal.

However, when I double checked the vaues with what pyarrow python says they didn't seem to match 🤔

I printed out the f16 column:

f16_col: PrimitiveArray<Float16>
[
  10.3046875,
  8.9609375,
  10.75,
  10.9375,
  8.046875,
  8.6953125,
  10.125,
  9.6875,
  9.984375,
  9.1484375,
  ...108 elements...,
  11.6015625,
  9.7578125,
  8.9765625,
  10.1796875,
  10.21875,
  11.359375,
  10.8359375,
  10.359375,
  11.4609375,
  8.8125,
]
f32_col: PrimitiveArray<Float32>
[
  8.827992,
  9.48172,
  11.511229,
  10.637534,
  9.301069,
  8.986282,
  10.032783,
  8.78344,
  9.328859,
  10.31201,
  ...52 elements...,
  7.6898966,
  10.054354,
  9.528224,
  10.459386,
  10.701954,
  10.138242,
  10.760133,
  10.229212,
  10.530065,
  9.295327,
]

Here is what python told me:

Python 3.11.9 (main, Apr  2 2024, 08:25:04) [Clang 15.0.0 (clang-1500.3.9.4)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow.parquet as pq
>>> table = pq.read_table('byte_stream_split_extended.gzip.parquet')
>>> table
pyarrow.Table
float16_plain: halffloat
float16_byte_stream_split: halffloat
float_plain: float
float_byte_stream_split: float
double_plain: double
double_byte_stream_split: double
int32_plain: int32
int32_byte_stream_split: int32
int64_plain: int64
int64_byte_stream_split: int64
flba5_plain: fixed_size_binary[5]
flba5_byte_stream_split: fixed_size_binary[5]
decimal_plain: decimal128(7, 3)
decimal_byte_stream_split: decimal128(7, 3)
----
float16_plain: [[18727,18555,18784,18808,18438,...,18573,18770,18637,18687,18667]]
float16_byte_stream_split: [[18727,18555,18784,18808,18438,...,18573,18770,18637,18687,18667]]
float_plain: [[10.337575,11.407482,10.090585,10.643939,7.9498277,...,10.138242,10.760133,10.229212,10.530065,9.295327]]
float_byte_stream_split: [[10.337575,11.407482,10.090585,10.643939,7.9498277,...,10.138242,10.760133,10.229212,10.530065,9.295327]]
double_plain: [[9.82038858616854,10.196776096656958,10.820528475417419,9.606258827775427,10.521167255732113,...,9.576393393539162,9.47941158714459,10.812601287753644,10.241659719820838,8.225037940357872]]
double_byte_stream_split: [[9.82038858616854,10.196776096656958,10.820528475417419,9.606258827775427,10.521167255732113,...,9.576393393539162,9.47941158714459,10.812601287753644,10.241659719820838,8.225037940357872]]
int32_plain: [[24191,41157,7403,79368,64983,...,3584,93802,95977,73925,10300]]
int32_byte_stream_split: [[24191,41157,7403,79368,64983,...,3584,93802,95977,73925,10300]]
int64_plain: [[293650000000,41079000000,51248000000,246066000000,572141000000,...,294755000000,343501000000,663621000000,976709000000,836245000000]]
int64_byte_stream_split: [[293650000000,41079000000,51248000000,246066000000,572141000000,...,294755000000,343501000000,663621000000,976709000000,836245000000]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pyarrow output looks like my parquet-read output (with the exception of the f16 columns). I'm not sure what happened with the f32_col above, but I did find those values further down in the output. Weird batching?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that the existing, non BSS columns (not changes by this PR) come back the same gives me confidence that the code is doing the right thing. I just found it straange that python seemed to give me a different result

"{}/byte_stream_split_extended.gzip.parquet",
arrow::util::test_util::parquet_test_data(),
);
let file = File::open(path).unwrap();
let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();

let mut row_count = 0;
for batch in record_reader {
let batch = batch.unwrap();
row_count += batch.num_rows();

// 0,1 are f16
let f16_col = batch.column(0).as_primitive::<Float16Type>();
let f16_bss = batch.column(1).as_primitive::<Float16Type>();
assert_eq!(f16_col.len(), f16_bss.len());
f16_col
.iter()
.zip(f16_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 2,3 are f32
let f32_col = batch.column(2).as_primitive::<Float32Type>();
let f32_bss = batch.column(3).as_primitive::<Float32Type>();
assert_eq!(f32_col.len(), f32_bss.len());
f32_col
.iter()
.zip(f32_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 4,5 are f64
let f64_col = batch.column(4).as_primitive::<Float64Type>();
let f64_bss = batch.column(5).as_primitive::<Float64Type>();
assert_eq!(f64_col.len(), f64_bss.len());
f64_col
.iter()
.zip(f64_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 6,7 are i32
let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
assert_eq!(i32_col.len(), i32_bss.len());
i32_col
.iter()
.zip(i32_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 8,9 are i64
let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
assert_eq!(i64_col.len(), i64_bss.len());
i64_col
.iter()
.zip(i64_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 10,11 are FLBA(5)
let flba_col = batch.column(10).as_fixed_size_binary();
let flba_bss = batch.column(11).as_fixed_size_binary();
assert_eq!(flba_col.len(), flba_bss.len());
flba_col
.iter()
.zip(flba_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));

// 12,13 are FLBA(4) (decimal(7,3))
let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
assert_eq!(dec_col.len(), dec_bss.len());
dec_col
.iter()
.zip(dec_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
}
assert_eq!(row_count, 200);
}

#[test]
fn test_read_incorrect_map_schema_file() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand Down
6 changes: 5 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,11 @@ mod tests {
| DataType::UInt64
| DataType::UInt32
| DataType::UInt16
| DataType::UInt8 => vec![Encoding::PLAIN, Encoding::DELTA_BINARY_PACKED],
| DataType::UInt8 => vec![
Encoding::PLAIN,
Encoding::DELTA_BINARY_PACKED,
Encoding::BYTE_STREAM_SPLIT,
],
DataType::Float32 | DataType::Float64 => {
vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
}
Expand Down
1 change: 1 addition & 0 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
props
.encoding(descr.path())
.unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
descr,
)?;

let statistics_enabled = props.statistics_enabled(descr.path());
Expand Down
Loading
Loading