Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions java/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ arrow-git.properties
cmake_install.cmake
install_manifest.txt
target/
?/
!/c/

# Generated properties file
flight/flight-sql-jdbc-driver/src/main/resources/properties/flight.properties
57 changes: 19 additions & 38 deletions java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import static org.apache.arrow.util.Preconditions.checkNotNull;
import static org.apache.arrow.util.Preconditions.checkState;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TypeLayout;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
Expand All @@ -44,12 +44,12 @@ final class ArrayImporter {
private final FieldVector vector;
private final DictionaryProvider dictionaryProvider;

private CDataReferenceManager referenceManager;
private ReferenceCountedArrowArray underlyingAllocation;
private int recursionLevel;

ArrayImporter(BufferAllocator allocator, FieldVector vector, DictionaryProvider dictionaryProvider) {
this.allocator = allocator;
this.vector = vector;
this.allocator = Preconditions.checkNotNull(allocator);
this.vector = Preconditions.checkNotNull(vector);
this.dictionaryProvider = dictionaryProvider;
}

Expand All @@ -66,12 +66,11 @@ void importArray(ArrowArray src) {
recursionLevel = 0;

// This keeps the array alive as long as there are any buffers that need it
referenceManager = new CDataReferenceManager(ownedArray);
underlyingAllocation = new ReferenceCountedArrowArray(ownedArray);
try {
referenceManager.increment();
doImport(snapshot);
} finally {
referenceManager.release();
underlyingAllocation.release();
}
}

Expand All @@ -81,9 +80,7 @@ private void importChild(ArrayImporter parent, ArrowArray src) {
recursionLevel = parent.recursionLevel + 1;
checkState(recursionLevel <= MAX_IMPORT_RECURSION_LEVEL, "Recursion level in ArrowArray struct exceeded");
// Child buffers will keep the entire parent import alive.
// Perhaps we can move the child structs on import,
// but that is another level of complication.
referenceManager = parent.referenceManager;
underlyingAllocation = parent.underlyingAllocation;
doImport(snapshot);
}

Expand Down Expand Up @@ -118,36 +115,20 @@ private void doImport(ArrowArray.Snapshot snapshot) {

// Import main data
ArrowFieldNode fieldNode = new ArrowFieldNode(snapshot.length, snapshot.null_count);
List<ArrowBuf> buffers = importBuffers(snapshot);
try {
long[] bufferPointers = NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));

try (final BufferImportTypeVisitor visitor = new BufferImportTypeVisitor(
allocator, underlyingAllocation, fieldNode, bufferPointers)) {
final List<ArrowBuf> buffers;
if (bufferPointers == null || bufferPointers.length == 0) {
buffers = Collections.emptyList();
} else {
buffers = vector.getField().getType().accept(visitor);
}
vector.loadFieldBuffers(fieldNode, buffers);
} catch (RuntimeException e) {
} catch (Exception e) {
throw new IllegalArgumentException(
"Could not load buffers for field " + vector.getField() + ". error message: " + e.getMessage(), e);
}
}

private List<ArrowBuf> importBuffers(ArrowArray.Snapshot snapshot) {
long[] buffers = NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));
if (buffers == null || buffers.length == 0) {
return new ArrayList<>();
}

int buffersCount = TypeLayout.getTypeBufferCount(vector.getField().getType());
checkState(buffers.length == buffersCount, "Expected %s buffers for imported type %s, ArrowArray struct has %s",
buffersCount, vector.getField().getType().getTypeID(), buffers.length);

List<ArrowBuf> result = new ArrayList<>(buffersCount);
for (long bufferPtr : buffers) {
ArrowBuf buffer = null;
if (bufferPtr != NULL) {
// See ARROW-17720: [Java] C data interface: Add API to compute imported buffer size
int capacity = Integer.MAX_VALUE;
buffer = new ArrowBuf(referenceManager, null, capacity, bufferPtr);
buffer.writerIndex(capacity);
}
result.add(buffer);
}
return result;
}
}
6 changes: 6 additions & 0 deletions java/c/src/main/java/org/apache/arrow/c/ArrowArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReferenceManager;
import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.VisibleForTesting;

/**
* C Data Interface ArrowArray.
Expand Down Expand Up @@ -149,6 +150,11 @@ public void close() {
}
}

@VisibleForTesting
boolean isClosed() {
return data == null;
}

private ByteBuffer directBuffer() {
return MemoryUtil.directBuffer(memoryAddress(), ArrowArray.SIZE_OF).order(ByteOrder.nativeOrder());
}
Expand Down
Loading