diff --git a/java/.gitignore b/java/.gitignore index 59c2e7b2a0c..07e84864a34 100644 --- a/java/.gitignore +++ b/java/.gitignore @@ -21,8 +21,6 @@ arrow-git.properties cmake_install.cmake install_manifest.txt target/ -?/ -!/c/ # Generated properties file flight/flight-sql-jdbc-driver/src/main/resources/properties/flight.properties diff --git a/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java b/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java index 4da3806575c..7132887ddee 100644 --- a/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java +++ b/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java @@ -22,13 +22,13 @@ import static org.apache.arrow.util.Preconditions.checkNotNull; import static org.apache.arrow.util.Preconditions.checkState; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.TypeLayout; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; @@ -44,12 +44,12 @@ final class ArrayImporter { private final FieldVector vector; private final DictionaryProvider dictionaryProvider; - private CDataReferenceManager referenceManager; + private ReferenceCountedArrowArray underlyingAllocation; private int recursionLevel; ArrayImporter(BufferAllocator allocator, FieldVector vector, DictionaryProvider dictionaryProvider) { - this.allocator = allocator; - this.vector = vector; + this.allocator = Preconditions.checkNotNull(allocator); + this.vector = Preconditions.checkNotNull(vector); this.dictionaryProvider = dictionaryProvider; } @@ -66,12 +66,11 @@ void importArray(ArrowArray src) { recursionLevel = 0; // This keeps the array alive as long as there are any buffers that need it - referenceManager = new CDataReferenceManager(ownedArray); + underlyingAllocation = new ReferenceCountedArrowArray(ownedArray); try { - referenceManager.increment(); doImport(snapshot); } finally { - referenceManager.release(); + underlyingAllocation.release(); } } @@ -81,9 +80,7 @@ private void importChild(ArrayImporter parent, ArrowArray src) { recursionLevel = parent.recursionLevel + 1; checkState(recursionLevel <= MAX_IMPORT_RECURSION_LEVEL, "Recursion level in ArrowArray struct exceeded"); // Child buffers will keep the entire parent import alive. - // Perhaps we can move the child structs on import, - // but that is another level of complication. - referenceManager = parent.referenceManager; + underlyingAllocation = parent.underlyingAllocation; doImport(snapshot); } @@ -118,36 +115,20 @@ private void doImport(ArrowArray.Snapshot snapshot) { // Import main data ArrowFieldNode fieldNode = new ArrowFieldNode(snapshot.length, snapshot.null_count); - List buffers = importBuffers(snapshot); - try { + long[] bufferPointers = NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers)); + + try (final BufferImportTypeVisitor visitor = new BufferImportTypeVisitor( + allocator, underlyingAllocation, fieldNode, bufferPointers)) { + final List buffers; + if (bufferPointers == null || bufferPointers.length == 0) { + buffers = Collections.emptyList(); + } else { + buffers = vector.getField().getType().accept(visitor); + } vector.loadFieldBuffers(fieldNode, buffers); - } catch (RuntimeException e) { + } catch (Exception e) { throw new IllegalArgumentException( "Could not load buffers for field " + vector.getField() + ". error message: " + e.getMessage(), e); } } - - private List importBuffers(ArrowArray.Snapshot snapshot) { - long[] buffers = NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers)); - if (buffers == null || buffers.length == 0) { - return new ArrayList<>(); - } - - int buffersCount = TypeLayout.getTypeBufferCount(vector.getField().getType()); - checkState(buffers.length == buffersCount, "Expected %s buffers for imported type %s, ArrowArray struct has %s", - buffersCount, vector.getField().getType().getTypeID(), buffers.length); - - List result = new ArrayList<>(buffersCount); - for (long bufferPtr : buffers) { - ArrowBuf buffer = null; - if (bufferPtr != NULL) { - // See ARROW-17720: [Java] C data interface: Add API to compute imported buffer size - int capacity = Integer.MAX_VALUE; - buffer = new ArrowBuf(referenceManager, null, capacity, bufferPtr); - buffer.writerIndex(capacity); - } - result.add(buffer); - } - return result; - } } diff --git a/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java b/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java index 99fe0432c14..a538852f47c 100644 --- a/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java +++ b/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java @@ -28,6 +28,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.ReferenceManager; import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.VisibleForTesting; /** * C Data Interface ArrowArray. @@ -149,6 +150,11 @@ public void close() { } } + @VisibleForTesting + boolean isClosed() { + return data == null; + } + private ByteBuffer directBuffer() { return MemoryUtil.directBuffer(memoryAddress(), ArrowArray.SIZE_OF).order(ByteOrder.nativeOrder()); } diff --git a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java new file mode 100644 index 00000000000..c8b6d070862 --- /dev/null +++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.c; + +import static org.apache.arrow.c.NativeUtil.NULL; +import static org.apache.arrow.util.Preconditions.checkState; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.util.VisibleForTesting; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.DurationVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntervalDayVector; +import org.apache.arrow.vector.IntervalMonthDayNanoVector; +import org.apache.arrow.vector.IntervalYearVector; +import org.apache.arrow.vector.LargeVarBinaryVector; +import org.apache.arrow.vector.LargeVarCharVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.DenseUnionVector; +import org.apache.arrow.vector.complex.LargeListVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.UnionVector; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.util.DataSizeRoundingUtil; + +/** + * Import buffers from a C Data Interface struct. + */ +class BufferImportTypeVisitor implements ArrowType.ArrowTypeVisitor>, AutoCloseable { + private final BufferAllocator allocator; + private final ReferenceCountedArrowArray underlyingAllocation; + private final ArrowFieldNode fieldNode; + private final long[] buffers; + private final List imported; + + BufferImportTypeVisitor(BufferAllocator allocator, ReferenceCountedArrowArray underlyingAllocation, + ArrowFieldNode fieldNode, long[] buffers) { + this.allocator = allocator; + this.underlyingAllocation = underlyingAllocation; + this.fieldNode = fieldNode; + this.buffers = buffers; + this.imported = new ArrayList<>(); + } + + @Override + public void close() throws Exception { + AutoCloseables.close(imported); + } + + @VisibleForTesting + long getBufferPtr(ArrowType type, int index) { + checkState( + buffers.length > index, + "Expected at least %s buffers for type %s, but found %s", index + 1, type, buffers.length); + if (buffers[index] == NULL) { + throw new IllegalStateException(String.format("Buffer %s for type %s cannot be null", index, type)); + } + return buffers[index]; + } + + private ArrowBuf importFixedBits(ArrowType type, int index, long bitsPerSlot) { + final long bufferPtr = getBufferPtr(type, index); + final long capacity = DataSizeRoundingUtil.divideBy8Ceil(bitsPerSlot * fieldNode.getLength()); + ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr); + this.imported.add(buf); + return buf; + } + + private ArrowBuf importFixedBytes(ArrowType type, int index, long bytesPerSlot) { + final long bufferPtr = getBufferPtr(type, index); + final long capacity = bytesPerSlot * fieldNode.getLength(); + ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr); + this.imported.add(buf); + return buf; + } + + private ArrowBuf importOffsets(ArrowType type, long bytesPerSlot) { + final long bufferPtr = getBufferPtr(type, 1); + final long capacity = bytesPerSlot * (fieldNode.getLength() + 1); + ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr); + this.imported.add(buf); + return buf; + } + + private ArrowBuf importData(ArrowType type, long capacity) { + final long bufferPtr = getBufferPtr(type, 2); + ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr); + this.imported.add(buf); + return buf; + } + + private ArrowBuf maybeImportBitmap(ArrowType type) { + checkState( + buffers.length > 0, + "Expected at least %s buffers for type %s, but found %s", 1, type, buffers.length); + if (buffers[0] == NULL) { + return null; + } + return importFixedBits(type, 0, /*bitsPerSlot=*/1); + } + + @Override + public List visit(ArrowType.Null type) { + checkState( + buffers.length == 0, + "Expected %s buffers for type %s, but found %s", 0, type, buffers.length); + return Collections.emptyList(); + } + + @Override + public List visit(ArrowType.Struct type) { + return Collections.singletonList(maybeImportBitmap(type)); + } + + @Override + public List visit(ArrowType.List type) { + return Arrays.asList(maybeImportBitmap(type), importOffsets(type, ListVector.OFFSET_WIDTH)); + } + + @Override + public List visit(ArrowType.LargeList type) { + return Arrays.asList(maybeImportBitmap(type), importOffsets(type, LargeListVector.OFFSET_WIDTH)); + } + + @Override + public List visit(ArrowType.FixedSizeList type) { + return Collections.singletonList(maybeImportBitmap(type)); + } + + @Override + public List visit(ArrowType.Union type) { + switch (type.getMode()) { + case Sparse: + return Collections.singletonList(importFixedBytes(type, 0, UnionVector.TYPE_WIDTH)); + case Dense: + return Arrays.asList(importFixedBytes(type, 0, DenseUnionVector.TYPE_WIDTH), + importFixedBytes(type, 0, DenseUnionVector.OFFSET_WIDTH)); + default: + throw new UnsupportedOperationException("Importing buffers for type: " + type); + } + } + + @Override + public List visit(ArrowType.Map type) { + return Arrays.asList(maybeImportBitmap(type), importOffsets(type, MapVector.OFFSET_WIDTH)); + } + + @Override + public List visit(ArrowType.Int type) { + return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, type.getBitWidth())); + } + + @Override + public List visit(ArrowType.FloatingPoint type) { + switch (type.getPrecision()) { + case HALF: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, /*bytesPerSlot=*/2)); + case SINGLE: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, Float4Vector.TYPE_WIDTH)); + case DOUBLE: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, Float8Vector.TYPE_WIDTH)); + default: + throw new UnsupportedOperationException("Importing buffers for type: " + type); + } + } + + @Override + public List visit(ArrowType.Utf8 type) { + try (ArrowBuf offsets = importOffsets(type, VarCharVector.OFFSET_WIDTH)) { + final int start = offsets.getInt(0); + final int end = offsets.getInt(fieldNode.getLength() * (long) VarCharVector.OFFSET_WIDTH); + checkState( + end >= start, + "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end); + final int len = end - start; + offsets.getReferenceManager().retain(); + return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len)); + } + } + + @Override + public List visit(ArrowType.LargeUtf8 type) { + try (ArrowBuf offsets = importOffsets(type, LargeVarCharVector.OFFSET_WIDTH)) { + final long start = offsets.getLong(0); + final long end = offsets.getLong(fieldNode.getLength() * (long) LargeVarCharVector.OFFSET_WIDTH); + checkState( + end >= start, + "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end); + final long len = end - start; + offsets.getReferenceManager().retain(); + return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len)); + } + } + + @Override + public List visit(ArrowType.Binary type) { + try (ArrowBuf offsets = importOffsets(type, VarBinaryVector.OFFSET_WIDTH)) { + final int start = offsets.getInt(0); + final int end = offsets.getInt(fieldNode.getLength() * (long) VarBinaryVector.OFFSET_WIDTH); + checkState( + end >= start, + "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end); + final int len = end - start; + offsets.getReferenceManager().retain(); + return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len)); + } + } + + @Override + public List visit(ArrowType.LargeBinary type) { + try (ArrowBuf offsets = importOffsets(type, LargeVarBinaryVector.OFFSET_WIDTH)) { + final long start = offsets.getLong(0); + // TODO: need better tests to cover the failure when I forget to multiply by offset width + final long end = offsets.getLong(fieldNode.getLength() * (long) LargeVarBinaryVector.OFFSET_WIDTH); + checkState( + end >= start, + "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end); + final long len = end - start; + offsets.getReferenceManager().retain(); + return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len)); + } + } + + @Override + public List visit(ArrowType.FixedSizeBinary type) { + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, type.getByteWidth())); + } + + @Override + public List visit(ArrowType.Bool type) { + return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, /*bitsPerSlot=*/1)); + } + + @Override + public List visit(ArrowType.Decimal type) { + return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, type.getBitWidth())); + } + + @Override + public List visit(ArrowType.Date type) { + switch (type.getUnit()) { + case DAY: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, DateDayVector.TYPE_WIDTH)); + case MILLISECOND: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, DateMilliVector.TYPE_WIDTH)); + default: + throw new UnsupportedOperationException("Importing buffers for type: " + type); + } + } + + @Override + public List visit(ArrowType.Time type) { + switch (type.getUnit()) { + case SECOND: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeSecVector.TYPE_WIDTH)); + case MILLISECOND: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeMilliVector.TYPE_WIDTH)); + case MICROSECOND: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeMicroVector.TYPE_WIDTH)); + case NANOSECOND: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeNanoVector.TYPE_WIDTH)); + default: + throw new UnsupportedOperationException("Importing buffers for type: " + type); + } + } + + @Override + public List visit(ArrowType.Timestamp type) { + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeStampVector.TYPE_WIDTH)); + } + + @Override + public List visit(ArrowType.Interval type) { + switch (type.getUnit()) { + case YEAR_MONTH: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, IntervalYearVector.TYPE_WIDTH)); + case DAY_TIME: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, IntervalDayVector.TYPE_WIDTH)); + case MONTH_DAY_NANO: + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, IntervalMonthDayNanoVector.TYPE_WIDTH)); + default: + throw new UnsupportedOperationException("Importing buffers for type: " + type); + } + } + + @Override + public List visit(ArrowType.Duration type) { + return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, DurationVector.TYPE_WIDTH)); + } +} diff --git a/java/c/src/main/java/org/apache/arrow/c/CDataReferenceManager.java b/java/c/src/main/java/org/apache/arrow/c/CDataReferenceManager.java deleted file mode 100644 index c5c2f977900..00000000000 --- a/java/c/src/main/java/org/apache/arrow/c/CDataReferenceManager.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.c; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.OwnershipTransferResult; -import org.apache.arrow.memory.ReferenceManager; -import org.apache.arrow.util.Preconditions; - -/** - * A ReferenceManager implementation that holds a - * {@link org.apache.arrow.c.BaseStruct}. - *

