Skip to content
241 changes: 241 additions & 0 deletions arrow-avro/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,245 @@ mod test {
assert!(batch.column(0).as_any().is::<StringViewArray>());
}

fn make_reader_schema_with_default_fields(
path: &str,
default_fields: Vec<Value>,
) -> AvroSchema {
let mut root = load_writer_schema_json(path);
assert_eq!(root["type"], "record", "writer schema must be a record");
root.as_object_mut()
.expect("schema is a JSON object")
.insert("fields".to_string(), Value::Array(default_fields));
AvroSchema::new(root.to_string())
}

#[test]
fn test_schema_resolution_defaults_all_supported_types() {
let path = "test/data/skippable_types.avro";
let duration_default = "\u{0000}".repeat(12);
let reader_schema = make_reader_schema_with_default_fields(
path,
vec![
serde_json::json!({"name":"d_bool","type":"boolean","default":true}),
serde_json::json!({"name":"d_int","type":"int","default":42}),
serde_json::json!({"name":"d_long","type":"long","default":12345}),
serde_json::json!({"name":"d_float","type":"float","default":1.5}),
serde_json::json!({"name":"d_double","type":"double","default":2.25}),
serde_json::json!({"name":"d_bytes","type":"bytes","default":"XYZ"}),
serde_json::json!({"name":"d_string","type":"string","default":"hello"}),
serde_json::json!({"name":"d_date","type":{"type":"int","logicalType":"date"},"default":0}),
serde_json::json!({"name":"d_time_ms","type":{"type":"int","logicalType":"time-millis"},"default":1000}),
serde_json::json!({"name":"d_time_us","type":{"type":"long","logicalType":"time-micros"},"default":2000}),
serde_json::json!({"name":"d_ts_ms","type":{"type":"long","logicalType":"local-timestamp-millis"},"default":0}),
serde_json::json!({"name":"d_ts_us","type":{"type":"long","logicalType":"local-timestamp-micros"},"default":0}),
serde_json::json!({"name":"d_decimal","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"default":""}),
serde_json::json!({"name":"d_fixed","type":{"type":"fixed","name":"F4","size":4},"default":"ABCD"}),
serde_json::json!({"name":"d_enum","type":{"type":"enum","name":"E","symbols":["A","B","C"]},"default":"A"}),
serde_json::json!({"name":"d_duration","type":{"type":"fixed","name":"Dur","size":12,"logicalType":"duration"},"default":duration_default}),
serde_json::json!({"name":"d_uuid","type":{"type":"string","logicalType":"uuid"},"default":"00000000-0000-0000-0000-000000000000"}),
serde_json::json!({"name":"d_array","type":{"type":"array","items":"int"},"default":[1,2,3]}),
serde_json::json!({"name":"d_map","type":{"type":"map","values":"long"},"default":{"a":1,"b":2}}),
serde_json::json!({"name":"d_record","type":{
"type":"record","name":"DefaultRec","fields":[
{"name":"x","type":"int"},
{"name":"y","type":["null","string"],"default":null}
]
},"default":{"x":7}}),
serde_json::json!({"name":"d_nullable_null","type":["null","int"],"default":null}),
serde_json::json!({"name":"d_nullable_value","type":["int","null"],"default":123}),
],
);
let actual = read_alltypes_with_reader_schema(path, reader_schema);
let num_rows = actual.num_rows();
assert!(num_rows > 0, "skippable_types.avro should contain rows");
assert_eq!(
actual.num_columns(),
22,
"expected exactly our defaulted fields"
);
let mut arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(22);
arrays.push(Arc::new(BooleanArray::from_iter(std::iter::repeat_n(
Some(true),
num_rows,
))));
arrays.push(Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(
42, num_rows,
))));
arrays.push(Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
12345, num_rows,
))));
arrays.push(Arc::new(Float32Array::from_iter_values(
std::iter::repeat_n(1.5f32, num_rows),
)));
arrays.push(Arc::new(Float64Array::from_iter_values(
std::iter::repeat_n(2.25f64, num_rows),
)));
arrays.push(Arc::new(BinaryArray::from_iter_values(
std::iter::repeat_n(b"XYZ".as_ref(), num_rows),
)));
arrays.push(Arc::new(StringArray::from_iter_values(
std::iter::repeat_n("hello", num_rows),
)));
arrays.push(Arc::new(Date32Array::from_iter_values(
std::iter::repeat_n(0, num_rows),
)));
arrays.push(Arc::new(Time32MillisecondArray::from_iter_values(
std::iter::repeat_n(1_000, num_rows),
)));
arrays.push(Arc::new(Time64MicrosecondArray::from_iter_values(
std::iter::repeat_n(2_000i64, num_rows),
)));
arrays.push(Arc::new(TimestampMillisecondArray::from_iter_values(
std::iter::repeat_n(0i64, num_rows),
)));
arrays.push(Arc::new(TimestampMicrosecondArray::from_iter_values(
std::iter::repeat_n(0i64, num_rows),
)));
#[cfg(feature = "small_decimals")]
let decimal = Decimal64Array::from_iter_values(std::iter::repeat_n(0i64, num_rows))
.with_precision_and_scale(10, 2)
.unwrap();
#[cfg(not(feature = "small_decimals"))]
let decimal = Decimal128Array::from_iter_values(std::iter::repeat_n(0i128, num_rows))
.with_precision_and_scale(10, 2)
.unwrap();
arrays.push(Arc::new(decimal));
let fixed_iter = std::iter::repeat_n(Some(*b"ABCD"), num_rows);
arrays.push(Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter_with_size(fixed_iter, 4).unwrap(),
));
let enum_keys = Int32Array::from_iter_values(std::iter::repeat_n(0, num_rows));
let enum_values = StringArray::from_iter_values(["A", "B", "C"]);
let enum_arr =
DictionaryArray::<Int32Type>::try_new(enum_keys, Arc::new(enum_values)).unwrap();
arrays.push(Arc::new(enum_arr));
let duration_values = std::iter::repeat_n(
Some(IntervalMonthDayNanoType::make_value(0, 0, 0)),
num_rows,
);
let duration_arr: IntervalMonthDayNanoArray = duration_values.collect();
arrays.push(Arc::new(duration_arr));
let uuid_bytes = [0u8; 16];
let uuid_iter = std::iter::repeat_n(Some(uuid_bytes), num_rows);
arrays.push(Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter_with_size(uuid_iter, 16).unwrap(),
));
let item_field = Arc::new(Field::new(
Field::LIST_FIELD_DEFAULT_NAME,
DataType::Int32,
false,
));
let mut list_builder = ListBuilder::new(Int32Builder::new()).with_field(item_field);
for _ in 0..num_rows {
list_builder.values().append_value(1);
list_builder.values().append_value(2);
list_builder.values().append_value(3);
list_builder.append(true);
}
arrays.push(Arc::new(list_builder.finish()));
let values_field = Arc::new(Field::new("value", DataType::Int64, false));
let mut map_builder = MapBuilder::new(
Some(builder::MapFieldNames {
entry: "entries".to_string(),
key: "key".to_string(),
value: "value".to_string(),
}),
StringBuilder::new(),
Int64Builder::new(),
)
.with_values_field(values_field);
for _ in 0..num_rows {
let (keys, vals) = map_builder.entries();
keys.append_value("a");
vals.append_value(1);
keys.append_value("b");
vals.append_value(2);
map_builder.append(true).unwrap();
}
arrays.push(Arc::new(map_builder.finish()));
let rec_fields: Fields = Fields::from(vec![
Field::new("x", DataType::Int32, false),
Field::new("y", DataType::Utf8, true),
]);
let mut sb = StructBuilder::new(
rec_fields.clone(),
vec![
Box::new(Int32Builder::new()),
Box::new(StringBuilder::new()),
],
);
for _ in 0..num_rows {
sb.field_builder::<Int32Builder>(0).unwrap().append_value(7);
sb.field_builder::<StringBuilder>(1).unwrap().append_null();
sb.append(true);
}
arrays.push(Arc::new(sb.finish()));
arrays.push(Arc::new(Int32Array::from_iter(std::iter::repeat_n(
None::<i32>,
num_rows,
))));
arrays.push(Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(
123, num_rows,
))));
let expected = RecordBatch::try_new(actual.schema(), arrays).unwrap();
assert_eq!(
actual, expected,
"defaults should materialize correctly for all fields"
);
}

