Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 97 additions & 35 deletions go/arrow/cdata/cdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package cdata
import "C"

import (
"errors"
"fmt"
"io"
"reflect"
Expand Down Expand Up @@ -178,6 +179,19 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) {
dt, ok := formatToSimpleType[f]
if ok {
ret.Type = dt

if schema.dictionary != nil {
valueField, err := importSchema(schema.dictionary)
if err != nil {
return ret, err
}

ret.Type = &arrow.DictionaryType{
IndexType: ret.Type,
ValueType: valueField.Type,
Ordered: schema.dictionary.flags&C.ARROW_FLAG_DICTIONARY_ORDERED != 0}
}

return
}

Expand Down Expand Up @@ -289,6 +303,7 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) {
} else {
ret.Type = dt
}

return
}

Expand Down Expand Up @@ -462,17 +477,26 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
}

bufs := []*memory.Buffer{nil, nil, nil}
var err error
if imp.arr.n_buffers == 3 {
// legacy format exported by older arrow c++ versions
bufs[1] = imp.importFixedSizeBuffer(1, 1)
bufs[2] = imp.importFixedSizeBuffer(2, int64(arrow.Int32SizeBytes))
if bufs[1], err = imp.importFixedSizeBuffer(1, 1); err != nil {
return err
}
if bufs[2], err = imp.importFixedSizeBuffer(2, int64(arrow.Int32SizeBytes)); err != nil {
return err
}
} else {
if err := imp.checkNumBuffers(2); err != nil {
return err
}

bufs[1] = imp.importFixedSizeBuffer(0, 1)
bufs[2] = imp.importFixedSizeBuffer(1, int64(arrow.Int32SizeBytes))
if bufs[1], err = imp.importFixedSizeBuffer(0, 1); err != nil {
return err
}
if bufs[2], err = imp.importFixedSizeBuffer(1, int64(arrow.Int32SizeBytes)); err != nil {
return err
}
}

children := make([]arrow.ArrayData, len(imp.children))
Expand All @@ -486,15 +510,20 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
}

var buf *memory.Buffer
var err error
if imp.arr.n_buffers == 2 {
// legacy format exported by older Arrow C++ versions
buf = imp.importFixedSizeBuffer(1, 1)
if buf, err = imp.importFixedSizeBuffer(1, 1); err != nil {
return err
}
} else {
if err := imp.checkNumBuffers(1); err != nil {
return err
}

buf = imp.importFixedSizeBuffer(0, 1)
if buf, err = imp.importFixedSizeBuffer(0, 1); err != nil {
return err
}
}

children := make([]arrow.ArrayData, len(imp.children))
Expand All @@ -509,21 +538,27 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
return nil
}

