diff --git a/common/src/main/java/org/apache/arrow/c/ArrowImporter.java b/common/src/main/java/org/apache/arrow/c/ArrowImporter.java index d0d1754e88..e5cf966c84 100644 --- a/common/src/main/java/org/apache/arrow/c/ArrowImporter.java +++ b/common/src/main/java/org/apache/arrow/c/ArrowImporter.java @@ -52,7 +52,7 @@ public FieldVector importVector( ArrowArray array, ArrowSchema schema, CDataDictionaryProvider provider) { Field field = importField(schema, provider); FieldVector vector = field.createVector(allocator); - CometArrayImporter importer = new CometArrayImporter(allocator, vector, provider); + ArrayImporter importer = new ArrayImporter(allocator, vector, provider); importer.importArray(array); return vector; } diff --git a/common/src/main/java/org/apache/arrow/c/CometArrayImporter.java b/common/src/main/java/org/apache/arrow/c/CometArrayImporter.java deleted file mode 100644 index 119055b5f0..0000000000 --- a/common/src/main/java/org/apache/arrow/c/CometArrayImporter.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.arrow.c; - -import java.util.Collections; -import java.util.List; - -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.util.Preconditions; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.dictionary.Dictionary; -import org.apache.arrow.vector.dictionary.DictionaryProvider; -import org.apache.arrow.vector.ipc.message.ArrowFieldNode; -import org.apache.arrow.vector.types.pojo.DictionaryEncoding; - -import static org.apache.arrow.c.NativeUtil.NULL; -import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt; -import static org.apache.arrow.util.Preconditions.checkNotNull; -import static org.apache.arrow.util.Preconditions.checkState; - -/** - * Importer for {@link ArrowArray}. We copy it from Arrow `ArrayImporter` because we need to use - * `CometBufferImportTypeVisitor` instead of Arrow `BufferImportTypeVisitor`. - */ -final class CometArrayImporter { - private static final int MAX_IMPORT_RECURSION_LEVEL = 64; - - private final BufferAllocator allocator; - private final FieldVector vector; - private final DictionaryProvider dictionaryProvider; - - private ReferenceCountedArrowArray underlyingAllocation; - private int recursionLevel; - - CometArrayImporter( - BufferAllocator allocator, FieldVector vector, DictionaryProvider dictionaryProvider) { - this.allocator = Preconditions.checkNotNull(allocator); - this.vector = Preconditions.checkNotNull(vector); - this.dictionaryProvider = dictionaryProvider; - } - - void importArray(ArrowArray src) { - ArrowArray.Snapshot snapshot = src.snapshot(); - checkState(snapshot.release != NULL, "Cannot import released ArrowArray"); - - // Move imported array - ArrowArray ownedArray = ArrowArray.allocateNew(allocator); - ownedArray.save(snapshot); - src.markReleased(); - src.close(); - - recursionLevel = 0; - - // This keeps the array alive as long as there are any buffers that need it - underlyingAllocation = new ReferenceCountedArrowArray(ownedArray); - try { - doImport(snapshot); - } finally { - underlyingAllocation.release(); - } - } - - private void importChild(CometArrayImporter parent, ArrowArray src) { - ArrowArray.Snapshot snapshot = src.snapshot(); - checkState(snapshot.release != NULL, "Cannot import released ArrowArray"); - recursionLevel = parent.recursionLevel + 1; - checkState( - recursionLevel <= MAX_IMPORT_RECURSION_LEVEL, - "Recursion level in ArrowArray struct exceeded"); - // Child buffers will keep the entire parent import alive. - underlyingAllocation = parent.underlyingAllocation; - doImport(snapshot); - } - - private void doImport(ArrowArray.Snapshot snapshot) { - // First import children (required for reconstituting parent array data) - long[] children = - NativeUtil.toJavaArray(snapshot.children, checkedCastToInt(snapshot.n_children)); - if (children != null && children.length > 0) { - List childVectors = vector.getChildrenFromFields(); - checkState( - children.length == childVectors.size(), - "ArrowArray struct has %s children (expected %s)", - children.length, - childVectors.size()); - for (int i = 0; i < children.length; i++) { - checkState(children[i] != NULL, "ArrowArray struct has NULL child at position %s", i); - CometArrayImporter childImporter = - new CometArrayImporter(allocator, childVectors.get(i), dictionaryProvider); - childImporter.importChild(this, ArrowArray.wrap(children[i])); - } - } - - // Handle import of a dictionary encoded vector - if (snapshot.dictionary != NULL) { - DictionaryEncoding encoding = vector.getField().getDictionary(); - checkNotNull(encoding, "Missing encoding on import of ArrowArray with dictionary"); - - Dictionary dictionary = dictionaryProvider.lookup(encoding.getId()); - checkNotNull(dictionary, "Dictionary lookup failed on import of ArrowArray with dictionary"); - - // reset the dictionary vector to the initial state - dictionary.getVector().clear(); - - CometArrayImporter dictionaryImporter = - new CometArrayImporter(allocator, dictionary.getVector(), dictionaryProvider); - dictionaryImporter.importChild(this, ArrowArray.wrap(snapshot.dictionary)); - } - - // Import main data - ArrowFieldNode fieldNode = new ArrowFieldNode(snapshot.length, snapshot.null_count); - long[] bufferPointers = - NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers)); - - try (final CometBufferImportTypeVisitor visitor = - new CometBufferImportTypeVisitor( - allocator, underlyingAllocation, fieldNode, snapshot, bufferPointers)) { - final List buffers; - if (bufferPointers == null || bufferPointers.length == 0) { - buffers = Collections.emptyList(); - } else { - buffers = vector.getField().getType().accept(visitor); - } - vector.loadFieldBuffers(fieldNode, buffers); - } catch (Exception e) { - throw new IllegalArgumentException( - "Could not load buffers for field " - + vector.getField() - + ". error message: " - + e.getMessage(), - e); - } - } -} diff --git a/common/src/main/java/org/apache/arrow/c/CometBufferImportTypeVisitor.java b/common/src/main/java/org/apache/arrow/c/CometBufferImportTypeVisitor.java deleted file mode 100644 index b80e6b7f20..0000000000 --- a/common/src/main/java/org/apache/arrow/c/CometBufferImportTypeVisitor.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.arrow.c; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.util.AutoCloseables; -import org.apache.arrow.util.VisibleForTesting; -import org.apache.arrow.vector.DateDayVector; -import org.apache.arrow.vector.DateMilliVector; -import org.apache.arrow.vector.DurationVector; -import org.apache.arrow.vector.Float4Vector; -import org.apache.arrow.vector.Float8Vector; -import org.apache.arrow.vector.IntervalDayVector; -import org.apache.arrow.vector.IntervalMonthDayNanoVector; -import org.apache.arrow.vector.IntervalYearVector; -import org.apache.arrow.vector.LargeVarBinaryVector; -import org.apache.arrow.vector.LargeVarCharVector; -import org.apache.arrow.vector.TimeMicroVector; -import org.apache.arrow.vector.TimeMilliVector; -import org.apache.arrow.vector.TimeNanoVector; -import org.apache.arrow.vector.TimeSecVector; -import org.apache.arrow.vector.TimeStampVector; -import org.apache.arrow.vector.VarBinaryVector; -import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.complex.DenseUnionVector; -import org.apache.arrow.vector.complex.LargeListVector; -import org.apache.arrow.vector.complex.ListVector; -import org.apache.arrow.vector.complex.MapVector; -import org.apache.arrow.vector.complex.UnionVector; -import org.apache.arrow.vector.ipc.message.ArrowFieldNode; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.util.DataSizeRoundingUtil; - -import static org.apache.arrow.c.NativeUtil.NULL; -import static org.apache.arrow.util.Preconditions.checkState; - -/** - * Import buffers from a C Data Interface struct. We copy it from Arrow `BufferImportTypeVisitor` - * and fix the issue: https://github.com/apache/arrow/issues/42156. - */ -class CometBufferImportTypeVisitor - implements ArrowType.ArrowTypeVisitor>, AutoCloseable { - private final BufferAllocator allocator; - private final ReferenceCountedArrowArray underlyingAllocation; - private final ArrowFieldNode fieldNode; - private final ArrowArray.Snapshot snapshot; - private final long[] buffers; - private final List imported; - - CometBufferImportTypeVisitor( - BufferAllocator allocator, - ReferenceCountedArrowArray underlyingAllocation, - ArrowFieldNode fieldNode, - ArrowArray.Snapshot snapshot, - long[] buffers) { - this.allocator = allocator; - this.underlyingAllocation = underlyingAllocation; - this.fieldNode = fieldNode; - this.snapshot = snapshot; - this.buffers = buffers; - this.imported = new ArrayList<>(); - } - - @Override - public void close() throws Exception { - AutoCloseables.close(imported); - } - - @VisibleForTesting - ArrowBuf importBuffer(ArrowType type, int index, long capacity) { - return importBuffer(type, index, 0, capacity); - } - - @VisibleForTesting - ArrowBuf importBuffer(ArrowType type, int index, long offset, long capacity) { - checkState( - buffers.length > index, - "Expected at least %s buffers for type %s, but found %s", - index + 1, - type, - buffers.length); - long bufferPtr = buffers[index] + offset; - - if (bufferPtr == NULL) { - // C array may be NULL but only accept that if expected capacity is zero too - if (capacity != 0) { - throw new IllegalStateException( - String.format("Buffer %s for type %s cannot be null", index, type)); - } else { - // no data in the C array, return an empty buffer - return allocator.getEmpty(); - } - } - - ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr); - imported.add(buf); - return buf; - } - - private ArrowBuf importFixedBits(ArrowType type, int index, long bitsPerSlot) { - final long capacity = DataSizeRoundingUtil.divideBy8Ceil(bitsPerSlot * fieldNode.getLength()); - return importBuffer(type, index, capacity); - } - - private ArrowBuf importFixedBytes(ArrowType type, int index, long bytesPerSlot) { - final long capacity = bytesPerSlot * fieldNode.getLength(); - return importBuffer(type, index, capacity); - } - - private ArrowBuf importOffsets(ArrowType type, long bytesPerSlot) { - final long capacity = bytesPerSlot * (fieldNode.getLength() + 1); - final long offset = snapshot.offset * bytesPerSlot; - return importBuffer(type, 1, offset, capacity); - } - - private ArrowBuf importData(ArrowType type, long capacity) { - return importBuffer(type, 2, capacity); - } - - private ArrowBuf maybeImportBitmap(ArrowType type) { - checkState( - buffers.length > 0, - "Expected at least %s buffers for type %s, but found %s", - 1, - type, - buffers.length); - if (buffers[0] == NULL) { - return null; - } - return importFixedBits(type, 0, /* bitsPerSlot= */ 1); - } - - @Override - public List visit(ArrowType.Null type) { - checkState( - buffers.length == 0, - "Expected %s buffers for type %s, but found %s", - 0, - type, - buffers.length); - return Collections.emptyList(); - } - - @Override - public List visit(ArrowType.Struct type) { - return Collections.singletonList(maybeImportBitmap(type)); - } - - @Override - public List visit(ArrowType.List type) { - return Arrays.asList(maybeImportBitmap(type), importOffsets(type, ListVector.OFFSET_WIDTH)); - } - - @Override - public List visit(ArrowType.LargeList type) { - return Arrays.asList( - maybeImportBitmap(type), importOffsets(type, LargeListVector.OFFSET_WIDTH)); - } - - @Override - public List visit(ArrowType.FixedSizeList type) { - return Collections.singletonList(maybeImportBitmap(type)); - } - - @Override - public List visit(ArrowType.Union type) { - switch (type.getMode()) { - case Sparse: - return Collections.singletonList(importFixedBytes(type, 0, UnionVector.TYPE_WIDTH)); - case Dense: - return Arrays.asList( - importFixedBytes(type, 0, DenseUnionVector.TYPE_WIDTH), - importFixedBytes(type, 1, DenseUnionVector.OFFSET_WIDTH)); - default: - throw new UnsupportedOperationException("Importing buffers for union type: " + type); - } - } - - @Override - public List visit(ArrowType.Map type) { - return Arrays.asList(maybeImportBitmap(type), importOffsets(type, MapVector.OFFSET_WIDTH)); - } - - @Override - public List visit(ArrowType.Int type) { - return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, type.getBitWidth())); - } - - @Override - public List visit(ArrowType.FloatingPoint type) { - switch (type.getPrecision()) { - case HALF: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, /* bytesPerSlot= */ 2)); - case SINGLE: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, Float4Vector.TYPE_WIDTH)); - case DOUBLE: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, Float8Vector.TYPE_WIDTH)); - default: - throw new UnsupportedOperationException("Importing buffers for type: " + type); - } - } - - @Override - public List visit(ArrowType.Utf8 type) { - try (ArrowBuf offsets = importOffsets(type, VarCharVector.OFFSET_WIDTH)) { - final int start = offsets.getInt(0); - final int end = offsets.getInt(fieldNode.getLength() * (long) VarCharVector.OFFSET_WIDTH); - checkState( - end >= start, - "Offset buffer for type %s is malformed: start: %s, end: %s", - type, - start, - end); - // HACK: For the issue https://github.com/apache/datafusion-comet/issues/540 - // As Arrow Java doesn't support `offset` in C Data interface, we cannot correctly import - // a slice of string from arrow-rs to Java Arrow and then export it to arrow-rs again. - // So we add this hack to always take full length of data buffer by assuming the first offset - // is always 0 which is true for Arrow Java and arrow-rs. - final int len = end; - offsets.getReferenceManager().retain(); - return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len)); - } - } - - @Override - public List visit(ArrowType.LargeUtf8 type) { - try (ArrowBuf offsets = importOffsets(type, LargeVarCharVector.OFFSET_WIDTH)) { - final long start = offsets.getLong(0); - final long end = - offsets.getLong(fieldNode.getLength() * (long) LargeVarCharVector.OFFSET_WIDTH); - checkState( - end >= start, - "Offset buffer for type %s is malformed: start: %s, end: %s", - type, - start, - end); - // HACK: For the issue https://github.com/apache/datafusion-comet/issues/540 - // As Arrow Java doesn't support `offset` in C Data interface, we cannot correctly import - // a slice of string from arrow-rs to Java Arrow and then export it to arrow-rs again. - // So we add this hack to always take full length of data buffer by assuming the first offset - // is always 0 which is true for Arrow Java and arrow-rs. - final long len = end; - offsets.getReferenceManager().retain(); - return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len)); - } - } - - @Override - public List visit(ArrowType.Binary type) { - try (ArrowBuf offsets = importOffsets(type, VarBinaryVector.OFFSET_WIDTH)) { - final int start = offsets.getInt(0); - final int end = offsets.getInt(fieldNode.getLength() * (long) VarBinaryVector.OFFSET_WIDTH); - checkState( - end >= start, - "Offset buffer for type %s is malformed: start: %s, end: %s", - type, - start, - end); - // HACK: For the issue https://github.com/apache/datafusion-comet/issues/540 - // As Arrow Java doesn't support `offset` in C Data interface, we cannot correctly import - // a slice of string from arrow-rs to Java Arrow and then export it to arrow-rs again. - // So we add this hack to always take full length of data buffer by assuming the first offset - // is always 0 which is true for Arrow Java and arrow-rs. - final int len = end; - offsets.getReferenceManager().retain(); - return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len)); - } - } - - @Override - public List visit(ArrowType.LargeBinary type) { - try (ArrowBuf offsets = importOffsets(type, LargeVarBinaryVector.OFFSET_WIDTH)) { - final long start = offsets.getLong(0); - // TODO: need better tests to cover the failure when I forget to multiply by offset width - final long end = - offsets.getLong(fieldNode.getLength() * (long) LargeVarBinaryVector.OFFSET_WIDTH); - checkState( - end >= start, - "Offset buffer for type %s is malformed: start: %s, end: %s", - type, - start, - end); - // HACK: For the issue https://github.com/apache/datafusion-comet/issues/540 - // As Arrow Java doesn't support `offset` in C Data interface, we cannot correctly import - // a slice of string from arrow-rs to Java Arrow and then export it to arrow-rs again. - // So we add this hack to always take full length of data buffer by assuming the first offset - // is always 0 which is true for Arrow Java and arrow-rs. - final long len = end; - offsets.getReferenceManager().retain(); - return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len)); - } - } - - @Override - public List visit(ArrowType.FixedSizeBinary type) { - return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, type.getByteWidth())); - } - - @Override - public List visit(ArrowType.Bool type) { - return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, /* bitsPerSlot= */ 1)); - } - - @Override - public List visit(ArrowType.Decimal type) { - return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, type.getBitWidth())); - } - - @Override - public List visit(ArrowType.Date type) { - switch (type.getUnit()) { - case DAY: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, DateDayVector.TYPE_WIDTH)); - case MILLISECOND: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, DateMilliVector.TYPE_WIDTH)); - default: - throw new UnsupportedOperationException("Importing buffers for type: " + type); - } - } - - @Override - public List visit(ArrowType.Time type) { - switch (type.getUnit()) { - case SECOND: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, TimeSecVector.TYPE_WIDTH)); - case MILLISECOND: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, TimeMilliVector.TYPE_WIDTH)); - case MICROSECOND: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, TimeMicroVector.TYPE_WIDTH)); - case NANOSECOND: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, TimeNanoVector.TYPE_WIDTH)); - default: - throw new UnsupportedOperationException("Importing buffers for type: " + type); - } - } - - @Override - public List visit(ArrowType.Timestamp type) { - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, TimeStampVector.TYPE_WIDTH)); - } - - @Override - public List visit(ArrowType.Interval type) { - switch (type.getUnit()) { - case YEAR_MONTH: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, IntervalYearVector.TYPE_WIDTH)); - case DAY_TIME: - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, IntervalDayVector.TYPE_WIDTH)); - case MONTH_DAY_NANO: - return Arrays.asList( - maybeImportBitmap(type), - importFixedBytes(type, 1, IntervalMonthDayNanoVector.TYPE_WIDTH)); - default: - throw new UnsupportedOperationException("Importing buffers for type: " + type); - } - } - - @Override - public List visit(ArrowType.Duration type) { - return Arrays.asList( - maybeImportBitmap(type), importFixedBytes(type, 1, DurationVector.TYPE_WIDTH)); - } -} diff --git a/pom.xml b/pom.xml index 0764186478..f1514c9433 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ under the License. 1.13.1 provided 3.3.4 - 16.0.0 + 18.3.0 1.9.13 2.43.0 0.8.11