#[test]
fn test_schema_resolution_default_enum_invalid_symbol_errors() {
let path = "test/data/skippable_types.avro";
let bad_schema = make_reader_schema_with_default_fields(
path,
vec![serde_json::json!({
"name":"bad_enum",
"type":{"type":"enum","name":"E","symbols":["A","B","C"]},
"default":"Z"
})],
);
let file = File::open(path).unwrap();
let res = ReaderBuilder::new()
.with_reader_schema(bad_schema)
.build(BufReader::new(file));
let err = res.expect_err("expected enum default validation to fail");
let msg = err.to_string();
let lower_msg = msg.to_lowercase();
assert!(
lower_msg.contains("enum")
&& (lower_msg.contains("symbol") || lower_msg.contains("default")),
"unexpected error: {msg}"
);
}

#[test]
fn test_schema_resolution_default_fixed_size_mismatch_errors() {
let path = "test/data/skippable_types.avro";
let bad_schema = make_reader_schema_with_default_fields(
path,
vec![serde_json::json!({
"name":"bad_fixed",
"type":{"type":"fixed","name":"F","size":4},
"default":"ABC"
})],
);
let file = File::open(path).unwrap();
let res = ReaderBuilder::new()
.with_reader_schema(bad_schema)
.build(BufReader::new(file));
let err = res.expect_err("expected fixed default validation to fail");
let msg = err.to_string();
let lower_msg = msg.to_lowercase();
assert!(
lower_msg.contains("fixed")
&& (lower_msg.contains("size")
|| lower_msg.contains("length")
|| lower_msg.contains("does not match")),
"unexpected error: {msg}"
);
}

#[test]
fn test_alltypes_skip_writer_fields_keep_double_only() {
let file = arrow_test_data("avro/alltypes_plain.avro");
Expand Down Expand Up @@ -2538,6 +2777,7 @@ mod test {
let values_i128: Vec<i128> = (1..=24).map(|n| (n as i128) * pow10).collect();
let build_expected = |dt: &DataType, values: &[i128]| -> ArrayRef {
match *dt {
#[cfg(feature = "small_decimals")]
DataType::Decimal32(p, s) => {
let it = values.iter().map(|&v| v as i32);
Arc::new(
Expand All @@ -2546,6 +2786,7 @@ mod test {
.unwrap(),
)
}
#[cfg(feature = "small_decimals")]
DataType::Decimal64(p, s) => {
let it = values.iter().map(|&v| v as i64);
Arc::new(
Expand Down
Loading
Loading