func (imp *cimporter) importStringLike(offsetByteWidth int64) error {
if err := imp.checkNoChildren(); err != nil {
return err
func (imp *cimporter) importStringLike(offsetByteWidth int64) (err error) {
if err = imp.checkNoChildren(); err != nil {
return
}

if err := imp.checkNumBuffers(3); err != nil {
return err
if err = imp.checkNumBuffers(3); err != nil {
return
}

nulls, err := imp.importNullBitmap(0)
if err != nil {
return err
var (
nulls, offsets, values *memory.Buffer
)

if nulls, err = imp.importNullBitmap(0); err != nil {
return
}

if offsets, err = imp.importOffsetsBuffer(1, offsetByteWidth); err != nil {
return
}

offsets := imp.importOffsetsBuffer(1, offsetByteWidth)
var nvals int64
switch offsetByteWidth {
case 4:
Expand All @@ -533,29 +568,34 @@ func (imp *cimporter) importStringLike(offsetByteWidth int64) error {
typedOffsets := arrow.Int64Traits.CastFromBytes(offsets.Bytes())
nvals = typedOffsets[imp.arr.offset+imp.arr.length]
}
values := imp.importVariableValuesBuffer(2, 1, nvals)
if values, err = imp.importVariableValuesBuffer(2, 1, nvals); err != nil {
return
}
imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset))
return nil
return
}

func (imp *cimporter) importListLike() error {
if err := imp.checkNumChildren(1); err != nil {
func (imp *cimporter) importListLike() (err error) {
if err = imp.checkNumChildren(1); err != nil {
return err
}

if err := imp.checkNumBuffers(2); err != nil {
if err = imp.checkNumBuffers(2); err != nil {
return err
}

nulls, err := imp.importNullBitmap(0)
if err != nil {
return err
var nulls, offsets *memory.Buffer
if nulls, err = imp.importNullBitmap(0); err != nil {
return
}

offsetSize := imp.dt.Layout().Buffers[1].ByteWidth
offsets := imp.importOffsetsBuffer(1, int64(offsetSize))
if offsets, err = imp.importOffsetsBuffer(1, int64(offsetSize)); err != nil {
return
}

imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset))
return nil
return
}

func (imp *cimporter) importFixedSizePrimitive() error {
Expand All @@ -576,14 +616,30 @@ func (imp *cimporter) importFixedSizePrimitive() error {

fw := imp.dt.(arrow.FixedWidthDataType)
if bitutil.IsMultipleOf8(int64(fw.BitWidth())) {
values = imp.importFixedSizeBuffer(1, bitutil.BytesForBits(int64(fw.BitWidth())))
values, err = imp.importFixedSizeBuffer(1, bitutil.BytesForBits(int64(fw.BitWidth())))
} else {
if fw.BitWidth() != 1 {
return xerrors.New("invalid bitwidth")
}
values = imp.importBitsBuffer(1)
values, err = imp.importBitsBuffer(1)
}

if err != nil {
return err
}

var dict *array.Data
if dt, ok := imp.dt.(*arrow.DictionaryType); ok {
dictImp := &cimporter{dt: dt.ValueType}
if err := dictImp.doImport(imp.arr.dictionary); err != nil {
return err
}
defer dictImp.data.Release()

dict = dictImp.data.(*array.Data)
}
imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, values}, nil, int(imp.arr.null_count), int(imp.arr.offset))

imp.data = array.NewDataWithDictionary(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, values}, int(imp.arr.null_count), int(imp.arr.offset), dict)
return nil
}

Expand All @@ -610,15 +666,21 @@ func (imp *cimporter) checkNumBuffers(n int64) error {
return nil
}

func (imp *cimporter) importBuffer(bufferID int, sz int64) *memory.Buffer {
func (imp *cimporter) importBuffer(bufferID int, sz int64) (*memory.Buffer, error) {
// this is not a copy, we're just having a slice which points at the data
// it's still owned by the C.ArrowArray object and its backing C++ object.
if imp.cbuffers[bufferID] == nil {
if sz != 0 {
return nil, errors.New("invalid buffer")
}
return memory.NewBufferBytes([]byte{}), nil
}
const maxLen = 0x7fffffff
data := (*[maxLen]byte)(unsafe.Pointer(imp.cbuffers[bufferID]))[:sz:sz]
return memory.NewBufferBytes(data)
return memory.NewBufferBytes(data), nil
}

func (imp *cimporter) importBitsBuffer(bufferID int) *memory.Buffer {
func (imp *cimporter) importBitsBuffer(bufferID int) (*memory.Buffer, error) {
bufsize := bitutil.BytesForBits(int64(imp.arr.length) + int64(imp.arr.offset))
return imp.importBuffer(bufferID, bufsize)
}
Expand All @@ -632,20 +694,20 @@ func (imp *cimporter) importNullBitmap(bufferID int) (*memory.Buffer, error) {
return nil, nil
}

return imp.importBitsBuffer(bufferID), nil
return imp.importBitsBuffer(bufferID)
}

func (imp *cimporter) importFixedSizeBuffer(bufferID int, byteWidth int64) *memory.Buffer {
func (imp *cimporter) importFixedSizeBuffer(bufferID int, byteWidth int64) (*memory.Buffer, error) {
bufsize := byteWidth * int64(imp.arr.length+imp.arr.offset)
return imp.importBuffer(bufferID, bufsize)
}

func (imp *cimporter) importOffsetsBuffer(bufferID int, offsetsize int64) *memory.Buffer {
func (imp *cimporter) importOffsetsBuffer(bufferID int, offsetsize int64) (*memory.Buffer, error) {
bufsize := offsetsize * int64((imp.arr.length + imp.arr.offset + 1))
return imp.importBuffer(bufferID, bufsize)
}

func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth, nvals int64) *memory.Buffer {
func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth, nvals int64) (*memory.Buffer, error) {
bufsize := byteWidth * nvals
return imp.importBuffer(bufferID, int64(bufsize))
}
Expand Down
27 changes: 18 additions & 9 deletions go/arrow/cdata/cdata_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type schemaExporter struct {
metadata []byte
flags int64
children []schemaExporter
dict *schemaExporter
}

func (exp *schemaExporter) handleExtension(dt arrow.DataType) arrow.DataType {
Expand Down Expand Up @@ -228,6 +229,11 @@ func (exp *schemaExporter) exportFormat(dt arrow.DataType) string {
exp.flags |= C.ARROW_FLAG_MAP_KEYS_SORTED
}
return "+m"
case *arrow.DictionaryType:
if dt.Ordered {
exp.flags |= C.ARROW_FLAG_DICTIONARY_ORDERED
}
return exp.exportFormat(dt.IndexType)
}
panic("unsupported data type for export")
}
Expand All @@ -240,6 +246,9 @@ func (exp *schemaExporter) export(field arrow.Field) {
}

switch dt := field.Type.(type) {
case *arrow.DictionaryType:
exp.dict = new(schemaExporter)
exp.dict.export(arrow.Field{Type: dt.ValueType})
case *arrow.ListType:
exp.children = make([]schemaExporter, 1)
exp.children[0].export(dt.ElemField())
Expand Down Expand Up @@ -309,6 +318,10 @@ func allocateBufferPtrArr(n int) (out []*C.void) {

func (exp *schemaExporter) finish(out *CArrowSchema) {
out.dictionary = nil
if exp.dict != nil {
out.dictionary = (*CArrowSchema)(C.malloc(C.sizeof_struct_ArrowSchema))
exp.dict.finish(out.dictionary)
}
out.name = C.CString(exp.name)
out.format = C.CString(exp.format)
out.metadata = (*C.char)(C.CBytes(exp.metadata))
Expand Down Expand Up @@ -353,7 +366,7 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
buffers := allocateBufferPtrArr(len(arr.Data().Buffers()))
for i := range arr.Data().Buffers() {
buf := arr.Data().Buffers()[i]
if buf == nil {
if buf == nil || buf.Len() == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem mandatory either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this is line 374 below. In the case where buf.Len() == 0 there is no allocated data to grab a pointer to via &buf.Bytes()[0] and you end up panic'ing with an error of attempting to get index 0 of a 0 length slice. Thus if we create an empty array, it's better to just use a nil pointer.

The alternative here would be to use &buf.Buf()[0] instead, which points at the reserved bytes (since creating a new buffer will, by default, automatically reserve 64 bytes if it wasn't expanded) but I thought it better to not force us to keep that memory around for a 0 length array.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds ok to me.

buffers[i] = nil
continue
}
Expand All @@ -368,7 +381,7 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
out.private_data = unsafe.Pointer(&h)
out.release = (*[0]byte)(C.goReleaseArray)
switch arr := arr.(type) {
case *array.List:
case array.ListLike:
out.n_children = 1
childPtrs := allocateArrowArrayPtrArr(1)
children := allocateArrowArrayArr(1)
Expand All @@ -382,13 +395,6 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
exportArray(arr.ListValues(), &children[0], nil)
childPtrs[0] = &children[0]
out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0]))
case *array.Map:
out.n_children = 1
childPtrs := allocateArrowArrayPtrArr(1)
children := allocateArrowArrayArr(1)
exportArray(arr.ListValues(), &children[0], nil)
childPtrs[0] = &children[0]
out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0]))
case *array.Struct:
out.n_children = C.int64_t(arr.NumField())
childPtrs := allocateArrowArrayPtrArr(arr.NumField())
Expand All @@ -398,6 +404,9 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
childPtrs[i] = &children[i]
}
out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0]))
case *array.Dictionary:
out.dictionary = (*CArrowArray)(C.malloc(C.sizeof_struct_ArrowArray))
exportArray(arr.Dictionary(), out.dictionary, nil)
default:
out.n_children = 0
out.children = nil
Expand Down
53 changes: 53 additions & 0 deletions go/arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,3 +694,56 @@ func TestExportRecordReaderStream(t *testing.T) {
}
assert.EqualValues(t, len(reclist), i)
}

