Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 80 additions & 2 deletions arrow/extensions/variant.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,83 @@ func NewDefaultVariantType() *VariantType {
return vt
}

func createShreddedField(dt arrow.DataType) arrow.DataType {
switch t := dt.(type) {
case arrow.ListLikeType:
return arrow.ListOfNonNullable(arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: createShreddedField(t.Elem()), Nullable: true},
))
case *arrow.StructType:
fields := make([]arrow.Field, 0, t.NumFields())
for i := range t.NumFields() {
f := t.Field(i)
fields = append(fields, arrow.Field{
Name: f.Name,
Type: arrow.StructOf(arrow.Field{
Name: "value",
Type: arrow.BinaryTypes.Binary,
Nullable: true,
}, arrow.Field{
Name: "typed_value",
Type: createShreddedField(f.Type),
Nullable: true,
}),
Nullable: false,
Metadata: f.Metadata,
})
}
return arrow.StructOf(fields...)
default:
return dt
}
}

// NewShreddedVariantType creates a new VariantType extension type using the provided
// type to define a shredded schema by setting the `typed_value` field accordingly and
// properly constructing the shredded fields for structs, lists and so on.
//
// For example:
//
// NewShreddedVariantType(arrow.StructOf(
// arrow.Field{Name: "latitude", Type: arrow.PrimitiveTypes.Float64},
// arrow.Field{Name: "longitude", Type: arrow.PrimitiveTypes.Float32}))
//
// Will create a variant type with the following structure:
//
// arrow.StructOf(
// arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary, Nullable: false},
// arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
// arrow.Field{Name: "typed_value", Type: arrow.StructOf(
// arrow.Field{Name: "latitude", Type: arrow.StructOf(
// arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
// arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float64, Nullable: true}),
// Nullable: false},
// arrow.Field{Name: "longitude", Type: arrow.StructOf(
// arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
// arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float32, Nullable: true}),
// Nullable: false},
// ), Nullable: true})
//
// This is intended to be a convenient way to create a shredded variant type from a definition
// of the fields to shred. If the provided data type is nil, it will create a default
// variant type.
func NewShreddedVariantType(dt arrow.DataType) *VariantType {
if dt == nil {
return NewDefaultVariantType()
}

vt, _ := NewVariantType(arrow.StructOf(
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary, Nullable: false},
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{
Name: "typed_value",
Type: createShreddedField(dt),
Nullable: true,
}))
return vt
}

// NewVariantType creates a new variant type based on the provided storage type.
//
// The rules for a variant storage type are:
Expand Down Expand Up @@ -1480,16 +1557,17 @@ type shreddedObjBuilder struct {
}

func (b *shreddedObjBuilder) AppendMissing() {
b.structBldr.Append(true)
b.structBldr.AppendValues([]bool{false})
for _, fieldBldr := range b.fieldBuilders {
fieldBldr.structBldr.Append(true)
fieldBldr.valueBldr.AppendNull()
fieldBldr.typedBldr.AppendMissing()
}
}

func (b *shreddedObjBuilder) tryTyped(v variant.Value) (residual []byte) {
if v.Type() != variant.Object {
b.structBldr.AppendNull()
b.AppendMissing()
return v.Bytes()
}

Expand Down
81 changes: 81 additions & 0 deletions arrow/extensions/variant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,3 +1574,84 @@ func TestVariantBuilderUnmarshalJSON(t *testing.T) {
assert.Equal(t, int8(5), innerVal2.Value())
})
}

func TestNewSimpleShreddedVariantType(t *testing.T) {
assert.True(t, arrow.TypeEqual(extensions.NewDefaultVariantType(),
extensions.NewShreddedVariantType(nil)))

vt := extensions.NewShreddedVariantType(arrow.PrimitiveTypes.Float32)
s := arrow.StructOf(
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float32, Nullable: true})

assert.Truef(t, arrow.TypeEqual(vt.Storage, s), "expected %s, got %s", s, vt.Storage)
}