- * A reference count is maintained and once it reaches zero the struct is - * released (as per the C data interface specification) and closed. - */ -final class CDataReferenceManager implements ReferenceManager { - private final AtomicInteger bufRefCnt = new AtomicInteger(0); - - private final BaseStruct struct; - - CDataReferenceManager(BaseStruct struct) { - this.struct = struct; - } - - @Override - public int getRefCount() { - return bufRefCnt.get(); - } - - @Override - public boolean release() { - return release(1); - } - - /** - * Increment the reference count without any safety checks. - */ - void increment() { - bufRefCnt.incrementAndGet(); - } - - @Override - public boolean release(int decrement) { - Preconditions.checkState(decrement >= 1, "ref count decrement should be greater than or equal to 1"); - // decrement the ref count - final int refCnt = bufRefCnt.addAndGet(-decrement); - // the new ref count should be >= 0 - Preconditions.checkState(refCnt >= 0, "ref count has gone negative"); - if (refCnt == 0) { - // refcount of this reference manager has dropped to 0 - // release the underlying memory - struct.release(); - struct.close(); - } - return refCnt == 0; - } - - @Override - public void retain() { - retain(1); - } - - @Override - public void retain(int increment) { - Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment); - final int originalReferenceCount = bufRefCnt.getAndAdd(increment); - Preconditions.checkState(originalReferenceCount > 0, "retain called but memory was already released"); - } - - @Override - public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) { - retain(); - - ArrowBuf targetArrowBuf = this.deriveBuffer(srcBuffer, 0, srcBuffer.capacity()); - targetArrowBuf.readerIndex(srcBuffer.readerIndex()); - targetArrowBuf.writerIndex(srcBuffer.writerIndex()); - return targetArrowBuf; - } - - @Override - public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length) { - final long derivedBufferAddress = sourceBuffer.memoryAddress() + index; - return new ArrowBuf(this, null, length, derivedBufferAddress); - } - - @Override - public OwnershipTransferResult transferOwnership(ArrowBuf sourceBuffer, BufferAllocator targetAllocator) { - throw new UnsupportedOperationException(); - } - - @Override - public BufferAllocator getAllocator() { - return null; - } - - @Override - public long getSize() { - return 0L; - } - - @Override - public long getAccountedSize() { - return 0L; - } -} diff --git a/java/c/src/main/java/org/apache/arrow/c/ReferenceCountedArrowArray.java b/java/c/src/main/java/org/apache/arrow/c/ReferenceCountedArrowArray.java new file mode 100644 index 00000000000..f09f14817b6 --- /dev/null +++ b/java/c/src/main/java/org/apache/arrow/c/ReferenceCountedArrowArray.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.c; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.ForeignAllocation; + +/** + * The owner of an imported C Data Interface array. + * + *

There is a fundamental mismatch here between memory allocation schemes: AllocationManager represents a single + * allocation (= a single address and length). But an ArrowArray combines multiple allocations behind a single + * deallocation callback. This class bridges the two by tracking a reference count, so that the single callback + * can be managed by multiple {@link ForeignAllocation} instances. + */ +final class ReferenceCountedArrowArray { + private final ArrowArray array; + private final AtomicInteger refCnt; + + ReferenceCountedArrowArray(ArrowArray array) { + this.array = array; + this.refCnt = new AtomicInteger(1); + } + + void retain() { + if (refCnt.addAndGet(1) - 1 <= 0) { + throw new IllegalStateException("Tried to retain a released ArrowArray"); + } + } + + void release() { + int refcnt = refCnt.addAndGet(-1); + if (refcnt == 0) { + array.release(); + array.close(); + } else if (refcnt < 0) { + throw new IllegalStateException("Reference count went negative for imported ArrowArray"); + } + } + + /** + * Create an ArrowBuf wrapping a buffer from this ArrowArray associated with the given BufferAllocator. + * + *

This method is "unsafe" because there is no validation of the given capacity or address. If the returned + * buffer is not freed, a memory leak will occur. + */ + ArrowBuf unsafeAssociateAllocation(BufferAllocator trackingAllocator, long capacity, long memoryAddress) { + retain(); + return trackingAllocator.wrapForeignAllocation(new ForeignAllocation(capacity, memoryAddress) { + @Override + protected void release0() { + ReferenceCountedArrowArray.this.release(); + } + }); + } +} diff --git a/java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java b/java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java new file mode 100644 index 00000000000..2d31089ca70 --- /dev/null +++ b/java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.c; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.ReferenceManager; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ArrowArrayUtilityTest { + BufferAllocator allocator; + ArrowArray arrowArray; + ReferenceCountedArrowArray dummyHandle; + + @BeforeEach + void beforeEach() { + allocator = new RootAllocator(); + arrowArray = ArrowArray.allocateNew(allocator); + dummyHandle = new ReferenceCountedArrowArray(arrowArray); + } + + @AfterEach + void afterEach() { + dummyHandle.release(); + allocator.close(); + } + + // ------------------------------------------------------------ + // BufferImportTypeVisitor + + @Test + void getBufferPtr() throws Exception { + // Note values are all dummy values here + try (BufferImportTypeVisitor visitor = + new BufferImportTypeVisitor(allocator, dummyHandle, new ArrowFieldNode(0, 0), new long[]{0})) { + + // Too few buffers + assertThrows(IllegalStateException.class, () -> visitor.getBufferPtr(new ArrowType.Bool(), 1)); + + // Null where one isn't expected + assertThrows(IllegalStateException.class, () -> visitor.getBufferPtr(new ArrowType.Bool(), 0)); + } + } + + @Test + void cleanupAfterFailure() throws Exception { + // Note values are all dummy values here + long address = MemoryUtil.UNSAFE.allocateMemory(16); + try (BufferImportTypeVisitor visitor = + new BufferImportTypeVisitor(allocator, dummyHandle, new ArrowFieldNode(0, 0), new long[] {address})) { + // This fails, but only after we've already imported a buffer. + assertThrows(IllegalStateException.class, () -> visitor.visit(new ArrowType.Int(32, true))); + } finally { + MemoryUtil.UNSAFE.freeMemory(address); + } + } + + @Test + void bufferAssociatedWithAllocator() throws Exception { + // Note values are all dummy values here + final long bufferSize = 16; + final long fieldLength = bufferSize / IntVector.TYPE_WIDTH; + long address = MemoryUtil.UNSAFE.allocateMemory(bufferSize); + long baseline = allocator.getAllocatedMemory(); + ArrowFieldNode fieldNode = new ArrowFieldNode(fieldLength, 0); + try (BufferImportTypeVisitor visitor = + new BufferImportTypeVisitor(allocator, dummyHandle, fieldNode, new long[] {0, address})) { + List buffers = visitor.visit(new ArrowType.Int(32, true)); + assertThat(buffers).hasSize(2); + assertThat(buffers.get(0)).isNull(); + assertThat(buffers.get(1)) + .isNotNull() + .extracting(ArrowBuf::getReferenceManager) + .extracting(ReferenceManager::getAllocator) + .isEqualTo(allocator); + assertThat(allocator.getAllocatedMemory()).isEqualTo(baseline + bufferSize); + } finally { + MemoryUtil.UNSAFE.freeMemory(address); + } + assertThat(allocator.getAllocatedMemory()).isEqualTo(baseline); + } + + // ------------------------------------------------------------ + // ReferenceCountedArrowArray + + @Test + void releaseRetain() { + ArrowArray array = ArrowArray.allocateNew(allocator); + ReferenceCountedArrowArray handle = new ReferenceCountedArrowArray(array); + assertThat(array.isClosed()).isFalse(); + handle.retain(); + assertThat(array.isClosed()).isFalse(); + handle.release(); + assertThat(array.isClosed()).isFalse(); + handle.release(); + assertThat(array.isClosed()).isTrue(); + + assertThrows(IllegalStateException.class, handle::release); + assertThrows(IllegalStateException.class, handle::retain); + } + + @Test + void associate() { + final long bufferSize = 16; + final long address = MemoryUtil.UNSAFE.allocateMemory(bufferSize); + try { + ArrowArray array = ArrowArray.allocateNew(allocator); + ReferenceCountedArrowArray handle = new ReferenceCountedArrowArray(array); + assertThat(array.isClosed()).isFalse(); + ArrowBuf buf = handle.unsafeAssociateAllocation(allocator, bufferSize, address); + assertThat(array.isClosed()).isFalse(); + buf.close(); + assertThat(array.isClosed()).isFalse(); + handle.release(); + assertThat(array.isClosed()).isTrue(); + } finally { + MemoryUtil.UNSAFE.freeMemory(address); + } + } +} diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java index 8aa9f85c250..fc73df449bd 100644 --- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java +++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java @@ -99,6 +99,7 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.TransferPair; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -106,14 +107,17 @@ public class RoundtripTest { private static final String EMPTY_SCHEMA_PATH = ""; private RootAllocator allocator = null; + private BufferAllocator childAllocator = null; @BeforeEach public void setUp() { allocator = new RootAllocator(Long.MAX_VALUE); + childAllocator = allocator.newChildAllocator("child", 0, Long.MAX_VALUE); } @AfterEach public void tearDown() { + childAllocator.close(); allocator.close(); } @@ -130,7 +134,15 @@ FieldVector vectorRoundtrip(FieldVector vector) { } // Consumer imports vector - return Data.importVector(allocator, consumerArrowArray, consumerArrowSchema, null); + FieldVector imported = Data.importVector(childAllocator, consumerArrowArray, consumerArrowSchema, null); + if (!(imported instanceof NullVector)) { + assertEquals(childAllocator, imported.getAllocator()); + } + + // Check that transfers work + TransferPair pair = imported.getTransferPair(allocator); + pair.transfer(); + return (FieldVector) pair.getTo(); } } diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java index 5f8ab12446a..3071c02f30a 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -17,48 +17,38 @@ package org.apache.arrow.memory; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.arrow.util.Preconditions; /** - * The abstract base class of AllocationManager. + * An AllocationManager is the implementation of a physical memory allocation. * - *

Manages the relationship between one or more allocators and a particular UDLE. Ensures that - * one allocator owns the - * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its - * associated allocators. + *

Manages the relationship between the allocators and a particular memory allocation. Ensures that + * one allocator owns the memory that multiple allocators may be referencing. Manages a BufferLedger between + * each of its associated allocators. It does not track the reference count; that is the role of {@link BufferLedger} + * (aka {@link ReferenceManager}). * - *

The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's - * package which need access - * to these objects or methods. + *

This is a public interface implemented by concrete allocator implementations (e.g. Netty or Unsafe). * *

Threading: AllocationManager manages thread-safety internally. Operations within the context - * of a single BufferLedger - * are lockless in nature and can be leveraged by multiple threads. Operations that cross the - * context of two ledgers - * will acquire a lock on the AllocationManager instance. Important note, there is one - * AllocationManager per - * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a - * typical query. The - * contention of acquiring a lock on AllocationManager should be very low. + * of a single BufferLedger are lockless in nature and can be leveraged by multiple threads. Operations that cross the + * context of two ledgers will acquire a lock on the AllocationManager instance. Important note, there is one + * AllocationManager per physical buffer allocation. As such, there will be thousands of these in a + * typical query. The contention of acquiring a lock on AllocationManager should be very low. */ public abstract class AllocationManager { - - private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); - + // The RootAllocator we are associated with. An allocation can only ever be associated with a single RootAllocator. private final BufferAllocator root; - private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet(); - // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap - // see JIRA for details + // An allocation can be tracked by multiple allocators. (This is because an allocator is more like a ledger.) + // All such allocators track reference counts individually, via BufferLedger instances. When an individual + // reference count reaches zero, the allocator will be dissociated from this allocation. If that was via the + // owningLedger, then no more allocators should be tracking this allocation, and the allocation will be freed. + // ARROW-1627: Trying to minimize memory overhead caused by previously used IdentityHashMap private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>(); - private final long amCreationTime = System.nanoTime(); - - // The ReferenceManager created at the time of creation of this AllocationManager - // is treated as the owning reference manager for the underlying chunk of memory - // managed by this allocation manager + // The primary BufferLedger (i.e. reference count) tracking this allocation. + // This is mostly a semantic constraint on the API user: if the reference count reaches 0 in the owningLedger, then + // there are not supposed to be any references through other allocators. In practice, this doesn't do anything + // as the implementation just forces ownership to be transferred to one of the other extant references. private volatile BufferLedger owningLedger; - private volatile long amDestructionTime = 0; protected AllocationManager(BufferAllocator accountingAllocator) { Preconditions.checkNotNull(accountingAllocator); @@ -81,7 +71,7 @@ void setOwningLedger(final BufferLedger ledger) { /** * Associate the existing underlying buffer with a new allocator. This will increase the - * reference count on the corresponding buffer ledger by 1 + * reference count on the corresponding buffer ledger by 1. * * @param allocator The target allocator to associate this buffer with. * @return The reference manager (new or existing) that associates the underlying @@ -99,6 +89,7 @@ private BufferLedger associate(final BufferAllocator allocator, final boolean re synchronized (this) { BufferLedger ledger = map.get(allocator); if (ledger != null) { + // We were already being tracked by the given allocator, just return it if (retain) { // bump the ref count for the ledger ledger.increment(); @@ -106,6 +97,7 @@ private BufferLedger associate(final BufferAllocator allocator, final boolean re return ledger; } + // We weren't previously being tracked by the given allocator; create a new ledger ledger = new BufferLedger(allocator, this); if (retain) { @@ -161,7 +153,6 @@ void release(final BufferLedger ledger) { // free the memory chunk associated with the allocation manager release0(); oldAllocator.getListener().onRelease(getSize()); - amDestructionTime = System.nanoTime(); owningLedger = null; } else { // since the refcount dropped to 0 for the owning reference manager and allocation diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java index e59349c6498..bb3816d9c41 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java @@ -235,4 +235,35 @@ BufferAllocator newChildAllocator( default RoundingPolicy getRoundingPolicy() { return DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY; } + + /** + * EXPERIMENTAL: Wrap an allocation created outside this BufferAllocator. + * + *

This is useful to integrate allocations from native code into the same memory management framework as + * Java-allocated buffers, presenting users a consistent API. The created buffer will be tracked by this allocator + * and can be transferred like Java-allocated buffers. + * + *

The underlying allocation will be closed when all references to the buffer are released. If this method throws, + * the underlying allocation will also be closed. + * + * @param allocation The underlying allocation. + */ + default ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) { + try { + forceAllocate(allocation.getSize()); + final AllocationManager manager = new ForeignAllocationManager(this, allocation); + final BufferLedger ledger = manager.associate(this); + final ArrowBuf buf = + new ArrowBuf(ledger, /*bufferManager=*/null, allocation.getSize(), allocation.memoryAddress()); + buf.writerIndex(allocation.getSize()); + return buf; + } catch (Throwable t) { + try { + allocation.release0(); + } catch (Throwable e) { + t.addSuppressed(e); + } + throw t; + } + } } diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocation.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocation.java new file mode 100644 index 00000000000..c1b47382a38 --- /dev/null +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocation.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +/** + * EXPERIMENTAL: a memory allocation that does not come from a BufferAllocator, but rather an outside source (like JNI). + * + *

To use this, subclass this class and implement {@link #release0()} to free the allocation. + */ +public abstract class ForeignAllocation { + private final long memoryAddress; + private final long size; + + /** + * Create a new AllocationManager representing an imported buffer. + * + * @param size The buffer size. + * @param memoryAddress The buffer address. + */ + protected ForeignAllocation(long size, long memoryAddress) { + this.memoryAddress = memoryAddress; + this.size = size; + } + + /** + * Get the size of this allocation. + */ + public long getSize() { + return size; + } + + /** + * Get the address of this allocation. + */ + protected long memoryAddress() { + return memoryAddress; + } + + /** + * Free this allocation. Will only be called once. + */ + protected abstract void release0(); +} diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocationManager.java new file mode 100644 index 00000000000..741b866f819 --- /dev/null +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocationManager.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +/** + * An AllocationManager wrapping a ForeignAllocation. + */ +class ForeignAllocationManager extends AllocationManager { + private final ForeignAllocation allocation; + + protected ForeignAllocationManager(BufferAllocator accountingAllocator, ForeignAllocation allocation) { + super(accountingAllocator); + this.allocation = allocation; + } + + @Override + public long getSize() { + return allocation.getSize(); + } + + @Override + protected long memoryAddress() { + return allocation.memoryAddress(); + } + + @Override + protected void release0() { + allocation.release0(); + } +} diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java index 00ae274b744..7d4de18751b 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java @@ -18,8 +18,10 @@ package org.apache.arrow.memory; /** - * Reference Manager manages one or more ArrowBufs that share the - * reference count for the underlying memory chunk. + * ReferenceManager is the reference count for one or more allocations. + * + *

In order to integrate with the core {@link BufferAllocator} implementation, the allocation itself should + * be represented by an {@link AllocationManager}, though this is not required by the API. */ public interface ReferenceManager { @@ -70,6 +72,8 @@ public interface ReferenceManager { * the target allocator-reference manager combination + 1 in the case that the provided allocator * already had an association to this underlying memory. * + *

The underlying allocation ({@link AllocationManager}) will not be copied. + * * @param srcBuffer source ArrowBuf * @param targetAllocator The target allocator to create an association with. * @return A new ArrowBuf which shares the same underlying memory as this ArrowBuf. @@ -89,9 +93,10 @@ public interface ReferenceManager { ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length); /** - * Transfer the memory accounting ownership of this ArrowBuf to another allocator. - * This will generate a new ArrowBuf that carries an association with the underlying memory - * for the given ArrowBuf + * Duplicate the memory accounting ownership of the backing allocation of the given ArrowBuf in another allocator. + * This will generate a new ArrowBuf that carries an association with the same underlying memory + * ({@link AllocationManager}s) as the given ArrowBuf. + * * @param sourceBuffer source ArrowBuf * @param targetAllocator The target allocator to create an association with * @return {@link OwnershipTransferResult} with info on transfer result and new buffer diff --git a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java new file mode 100644 index 00000000000..5e40645e06b --- /dev/null +++ b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.arrow.memory.util.MemoryUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestForeignAllocation { + BufferAllocator allocator; + + @Before + public void before() { + allocator = new RootAllocator(); + } + + @After + public void after() { + allocator.close(); + } + + @Test + public void wrapForeignAllocation() { + final long bufferSize = 16; + UnsafeForeignAllocation allocation = new UnsafeForeignAllocation(bufferSize); + try { + assertEquals(0, allocator.getAllocatedMemory()); + ArrowBuf buf = allocator.wrapForeignAllocation(allocation); + assertEquals(bufferSize, buf.capacity()); + buf.close(); + assertTrue(allocation.released); + } finally { + allocation.release0(); + } + assertEquals(0, allocator.getAllocatedMemory()); + } + + private static class UnsafeForeignAllocation extends ForeignAllocation { + boolean released = false; + + public UnsafeForeignAllocation(long bufferSize) { + super(bufferSize, MemoryUtil.UNSAFE.allocateMemory(bufferSize)); + } + + @Override + protected void release0() { + if (!released) { + MemoryUtil.UNSAFE.freeMemory(memoryAddress()); + released = true; + } + } + } +}