diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index b2084d6e368..bbddc9c4d20 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -37,6 +37,7 @@ package cdata import "C" import ( + "errors" "fmt" "io" "reflect" @@ -178,6 +179,19 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) { dt, ok := formatToSimpleType[f] if ok { ret.Type = dt + + if schema.dictionary != nil { + valueField, err := importSchema(schema.dictionary) + if err != nil { + return ret, err + } + + ret.Type = &arrow.DictionaryType{ + IndexType: ret.Type, + ValueType: valueField.Type, + Ordered: schema.dictionary.flags&C.ARROW_FLAG_DICTIONARY_ORDERED != 0} + } + return } @@ -289,6 +303,7 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) { } else { ret.Type = dt } + return } @@ -462,17 +477,26 @@ func (imp *cimporter) doImport(src *CArrowArray) error { } bufs := []*memory.Buffer{nil, nil, nil} + var err error if imp.arr.n_buffers == 3 { // legacy format exported by older arrow c++ versions - bufs[1] = imp.importFixedSizeBuffer(1, 1) - bufs[2] = imp.importFixedSizeBuffer(2, int64(arrow.Int32SizeBytes)) + if bufs[1], err = imp.importFixedSizeBuffer(1, 1); err != nil { + return err + } + if bufs[2], err = imp.importFixedSizeBuffer(2, int64(arrow.Int32SizeBytes)); err != nil { + return err + } } else { if err := imp.checkNumBuffers(2); err != nil { return err } - bufs[1] = imp.importFixedSizeBuffer(0, 1) - bufs[2] = imp.importFixedSizeBuffer(1, int64(arrow.Int32SizeBytes)) + if bufs[1], err = imp.importFixedSizeBuffer(0, 1); err != nil { + return err + } + if bufs[2], err = imp.importFixedSizeBuffer(1, int64(arrow.Int32SizeBytes)); err != nil { + return err + } } children := make([]arrow.ArrayData, len(imp.children)) @@ -486,15 +510,20 @@ func (imp *cimporter) doImport(src *CArrowArray) error { } var buf *memory.Buffer + var err error if imp.arr.n_buffers == 2 { // legacy format exported by older Arrow C++ versions - buf = imp.importFixedSizeBuffer(1, 1) + if buf, err = imp.importFixedSizeBuffer(1, 1); err != nil { + return err + } } else { if err := imp.checkNumBuffers(1); err != nil { return err } - buf = imp.importFixedSizeBuffer(0, 1) + if buf, err = imp.importFixedSizeBuffer(0, 1); err != nil { + return err + } } children := make([]arrow.ArrayData, len(imp.children)) @@ -509,21 +538,27 @@ func (imp *cimporter) doImport(src *CArrowArray) error { return nil } -func (imp *cimporter) importStringLike(offsetByteWidth int64) error { - if err := imp.checkNoChildren(); err != nil { - return err +func (imp *cimporter) importStringLike(offsetByteWidth int64) (err error) { + if err = imp.checkNoChildren(); err != nil { + return } - if err := imp.checkNumBuffers(3); err != nil { - return err + if err = imp.checkNumBuffers(3); err != nil { + return } - nulls, err := imp.importNullBitmap(0) - if err != nil { - return err + var ( + nulls, offsets, values *memory.Buffer + ) + + if nulls, err = imp.importNullBitmap(0); err != nil { + return + } + + if offsets, err = imp.importOffsetsBuffer(1, offsetByteWidth); err != nil { + return } - offsets := imp.importOffsetsBuffer(1, offsetByteWidth) var nvals int64 switch offsetByteWidth { case 4: @@ -533,29 +568,34 @@ func (imp *cimporter) importStringLike(offsetByteWidth int64) error { typedOffsets := arrow.Int64Traits.CastFromBytes(offsets.Bytes()) nvals = typedOffsets[imp.arr.offset+imp.arr.length] } - values := imp.importVariableValuesBuffer(2, 1, nvals) + if values, err = imp.importVariableValuesBuffer(2, 1, nvals); err != nil { + return + } imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset)) - return nil + return } -func (imp *cimporter) importListLike() error { - if err := imp.checkNumChildren(1); err != nil { +func (imp *cimporter) importListLike() (err error) { + if err = imp.checkNumChildren(1); err != nil { return err } - if err := imp.checkNumBuffers(2); err != nil { + if err = imp.checkNumBuffers(2); err != nil { return err } - nulls, err := imp.importNullBitmap(0) - if err != nil { - return err + var nulls, offsets *memory.Buffer + if nulls, err = imp.importNullBitmap(0); err != nil { + return } offsetSize := imp.dt.Layout().Buffers[1].ByteWidth - offsets := imp.importOffsetsBuffer(1, int64(offsetSize)) + if offsets, err = imp.importOffsetsBuffer(1, int64(offsetSize)); err != nil { + return + } + imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset)) - return nil + return } func (imp *cimporter) importFixedSizePrimitive() error { @@ -576,14 +616,30 @@ func (imp *cimporter) importFixedSizePrimitive() error { fw := imp.dt.(arrow.FixedWidthDataType) if bitutil.IsMultipleOf8(int64(fw.BitWidth())) { - values = imp.importFixedSizeBuffer(1, bitutil.BytesForBits(int64(fw.BitWidth()))) + values, err = imp.importFixedSizeBuffer(1, bitutil.BytesForBits(int64(fw.BitWidth()))) } else { if fw.BitWidth() != 1 { return xerrors.New("invalid bitwidth") } - values = imp.importBitsBuffer(1) + values, err = imp.importBitsBuffer(1) + } + + if err != nil { + return err + } + + var dict *array.Data + if dt, ok := imp.dt.(*arrow.DictionaryType); ok { + dictImp := &cimporter{dt: dt.ValueType} + if err := dictImp.doImport(imp.arr.dictionary); err != nil { + return err + } + defer dictImp.data.Release() + + dict = dictImp.data.(*array.Data) } - imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, values}, nil, int(imp.arr.null_count), int(imp.arr.offset)) + + imp.data = array.NewDataWithDictionary(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, values}, int(imp.arr.null_count), int(imp.arr.offset), dict) return nil } @@ -610,15 +666,21 @@ func (imp *cimporter) checkNumBuffers(n int64) error { return nil } -func (imp *cimporter) importBuffer(bufferID int, sz int64) *memory.Buffer { +func (imp *cimporter) importBuffer(bufferID int, sz int64) (*memory.Buffer, error) { // this is not a copy, we're just having a slice which points at the data // it's still owned by the C.ArrowArray object and its backing C++ object. + if imp.cbuffers[bufferID] == nil { + if sz != 0 { + return nil, errors.New("invalid buffer") + } + return memory.NewBufferBytes([]byte{}), nil + } const maxLen = 0x7fffffff data := (*[maxLen]byte)(unsafe.Pointer(imp.cbuffers[bufferID]))[:sz:sz] - return memory.NewBufferBytes(data) + return memory.NewBufferBytes(data), nil } -func (imp *cimporter) importBitsBuffer(bufferID int) *memory.Buffer { +func (imp *cimporter) importBitsBuffer(bufferID int) (*memory.Buffer, error) { bufsize := bitutil.BytesForBits(int64(imp.arr.length) + int64(imp.arr.offset)) return imp.importBuffer(bufferID, bufsize) } @@ -632,20 +694,20 @@ func (imp *cimporter) importNullBitmap(bufferID int) (*memory.Buffer, error) { return nil, nil } - return imp.importBitsBuffer(bufferID), nil + return imp.importBitsBuffer(bufferID) } -func (imp *cimporter) importFixedSizeBuffer(bufferID int, byteWidth int64) *memory.Buffer { +func (imp *cimporter) importFixedSizeBuffer(bufferID int, byteWidth int64) (*memory.Buffer, error) { bufsize := byteWidth * int64(imp.arr.length+imp.arr.offset) return imp.importBuffer(bufferID, bufsize) } -func (imp *cimporter) importOffsetsBuffer(bufferID int, offsetsize int64) *memory.Buffer { +func (imp *cimporter) importOffsetsBuffer(bufferID int, offsetsize int64) (*memory.Buffer, error) { bufsize := offsetsize * int64((imp.arr.length + imp.arr.offset + 1)) return imp.importBuffer(bufferID, bufsize) } -func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth, nvals int64) *memory.Buffer { +func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth, nvals int64) (*memory.Buffer, error) { bufsize := byteWidth * nvals return imp.importBuffer(bufferID, int64(bufsize)) } diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go index 9de6e604c01..555ab42adc4 100644 --- a/go/arrow/cdata/cdata_exports.go +++ b/go/arrow/cdata/cdata_exports.go @@ -76,6 +76,7 @@ type schemaExporter struct { metadata []byte flags int64 children []schemaExporter + dict *schemaExporter } func (exp *schemaExporter) handleExtension(dt arrow.DataType) arrow.DataType { @@ -228,6 +229,11 @@ func (exp *schemaExporter) exportFormat(dt arrow.DataType) string { exp.flags |= C.ARROW_FLAG_MAP_KEYS_SORTED } return "+m" + case *arrow.DictionaryType: + if dt.Ordered { + exp.flags |= C.ARROW_FLAG_DICTIONARY_ORDERED + } + return exp.exportFormat(dt.IndexType) } panic("unsupported data type for export") } @@ -240,6 +246,9 @@ func (exp *schemaExporter) export(field arrow.Field) { } switch dt := field.Type.(type) { + case *arrow.DictionaryType: + exp.dict = new(schemaExporter) + exp.dict.export(arrow.Field{Type: dt.ValueType}) case *arrow.ListType: exp.children = make([]schemaExporter, 1) exp.children[0].export(dt.ElemField()) @@ -309,6 +318,10 @@ func allocateBufferPtrArr(n int) (out []*C.void) { func (exp *schemaExporter) finish(out *CArrowSchema) { out.dictionary = nil + if exp.dict != nil { + out.dictionary = (*CArrowSchema)(C.malloc(C.sizeof_struct_ArrowSchema)) + exp.dict.finish(out.dictionary) + } out.name = C.CString(exp.name) out.format = C.CString(exp.format) out.metadata = (*C.char)(C.CBytes(exp.metadata)) @@ -353,7 +366,7 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { buffers := allocateBufferPtrArr(len(arr.Data().Buffers())) for i := range arr.Data().Buffers() { buf := arr.Data().Buffers()[i] - if buf == nil { + if buf == nil || buf.Len() == 0 { buffers[i] = nil continue } @@ -368,7 +381,7 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { out.private_data = unsafe.Pointer(&h) out.release = (*[0]byte)(C.goReleaseArray) switch arr := arr.(type) { - case *array.List: + case array.ListLike: out.n_children = 1 childPtrs := allocateArrowArrayPtrArr(1) children := allocateArrowArrayArr(1) @@ -382,13 +395,6 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { exportArray(arr.ListValues(), &children[0], nil) childPtrs[0] = &children[0] out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0])) - case *array.Map: - out.n_children = 1 - childPtrs := allocateArrowArrayPtrArr(1) - children := allocateArrowArrayArr(1) - exportArray(arr.ListValues(), &children[0], nil) - childPtrs[0] = &children[0] - out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0])) case *array.Struct: out.n_children = C.int64_t(arr.NumField()) childPtrs := allocateArrowArrayPtrArr(arr.NumField()) @@ -398,6 +404,9 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { childPtrs[i] = &children[i] } out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0])) + case *array.Dictionary: + out.dictionary = (*CArrowArray)(C.malloc(C.sizeof_struct_ArrowArray)) + exportArray(arr.Dictionary(), out.dictionary, nil) default: out.n_children = 0 out.children = nil diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go index 35a3db68ceb..84d212f2506 100644 --- a/go/arrow/cdata/cdata_test.go +++ b/go/arrow/cdata/cdata_test.go @@ -694,3 +694,56 @@ func TestExportRecordReaderStream(t *testing.T) { } assert.EqualValues(t, len(reclist), i) } + +func TestEmptyListExport(t *testing.T) { + bldr := array.NewBuilder(memory.DefaultAllocator, arrow.LargeListOf(arrow.PrimitiveTypes.Int32)) + defer bldr.Release() + + arr := bldr.NewArray() + defer arr.Release() + + var out CArrowArray + ExportArrowArray(arr, &out, nil) + + assert.Zero(t, out.length) + assert.Zero(t, out.null_count) + assert.Zero(t, out.offset) + assert.EqualValues(t, 2, out.n_buffers) + assert.NotNil(t, out.buffers) + assert.EqualValues(t, 1, out.n_children) + assert.NotNil(t, out.children) +} + +func TestEmptyDictExport(t *testing.T) { + bldr := array.NewBuilder(memory.DefaultAllocator, &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int8, ValueType: arrow.BinaryTypes.String, Ordered: true}) + defer bldr.Release() + + arr := bldr.NewArray() + defer arr.Release() + + var out CArrowArray + var sc CArrowSchema + ExportArrowArray(arr, &out, &sc) + + assert.EqualValues(t, 'c', *sc.format) + assert.NotZero(t, sc.flags&1) + assert.Zero(t, sc.n_children) + assert.NotNil(t, sc.dictionary) + assert.EqualValues(t, 'u', *sc.dictionary.format) + + assert.Zero(t, out.length) + assert.Zero(t, out.null_count) + assert.Zero(t, out.offset) + assert.EqualValues(t, 2, out.n_buffers) + assert.Zero(t, out.n_children) + assert.Nil(t, out.children) + assert.NotNil(t, out.dictionary) + + assert.Zero(t, out.dictionary.length) + assert.Zero(t, out.dictionary.null_count) + assert.Zero(t, out.dictionary.offset) + assert.EqualValues(t, 3, out.dictionary.n_buffers) + assert.Zero(t, out.dictionary.n_children) + assert.Nil(t, out.dictionary.children) + assert.Nil(t, out.dictionary.dictionary) +} diff --git a/go/arrow/cdata/exports.go b/go/arrow/cdata/exports.go index a8a39b4fbe8..daa8c8384f3 100644 --- a/go/arrow/cdata/exports.go +++ b/go/arrow/cdata/exports.go @@ -51,6 +51,11 @@ func releaseExportedSchema(schema *CArrowSchema) { return } + if schema.dictionary != nil { + C.ArrowSchemaRelease(schema.dictionary) + C.free(unsafe.Pointer(schema.dictionary)) + } + var children []*CArrowSchema s := (*reflect.SliceHeader)(unsafe.Pointer(&children)) s.Data = uintptr(unsafe.Pointer(schema.children)) @@ -76,6 +81,11 @@ func releaseExportedArray(arr *CArrowArray) { C.free(unsafe.Pointer(arr.buffers)) } + if arr.dictionary != nil { + C.ArrowArrayRelease(arr.dictionary) + C.free(unsafe.Pointer(arr.dictionary)) + } + if arr.n_children > 0 { var children []*CArrowArray s := (*reflect.SliceHeader)(unsafe.Pointer(&children)) diff --git a/go/arrow/cdata/test/test_cimport.go b/go/arrow/cdata/test/test_cimport.go index f6cadaa8d1e..6383b50701e 100644 --- a/go/arrow/cdata/test/test_cimport.go +++ b/go/arrow/cdata/test/test_cimport.go @@ -163,4 +163,16 @@ func importThenExportRecord(schemaIn, arrIn uintptr, schemaOut, arrOut uintptr) cdata.ExportArrowRecordBatch(rec, cdata.ArrayFromPtr(arrOut), cdata.SchemaFromPtr(schemaOut)) } +//export roundtripArray +func roundtripArray(arrIn, schema, arrOut uintptr) { + _, arr, err := cdata.ImportCArray(cdata.ArrayFromPtr(arrIn), cdata.SchemaFromPtr(schema)) + if err != nil { + panic(err) + } + defer arr.Release() + + outArr := cdata.ArrayFromPtr(arrOut) + cdata.ExportArrowArray(arr, outArr, nil) +} + func main() {} diff --git a/go/arrow/cdata/test/test_export_to_cgo.py b/go/arrow/cdata/test/test_export_to_cgo.py index f1cb733755b..e794a84bd43 100644 --- a/go/arrow/cdata/test/test_export_to_cgo.py +++ b/go/arrow/cdata/test/test_export_to_cgo.py @@ -43,6 +43,7 @@ def load_cgotest(): void importThenExportSchema(uintptr_t input, uintptr_t output); void importThenExportRecord(uintptr_t schemaIn, uintptr_t arrIn, uintptr_t schemaOut, uintptr_t arrOut); + void roundtripArray(uintptr_t arrIn, uintptr_t schema, uintptr_t arrOut); """) return ffi.dlopen(f'./cgotest.{libext}') @@ -161,6 +162,68 @@ def test_batch_roundtrip(self): del c_schema del c_batch + # commented out types can be uncommented after + # GH-14875 is addressed + _test_pyarrow_types = [ + pa.null(), + pa.bool_(), + pa.int32(), + pa.time32("s"), + pa.time64("us"), + pa.date32(), + pa.timestamp("us"), + pa.timestamp("us", tz="UTC"), + pa.timestamp("us", tz="Europe/Paris"), + pa.duration("s"), + pa.duration("ms"), + pa.duration("us"), + pa.duration("ns"), + pa.float16(), + pa.float32(), + pa.float64(), + pa.decimal128(19, 4), + # pa.string(), + # pa.binary(), + # pa.binary(10), + # pa.large_string(), + # pa.large_binary(), + pa.list_(pa.int32()), + pa.list_(pa.int32(), 2), + pa.large_list(pa.uint16()), + pa.struct([ + pa.field("a", pa.int32()), + pa.field("b", pa.int8()), + # pa.field("c", pa.string()), + ]), + pa.struct([ + pa.field("a", pa.int32(), nullable=False), + pa.field("b", pa.int8(), nullable=False), + # pa.field("c", pa.string()), + ]), + pa.dictionary(pa.int8(), pa.int64()), + # pa.dictionary(pa.int8(), pa.string()), + # pa.map_(pa.string(), pa.int32()), + pa.map_(pa.int64(), pa.int32()), + ] + + def test_empty_roundtrip(self): + for typ in self._test_pyarrow_types: + with self.subTest(typ=typ): + with self.assert_pyarrow_memory_released(): + a = pa.array([], typ) + a._export_to_c(self.ptr_array) + typ._export_to_c(self.ptr_schema) + + c_arr = ffi.new("struct ArrowArray*") + ptr_arr = int(ffi.cast("uintptr_t", c_arr)) + + cgotest.roundtripArray(self.ptr_array, self.ptr_schema, ptr_arr) + b = pa.Array._import_from_c(ptr_arr, typ) + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() + assert a.type == b.type + del a + del b if __name__ == '__main__': unittest.main(verbosity=2)