Skip to content

Commit

Permalink
Add serialization of ScalarValue::Struct (#3536)
Browse files Browse the repository at this point in the history
* Add serialization of `ScalarValue::Struct`

* Remove explicit is_null encoding

* Restore submodules
  • Loading branch information
alamb committed Sep 28, 2022
1 parent 41b59cf commit b4c0601
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 7 deletions.
8 changes: 8 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,13 @@ message IntervalMonthDayNanoValue {
int64 nanos = 3;
}

message StructValue {
// Note that a null struct value must have one or more fields, so we
// encode a null StructValue as one witth an empty field_values
// list.
repeated ScalarValue field_values = 2;
repeated Field fields = 3;
}

message ScalarValue{
oneof value {
Expand Down Expand Up @@ -773,6 +780,7 @@ message ScalarValue{
bytes large_binary_value = 29;
int64 time64_value = 30;
IntervalMonthDayNanoValue interval_month_day_nano = 31;
StructValue struct_value = 32;
}
}

Expand Down
22 changes: 22 additions & 0 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,28 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
)),
Value::StructValue(v) => {
// all structs must have at least 1 field, so we treat
// an empty values list as NULL
let values = if v.field_values.is_empty() {
None
} else {
Some(
v.field_values
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<ScalarValue>, _>>()?,
)
};

let fields = v
.fields
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<Field>, _>>()?;

Self::Struct(values, Box::new(fields))
}
})
}
}
Expand Down
22 changes: 22 additions & 0 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ mod roundtrip_tests {
#[test]
fn scalar_values_error_serialization() {
let should_fail_on_seralize: Vec<ScalarValue> = vec![
// Should fail due to empty values
ScalarValue::Struct(
Some(vec![]),
Box::new(vec![Field::new("item", DataType::Int16, true)]),
),
// Should fail due to inconsistent types
ScalarValue::new_list(
Some(vec![
Expand Down Expand Up @@ -514,6 +519,23 @@ mod roundtrip_tests {
ScalarValue::Binary(None),
ScalarValue::LargeBinary(Some(b"bar".to_vec())),
ScalarValue::LargeBinary(None),
ScalarValue::Struct(
Some(vec![
ScalarValue::Int32(Some(23)),
ScalarValue::Boolean(Some(false)),
]),
Box::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Boolean, false),
]),
),
ScalarValue::Struct(
None,
Box::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("a", DataType::Boolean, false),
]),
),
];

for test_case in should_pass.into_iter() {
Expand Down
32 changes: 25 additions & 7 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ impl Error {
}
}

fn invalid_scalar_value(value: &ScalarValue) -> Self {
Self::InvalidScalarValue(value.to_owned())
}

fn invalid_scalar_type(data_type: &DataType) -> Self {
Self::InvalidScalarType(data_type.to_owned())
}
Expand Down Expand Up @@ -1214,9 +1210,31 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
protobuf::ScalarValue { value: Some(value) }
}

datafusion::scalar::ScalarValue::Struct(_, _) => {
// not yet implemented (TODO file ticket)
return Err(Error::invalid_scalar_value(val));
datafusion::scalar::ScalarValue::Struct(values, fields) => {
// encode null as empty field values list
let field_values = if let Some(values) = values {
if values.is_empty() {
return Err(Error::InvalidScalarValue(val.clone()));
}
values
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<protobuf::ScalarValue>, _>>()?
} else {
vec![]
};

let fields = fields
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<protobuf::Field>, _>>()?;

protobuf::ScalarValue {
value: Some(Value::StructValue(protobuf::StructValue {
field_values,
fields,
})),
}
}

datafusion::scalar::ScalarValue::Dictionary(index_type, val) => {
Expand Down

0 comments on commit b4c0601

Please sign in to comment.