diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index 70ce35a98952..6b2219e9c8d0 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -276,6 +276,7 @@ impl FlightDataDecoder { self.state = Some(FlightStreamState { schema: Arc::clone(&schema), + schema_message: data.clone(), dictionaries_by_field, }); Ok(Some(DecodedFlightData::new_schema(data, schema))) @@ -296,10 +297,15 @@ impl FlightDataDecoder { ) })?; + let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header) + .unwrap() + .header_as_schema() + .unwrap(); + arrow_ipc::reader::read_dictionary( &buffer, dictionary_batch, - &state.schema, + ipc_schema, &mut state.dictionaries_by_field, &message.version(), ) @@ -319,8 +325,14 @@ impl FlightDataDecoder { )); }; + let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header) + .unwrap() + .header_as_schema() + .unwrap(); + let batch = flight_data_to_arrow_batch( &data, + ipc_schema, Arc::clone(&state.schema), &state.dictionaries_by_field, ) @@ -376,6 +388,7 @@ impl futures::Stream for FlightDataDecoder { #[derive(Debug)] struct FlightStreamState { schema: SchemaRef, + schema_message: FlightData, dictionaries_by_field: HashMap, } diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 57ac9f3173fe..883e380f6082 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -535,15 +535,13 @@ fn prepare_field_for_flight( ) .with_metadata(field.metadata().clone()) } else { - #[allow(deprecated)] - let dict_id = dictionary_tracker.set_dict_id(field.as_ref()); - + dictionary_tracker.next_dict_id(); #[allow(deprecated)] Field::new_dict( field.name(), field.data_type().clone(), field.is_nullable(), - dict_id, + 0, field.dict_is_ordered().unwrap_or_default(), ) .with_metadata(field.metadata().clone()) @@ -585,14 +583,13 @@ fn prepare_schema_for_flight( ) .with_metadata(field.metadata().clone()) } else { - #[allow(deprecated)] - let dict_id = dictionary_tracker.set_dict_id(field.as_ref()); + dictionary_tracker.next_dict_id(); #[allow(deprecated)] Field::new_dict( field.name(), field.data_type().clone(), field.is_nullable(), - dict_id, + 0, field.dict_is_ordered().unwrap_or_default(), ) .with_metadata(field.metadata().clone()) @@ -654,16 +651,10 @@ struct FlightIpcEncoder { impl FlightIpcEncoder { fn new(options: IpcWriteOptions, error_on_replacement: bool) -> Self { - #[allow(deprecated)] - let preserve_dict_id = options.preserve_dict_id(); Self { options, data_gen: IpcDataGenerator::default(), - #[allow(deprecated)] - dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id( - error_on_replacement, - preserve_dict_id, - ), + dictionary_tracker: DictionaryTracker::new(error_on_replacement), } } @@ -1547,9 +1538,8 @@ mod tests { async fn verify_flight_round_trip(mut batches: Vec) { let expected_schema = batches.first().unwrap().schema(); - #[allow(deprecated)] let encoder = FlightDataEncoderBuilder::default() - .with_options(IpcWriteOptions::default().with_preserve_dict_id(false)) + .with_options(IpcWriteOptions::default()) .with_dictionary_handling(DictionaryHandling::Resend) .build(futures::stream::iter(batches.clone().into_iter().map(Ok))); @@ -1575,8 +1565,7 @@ mod tests { HashMap::from([("some_key".to_owned(), "some_value".to_owned())]), ); - #[allow(deprecated)] - let mut dictionary_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); + let mut dictionary_tracker = DictionaryTracker::new(false); let got = prepare_schema_for_flight(&schema, &mut dictionary_tracker, false); assert!(got.metadata().contains_key("some_key")); @@ -1606,9 +1595,7 @@ mod tests { options: &IpcWriteOptions, ) -> (Vec, FlightData) { let data_gen = IpcDataGenerator::default(); - #[allow(deprecated)] - let mut dictionary_tracker = - DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dictionary_tracker = DictionaryTracker::new(false); let (encoded_dictionaries, encoded_batch) = data_gen .encoded_batch(batch, &mut dictionary_tracker, options) diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs index c0af71aaf4dc..8043d5b4a72b 100644 --- a/arrow-flight/src/lib.rs +++ b/arrow-flight/src/lib.rs @@ -149,9 +149,7 @@ pub struct IpcMessage(pub Bytes); fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { let data_gen = writer::IpcDataGenerator::default(); - #[allow(deprecated)] - let mut dict_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dict_tracker = writer::DictionaryTracker::new(false); data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options) } diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs index 6791b68b757d..130f48795730 100644 --- a/arrow-flight/src/sql/client.rs +++ b/arrow-flight/src/sql/client.rs @@ -707,6 +707,7 @@ pub enum ArrowFlightData { pub fn arrow_data_from_flight_data( flight_data: FlightData, arrow_schema_ref: &SchemaRef, + ipc_schema: arrow_ipc::Schema, ) -> Result { let ipc_message = root_as_message(&flight_data.data_header[..]) .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?; @@ -723,6 +724,7 @@ pub fn arrow_data_from_flight_data( let record_batch = read_record_batch( &Buffer::from(flight_data.data_body), ipc_record_batch, + ipc_schema, arrow_schema_ref.clone(), &dictionaries_by_field, None, diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 428dde73ca6c..f004739245d9 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -44,7 +44,8 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result Result, ) -> Result { @@ -71,6 +73,7 @@ pub fn flight_data_to_arrow_batch( reader::read_record_batch( &Buffer::from(data.data_body.as_ref()), batch, + ipc_schema, schema, dictionaries_by_id, None, @@ -90,9 +93,7 @@ pub fn batches_to_flight_data( let mut flight_data = vec![]; let data_gen = writer::IpcDataGenerator::default(); - #[allow(deprecated)] - let mut dictionary_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dictionary_tracker = writer::DictionaryTracker::new(false); for batch in batches.iter() { let (encoded_dictionaries, encoded_batch) = diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index 406419028d00..0981d71d50cd 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -34,7 +34,6 @@ use arrow_flight::{ use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; -use arrow::datatypes::Schema; use std::sync::Arc; type Error = Box; @@ -72,9 +71,7 @@ async fn upload_data( let (mut upload_tx, upload_rx) = mpsc::channel(10); let options = arrow::ipc::writer::IpcWriteOptions::default(); - #[allow(deprecated)] - let mut dict_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dict_tracker = writer::DictionaryTracker::new(false); let data_gen = writer::IpcDataGenerator::default(); let data = IpcMessage( data_gen @@ -217,33 +214,40 @@ async fn consume_flight_location( let resp = client.do_get(ticket).await?; let mut resp = resp.into_inner(); - let flight_schema = receive_schema_flight_data(&mut resp) + let data = resp + .next() .await - .unwrap_or_else(|| panic!("Failed to receive flight schema")); - let actual_schema = Arc::new(flight_schema); + .ok_or_else(|| Error::from("No data received from Flight server"))??; + let message = + arrow::ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); + + // message header is a Schema, so read it + let ipc_schema: ipc::Schema = message + .header_as_schema() + .expect("Unable to read IPC message as schema"); + let schema = Arc::new(ipc::convert::fb_to_schema(ipc_schema)); let mut dictionaries_by_id = HashMap::new(); for (counter, expected_batch) in expected_data.iter().enumerate() { - let data = - receive_batch_flight_data(&mut resp, actual_schema.clone(), &mut dictionaries_by_id) - .await - .unwrap_or_else(|| { - panic!( - "Got fewer batches than expected, received so far: {} expected: {}", - counter, - expected_data.len(), - ) - }); + let data = receive_batch_flight_data(&mut resp, ipc_schema, &mut dictionaries_by_id) + .await + .unwrap_or_else(|| { + panic!( + "Got fewer batches than expected, received so far: {} expected: {}", + counter, + expected_data.len(), + ) + }); let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); let actual_batch = - flight_data_to_arrow_batch(&data, actual_schema.clone(), &dictionaries_by_id) + flight_data_to_arrow_batch(&data, ipc_schema, schema.clone(), &dictionaries_by_id) .expect("Unable to convert flight data to Arrow batch"); - assert_eq!(actual_schema, actual_batch.schema()); + assert_eq!(schema, actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); assert_eq!(expected_batch.num_rows(), actual_batch.num_rows()); let schema = expected_batch.schema(); @@ -267,23 +271,9 @@ async fn consume_flight_location( Ok(()) } -async fn receive_schema_flight_data(resp: &mut Streaming) -> Option { - let data = resp.next().await?.ok()?; - let message = - arrow::ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); - - // message header is a Schema, so read it - let ipc_schema: ipc::Schema = message - .header_as_schema() - .expect("Unable to read IPC message as schema"); - let schema = ipc::convert::fb_to_schema(ipc_schema); - - Some(schema) -} - async fn receive_batch_flight_data( resp: &mut Streaming, - schema: SchemaRef, + ipc_schema: arrow::ipc::Schema<'_>, dictionaries_by_id: &mut HashMap, ) -> Option { let mut data = resp.next().await?.ok()?; @@ -296,7 +286,7 @@ async fn receive_batch_flight_data( message .header_as_dictionary_batch() .expect("Error parsing dictionary"), - &schema, + ipc_schema, dictionaries_by_id, &message.version(), ) diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index 92989a20393e..27ec48532492 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -119,9 +119,7 @@ impl FlightService for FlightServiceImpl { .ok_or_else(|| Status::not_found(format!("Could not find flight. {key}")))?; let options = arrow::ipc::writer::IpcWriteOptions::default(); - #[allow(deprecated)] - let mut dictionary_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dictionary_tracker = writer::DictionaryTracker::new(false); let data_gen = writer::IpcDataGenerator::default(); let data = IpcMessage( data_gen @@ -268,6 +266,7 @@ impl FlightService for FlightServiceImpl { if let Err(e) = save_uploaded_chunks( uploaded_chunks, schema_ref, + flight_data, input_stream, response_tx, schema, @@ -319,6 +318,7 @@ async fn record_batch_from_message( message: ipc::Message<'_>, data_body: &Buffer, schema_ref: SchemaRef, + ipc_schema: ipc::Schema<'_>, dictionaries_by_id: &HashMap, ) -> Result { let ipc_batch = message @@ -328,6 +328,7 @@ async fn record_batch_from_message( let arrow_batch_result = reader::read_record_batch( data_body, ipc_batch, + ipc_schema, schema_ref, dictionaries_by_id, None, @@ -341,7 +342,7 @@ async fn record_batch_from_message( async fn dictionary_from_message( message: ipc::Message<'_>, data_body: &Buffer, - schema_ref: SchemaRef, + ipc_schema: ipc::Schema<'_>, dictionaries_by_id: &mut HashMap, ) -> Result<(), Status> { let ipc_batch = message @@ -351,7 +352,7 @@ async fn dictionary_from_message( let dictionary_batch_result = reader::read_dictionary( data_body, ipc_batch, - &schema_ref, + ipc_schema, dictionaries_by_id, &message.version(), ); @@ -362,6 +363,7 @@ async fn dictionary_from_message( async fn save_uploaded_chunks( uploaded_chunks: Arc>>, schema_ref: Arc, + schema_flight_data: FlightData, mut input_stream: Streaming, mut response_tx: mpsc::Sender>, schema: Schema, @@ -372,6 +374,11 @@ async fn save_uploaded_chunks( let mut dictionaries_by_id = HashMap::new(); + let ipc_schema = arrow::ipc::root_as_message(&schema_flight_data.data_header[..]) + .map_err(|e| Status::invalid_argument(format!("Could not parse message: {e:?}")))? + .header_as_schema() + .ok_or_else(|| Status::invalid_argument("Could not parse message header as schema"))?; + while let Some(Ok(data)) = input_stream.next().await { let message = arrow::ipc::root_as_message(&data.data_header[..]) .map_err(|e| Status::internal(format!("Could not parse message: {e:?}")))?; @@ -389,6 +396,7 @@ async fn save_uploaded_chunks( message, &Buffer::from(data.data_body.as_ref()), schema_ref.clone(), + ipc_schema, &dictionaries_by_id, ) .await?; @@ -399,7 +407,7 @@ async fn save_uploaded_chunks( dictionary_from_message( message, &Buffer::from(data.data_body.as_ref()), - schema_ref.clone(), + ipc_schema, &mut dictionaries_by_id, ) .await?; diff --git a/arrow-ipc/benches/ipc_reader.rs b/arrow-ipc/benches/ipc_reader.rs index ab77449eeb7d..5e41d908833a 100644 --- a/arrow-ipc/benches/ipc_reader.rs +++ b/arrow-ipc/benches/ipc_reader.rs @@ -18,7 +18,6 @@ use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; use arrow_array::{builder::StringBuilder, RecordBatch}; use arrow_buffer::Buffer; -use arrow_ipc::convert::fb_to_schema; use arrow_ipc::reader::{read_footer_length, FileDecoder, FileReader, StreamReader}; use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter}; use arrow_ipc::{root_as_footer, Block, CompressionType}; @@ -215,9 +214,11 @@ impl IPCBufferDecoder { let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap(); let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap(); - let schema = fb_to_schema(footer.schema().unwrap()); - - let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()); + let mut decoder = FileDecoder::new( + buffer[trailer_start - footer_len..trailer_start].to_vec(), + Default::default(), + footer.version(), + ); // Read dictionaries for block in footer.dictionaries().iter().flatten() { diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index 0be74bf6d9ea..ec96467e3132 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -19,6 +19,7 @@ use arrow_buffer::Buffer; use arrow_schema::*; +use core::panic; use flatbuffers::{ FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier, VerifierOptions, WIPOffset, @@ -127,12 +128,6 @@ impl<'a> IpcSchemaEncoder<'a> { } } -/// Serialize a schema in IPC format -#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")] -pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> { - IpcSchemaEncoder::new().schema_to_fb(schema) -} - /// Push a key-value metadata into a FlatBufferBuilder and return [WIPOffset] pub fn metadata_to_fb<'a>( fbb: &mut FlatBufferBuilder<'a>, @@ -173,7 +168,7 @@ impl From> for Field { field.name().unwrap(), get_data_type(field, true), field.nullable(), - dictionary.id(), + 0, dictionary.isOrdered(), ) } else { @@ -530,24 +525,13 @@ pub(crate) fn build_field<'a>( match dictionary_tracker { Some(tracker) => Some(get_fb_dictionary( index_type, - #[allow(deprecated)] - tracker.set_dict_id(field), - field - .dict_is_ordered() - .expect("All Dictionary types have `dict_is_ordered`"), - fbb, - )), - None => Some(get_fb_dictionary( - index_type, - #[allow(deprecated)] - field - .dict_id() - .expect("Dictionary type must have a dictionary id"), + tracker.next_dict_id(), field .dict_is_ordered() .expect("All Dictionary types have `dict_is_ordered`"), fbb, )), + None => panic!("IPC must no longer be used without dictionary tracker"), } } else { None diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 919407dcda7a..a3d4b15bfe0f 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -39,6 +39,9 @@ use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, Scalar use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag}; use arrow_schema::*; +use crate::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +use crate::Field as IpcField; + use crate::compression::CompressionCodec; use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER}; use DataType::*; @@ -80,6 +83,7 @@ impl RecordBatchDecoder<'_> { /// - cast the 64-bit array to the appropriate data type fn create_array( &mut self, + ipc_field: IpcField, field: &Field, variadic_counts: &mut VecDeque, ) -> Result { @@ -115,13 +119,21 @@ impl RecordBatchDecoder<'_> { List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => { let list_node = self.next_node(field)?; let list_buffers = [self.next_buffer()?, self.next_buffer()?]; - let values = self.create_array(list_field, variadic_counts)?; + let values = self.create_array( + ipc_field.children().unwrap().get(0), + list_field, + variadic_counts, + )?; self.create_list_array(list_node, data_type, &list_buffers, values) } FixedSizeList(ref list_field, _) => { let list_node = self.next_node(field)?; let list_buffers = [self.next_buffer()?]; - let values = self.create_array(list_field, variadic_counts)?; + let values = self.create_array( + ipc_field.children().unwrap().get(0), + list_field, + variadic_counts, + )?; self.create_list_array(list_node, data_type, &list_buffers, values) } Struct(struct_fields) => { @@ -132,16 +144,22 @@ impl RecordBatchDecoder<'_> { let mut struct_arrays = vec![]; // TODO investigate whether just knowing the number of buffers could // still work - for struct_field in struct_fields { - let child = self.create_array(struct_field, variadic_counts)?; + for (idx, struct_field) in struct_fields.iter().enumerate() { + let child = self.create_array( + ipc_field.children().unwrap().get(idx), + struct_field, + variadic_counts, + )?; struct_arrays.push(child); } self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays) } RunEndEncoded(run_ends_field, values_field) => { let run_node = self.next_node(field)?; - let run_ends = self.create_array(run_ends_field, variadic_counts)?; - let values = self.create_array(values_field, variadic_counts)?; + let children = ipc_field.children().unwrap(); + let run_ends = + self.create_array(children.get(0), run_ends_field, variadic_counts)?; + let values = self.create_array(children.get(1), values_field, variadic_counts)?; let run_array_length = run_node.length() as usize; let builder = ArrayData::builder(data_type.clone()) @@ -156,11 +174,7 @@ impl RecordBatchDecoder<'_> { let index_node = self.next_node(field)?; let index_buffers = [self.next_buffer()?, self.next_buffer()?]; - #[allow(deprecated)] - let dict_id = field.dict_id().ok_or_else(|| { - ArrowError::ParseError(format!("Field {field} does not have dict id")) - })?; - + let dict_id = ipc_field.dictionary().unwrap().id(); let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| { ArrowError::ParseError(format!( "Cannot find a dictionary batch with dict id: {dict_id}" @@ -198,8 +212,11 @@ impl RecordBatchDecoder<'_> { let mut children = Vec::with_capacity(fields.len()); - for (_id, field) in fields.iter() { - let child = self.create_array(field, variadic_counts)?; + let ipc_children = ipc_field.children().unwrap(); + for i in 0..ipc_children.len() { + let ipc_field = ipc_children.get(i); + let field: Field = ipc_field.into(); + let child = self.create_array(ipc_field, &field, variadic_counts)?; children.push(child); } @@ -371,6 +388,8 @@ struct RecordBatchDecoder<'a> { batch: crate::RecordBatch<'a>, /// The output schema schema: SchemaRef, + /// The schema as it is encoded in the IPC source + ipc_schema: crate::Schema<'a>, /// Decoded dictionaries indexed by dictionary id dictionaries_by_id: &'a HashMap, /// Optional compression codec @@ -400,6 +419,7 @@ impl<'a> RecordBatchDecoder<'a> { fn try_new( buf: &'a Buffer, batch: crate::RecordBatch<'a>, + ipc_schema: crate::Schema<'a>, schema: SchemaRef, dictionaries_by_id: &'a HashMap, metadata: &'a MetadataVersion, @@ -418,6 +438,7 @@ impl<'a> RecordBatchDecoder<'a> { Ok(Self { batch, + ipc_schema, schema, dictionaries_by_id, compression, @@ -477,23 +498,28 @@ impl<'a> RecordBatchDecoder<'a> { let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize)); - let schema = Arc::clone(&self.schema); if let Some(projection) = self.projection { let mut arrays = vec![]; // project fields - for (idx, field) in schema.fields().iter().enumerate() { + let ipc_fields = self.ipc_schema.fields().unwrap(); + let ipc_fields_len = ipc_fields.len(); + for idx in 0..ipc_fields_len { // Create array for projected field if let Some(proj_idx) = projection.iter().position(|p| p == &idx) { - let child = self.create_array(field, &mut variadic_counts)?; + let child = self.create_array( + ipc_fields.get(idx), + self.schema.clone().field(idx), + &mut variadic_counts, + )?; arrays.push((proj_idx, child)); } else { - self.skip_field(field, &mut variadic_counts)?; + self.skip_field(self.schema.clone().field(idx), &mut variadic_counts)?; } } arrays.sort_by_key(|t| t.0); - let schema = Arc::new(schema.project(projection)?); + let schema = Arc::new(self.schema.project(projection)?); let columns = arrays.into_iter().map(|t| t.1).collect::>(); if self.skip_validation.get() { @@ -512,8 +538,14 @@ impl<'a> RecordBatchDecoder<'a> { } else { let mut children = vec![]; // keep track of index as lists require more than one node - for field in schema.fields() { - let child = self.create_array(field, &mut variadic_counts)?; + let ipc_fields = self.ipc_schema.fields().unwrap(); + let ipc_fields_len = ipc_fields.len(); + for idx in 0..ipc_fields_len { + let child = self.create_array( + ipc_fields.get(idx), + self.schema.clone().field(idx), + &mut variadic_counts, + )?; children.push(child); } @@ -521,14 +553,14 @@ impl<'a> RecordBatchDecoder<'a> { // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid unsafe { Ok(RecordBatch::new_unchecked( - schema, + self.schema, children, self.batch.length() as usize, )) } } else { assert!(variadic_counts.is_empty()); - RecordBatch::try_new_with_options(schema, children, &options) + RecordBatch::try_new_with_options(self.schema, children, &options) } } } @@ -638,12 +670,13 @@ impl<'a> RecordBatchDecoder<'a> { pub fn read_record_batch( buf: &Buffer, batch: crate::RecordBatch, + ipc_schema: crate::Schema, schema: SchemaRef, dictionaries_by_id: &HashMap, projection: Option<&[usize]>, metadata: &MetadataVersion, ) -> Result { - RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? + RecordBatchDecoder::try_new(buf, batch, ipc_schema, schema, dictionaries_by_id, metadata)? .with_projection(projection) .with_require_alignment(false) .read_record_batch() @@ -654,14 +687,14 @@ pub fn read_record_batch( pub fn read_dictionary( buf: &Buffer, batch: crate::DictionaryBatch, - schema: &Schema, + ipc_schema: crate::Schema, dictionaries_by_id: &mut HashMap, metadata: &MetadataVersion, ) -> Result<(), ArrowError> { read_dictionary_impl( buf, batch, - schema, + ipc_schema, dictionaries_by_id, metadata, false, @@ -669,10 +702,41 @@ pub fn read_dictionary( ) } +fn first_field_with_dict_id_from_schema(schema: crate::Schema, id: i64) -> Option { + let c_fields = schema.fields().unwrap(); + let len = c_fields.len(); + for i in 0..len { + let c_field: crate::Field = c_fields.get(i); + if let Some(field) = first_field_with_dict_id_from_field(c_field, id) { + return Some(field); + } + } + + None +} + +fn first_field_with_dict_id_from_field(field: crate::Field, id: i64) -> Option { + if let Some(dictionary) = field.dictionary() { + if dictionary.id() == id { + return Some(field.into()); + } + } + + if let Some(children) = field.children() { + for child in children.iter() { + if let Some(field) = first_field_with_dict_id_from_field(child, id) { + return Some(field); + } + } + } + + None +} + fn read_dictionary_impl( buf: &Buffer, batch: crate::DictionaryBatch, - schema: &Schema, + ipc_schema: crate::Schema, dictionaries_by_id: &mut HashMap, metadata: &MetadataVersion, require_alignment: bool, @@ -685,9 +749,7 @@ fn read_dictionary_impl( } let id = batch.id(); - #[allow(deprecated)] - let fields_using_this_dictionary = schema.fields_with_dict_id(id); - let first_field = fields_using_this_dictionary.first().ok_or_else(|| { + let first_field = first_field_with_dict_id_from_schema(ipc_schema, id).ok_or_else(|| { ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema")) })?; @@ -699,10 +761,22 @@ fn read_dictionary_impl( // Make a fake schema for the dictionary batch. let value = value_type.as_ref().clone(); let schema = Schema::new(vec![Field::new("", value, true)]); + let gen = IpcDataGenerator::default(); + let mut dict_tracker = DictionaryTracker::new(false); + let data = gen.schema_to_bytes_with_dictionary_tracker( + &schema, + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let message = crate::root_as_message(&data.ipc_message).map_err(|err| { + ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) + })?; + let ipc_schema = message.header_as_schema().unwrap(); // Read a single column let record_batch = RecordBatchDecoder::try_new( buf, batch.data().unwrap(), + ipc_schema, Arc::new(schema), dictionaries_by_id, metadata, @@ -812,7 +886,7 @@ pub fn read_footer_length(buf: [u8; 10]) -> Result { /// let back = fb_to_schema(footer.schema().unwrap()); /// assert_eq!(&back, schema.as_ref()); /// -/// let mut decoder = FileDecoder::new(schema, footer.version()); +/// let mut decoder = FileDecoder::new(buffer[trailer_start - footer_len..trailer_start].to_vec(), Default::default(), footer.version()); /// /// // Read dictionaries /// for block in footer.dictionaries().iter().flatten() { @@ -835,6 +909,8 @@ pub fn read_footer_length(buf: [u8; 10]) -> Result { #[derive(Debug)] pub struct FileDecoder { schema: SchemaRef, + footer_buffer: Vec, + verifier_options: VerifierOptions, dictionaries: HashMap, version: MetadataVersion, projection: Option>, @@ -844,9 +920,19 @@ pub struct FileDecoder { impl FileDecoder { /// Create a new [`FileDecoder`] with the given schema and version - pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self { + pub fn new( + footer_buffer: Vec, + verifier_options: VerifierOptions, + version: MetadataVersion, + ) -> Self { + let footer = + crate::root_as_footer_with_opts(&verifier_options, &footer_buffer[..]).unwrap(); + let ipc_schema = footer.schema().unwrap(); + let schema = crate::convert::fb_to_schema(ipc_schema); Self { - schema, + schema: Arc::new(schema), + footer_buffer, + verifier_options, version, dictionaries: Default::default(), projection: None, @@ -911,10 +997,16 @@ impl FileDecoder { match message.header_type() { crate::MessageHeader::DictionaryBatch => { let batch = message.header_as_dictionary_batch().unwrap(); + let footer = crate::root_as_footer_with_opts( + &self.verifier_options, + &self.footer_buffer[..], + ) + .unwrap(); + let ipc_schema = footer.schema().unwrap(); read_dictionary_impl( &buf.slice(block.metaDataLength() as _), batch, - &self.schema, + ipc_schema, &mut self.dictionaries, &message.version(), self.require_alignment, @@ -942,10 +1034,17 @@ impl FileDecoder { let batch = message.header_as_record_batch().ok_or_else(|| { ArrowError::IpcError("Unable to read IPC message as record batch".to_string()) })?; + let footer = crate::root_as_footer_with_opts( + &self.verifier_options, + &self.footer_buffer[..], + ) + .unwrap(); + let ipc_schema = footer.schema().unwrap(); // read the block that makes up the record batch into a buffer RecordBatchDecoder::try_new( &buf.slice(block.metaDataLength() as _), batch, + ipc_schema, self.schema.clone(), &self.dictionaries, &message.version(), @@ -1070,8 +1169,6 @@ impl FileReaderBuilder { )); } - let schema = crate::convert::fb_to_schema(ipc_schema); - let mut custom_metadata = HashMap::new(); if let Some(fb_custom_metadata) = footer.custom_metadata() { for kv in fb_custom_metadata.into_iter() { @@ -1082,7 +1179,7 @@ impl FileReaderBuilder { } } - let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()); + let mut decoder = FileDecoder::new(footer_data.clone(), verifier_options, footer.version()); if let Some(projection) = self.projection { decoder = decoder.with_projection(projection) } @@ -1334,6 +1431,10 @@ pub struct StreamReader { /// The schema that is read from the stream's first message schema: SchemaRef, + /// The buffer that the IPC schema flatbuffer is stored in. This is needed to satify the + /// lifetime requirements of the flatbuffer reader. + schema_message_buffer: Vec, + /// Optional dictionaries for each schema field. /// /// Dictionaries may be appended to in the streaming format. @@ -1427,6 +1528,7 @@ impl StreamReader { Ok(Self { reader, schema: Arc::new(schema), + schema_message_buffer: meta_buffer.to_vec(), finished: false, dictionaries_by_id, projection, @@ -1510,10 +1612,20 @@ impl StreamReader { let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); self.reader.read_exact(&mut buf)?; + let message = + crate::root_as_message(&self.schema_message_buffer).map_err(|err| { + ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) + })?; + // message header is a Schema, so read it + let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| { + ArrowError::ParseError("Unable to read IPC message as schema".to_string()) + })?; + RecordBatchDecoder::try_new( &buf.into(), batch, - self.schema(), + ipc_schema, + self.schema.clone(), &self.dictionaries_by_id, &message.version(), )? @@ -1533,10 +1645,19 @@ impl StreamReader { let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); self.reader.read_exact(&mut buf)?; + let message = + crate::root_as_message(&self.schema_message_buffer).map_err(|err| { + ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) + })?; + // message header is a Schema, so read it + let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| { + ArrowError::ParseError("Unable to read IPC message as schema".to_string()) + })?; + read_dictionary_impl( &buf.into(), batch, - &self.schema, + ipc_schema, &mut self.dictionaries_by_id, &message.version(), false, @@ -1892,11 +2013,13 @@ mod tests { let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]) .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?; - let schema = fb_to_schema(footer.schema().unwrap()); - let mut decoder = unsafe { - FileDecoder::new(Arc::new(schema), footer.version()) - .with_skip_validation(skip_validation) + FileDecoder::new( + buffer[trailer_start - footer_len..trailer_start].to_vec(), + Default::default(), + footer.version(), + ) + .with_skip_validation(skip_validation) }; // Read dictionaries for block in footer.dictionaries().iter().flatten() { @@ -2007,8 +2130,7 @@ mod tests { let mut writer = crate::writer::FileWriter::try_new_with_options( &mut buf, batch.schema_ref(), - #[allow(deprecated)] - IpcWriteOptions::default().with_preserve_dict_id(false), + IpcWriteOptions::default(), ) .unwrap(); writer.write(&batch).unwrap(); @@ -2440,8 +2562,16 @@ mod tests { .unwrap(); let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); + let mut dict_tracker = DictionaryTracker::new(false); + + let encoded_schema = gen.schema_to_bytes_with_dictionary_tracker( + &batch.schema(), + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let schema_message = root_as_message(&encoded_schema.ipc_message).unwrap(); + let ipc_schema = schema_message.header_as_schema().unwrap(); + let (_, encoded) = gen .encoded_batch(&batch, &mut dict_tracker, &Default::default()) .unwrap(); @@ -2459,6 +2589,7 @@ mod tests { let roundtrip = RecordBatchDecoder::try_new( &b, ipc_batch, + ipc_schema, batch.schema(), &Default::default(), &message.version(), @@ -2479,8 +2610,16 @@ mod tests { .unwrap(); let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); + let mut dict_tracker = DictionaryTracker::new(false); + + let encoded_schema = gen.schema_to_bytes_with_dictionary_tracker( + &batch.schema(), + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let schema_message = root_as_message(&encoded_schema.ipc_message).unwrap(); + let ipc_schema = schema_message.header_as_schema().unwrap(); + let (_, encoded) = gen .encoded_batch(&batch, &mut dict_tracker, &Default::default()) .unwrap(); @@ -2498,6 +2637,7 @@ mod tests { let result = RecordBatchDecoder::try_new( &b, ipc_batch, + ipc_schema, batch.schema(), &Default::default(), &message.version(), @@ -2691,8 +2831,7 @@ mod tests { let mut writer = crate::writer::StreamWriter::try_new_with_options( &mut buf, batch.schema().as_ref(), - #[allow(deprecated)] - crate::writer::IpcWriteOptions::default().with_preserve_dict_id(false), + crate::writer::IpcWriteOptions::default(), ) .expect("Failed to create StreamWriter"); writer.write(&batch).expect("Failed to write RecordBatch"); diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs index e89467814242..915d834bef66 100644 --- a/arrow-ipc/src/reader/stream.rs +++ b/arrow-ipc/src/reader/stream.rs @@ -35,6 +35,8 @@ use crate::{MessageHeader, CONTINUATION_MARKER}; pub struct StreamDecoder { /// The schema of this decoder, if read schema: Option, + /// The ipc schema and its buffer + ipc_schema_message: Option>, /// Lookup table for dictionaries by ID dictionaries: HashMap, /// The decoder state @@ -189,8 +191,8 @@ impl StreamDecoder { } } DecoderState::Body { message } => { - let message = message.as_ref(); - let body_length = message.bodyLength() as usize; + let message_ref = message.as_ref(); + let body_length = message_ref.bodyLength() as usize; let body = if self.buf.is_empty() && buffer.len() >= body_length { let body = buffer.slice_with_length(0, body_length); @@ -207,8 +209,8 @@ impl StreamDecoder { std::mem::take(&mut self.buf).into() }; - let version = message.version(); - match message.header_type() { + let version = message_ref.version(); + match message_ref.header_type() { MessageHeader::Schema => { if self.schema.is_some() { return Err(ArrowError::IpcError( @@ -216,19 +218,34 @@ impl StreamDecoder { )); } - let ipc_schema = message.header_as_schema().unwrap(); + // Get a reference to the schema from the message + let ipc_schema = message_ref.header_as_schema().unwrap(); let schema = crate::convert::fb_to_schema(ipc_schema); + + // Store the schema and reset state + self.ipc_schema_message = Some(Arc::new(message.clone())); self.state = DecoderState::default(); self.schema = Some(Arc::new(schema)); } MessageHeader::RecordBatch => { - let batch = message.header_as_record_batch().unwrap(); + let batch = message_ref.header_as_record_batch().unwrap(); + let ipc_schema_message = + self.ipc_schema_message.clone().ok_or_else(|| { + ArrowError::IpcError("Missing IPC schema".to_string()) + })?; + let ipc_schema = ipc_schema_message + .as_ref() + .as_ref() + .header_as_schema() + .unwrap(); + let schema = self.schema.clone().ok_or_else(|| { ArrowError::IpcError("Missing schema".to_string()) })?; let batch = RecordBatchDecoder::try_new( &body, batch, + ipc_schema, schema, &self.dictionaries, &version, @@ -239,14 +256,21 @@ impl StreamDecoder { return Ok(Some(batch)); } MessageHeader::DictionaryBatch => { - let dictionary = message.header_as_dictionary_batch().unwrap(); - let schema = self.schema.as_deref().ok_or_else(|| { - ArrowError::IpcError("Missing schema".to_string()) - })?; + let dictionary = message_ref.header_as_dictionary_batch().unwrap(); + let ipc_schema_message = + self.ipc_schema_message.clone().ok_or_else(|| { + ArrowError::IpcError("Missing IPC schema".to_string()) + })?; + let ipc_schema = ipc_schema_message + .as_ref() + .as_ref() + .header_as_schema() + .unwrap(); + read_dictionary_impl( &body, dictionary, - schema, + ipc_schema, &mut self.dictionaries, &version, self.require_alignment, @@ -395,8 +419,7 @@ mod tests { let mut writer = StreamWriter::try_new_with_options( &mut buffer, &schema, - #[allow(deprecated)] - IpcWriteOptions::default().with_preserve_dict_id(false), + IpcWriteOptions::default(), ) .expect("Failed to create StreamWriter"); writer.write(&batch).expect("Failed to write RecordBatch"); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index bd255fd2d540..2294ba03252b 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -65,15 +65,6 @@ pub struct IpcWriteOptions { /// Compression, if desired. Will result in a runtime error /// if the corresponding feature is not enabled batch_compression_type: Option, - /// Flag indicating whether the writer should preserve the dictionary IDs defined in the - /// schema or generate unique dictionary IDs internally during encoding. - /// - /// Defaults to `false` - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it." - )] - preserve_dict_id: bool, } impl IpcWriteOptions { @@ -122,7 +113,6 @@ impl IpcWriteOptions { write_legacy_ipc_format, metadata_version, batch_compression_type: None, - preserve_dict_id: false, }), crate::MetadataVersion::V5 => { if write_legacy_ipc_format { @@ -130,13 +120,11 @@ impl IpcWriteOptions { "Legacy IPC format only supported on metadata version 4".to_string(), )) } else { - #[allow(deprecated)] Ok(Self { alignment, write_legacy_ipc_format, metadata_version, batch_compression_type: None, - preserve_dict_id: false, }) } } @@ -145,45 +133,15 @@ impl IpcWriteOptions { ))), } } - - /// Return whether the writer is configured to preserve the dictionary IDs - /// defined in the schema - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it." - )] - pub fn preserve_dict_id(&self) -> bool { - #[allow(deprecated)] - self.preserve_dict_id - } - - /// Set whether the IPC writer should preserve the dictionary IDs in the schema - /// or auto-assign unique dictionary IDs during encoding (defaults to true) - /// - /// If this option is true, the application must handle assigning ids - /// to the dictionary batches in order to encode them correctly - /// - /// The default will change to `false` in future releases - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it." - )] - #[allow(deprecated)] - pub fn with_preserve_dict_id(mut self, preserve_dict_id: bool) -> Self { - self.preserve_dict_id = preserve_dict_id; - self - } } impl Default for IpcWriteOptions { fn default() -> Self { - #[allow(deprecated)] Self { alignment: 64, write_legacy_ipc_format: false, metadata_version: crate::MetadataVersion::V5, batch_compression_type: None, - preserve_dict_id: false, } } } @@ -224,10 +182,7 @@ pub struct IpcDataGenerator {} impl IpcDataGenerator { /// Converts a schema to an IPC message along with `dictionary_tracker` - /// and returns it encoded inside [EncodedData] as a flatbuffer - /// - /// Preferred method over [IpcDataGenerator::schema_to_bytes] since it's - /// deprecated since Arrow v54.0.0 + /// and returns it encoded inside [EncodedData] as a flatbuffer. pub fn schema_to_bytes_with_dictionary_tracker( &self, schema: &Schema, @@ -258,36 +213,6 @@ impl IpcDataGenerator { } } - #[deprecated( - since = "54.0.0", - note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release." - )] - /// Converts a schema to an IPC message and returns it encoded inside [EncodedData] as a flatbuffer - pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - let schema = { - #[allow(deprecated)] - // This will be replaced with the IpcSchemaConverter in the next release. - let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema); - fb.as_union_value() - }; - - let mut message = crate::MessageBuilder::new(&mut fbb); - message.add_version(write_options.metadata_version); - message.add_header_type(crate::MessageHeader::Schema); - message.add_bodyLength(0); - message.add_header(schema); - // TODO: custom metadata - let data = message.finish(); - fbb.finish(data, None); - - let data = fbb.finished_data(); - EncodedData { - ipc_message: data.to_vec(), - arrow_data: vec![], - } - } - fn _encode_dictionaries>( &self, column: &ArrayRef, @@ -441,13 +366,9 @@ impl IpcDataGenerator { // It's importnat to only take the dict_id at this point, because the dict ID // sequence is assigned depth-first, so we need to first encode children and have // them take their assigned dict IDs before we take the dict ID for this field. - #[allow(deprecated)] - let dict_id = dict_id_seq - .next() - .or_else(|| field.dict_id()) - .ok_or_else(|| { - ArrowError::IpcError(format!("no dict id for field {}", field.name())) - })?; + let dict_id = dict_id_seq.next().ok_or_else(|| { + ArrowError::IpcError(format!("no dict id for field {}", field.name())) + })?; let emit = dictionary_tracker.insert(dict_id, column)?; @@ -789,11 +710,6 @@ pub struct DictionaryTracker { written: HashMap, dict_ids: Vec, error_on_replacement: bool, - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it." - )] - preserve_dict_id: bool, } impl DictionaryTracker { @@ -813,52 +729,17 @@ impl DictionaryTracker { written: HashMap::new(), dict_ids: Vec::new(), error_on_replacement, - preserve_dict_id: false, - } - } - - /// Create a new [`DictionaryTracker`]. - /// - /// If `error_on_replacement` - /// is true, an error will be generated if an update to an - /// existing dictionary is attempted. - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it." - )] - pub fn new_with_preserve_dict_id(error_on_replacement: bool, preserve_dict_id: bool) -> Self { - #[allow(deprecated)] - Self { - written: HashMap::new(), - dict_ids: Vec::new(), - error_on_replacement, - preserve_dict_id, } } - /// Set the dictionary ID for `field`. - /// - /// If `preserve_dict_id` is true, this will return the `dict_id` in `field` (or panic if `field` does - /// not have a `dict_id` defined). - /// - /// If `preserve_dict_id` is false, this will return the value of the last `dict_id` assigned incremented by 1 - /// or 0 in the case where no dictionary IDs have yet been assigned - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it." - )] - pub fn set_dict_id(&mut self, field: &Field) -> i64 { - #[allow(deprecated)] - let next = if self.preserve_dict_id { - #[allow(deprecated)] - field.dict_id().expect("no dict_id in field") - } else { - self.dict_ids - .last() - .copied() - .map(|i| i + 1) - .unwrap_or_default() - }; + /// Record and return the next dictionary ID. + pub fn next_dict_id(&mut self) -> i64 { + let next = self + .dict_ids + .last() + .copied() + .map(|i| i + 1) + .unwrap_or_default(); self.dict_ids.push(next); next @@ -995,11 +876,7 @@ impl FileWriter { writer.write_all(&super::ARROW_MAGIC)?; writer.write_all(&PADDING[..pad_len])?; // write the schema, set the written bytes to the schema + header - #[allow(deprecated)] - let preserve_dict_id = write_options.preserve_dict_id; - #[allow(deprecated)] - let mut dictionary_tracker = - DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id); + let mut dictionary_tracker = DictionaryTracker::new(true); let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker( schema, &mut dictionary_tracker, @@ -1074,11 +951,7 @@ impl FileWriter { let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); let record_batches = fbb.create_vector(&self.record_blocks); - #[allow(deprecated)] - let preserve_dict_id = self.write_options.preserve_dict_id; - #[allow(deprecated)] - let mut dictionary_tracker = - DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id); + let mut dictionary_tracker = DictionaryTracker::new(true); let schema = IpcSchemaEncoder::new() .with_dictionary_tracker(&mut dictionary_tracker) .schema_to_fb_offset(&mut fbb, &self.schema); @@ -1229,11 +1102,7 @@ impl StreamWriter { write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); - #[allow(deprecated)] - let preserve_dict_id = write_options.preserve_dict_id; - #[allow(deprecated)] - let mut dictionary_tracker = - DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id); + let mut dictionary_tracker = DictionaryTracker::new(false); // write the schema, set the written bytes to the schema let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker( @@ -1868,7 +1737,6 @@ mod tests { use arrow_array::types::*; use arrow_buffer::ScalarBuffer; - use crate::convert::fb_to_schema; use crate::reader::*; use crate::root_as_footer; use crate::MetadataVersion; @@ -2141,7 +2009,7 @@ mod tests { // Dict field with id 2 #[allow(deprecated)] - let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 2, false); + let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false); let union_fields = [(0, Arc::new(dctfield))].into_iter().collect(); let types = [0, 0, 0].into_iter().collect::>(); @@ -2155,17 +2023,22 @@ mod tests { false, )])); + let gen = IpcDataGenerator {}; + let mut dict_tracker = DictionaryTracker::new(false); + gen.schema_to_bytes_with_dictionary_tracker( + &schema, + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap(); - let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); gen.encoded_batch(&batch, &mut dict_tracker, &Default::default()) .unwrap(); // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema // so we expect the dict will be keyed to 0 - assert!(dict_tracker.written.contains_key(&2)); + assert!(dict_tracker.written.contains_key(&0)); } #[test] @@ -2193,15 +2066,20 @@ mod tests { false, )])); + let gen = IpcDataGenerator {}; + let mut dict_tracker = DictionaryTracker::new(false); + gen.schema_to_bytes_with_dictionary_tracker( + &schema, + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap(); - let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); gen.encoded_batch(&batch, &mut dict_tracker, &Default::default()) .unwrap(); - assert!(dict_tracker.written.contains_key(&2)); + assert!(dict_tracker.written.contains_key(&0)); } fn write_union_file(options: IpcWriteOptions) { @@ -2977,12 +2855,14 @@ mod tests { let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap(); - let schema = fb_to_schema(footer.schema().unwrap()); - // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient // for `read_record_batch` later on to read the data in a zero-copy manner. - let decoder = - FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true); + let decoder = FileDecoder::new( + buffer[trailer_start - footer_len..trailer_start].to_vec(), + Default::default(), + footer.version(), + ) + .with_require_alignment(true); let batches = footer.recordBatches().unwrap(); @@ -3030,12 +2910,14 @@ mod tests { let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap(); let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap(); - let schema = fb_to_schema(footer.schema().unwrap()); - // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying // to an aligned buffer in `ArrayDataBuilder.build_aligned`. - let decoder = - FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true); + let decoder = FileDecoder::new( + buffer[trailer_start - footer_len..trailer_start].to_vec(), + Default::default(), + footer.version(), + ) + .with_require_alignment(true); let batches = footer.recordBatches().unwrap(); diff --git a/arrow/examples/zero_copy_ipc.rs b/arrow/examples/zero_copy_ipc.rs index 15fc477c59cf..529d7a26b900 100644 --- a/arrow/examples/zero_copy_ipc.rs +++ b/arrow/examples/zero_copy_ipc.rs @@ -24,12 +24,10 @@ use arrow::array::{record_batch, RecordBatch}; use arrow::error::Result; use arrow_buffer::Buffer; use arrow_cast::pretty::pretty_format_batches; -use arrow_ipc::convert::fb_to_schema; use arrow_ipc::reader::{read_footer_length, FileDecoder}; use arrow_ipc::writer::FileWriter; use arrow_ipc::{root_as_footer, Block}; use std::path::PathBuf; -use std::sync::Arc; /// This example shows how to read data from an Arrow IPC file without copying /// using `mmap` and the [`FileDecoder`] API @@ -99,11 +97,10 @@ impl IPCBufferDecoder { fn new(buffer: Buffer) -> Self { let trailer_start = buffer.len() - 10; let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap(); - let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap(); - - let schema = fb_to_schema(footer.schema().unwrap()); - - let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()); + let footer_buf = &buffer[trailer_start - footer_len..trailer_start]; + let footer = root_as_footer(footer_buf).unwrap(); + let mut decoder = + FileDecoder::new(footer_buf.to_vec(), Default::default(), footer.version()); // Read dictionaries for block in footer.dictionaries().iter().flatten() { diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 64a4e0e11544..b9688fd017f9 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -180,9 +180,7 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result { /// Encodes the Arrow schema into the IPC format, and base64 encodes it pub fn encode_arrow_schema(schema: &Schema) -> String { let options = writer::IpcWriteOptions::default(); - #[allow(deprecated)] - let mut dictionary_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id()); + let mut dictionary_tracker = writer::DictionaryTracker::new(true); let data_gen = writer::IpcDataGenerator::default(); let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);