diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 49910a3ee2b0..82a106ce49c1 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -20,7 +20,7 @@ use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll}; use crate::{error::Result, FlightData, FlightDescriptor, SchemaAsIpc}; use arrow_array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, UnionArray}; -use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, UnionMode}; use bytes::Bytes; @@ -647,6 +647,7 @@ struct FlightIpcEncoder { options: IpcWriteOptions, data_gen: IpcDataGenerator, dictionary_tracker: DictionaryTracker, + compression_context: CompressionContext, } impl FlightIpcEncoder { @@ -655,6 +656,7 @@ impl FlightIpcEncoder { options, data_gen: IpcDataGenerator::default(), dictionary_tracker: DictionaryTracker::new(error_on_replacement), + compression_context: CompressionContext::default(), } } @@ -666,9 +668,12 @@ impl FlightIpcEncoder { /// Convert a `RecordBatch` to a Vec of `FlightData` representing /// dictionaries and a `FlightData` representing the batch fn encode_batch(&mut self, batch: &RecordBatch) -> Result<(Vec, FlightData)> { - let (encoded_dictionaries, encoded_batch) = - self.data_gen - .encoded_batch(batch, &mut self.dictionary_tracker, &self.options)?; + let (encoded_dictionaries, encoded_batch) = self.data_gen.encode( + batch, + &mut self.dictionary_tracker, + &self.options, + &mut self.compression_context, + )?; let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); let flight_batch = encoded_batch.into(); @@ -1596,9 +1601,15 @@ mod tests { ) -> (Vec, FlightData) { let data_gen = IpcDataGenerator::default(); let mut dictionary_tracker = DictionaryTracker::new(false); + let mut compression_context = CompressionContext::default(); let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, &mut dictionary_tracker, options) + .encode( + batch, + &mut dictionary_tracker, + options, + &mut compression_context, + ) .expect("DictionaryTracker configured above to not error on replacement"); let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index a304aedcfaee..6effb5f86aaf 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; use arrow_ipc::convert::fb_to_schema; +use arrow_ipc::writer::CompressionContext; use arrow_ipc::{reader, root_as_message, writer, writer::IpcWriteOptions}; use arrow_schema::{ArrowError, Schema, SchemaRef}; @@ -91,10 +92,15 @@ pub fn batches_to_flight_data( let data_gen = writer::IpcDataGenerator::default(); let mut dictionary_tracker = writer::DictionaryTracker::new(false); + let mut compression_context = CompressionContext::default(); for batch in batches.iter() { - let (encoded_dictionaries, encoded_batch) = - data_gen.encoded_batch(batch, &mut dictionary_tracker, &options)?; + let (encoded_dictionaries, encoded_batch) = data_gen.encode( + batch, + &mut dictionary_tracker, + &options, + &mut compression_context, + )?; dictionaries.extend(encoded_dictionaries.into_iter().map(Into::into)); flight_data.push(encoded_batch.into()); diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index bd41ab602ee5..4f4f29cc3d2a 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -24,7 +24,10 @@ use arrow::{ array::ArrayRef, buffer::Buffer, datatypes::SchemaRef, - ipc::{self, reader, writer}, + ipc::{ + self, reader, + writer::{self, CompressionContext}, + }, record_batch::RecordBatch, }; use arrow_flight::{ @@ -90,6 +93,8 @@ async fn upload_data( let mut original_data_iter = original_data.iter().enumerate(); + let mut compression_context = CompressionContext::default(); + if let Some((counter, first_batch)) = original_data_iter.next() { let metadata = counter.to_string().into_bytes(); // Preload the first batch into the channel before starting the request @@ -99,6 +104,7 @@ async fn upload_data( first_batch, &options, &mut dict_tracker, + &mut compression_context, ) .await?; @@ -121,6 +127,7 @@ async fn upload_data( batch, &options, &mut dict_tracker, + &mut compression_context, ) .await?; @@ -150,11 +157,12 @@ async fn send_batch( batch: &RecordBatch, options: &writer::IpcWriteOptions, dictionary_tracker: &mut writer::DictionaryTracker, + compression_context: &mut CompressionContext, ) -> Result { let data_gen = writer::IpcDataGenerator::default(); let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, dictionary_tracker, options) + .encode(batch, dictionary_tracker, options, compression_context) .expect("DictionaryTracker configured above to not error on replacement"); let dictionary_flight_data: Vec = diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index d608a4753723..9faced000366 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -144,7 +144,12 @@ impl FlightService for FlightServiceImpl { .enumerate() .flat_map(|(counter, batch)| { let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, &mut dictionary_tracker, &options) + .encode( + batch, + &mut dictionary_tracker, + &options, + &mut Default::default(), + ) .expect("DictionaryTracker configured above to not error on replacement"); let dictionary_flight_data = encoded_dictionaries.into_iter().map(Into::into); diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index 47ea7785cbec..9bbc6e752c12 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -22,6 +22,41 @@ use arrow_schema::ArrowError; const LENGTH_NO_COMPRESSED_DATA: i64 = -1; const LENGTH_OF_PREFIX_DATA: i64 = 8; +/// Additional context that may be needed for compression. +/// +/// In the case of zstd, this will contain the zstd context, which can be reused between subsequent +/// compression calls to avoid the performance overhead of initialising a new context for every +/// compression. +pub struct CompressionContext { + #[cfg(feature = "zstd")] + compressor: zstd::bulk::Compressor<'static>, +} + +// the reason we allow derivable_impls here is because when zstd feature is not enabled, this +// becomes derivable. however with zstd feature want to be explicit about the compression level. +#[allow(clippy::derivable_impls)] +impl Default for CompressionContext { + fn default() -> Self { + CompressionContext { + // safety: `new` here will only return error here if using an invalid compression level + #[cfg(feature = "zstd")] + compressor: zstd::bulk::Compressor::new(zstd::DEFAULT_COMPRESSION_LEVEL) + .expect("can use default compression level"), + } + } +} + +impl std::fmt::Debug for CompressionContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("CompressionContext"); + + #[cfg(feature = "zstd")] + ds.field("compressor", &"zstd::bulk::Compressor"); + + ds.finish() + } +} + /// Represents compressing a ipc stream using a particular compression algorithm #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CompressionCodec { @@ -58,6 +93,7 @@ impl CompressionCodec { &self, input: &[u8], output: &mut Vec, + context: &mut CompressionContext, ) -> Result { let uncompressed_data_len = input.len(); let original_output_len = output.len(); @@ -67,7 +103,7 @@ impl CompressionCodec { } else { // write compressed data directly into the output buffer output.extend_from_slice(&uncompressed_data_len.to_le_bytes()); - self.compress(input, output)?; + self.compress(input, output, context)?; let compression_len = output.len() - original_output_len; if compression_len > uncompressed_data_len { @@ -115,10 +151,15 @@ impl CompressionCodec { /// Compress the data in input buffer and write to output buffer /// using the specified compression - fn compress(&self, input: &[u8], output: &mut Vec) -> Result<(), ArrowError> { + fn compress( + &self, + input: &[u8], + output: &mut Vec, + context: &mut CompressionContext, + ) -> Result<(), ArrowError> { match self { CompressionCodec::Lz4Frame => compress_lz4(input, output), - CompressionCodec::Zstd => compress_zstd(input, output), + CompressionCodec::Zstd => compress_zstd(input, output, context), } } @@ -175,17 +216,23 @@ fn decompress_lz4(_input: &[u8], _decompressed_size: usize) -> Result, A } #[cfg(feature = "zstd")] -fn compress_zstd(input: &[u8], output: &mut Vec) -> Result<(), ArrowError> { - use std::io::Write; - let mut encoder = zstd::Encoder::new(output, 0)?; - encoder.write_all(input)?; - encoder.finish()?; +fn compress_zstd( + input: &[u8], + output: &mut Vec, + context: &mut CompressionContext, +) -> Result<(), ArrowError> { + let result = context.compressor.compress(input)?; + output.extend_from_slice(&result); Ok(()) } #[cfg(not(feature = "zstd"))] #[allow(clippy::ptr_arg)] -fn compress_zstd(_input: &[u8], _output: &mut Vec) -> Result<(), ArrowError> { +fn compress_zstd( + _input: &[u8], + _output: &mut Vec, + _context: &mut CompressionContext, +) -> Result<(), ArrowError> { Err(ArrowError::InvalidArgumentError( "zstd IPC compression requires the zstd feature".to_string(), )) @@ -227,7 +274,9 @@ mod tests { let input_bytes = b"hello lz4"; let codec = super::CompressionCodec::Lz4Frame; let mut output_bytes: Vec = Vec::new(); - codec.compress(input_bytes, &mut output_bytes).unwrap(); + codec + .compress(input_bytes, &mut output_bytes, &mut Default::default()) + .unwrap(); let result = codec .decompress(output_bytes.as_slice(), input_bytes.len()) .unwrap(); @@ -240,7 +289,9 @@ mod tests { let input_bytes = b"hello zstd"; let codec = super::CompressionCodec::Zstd; let mut output_bytes: Vec = Vec::new(); - codec.compress(input_bytes, &mut output_bytes).unwrap(); + codec + .compress(input_bytes, &mut output_bytes, &mut Default::default()) + .unwrap(); let result = codec .decompress(output_bytes.as_slice(), input_bytes.len()) .unwrap(); diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index dfb9f3f75d8f..7702c814e8d3 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -2702,7 +2702,12 @@ mod tests { let gen = IpcDataGenerator {}; let mut dict_tracker = DictionaryTracker::new(false); let (_, encoded) = gen - .encoded_batch(&batch, &mut dict_tracker, &Default::default()) + .encode( + &batch, + &mut dict_tracker, + &Default::default(), + &mut Default::default(), + ) .unwrap(); let message = root_as_message(&encoded.ipc_message).unwrap(); @@ -2740,7 +2745,12 @@ mod tests { let gen = IpcDataGenerator {}; let mut dict_tracker = DictionaryTracker::new(false); let (_, encoded) = gen - .encoded_batch(&batch, &mut dict_tracker, &Default::default()) + .encode( + &batch, + &mut dict_tracker, + &Default::default(), + &mut Default::default(), + ) .unwrap(); let message = root_as_message(&encoded.ipc_message).unwrap(); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 59a1a3c0a190..ed05998ad106 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -42,6 +42,7 @@ use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec}; use arrow_schema::*; use crate::compression::CompressionCodec; +pub use crate::compression::CompressionContext; use crate::convert::IpcSchemaEncoder; use crate::CONTINUATION_MARKER; @@ -167,7 +168,7 @@ impl Default for IpcWriteOptions { /// # use std::sync::Arc; /// # use arrow_array::UInt64Array; /// # use arrow_array::RecordBatch; -/// # use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; /// /// // Create a record batch /// let batch = RecordBatch::try_from_iter(vec![ @@ -179,11 +180,13 @@ impl Default for IpcWriteOptions { /// let options = IpcWriteOptions::default(); /// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); /// +/// let mut compression_context = CompressionContext::default(); +/// /// // encode the batch into zero or more encoded dictionaries /// // and the data for the actual array. /// let data_gen = IpcDataGenerator::default(); /// let (encoded_dictionaries, encoded_message) = data_gen -/// .encoded_batch(&batch, &mut dictionary_tracker, &options) +/// .encode(&batch, &mut dictionary_tracker, &options, &mut compression_context) /// .unwrap(); /// # } /// ``` @@ -231,6 +234,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id: &mut I, + compression_context: &mut CompressionContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Struct(fields) => { @@ -243,6 +247,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } } @@ -264,6 +269,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::List(field) => { @@ -275,6 +281,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::LargeList(field) => { @@ -286,6 +293,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::FixedSizeList(field, _) => { @@ -300,6 +308,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::Map(field, _) => { @@ -318,6 +327,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; // values @@ -328,6 +338,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::Union(fields, _) => { @@ -341,6 +352,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } } @@ -350,6 +362,7 @@ impl IpcDataGenerator { Ok(()) } + #[allow(clippy::too_many_arguments)] fn encode_dictionaries>( &self, field: &Field, @@ -358,6 +371,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id_seq: &mut I, + compression_context: &mut CompressionContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Dictionary(_key_type, _value_type) => { @@ -372,6 +386,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id_seq, + compression_context, )?; // It's important to only take the dict_id at this point, because the dict ID @@ -393,6 +408,7 @@ impl IpcDataGenerator { dict_values, write_options, false, + compression_context, )?); } DictionaryUpdate::Delta(data) => { @@ -401,6 +417,7 @@ impl IpcDataGenerator { &data, write_options, true, + compression_context, )?); } } @@ -411,6 +428,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id_seq, + compression_context, )?, } @@ -420,11 +438,12 @@ impl IpcDataGenerator { /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch). /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s (so they are only sent once) /// Make sure the [DictionaryTracker] is initialized at the start of the stream. - pub fn encoded_batch( + pub fn encode( &self, batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, ) -> Result<(Vec, EncodedData), ArrowError> { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); @@ -440,19 +459,40 @@ impl IpcDataGenerator { dictionary_tracker, write_options, &mut dict_id, + compression_context, )?; } - let encoded_message = self.record_batch_to_bytes(batch, write_options)?; + let encoded_message = + self.record_batch_to_bytes(batch, write_options, compression_context)?; Ok((encoded_dictionaries, encoded_message)) } + /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch). + /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s (so they are only sent once) + /// Make sure the [DictionaryTracker] is initialized at the start of the stream. + #[deprecated(since = "57.0.0", note = "Use `encode` instead")] + pub fn encoded_batch( + &self, + batch: &RecordBatch, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + ) -> Result<(Vec, EncodedData), ArrowError> { + self.encode( + batch, + dictionary_tracker, + write_options, + &mut Default::default(), + ) + } + /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the /// other for the batch's data fn record_batch_to_bytes( &self, batch: &RecordBatch, write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, ) -> Result { let mut fbb = FlatBufferBuilder::new(); @@ -487,6 +527,7 @@ impl IpcDataGenerator { array.len(), array.null_count(), compression_codec, + compression_context, write_options, )?; @@ -545,6 +586,7 @@ impl IpcDataGenerator { array_data: &ArrayData, write_options: &IpcWriteOptions, is_delta: bool, + compression_context: &mut CompressionContext, ) -> Result { let mut fbb = FlatBufferBuilder::new(); @@ -575,6 +617,7 @@ impl IpcDataGenerator { array_data.len(), array_data.null_count(), compression_codec, + compression_context, write_options, )?; @@ -1008,6 +1051,8 @@ pub struct FileWriter { custom_metadata: HashMap, data_gen: IpcDataGenerator, + + compression_context: CompressionContext, } impl FileWriter> { @@ -1069,6 +1114,7 @@ impl FileWriter { dictionary_tracker, custom_metadata: HashMap::new(), data_gen, + compression_context: CompressionContext::default(), }) } @@ -1085,10 +1131,11 @@ impl FileWriter { )); } - let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch( + let (encoded_dictionaries, encoded_message) = self.data_gen.encode( batch, &mut self.dictionary_tracker, &self.write_options, + &mut self.compression_context, )?; for encoded_dictionary in encoded_dictionaries { @@ -1293,6 +1340,8 @@ pub struct StreamWriter { dictionary_tracker: DictionaryTracker, data_gen: IpcDataGenerator, + + compression_context: CompressionContext, } impl StreamWriter> { @@ -1343,6 +1392,7 @@ impl StreamWriter { finished: false, dictionary_tracker, data_gen, + compression_context: CompressionContext::default(), }) } @@ -1356,7 +1406,12 @@ impl StreamWriter { let (encoded_dictionaries, encoded_message) = self .data_gen - .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options) + .encode( + batch, + &mut self.dictionary_tracker, + &self.write_options, + &mut self.compression_context, + ) .expect("StreamWriter is configured to not error on dictionary replacement"); for encoded_dictionary in encoded_dictionaries { @@ -1667,6 +1722,7 @@ fn write_array_data( num_rows: usize, null_count: usize, compression_codec: Option, + compression_context: &mut CompressionContext, write_options: &IpcWriteOptions, ) -> Result { let mut offset = offset; @@ -1696,6 +1752,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1710,6 +1767,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1727,6 +1785,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1739,6 +1798,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1771,6 +1831,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } else if matches!(data_type, DataType::Boolean) { @@ -1786,6 +1847,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } else if matches!( @@ -1808,6 +1870,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; offset = write_array_data( @@ -1819,6 +1882,7 @@ fn write_array_data( sliced_child_data.len(), sliced_child_data.null_count(), compression_codec, + compression_context, write_options, )?; return Ok(offset); @@ -1839,6 +1903,7 @@ fn write_array_data( child_data.len(), child_data.null_count(), compression_codec, + compression_context, write_options, )?; return Ok(offset); @@ -1850,6 +1915,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1872,6 +1938,7 @@ fn write_array_data( data_ref.len(), data_ref.null_count(), compression_codec, + compression_context, write_options, )?; } @@ -1889,6 +1956,7 @@ fn write_array_data( data_ref.len(), data_ref.null_count(), compression_codec, + compression_context, write_options, )?; } @@ -1915,10 +1983,11 @@ fn write_buffer( arrow_data: &mut Vec, // output stream offset: i64, // current output stream offset compression_codec: Option, + compression_context: &mut CompressionContext, alignment: u8, ) -> Result { let len: i64 = match compression_codec { - Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?, + Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?, None => { arrow_data.extend_from_slice(buffer); buffer.len() @@ -2250,7 +2319,7 @@ mod tests { false, )])); - let gen = IpcDataGenerator {}; + let gen = IpcDataGenerator::default(); let mut dict_tracker = DictionaryTracker::new(false); gen.schema_to_bytes_with_dictionary_tracker( &schema, @@ -2260,8 +2329,13 @@ mod tests { let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap(); - gen.encoded_batch(&batch, &mut dict_tracker, &Default::default()) - .unwrap(); + gen.encode( + &batch, + &mut dict_tracker, + &Default::default(), + &mut Default::default(), + ) + .unwrap(); // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema // so we expect the dict will be keyed to 0 @@ -2293,7 +2367,7 @@ mod tests { false, )])); - let gen = IpcDataGenerator {}; + let gen = IpcDataGenerator::default(); let mut dict_tracker = DictionaryTracker::new(false); gen.schema_to_bytes_with_dictionary_tracker( &schema, @@ -2303,8 +2377,13 @@ mod tests { let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap(); - gen.encoded_batch(&batch, &mut dict_tracker, &Default::default()) - .unwrap(); + gen.encode( + &batch, + &mut dict_tracker, + &Default::default(), + &mut Default::default(), + ) + .unwrap(); assert!(dict_tracker.written.contains_key(&0)); }