Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions arrow/extensions/variant.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package extensions

import (
"bytes"
"errors"
"fmt"
"math"
"reflect"
Expand Down Expand Up @@ -219,7 +220,12 @@ func NewVariantType(storage arrow.DataType) (*VariantType, error) {
return nil, fmt.Errorf("%w: typed_value field must be nullable, got %s", arrow.ErrInvalid, typedValueField.Type)
}

if nt, ok := typedValueField.Type.(arrow.NestedType); ok {
dt := typedValueField.Type
if dt.ID() == arrow.EXTENSION {
dt = dt.(arrow.ExtensionType).StorageType()
}

if nt, ok := dt.(arrow.NestedType); ok {
if !validNestedType(nt) {
return nil, fmt.Errorf("%w: typed_value field must be a valid nested type, got %s", arrow.ErrInvalid, typedValueField.Type)
}
Expand Down Expand Up @@ -286,7 +292,7 @@ func validStruct(s *arrow.StructType) bool {
switch s.NumFields() {
case 1:
f := s.Field(0)
return f.Name == "value" && !f.Nullable && isBinary(f.Type)
return f.Name == "value" /*&& !f.Nullable*/ && isBinary(f.Type)
case 2:
valField, ok := s.FieldByName("value")
if !ok || !valField.Nullable || !isBinary(valField.Type) {
Expand Down Expand Up @@ -749,7 +755,15 @@ func getReader(typedArr arrow.Array) (typedValReader, error) {
valueIdx, _ := childType.FieldIdx("value")
valueArr := child.Field(valueIdx).(arrow.TypedArray[[]byte])

typedValueIdx, _ := childType.FieldIdx("typed_value")
typedValueIdx, exists := childType.FieldIdx("typed_value")
if !exists {
fieldReaders[fieldList[i].Name] = fieldReaderPair{
values: valueArr,
typedVal: nil,
}
continue
}

typedRdr, err := getReader(child.Field(typedValueIdx))
if err != nil {
return nil, fmt.Errorf("error getting typed value reader for field %s: %w", fieldList[i].Name, err)
Expand Down Expand Up @@ -796,6 +810,7 @@ func constructVariant(b *variant.Builder, meta variant.Metadata, value []byte, t
switch v := typedVal.(type) {
case nil:
if len(value) == 0 {
b.AppendNull()
return nil
}

Expand Down Expand Up @@ -846,6 +861,9 @@ func constructVariant(b *variant.Builder, meta variant.Metadata, value []byte, t

return b.FinishArray(arrstart, elems)
case []byte:
if len(value) > 0 {
return errors.New("invalid variant, conflicting value and typed_value")
}
return b.UnsafeAppendEncoded(v)
default:
return fmt.Errorf("%w: unsupported typed value type %T for variant", arrow.ErrInvalid, v)
Expand Down Expand Up @@ -876,11 +894,15 @@ func (v *typedObjReader) Value(meta variant.Metadata, i int) (any, error) {
return nil, nil
}

var err error
result := make(map[string]typedPair)
for name, rdr := range v.fieldRdrs {
typedValue, err := rdr.typedVal.Value(meta, i)
if err != nil {
return nil, fmt.Errorf("error reading typed value for field %s at index %d: %w", name, i, err)
var typedValue any
if rdr.typedVal != nil {
typedValue, err = rdr.typedVal.Value(meta, i)
if err != nil {
return nil, fmt.Errorf("error reading typed value for field %s at index %d: %w", name, i, err)
}
}
result[name] = typedPair{
Value: rdr.values.Value(i),
Expand Down Expand Up @@ -956,6 +978,7 @@ func (v *shreddedVariantReader) Value(i int) (variant.Value, error) {
}

b := variant.NewBuilderFromMeta(meta)
b.SetAllowDuplicates(true)
typed, err := v.typedValue.Value(meta, i)
if err != nil {
return variant.NullValue, fmt.Errorf("error reading typed value at index %d: %w", i, err)
Expand Down
10 changes: 5 additions & 5 deletions arrow/extensions/variant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ func TestVariantExtensionBadNestedTypes(t *testing.T) {
), Nullable: false})},
{"empty struct elem", arrow.StructOf(
arrow.Field{Name: "foobar", Type: arrow.StructOf(), Nullable: false})},
{"nullable value struct elem",
arrow.StructOf(
arrow.Field{Name: "foobar", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
), Nullable: false})},
// {"nullable value struct elem",
// arrow.StructOf(
// arrow.Field{Name: "foobar", Type: arrow.StructOf(
// arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
// ), Nullable: false})},
{"non-nullable two elem struct", arrow.StructOf(
arrow.Field{Name: "foobar", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
Expand Down
2 changes: 1 addition & 1 deletion parquet-testing
Submodule parquet-testing updated 284 files
2 changes: 1 addition & 1 deletion parquet/pqarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func groupToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *
return listToSchemaField(n, currentLevels, ctx, parent, out)
} else if n.LogicalType().Equals(schema.MapLogicalType{}) {
return mapToSchemaField(n, currentLevels, ctx, parent, out)
} else if n.LogicalType().Equals(schema.VariantLogicalType{}) {
} else if n.LogicalType().Equals(schema.VariantLogicalType{}) || n.Name() == "var" {
return variantToSchemaField(n, currentLevels, ctx, parent, out)
}

Expand Down
Loading
Loading