func TestNewShreddedVariantType(t *testing.T) {
vt := extensions.NewShreddedVariantType(arrow.StructOf(arrow.Field{
Name: "event_type",
Type: arrow.BinaryTypes.String,
}, arrow.Field{
Name: "event_ts",
Type: arrow.FixedWidthTypes.Timestamp_us,
}))

assert.NotNil(t, vt)
s := arrow.StructOf(
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.StructOf(
arrow.Field{Name: "event_type", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.BinaryTypes.String, Nullable: true},
)},
arrow.Field{Name: "event_ts", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true},
)},
), Nullable: true})

assert.Truef(t, arrow.TypeEqual(vt.Storage, s), "expected %s, got %s", s, vt.Storage)
}

func TestShreddedVariantNested(t *testing.T) {
vt := extensions.NewShreddedVariantType(arrow.StructOf(
arrow.Field{Name: "strval", Type: arrow.BinaryTypes.String},
arrow.Field{Name: "bool", Type: arrow.FixedWidthTypes.Boolean},
arrow.Field{Name: "location", Type: arrow.ListOf(arrow.StructOf(
arrow.Field{Name: "latitude", Type: arrow.PrimitiveTypes.Float64},
arrow.Field{Name: "longitude", Type: arrow.PrimitiveTypes.Float32},
))}))

s := arrow.StructOf(
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.StructOf(
arrow.Field{Name: "strval", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.BinaryTypes.String, Nullable: true},
)},
arrow.Field{Name: "bool", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.FixedWidthTypes.Boolean, Nullable: true},
)},
arrow.Field{Name: "location", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.ListOfNonNullable(arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.StructOf(
arrow.Field{Name: "latitude", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float64, Nullable: true},
)},
arrow.Field{Name: "longitude", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float32, Nullable: true},
)},
), Nullable: true},
)), Nullable: true})},
), Nullable: true})

assert.Truef(t, arrow.TypeEqual(vt.Storage, s), "expected %s, got %s", s, vt.Storage)
}
1 change: 1 addition & 0 deletions parquet/file/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ func (rr *recordReader) ReadRecordData(numRecords int64) (int64, error) {
// no repetition levels, skip delimiting logic. each level
// represents null or not null entry
recordsRead = utils.Min(rr.levelsWritten-rr.levelsPos, numRecords)
valuesToRead = recordsRead
// this is advanced by delimitRecords which we skipped
rr.levelsPos += recordsRead
} else {
Expand Down
104 changes: 104 additions & 0 deletions parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2314,3 +2314,107 @@ func TestEmptyListDeltaBinaryPacked(t *testing.T) {
assert.True(t, schema.Equal(tbl.Schema()))
assert.EqualValues(t, 1, tbl.NumRows())
}

func TestReadWriteNonShreddedVariant(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

bldr := extensions.NewVariantBuilder(mem, extensions.NewDefaultVariantType())
defer bldr.Release()

jsonData := `[
42,
"text",
[1, 2, 3],
{"name": "Alice"},
[{"id": 1, "name": "Item 1"}, {"id": 2, "name": "Item 2"}],
{"items": [1, "two", true], "metadata": {"created": "2025-01-01"}},
null
]`

err := bldr.UnmarshalJSON([]byte(jsonData))
require.NoError(t, err)

arr := bldr.NewArray()
defer arr.Release()

rec := array.NewRecord(arrow.NewSchema([]arrow.Field{
{Name: "variant", Type: arr.DataType(), Nullable: true},
}, nil), []arrow.Array{arr}, -1)

var buf bytes.Buffer
wr, err := pqarrow.NewFileWriter(rec.Schema(), &buf, nil,
pqarrow.DefaultWriterProps())
require.NoError(t, err)

require.NoError(t, wr.Write(rec))
rec.Release()
wr.Close()

rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
reader, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
require.NoError(t, err)
defer rdr.Close()

tbl, err := reader.ReadTable(context.Background())
require.NoError(t, err)
defer tbl.Release()

assert.True(t, array.Equal(arr, tbl.Column(0).Data().Chunk(0)))
}

func TestReadWriteShreddedVariant(t *testing.T) {
vt := extensions.NewShreddedVariantType(arrow.StructOf(
arrow.Field{Name: "event_type", Type: arrow.BinaryTypes.String},
arrow.Field{Name: "event_ts", Type: arrow.FixedWidthTypes.Timestamp_us}))

mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

bldr := vt.NewBuilder(mem)
defer bldr.Release()

jsonData := `[
{"event_type": "noop", "event_ts": "1970-01-21 00:29:54.114937Z"},
42,
{"event_type": "text", "event_ts": "1970-01-21 00:29:54.954163Z"},
{"event_type": "list", "event_ts": "1970-01-21 00:29:54.240241Z"},
"text",
{"event_type": "object", "event_ts": "1970-01-21 00:29:54.146402Z"},
null
]`

err := bldr.UnmarshalJSON([]byte(jsonData))
require.NoError(t, err)

arr := bldr.NewArray()
defer arr.Release()

rec := array.NewRecord(arrow.NewSchema([]arrow.Field{
{Name: "variant", Type: arr.DataType(), Nullable: true},
}, nil), []arrow.Array{arr}, -1)

var buf bytes.Buffer
wr, err := pqarrow.NewFileWriter(rec.Schema(), &buf,
parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)),
pqarrow.DefaultWriterProps())
require.NoError(t, err)

