diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 9374e226b87f..82b8ba166f14 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -1104,12 +1104,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { rep_levels_byte_len + def_levels_byte_len + values_data.buf.len(); // Data Page v2 compresses values only. - match self.compressor { + let is_compressed = match self.compressor { Some(ref mut cmpr) => { + let buffer_len = buffer.len(); cmpr.compress(&values_data.buf, &mut buffer)?; + if uncompressed_size <= buffer.len() - buffer_len { + buffer.truncate(buffer_len); + buffer.extend_from_slice(&values_data.buf); + false + } else { + true + } } - None => buffer.extend_from_slice(&values_data.buf), - } + None => { + buffer.extend_from_slice(&values_data.buf); + false + } + }; let data_page = Page::DataPageV2 { buf: buffer.into(), @@ -1119,7 +1130,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { num_rows: self.page_metrics.num_buffered_rows, def_levels_byte_len: def_levels_byte_len as u32, rep_levels_byte_len: rep_levels_byte_len as u32, - is_compressed: self.compressor.is_some(), + is_compressed, statistics: page_statistics, }; @@ -4236,4 +4247,33 @@ mod tests { .unwrap(); ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path) } + + #[test] + fn test_page_v2_snappy_compression_fallback() { + // Test that PageV2 sets is_compressed to false when Snappy compression increases data size + let page_writer = TestPageWriter {}; + + // Create WriterProperties with PageV2 and Snappy compression + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + // Disable dictionary to ensure data is written directly + .set_dictionary_enabled(false) + .set_compression(Compression::SNAPPY) + .build(); + + let mut column_writer = + get_test_column_writer::(Box::new(page_writer), 0, 0, Arc::new(props)); + + // Create small, simple data that Snappy compression will likely increase in size + // due to compression overhead for very small data + let values = vec![ByteArray::from("a")]; + + column_writer.write_batch(&values, None, None).unwrap(); + + let result = column_writer.close().unwrap(); + assert_eq!( + result.metadata.uncompressed_size(), + result.metadata.compressed_size() + ); + } }