func TestEmptyListExport(t *testing.T) {
bldr := array.NewBuilder(memory.DefaultAllocator, arrow.LargeListOf(arrow.PrimitiveTypes.Int32))
defer bldr.Release()

arr := bldr.NewArray()
defer arr.Release()

var out CArrowArray
ExportArrowArray(arr, &out, nil)

assert.Zero(t, out.length)
assert.Zero(t, out.null_count)
assert.Zero(t, out.offset)
assert.EqualValues(t, 2, out.n_buffers)
assert.NotNil(t, out.buffers)
assert.EqualValues(t, 1, out.n_children)
assert.NotNil(t, out.children)
}

func TestEmptyDictExport(t *testing.T) {
bldr := array.NewBuilder(memory.DefaultAllocator, &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int8, ValueType: arrow.BinaryTypes.String, Ordered: true})
defer bldr.Release()

arr := bldr.NewArray()
defer arr.Release()

var out CArrowArray
var sc CArrowSchema
ExportArrowArray(arr, &out, &sc)

assert.EqualValues(t, 'c', *sc.format)
assert.NotZero(t, sc.flags&1)
assert.Zero(t, sc.n_children)
assert.NotNil(t, sc.dictionary)
assert.EqualValues(t, 'u', *sc.dictionary.format)

assert.Zero(t, out.length)
assert.Zero(t, out.null_count)
assert.Zero(t, out.offset)
assert.EqualValues(t, 2, out.n_buffers)
assert.Zero(t, out.n_children)
assert.Nil(t, out.children)
assert.NotNil(t, out.dictionary)

assert.Zero(t, out.dictionary.length)
assert.Zero(t, out.dictionary.null_count)
assert.Zero(t, out.dictionary.offset)
assert.EqualValues(t, 3, out.dictionary.n_buffers)
assert.Zero(t, out.dictionary.n_children)
assert.Nil(t, out.dictionary.children)
assert.Nil(t, out.dictionary.dictionary)
}
Loading