require.NoError(t, wr.Write(rec))
rec.Release()
wr.Close()

rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
reader, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
require.NoError(t, err)
defer rdr.Close()

tbl, err := reader.ReadTable(context.Background())
require.NoError(t, err)
defer tbl.Release()

assert.Truef(t, array.Equal(arr, tbl.Column(0).Data().Chunk(0)),
"expected: %s\ngot: %s", arr, tbl.Column(0).Data().Chunk(0))
}
40 changes: 39 additions & 1 deletion parquet/pqarrow/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,37 @@ func (fr *FileReader) Schema() (*arrow.Schema, error) {
return FromParquet(fr.rdr.MetaData().Schema, &fr.Props, fr.rdr.MetaData().KeyValueMetadata())
}

type extensionReader struct {
colReaderImpl

fieldWithExt arrow.Field
}

func (er *extensionReader) Field() *arrow.Field {
return &er.fieldWithExt
}

func (er *extensionReader) BuildArray(boundedLen int64) (*arrow.Chunked, error) {
if er.colReaderImpl == nil {
return nil, errors.New("extension reader has no underlying column reader implementation")
}

chkd, err := er.colReaderImpl.BuildArray(boundedLen)
if err != nil {
return nil, err
}
defer chkd.Release()

extType := er.fieldWithExt.Type.(arrow.ExtensionType)

newChunks := make([]arrow.Array, len(chkd.Chunks()))
for i, c := range chkd.Chunks() {
newChunks[i] = array.NewExtensionArrayWithStorage(extType, c)
}

return arrow.NewChunked(extType, newChunks), nil
}

type colReaderImpl interface {
LoadBatch(nrecs int64) error
BuildArray(boundedLen int64) (*arrow.Chunked, error)
Expand Down Expand Up @@ -517,7 +548,14 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi

switch arrowField.Type.ID() {
case arrow.EXTENSION:
return nil, xerrors.New("extension type not implemented")
storageField := arrowField
storageField.Type = arrowField.Type.(arrow.ExtensionType).StorageType()
storageReader, err := fr.getReader(ctx, field, storageField)
if err != nil {
return nil, err
}

return &ColumnReader{&extensionReader{colReaderImpl: storageReader, fieldWithExt: arrowField}}, nil
case arrow.STRUCT:

childReaders := make([]*ColumnReader, len(field.Children))
Expand Down
Loading
Loading