diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 546650faf568..b5750cc6a3ac 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -482,8 +482,8 @@ use crate::codec::AvroFieldBuilder; use crate::reader::header::read_header; use crate::schema::{ - AvroSchema, CONFLUENT_MAGIC, Fingerprint, FingerprintAlgorithm, SINGLE_OBJECT_MAGIC, Schema, - SchemaStore, + AvroSchema, CONFLUENT_MAGIC, Fingerprint, FingerprintAlgorithm, SCHEMA_METADATA_KEY, + SINGLE_OBJECT_MAGIC, Schema, SchemaStore, }; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, SchemaRef}; @@ -890,6 +890,17 @@ impl Decoder { /// See `Self::with_strict_mode`. /// * **`reader_schema`**: Optional reader schema (projection / evolution) used when decoding /// values (default: `None`). See `Self::with_reader_schema`. +/// * **`projection`**: Optional projection of **top‑level record fields** by index (default: `None`). +/// +/// If set, the effective reader schema is **pruned** to include only the projected fields, in the +/// specified order: +/// +/// * If a reader schema is provided, that schema is pruned. +/// * Otherwise, a reader schema is derived from the writer schema and then pruned. +/// * For streaming `Decoder` with multiple writer schemas and no reader schema, a projected reader +/// schema is derived **per writer schema** in the `SchemaStore`. +/// +/// See `Self::with_projection`. /// * **`writer_schema_store`**: Required for building a `Decoder` for single‑object or /// Confluent framing. Maps fingerprints to Avro schemas. See `Self::with_writer_schema_store`. /// * **`active_fingerprint`**: Optional starting fingerprint for streaming decode when the @@ -931,6 +942,7 @@ pub struct ReaderBuilder { strict_mode: bool, utf8_view: bool, reader_schema: Option, + projection: Option>, writer_schema_store: Option, active_fingerprint: Option, } @@ -942,6 +954,7 @@ impl Default for ReaderBuilder { strict_mode: false, utf8_view: false, reader_schema: None, + projection: None, writer_schema_store: None, active_fingerprint: None, } @@ -955,6 +968,7 @@ impl ReaderBuilder { /// * `strict_mode = false` /// * `utf8_view = false` /// * `reader_schema = None` + /// * `projection = None` /// * `writer_schema_store = None` /// * `active_fingerprint = None` pub fn new() -> Self { @@ -1017,8 +1031,33 @@ impl ReaderBuilder { .ok_or_else(|| { ArrowError::ParseError("No Avro schema present in file header".into()) })?; + let projected_reader_schema = self + .projection + .as_deref() + .map(|projection| { + let base_schema = if let Some(reader_schema) = reader_schema { + reader_schema.clone() + } else { + let raw = hdr.get(SCHEMA_METADATA_KEY).ok_or_else(|| { + ArrowError::ParseError( + "No Avro schema present in file header".to_string(), + ) + })?; + let json_string = std::str::from_utf8(raw) + .map_err(|e| { + ArrowError::ParseError(format!( + "Invalid UTF-8 in Avro schema header: {e}" + )) + })? + .to_string(); + AvroSchema::new(json_string) + }; + base_schema.project(projection) + }) + .transpose()?; + let effective_reader_schema = projected_reader_schema.as_ref().or(reader_schema); let record_decoder = - self.make_record_decoder_from_schemas(&writer_schema, reader_schema)?; + self.make_record_decoder_from_schemas(&writer_schema, effective_reader_schema)?; return Ok(self.make_decoder_with_parts( record_decoder, None, @@ -1041,6 +1080,11 @@ impl ReaderBuilder { .ok_or_else(|| { ArrowError::ParseError("Could not determine initial schema fingerprint".into()) })?; + let projection = self.projection.as_deref(); + let projected_reader_schema = match (projection, reader_schema) { + (Some(projection), Some(reader_schema)) => Some(reader_schema.project(projection)?), + _ => None, + }; let mut cache = IndexMap::with_capacity(fingerprints.len().saturating_sub(1)); let mut active_decoder: Option = None; for fingerprint in store.fingerprints() { @@ -1053,8 +1097,23 @@ impl ReaderBuilder { } }; let writer_schema = avro_schema.schema()?; - let record_decoder = - self.make_record_decoder_from_schemas(&writer_schema, reader_schema)?; + let record_decoder = match projection { + None => self.make_record_decoder_from_schemas(&writer_schema, reader_schema)?, + Some(projection) => { + if let Some(ref pruned_reader_schema) = projected_reader_schema { + self.make_record_decoder_from_schemas( + &writer_schema, + Some(pruned_reader_schema), + )? + } else { + let derived_reader_schema = avro_schema.project(projection)?; + self.make_record_decoder_from_schemas( + &writer_schema, + Some(&derived_reader_schema), + )? + } + } + }; if fingerprint == start_fingerprint { active_decoder = Some(record_decoder); } else { @@ -1119,6 +1178,68 @@ impl ReaderBuilder { self } + /// Sets an explicit top-level field projection by index. + /// + /// The provided `projection` is a list of indices into the **top-level record** fields. + /// The output schema will contain only these fields, in the specified order. + /// + /// Internally, this is implemented by pruning the effective Avro *reader schema*: + /// + /// * If a reader schema is provided via `Self::with_reader_schema`, that schema is pruned. + /// * Otherwise, a reader schema is derived from the writer schema and then pruned. + /// * For streaming `Decoder` with multiple writer schemas and no reader schema, a projected + /// reader schema is derived **per writer schema** in the `SchemaStore`. + /// + /// # Example + /// + /// Read only specific columns from an Avro OCF file: + /// + /// ``` + /// use std::io::Cursor; + /// use std::sync::Arc; + /// use arrow_array::{ArrayRef, Int32Array, StringArray, Float64Array, RecordBatch}; + /// use arrow_schema::{DataType, Field, Schema}; + /// use arrow_avro::writer::AvroWriter; + /// use arrow_avro::reader::ReaderBuilder; + /// + /// # fn main() -> Result<(), Box> { + /// // Original schema has three fields: id, name, value + /// let schema = Schema::new(vec![ + /// Field::new("id", DataType::Int32, false), + /// Field::new("name", DataType::Utf8, false), + /// Field::new("value", DataType::Float64, false), + /// ]); + /// let batch = RecordBatch::try_new( + /// Arc::new(schema.clone()), + /// vec![ + /// Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + /// Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef, + /// Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, + /// ], + /// )?; + /// + /// // Write Avro OCF + /// let mut writer = AvroWriter::new(Vec::new(), schema)?; + /// writer.write(&batch)?; + /// writer.finish()?; + /// let bytes = writer.into_inner(); + /// + /// // Read only fields at indices 2 and 0 (value, id) — in that order + /// let mut reader = ReaderBuilder::new() + /// .with_projection(vec![2, 0]) + /// .build(Cursor::new(bytes))?; + /// + /// let out = reader.next().unwrap()?; + /// assert_eq!(out.num_columns(), 2); + /// assert_eq!(out.schema().field(0).name(), "value"); + /// assert_eq!(out.schema().field(1).name(), "id"); + /// # Ok(()) } + /// ``` + pub fn with_projection(mut self, projection: Vec) -> Self { + self.projection = Some(projection); + self + } + /// Sets the `SchemaStore` used to resolve writer schemas by fingerprint. /// /// This is required when building a `Decoder` for **single‑object encoding** or the @@ -1651,7 +1772,42 @@ mod test { } #[test] - fn writer_string_reader_nullable_with_alias() -> Result<(), Box> { + fn ocf_projection_no_reader_schema_reorder() -> Result<(), Box> { + // Writer: { id: int, name: string, is_active: boolean } + let writer_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("is_active", DataType::Boolean, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(writer_schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef, + Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef, + Arc::new(BooleanArray::from(vec![true, false])) as ArrayRef, + ], + )?; + let bytes = write_ocf(&writer_schema, &[batch]); + // Project and reorder: [is_active, id] + let mut reader = ReaderBuilder::new() + .with_projection(vec![2, 0]) + .build(Cursor::new(bytes))?; + let out = reader.next().unwrap()?; + assert_eq!(out.num_columns(), 2); + assert_eq!(out.schema().field(0).name(), "is_active"); + assert_eq!(out.schema().field(1).name(), "id"); + let is_active = out.column(0).as_boolean(); + assert!(is_active.value(0)); + assert!(!is_active.value(1)); + let id = out.column(1).as_primitive::(); + assert_eq!(id.value(0), 1); + assert_eq!(id.value(1), 2); + Ok(()) + } + + #[test] + fn ocf_projection_with_reader_schema_alias_and_default() + -> Result<(), Box> { // Writer: { id: long, name: string } let writer_schema = Schema::new(vec![ Field::new("id", DataType::Int64, false), @@ -1665,6 +1821,9 @@ mod test { ], )?; let bytes = write_ocf(&writer_schema, &[batch]); + // Reader adds alias + default field: + // - rename `name` -> `full_name` via aliases + // - add `is_active` with default true let reader_json = r#" { "type": "record", @@ -1675,15 +1834,180 @@ mod test { { "name": "is_active", "type": "boolean", "default": true } ] }"#; + // Project only [full_name, is_active] (indices relative to the reader schema) let mut reader = ReaderBuilder::new() .with_reader_schema(AvroSchema::new(reader_json.to_string())) + .with_projection(vec![1, 2]) .build(Cursor::new(bytes))?; let out = reader.next().unwrap()?; - // Evolved aliased field should be non-null and match original writer values - let full_name = out.column(1).as_string::(); + assert_eq!(out.num_columns(), 2); + assert_eq!(out.schema().field(0).name(), "full_name"); + assert_eq!(out.schema().field(1).name(), "is_active"); + let full_name = out.column(0).as_string::(); assert_eq!(full_name.value(0), "a"); assert_eq!(full_name.value(1), "b"); + let is_active = out.column(1).as_boolean(); + assert!(is_active.value(0)); + assert!(is_active.value(1)); + Ok(()) + } + + #[test] + fn projection_errors_out_of_bounds_and_duplicate() -> Result<(), Box> { + let writer_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(writer_schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![1])) as ArrayRef, + Arc::new(Int32Array::from(vec![2])) as ArrayRef, + ], + )?; + let bytes = write_ocf(&writer_schema, &[batch]); + let err = ReaderBuilder::new() + .with_projection(vec![2]) + .build(Cursor::new(bytes.clone())) + .unwrap_err(); + assert!(matches!(err, ArrowError::AvroError(_))); + assert!(err.to_string().contains("out of bounds")); + let err = ReaderBuilder::new() + .with_projection(vec![0, 0]) + .build(Cursor::new(bytes)) + .unwrap_err(); + assert!(matches!(err, ArrowError::AvroError(_))); + assert!(err.to_string().contains("Duplicate projection index")); + Ok(()) + } + + #[test] + #[cfg(feature = "snappy")] + fn test_alltypes_plain_with_projection_and_reader_schema() { + use std::fs::File; + use std::io::BufReader; + let path = arrow_test_data("avro/alltypes_plain.avro"); + // Build a reader schema that selects [double_col, id, tinyint_col] in that order + let reader_schema = make_reader_schema_with_selected_fields_in_order( + &path, + &["double_col", "id", "tinyint_col"], + ); + let file = File::open(&path).expect("open avro/alltypes_plain.avro"); + let reader = ReaderBuilder::new() + .with_batch_size(1024) + .with_reader_schema(reader_schema) + .with_projection(vec![1, 2]) // Select indices 1 and 2 from reader schema: [id, tinyint_col] + .build(BufReader::new(file)) + .expect("build reader with projection and reader schema"); + let schema = reader.schema(); + // Verify the projected schema has exactly 2 fields in the correct order + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(1).name(), "tinyint_col"); + let batches: Vec = reader.collect::, _>>().unwrap(); + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 8); + assert_eq!(batch.num_columns(), 2); + // Build expected batch with exact values from alltypes_plain.avro: + // - id values: [4, 5, 6, 7, 2, 3, 0, 1] + // - tinyint_col values: [0, 1, 0, 1, 0, 1, 0, 1] (i.e., row_index % 2) + let expected = RecordBatch::try_from_iter_with_nullable([ + ( + "id", + Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as ArrayRef, + true, + ), + ( + "tinyint_col", + Arc::new(Int32Array::from(vec![0, 1, 0, 1, 0, 1, 0, 1])) as ArrayRef, + true, + ), + ]) + .unwrap(); + assert_eq!( + batch, &expected, + "Projected batch mismatch for alltypes_plain.avro with reader schema and projection [1, 2]" + ); + } + + #[test] + #[cfg(feature = "snappy")] + fn test_alltypes_plain_with_projection() { + use std::fs::File; + use std::io::BufReader; + let path = arrow_test_data("avro/alltypes_plain.avro"); + let file = File::open(&path).expect("open avro/alltypes_plain.avro"); + let reader = ReaderBuilder::new() + .with_batch_size(1024) + .with_projection(vec![2, 0, 5]) + .build(BufReader::new(file)) + .expect("build reader with projection"); + let schema = reader.schema(); + assert_eq!(schema.fields().len(), 3); + assert_eq!(schema.field(0).name(), "tinyint_col"); + assert_eq!(schema.field(1).name(), "id"); + assert_eq!(schema.field(2).name(), "bigint_col"); + let batches: Vec = reader.collect::, _>>().unwrap(); + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 8); + assert_eq!(batch.num_columns(), 3); + let expected = RecordBatch::try_from_iter_with_nullable([ + ( + "tinyint_col", + Arc::new(Int32Array::from(vec![0, 1, 0, 1, 0, 1, 0, 1])) as ArrayRef, + true, + ), + ( + "id", + Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as ArrayRef, + true, + ), + ( + "bigint_col", + Arc::new(Int64Array::from(vec![0, 10, 0, 10, 0, 10, 0, 10])) as ArrayRef, + true, + ), + ]) + .unwrap(); + assert_eq!( + batch, &expected, + "Projected batch mismatch for alltypes_plain.avro with projection [2, 0, 5]" + ); + } + #[test] + fn writer_string_reader_nullable_with_alias() -> Result<(), Box> { + let writer_schema = Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(writer_schema.clone()), + vec![ + Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef, + Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef, + ], + )?; + let bytes = write_ocf(&writer_schema, &[batch]); + let reader_json = r#" + { + "type": "record", + "name": "topLevelRecord", + "fields": [ + { "name": "id", "type": "long" }, + { "name": "full_name", "type": ["null","string"], "aliases": ["name"], "default": null }, + { "name": "is_active", "type": "boolean", "default": true } + ] + }"#; + let mut reader = ReaderBuilder::new() + .with_reader_schema(AvroSchema::new(reader_json.to_string())) + .build(Cursor::new(bytes))?; + let out = reader.next().unwrap()?; + let full_name = out.column(1).as_string::(); + assert_eq!(full_name.value(0), "a"); + assert_eq!(full_name.value(1), "b"); Ok(()) } @@ -2222,6 +2546,53 @@ mod test { assert_eq!(decoder.pending_schema.as_ref().unwrap().0, fp_long); } + #[test] + fn test_decoder_projection_multiple_writer_schemas_no_reader_schema() + -> Result<(), Box> { + // Two writer schemas with different shapes + let writer_v1 = AvroSchema::new( + r#"{"type":"record","name":"E","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]}"# + .to_string(), + ); + let writer_v2 = AvroSchema::new( + r#"{"type":"record","name":"E","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"},{"name":"c","type":"int"}]}"# + .to_string(), + ); + let mut store = SchemaStore::new(); + let fp1 = store.register(writer_v1)?; + let fp2 = store.register(writer_v2)?; + let mut decoder = ReaderBuilder::new() + .with_writer_schema_store(store) + .with_active_fingerprint(fp1) + .with_batch_size(8) + .with_projection(vec![1]) + .build_decoder()?; + // Message for v1: {a:1, b:"x"} + let mut msg1 = make_prefix(fp1); + msg1.extend_from_slice(&encode_zigzag(1)); // a = 1 + msg1.push((1u8) << 1); + msg1.extend_from_slice(b"x"); + // Message for v2: {a:2, b:"y", c:7} + let mut msg2 = make_prefix(fp2); + msg2.extend_from_slice(&encode_zigzag(2)); // a = 2 + msg2.push((1u8) << 1); + msg2.extend_from_slice(b"y"); + msg2.extend_from_slice(&encode_zigzag(7)); // c = 7 + decoder.decode(&msg1)?; + let batch1 = decoder.flush()?.expect("batch1"); + assert_eq!(batch1.num_columns(), 1); + assert_eq!(batch1.schema().field(0).name(), "b"); + let b1 = batch1.column(0).as_string::(); + assert_eq!(b1.value(0), "x"); + decoder.decode(&msg2)?; + let batch2 = decoder.flush()?.expect("batch2"); + assert_eq!(batch2.num_columns(), 1); + assert_eq!(batch2.schema().field(0).name(), "b"); + let b2 = batch2.column(0).as_string::(); + assert_eq!(b2.value(0), "y"); + Ok(()) + } + #[test] fn test_two_messages_same_schema() { let writer_schema = make_value_schema(PrimitiveType::Int); diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index 819ea1f16e9b..0ecb5c1d191c 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -395,6 +395,66 @@ impl AvroSchema { Self::generate_fingerprint(&self.schema()?, hash_type) } + pub(crate) fn project(&self, projection: &[usize]) -> Result { + let mut value: Value = serde_json::from_str(&self.json_string) + .map_err(|e| ArrowError::AvroError(format!("Invalid Avro schema JSON: {e}")))?; + let obj = value.as_object_mut().ok_or_else(|| { + ArrowError::AvroError( + "Projected schema must be a JSON object Avro record schema".to_string(), + ) + })?; + match obj.get("type").and_then(|v| v.as_str()) { + Some("record") => {} + Some(other) => { + return Err(ArrowError::AvroError(format!( + "Projected schema must be an Avro record, found type '{other}'" + ))); + } + None => { + return Err(ArrowError::AvroError( + "Projected schema missing required 'type' field".to_string(), + )); + } + } + let fields_val = obj.get_mut("fields").ok_or_else(|| { + ArrowError::AvroError("Avro record schema missing required 'fields'".to_string()) + })?; + let projected_fields = { + let mut original_fields = match fields_val { + Value::Array(arr) => std::mem::take(arr), + _ => { + return Err(ArrowError::AvroError( + "Avro record schema 'fields' must be an array".to_string(), + )); + } + }; + let len = original_fields.len(); + let mut seen: HashSet = HashSet::with_capacity(projection.len()); + let mut out: Vec = Vec::with_capacity(projection.len()); + for &i in projection { + if i >= len { + return Err(ArrowError::AvroError(format!( + "Projection index {i} out of bounds for record with {len} fields" + ))); + } + if !seen.insert(i) { + return Err(ArrowError::AvroError(format!( + "Duplicate projection index {i}" + ))); + } + out.push(std::mem::replace(&mut original_fields[i], Value::Null)); + } + out + }; + *fields_val = Value::Array(projected_fields); + let json_string = serde_json::to_string(&value).map_err(|e| { + ArrowError::AvroError(format!( + "Failed to serialize projected Avro schema JSON: {e}" + )) + })?; + Ok(Self::new(json_string)) + } + pub(crate) fn generate_fingerprint( schema: &Schema, hash_type: FingerprintAlgorithm, @@ -3137,4 +3197,546 @@ mod tests { assert_eq!(union_arr2[1], Value::String("int".into())); assert_eq!(union_arr2[2], Value::String("string".into())); } + + #[test] + fn test_project_empty_projection() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let projected = schema.project(&[]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert!( + fields.is_empty(), + "Empty projection should yield empty fields" + ); + } + + #[test] + fn test_project_single_field() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"}, + {"name": "c", "type": "long"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let projected = schema.project(&[1]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert_eq!(fields.len(), 1); + assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("b")); + } + + #[test] + fn test_project_multiple_fields() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"}, + {"name": "c", "type": "long"}, + {"name": "d", "type": "boolean"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let projected = schema.project(&[0, 2, 3]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert_eq!(fields.len(), 3); + assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a")); + assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("c")); + assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("d")); + } + + #[test] + fn test_project_all_fields() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let projected = schema.project(&[0, 1]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert_eq!(fields.len(), 2); + assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a")); + assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("b")); + } + + #[test] + fn test_project_reorder_fields() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"}, + {"name": "c", "type": "long"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + // Project in reverse order + let projected = schema.project(&[2, 0, 1]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert_eq!(fields.len(), 3); + assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("c")); + assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("a")); + assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("b")); + } + + #[test] + fn test_project_preserves_record_metadata() { + let schema_json = r#"{ + "type": "record", + "name": "MyRecord", + "namespace": "com.example", + "doc": "A test record", + "aliases": ["OldRecord"], + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let projected = schema.project(&[0]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("MyRecord")); + assert_eq!( + v.get("namespace").and_then(|n| n.as_str()), + Some("com.example") + ); + assert_eq!(v.get("doc").and_then(|n| n.as_str()), Some("A test record")); + assert!(v.get("aliases").is_some()); + } + + #[test] + fn test_project_preserves_field_metadata() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int", "doc": "Field A", "default": 0}, + {"name": "b", "type": "string"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let projected = schema.project(&[0]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert_eq!( + fields[0].get("doc").and_then(|d| d.as_str()), + Some("Field A") + ); + assert_eq!(fields[0].get("default").and_then(|d| d.as_i64()), Some(0)); + } + + #[test] + fn test_project_with_nested_record() { + let schema_json = r#"{ + "type": "record", + "name": "Outer", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "inner", "type": { + "type": "record", + "name": "Inner", + "fields": [ + {"name": "x", "type": "int"}, + {"name": "y", "type": "string"} + ] + }}, + {"name": "value", "type": "double"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let projected = schema.project(&[1]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert_eq!(fields.len(), 1); + assert_eq!( + fields[0].get("name").and_then(|n| n.as_str()), + Some("inner") + ); + // Verify nested record structure is preserved + let inner_type = fields[0].get("type").unwrap(); + assert_eq!( + inner_type.get("type").and_then(|t| t.as_str()), + Some("record") + ); + assert_eq!( + inner_type.get("name").and_then(|n| n.as_str()), + Some("Inner") + ); + } + + #[test] + fn test_project_with_complex_field_types() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "arr", "type": {"type": "array", "items": "int"}}, + {"name": "map", "type": {"type": "map", "values": "string"}}, + {"name": "union", "type": ["null", "int"]} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let projected = schema.project(&[0, 2]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert_eq!(fields.len(), 2); + // Verify array type is preserved + let arr_type = fields[0].get("type").unwrap(); + assert_eq!(arr_type.get("type").and_then(|t| t.as_str()), Some("array")); + // Verify union type is preserved + let union_type = fields[1].get("type").unwrap(); + assert!(union_type.is_array()); + } + + #[test] + fn test_project_error_invalid_json() { + let schema = AvroSchema::new("not valid json".to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("Invalid Avro schema JSON"), + "Expected parse error, got: {msg}" + ); + } + + #[test] + fn test_project_error_not_object() { + // Primitive type schema (not a JSON object) + let schema = AvroSchema::new(r#""string""#.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("must be a JSON object"), + "Expected object error, got: {msg}" + ); + } + + #[test] + fn test_project_error_array_schema() { + // Array (list) is a valid JSON but not a record + let schema = AvroSchema::new(r#"["null", "int"]"#.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("must be a JSON object"), + "Expected object error for array schema, got: {msg}" + ); + } + + #[test] + fn test_project_error_type_not_record() { + let schema_json = r#"{ + "type": "enum", + "name": "Color", + "symbols": ["RED", "GREEN", "BLUE"] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("must be an Avro record") && msg.contains("'enum'"), + "Expected type mismatch error, got: {msg}" + ); + } + + #[test] + fn test_project_error_type_array() { + let schema_json = r#"{ + "type": "array", + "items": "int" + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("must be an Avro record") && msg.contains("'array'"), + "Expected type mismatch error for array type, got: {msg}" + ); + } + + #[test] + fn test_project_error_type_fixed() { + let schema_json = r#"{ + "type": "fixed", + "name": "MD5", + "size": 16 + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("must be an Avro record") && msg.contains("'fixed'"), + "Expected type mismatch error for fixed type, got: {msg}" + ); + } + + #[test] + fn test_project_error_type_map() { + let schema_json = r#"{ + "type": "map", + "values": "string" + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("must be an Avro record") && msg.contains("'map'"), + "Expected type mismatch error for map type, got: {msg}" + ); + } + + #[test] + fn test_project_error_missing_type_field() { + let schema_json = r#"{ + "name": "Test", + "fields": [{"name": "a", "type": "int"}] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("missing required 'type' field"), + "Expected missing type error, got: {msg}" + ); + } + + #[test] + fn test_project_error_missing_fields() { + let schema_json = r#"{ + "type": "record", + "name": "Test" + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("missing required 'fields'"), + "Expected missing fields error, got: {msg}" + ); + } + + #[test] + fn test_project_error_fields_not_array() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": "not an array" + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("'fields' must be an array"), + "Expected fields array error, got: {msg}" + ); + } + + #[test] + fn test_project_error_index_out_of_bounds() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[5]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("out of bounds") && msg.contains("5") && msg.contains("2"), + "Expected out of bounds error, got: {msg}" + ); + } + + #[test] + fn test_project_error_index_out_of_bounds_edge() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + // Index 1 is just out of bounds for a 1-element array + let err = schema.project(&[1]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("out of bounds") && msg.contains("1"), + "Expected out of bounds error for edge case, got: {msg}" + ); + } + + #[test] + fn test_project_error_duplicate_index() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"}, + {"name": "c", "type": "long"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[0, 1, 0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("Duplicate projection index") && msg.contains("0"), + "Expected duplicate index error, got: {msg}" + ); + } + + #[test] + fn test_project_error_duplicate_index_consecutive() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[1, 1]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("Duplicate projection index") && msg.contains("1"), + "Expected duplicate index error for consecutive duplicates, got: {msg}" + ); + } + + #[test] + fn test_project_with_empty_fields() { + let schema_json = r#"{ + "type": "record", + "name": "EmptyRecord", + "fields": [] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + // Projecting empty from empty should succeed + let projected = schema.project(&[]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert!(fields.is_empty()); + } + + #[test] + fn test_project_empty_fields_index_out_of_bounds() { + let schema_json = r#"{ + "type": "record", + "name": "EmptyRecord", + "fields": [] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let err = schema.project(&[0]).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("out of bounds") && msg.contains("0 fields"), + "Expected out of bounds error for empty record, got: {msg}" + ); + } + + #[test] + fn test_project_result_is_valid_avro_schema() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "namespace": "com.example", + "fields": [ + {"name": "id", "type": "long"}, + {"name": "name", "type": "string"}, + {"name": "active", "type": "boolean"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + let projected = schema.project(&[0, 2]).unwrap(); + // Verify the projected schema can be parsed as a valid Avro schema + let parsed = projected.schema(); + assert!(parsed.is_ok(), "Projected schema should be valid Avro"); + match parsed.unwrap() { + Schema::Complex(ComplexType::Record(r)) => { + assert_eq!(r.name, "Test"); + assert_eq!(r.namespace, Some("com.example")); + assert_eq!(r.fields.len(), 2); + assert_eq!(r.fields[0].name, "id"); + assert_eq!(r.fields[1].name, "active"); + } + _ => panic!("Expected Record schema"), + } + } + + #[test] + fn test_project_non_contiguous_indices() { + let schema_json = r#"{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "f0", "type": "int"}, + {"name": "f1", "type": "int"}, + {"name": "f2", "type": "int"}, + {"name": "f3", "type": "int"}, + {"name": "f4", "type": "int"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + // Select every other field + let projected = schema.project(&[0, 2, 4]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert_eq!(fields.len(), 3); + assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f0")); + assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("f2")); + assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("f4")); + } + + #[test] + fn test_project_single_field_from_many() { + let schema_json = r#"{ + "type": "record", + "name": "BigRecord", + "fields": [ + {"name": "f0", "type": "int"}, + {"name": "f1", "type": "int"}, + {"name": "f2", "type": "int"}, + {"name": "f3", "type": "int"}, + {"name": "f4", "type": "int"}, + {"name": "f5", "type": "int"}, + {"name": "f6", "type": "int"}, + {"name": "f7", "type": "int"}, + {"name": "f8", "type": "int"}, + {"name": "f9", "type": "int"} + ] + }"#; + let schema = AvroSchema::new(schema_json.to_string()); + // Select only the last field + let projected = schema.project(&[9]).unwrap(); + let v: Value = serde_json::from_str(&projected.json_string).unwrap(); + let fields = v.get("fields").and_then(|f| f.as_array()).unwrap(); + assert_eq!(fields.len(), 1); + assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f9")); + } }