Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ public ArrowBuf retain(BufferAllocator target) {
historicalLog.recordEvent("retain(%s)", target.getName());
}
final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
return otherLedger.newArrowBuf(offset, length, null);
ArrowBuf newArrowBuf = otherLedger.newArrowBuf(offset, length, null);
newArrowBuf.readerIndex(this.readerIndex);
newArrowBuf.writerIndex(this.writerIndex);
return newArrowBuf;
}

/**
Expand Down Expand Up @@ -214,6 +217,8 @@ public TransferResult transferOwnership(BufferAllocator target) {

final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
final ArrowBuf newBuf = otherLedger.newArrowBuf(offset, length, null);
newBuf.readerIndex(this.readerIndex);
newBuf.writerIndex(this.writerIndex);
final boolean allocationFit = this.ledger.transferBalance(otherLedger);
return new TransferResult(allocationFit, newBuf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ArrowBuf;
import io.netty.buffer.ArrowBuf.TransferResult;

import org.apache.arrow.memory.AllocationReservation;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.memory.RootAllocator;
import org.junit.Ignore;
import org.junit.Test;

import io.netty.buffer.ArrowBuf;
import io.netty.buffer.ArrowBuf.TransferResult;

public class TestBaseAllocator {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class);

Expand Down Expand Up @@ -134,6 +131,7 @@ public void testAllocator_transferOwnership() throws Exception {
final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
rootAllocator.verify();
TransferResult transferOwnership = arrowBuf1.transferOwnership(childAllocator2);
assertEquiv(arrowBuf1, transferOwnership.buffer);
final boolean allocationFit = transferOwnership.allocationFit;
rootAllocator.verify();
assertTrue(allocationFit);
Expand All @@ -160,6 +158,7 @@ public void testAllocator_shareOwnership() throws Exception {
rootAllocator.verify();
assertNotNull(arrowBuf2);
assertNotEquals(arrowBuf2, arrowBuf1);
assertEquiv(arrowBuf1, arrowBuf2);

// release original buffer (thus transferring ownership to allocator 2. (should leave allocator 1 in empty state)
arrowBuf1.release();
Expand All @@ -172,6 +171,7 @@ public void testAllocator_shareOwnership() throws Exception {
assertNotNull(arrowBuf3);
assertNotEquals(arrowBuf3, arrowBuf1);
assertNotEquals(arrowBuf3, arrowBuf2);
assertEquiv(arrowBuf1, arrowBuf3);
rootAllocator.verify();

arrowBuf2.release();
Expand Down Expand Up @@ -452,8 +452,10 @@ public void testAllocator_transferSliced() throws Exception {
rootAllocator.verify();

TransferResult result1 = arrowBuf2s.transferOwnership(childAllocator1);
assertEquiv(arrowBuf2s, result1.buffer);
rootAllocator.verify();
TransferResult result2 = arrowBuf1s.transferOwnership(childAllocator2);
assertEquiv(arrowBuf1s, result2.buffer);
rootAllocator.verify();

result1.buffer.release();
Expand Down Expand Up @@ -482,7 +484,9 @@ public void testAllocator_shareSliced() throws Exception {
rootAllocator.verify();

final ArrowBuf arrowBuf2s1 = arrowBuf2s.retain(childAllocator1);
assertEquiv(arrowBuf2s, arrowBuf2s1);
final ArrowBuf arrowBuf1s2 = arrowBuf1s.retain(childAllocator2);
assertEquiv(arrowBuf1s, arrowBuf1s2);
rootAllocator.verify();

arrowBuf1s.release(); // releases arrowBuf1
Expand Down Expand Up @@ -512,11 +516,13 @@ public void testAllocator_transferShared() throws Exception {
rootAllocator.verify();
assertNotNull(arrowBuf2);
assertNotEquals(arrowBuf2, arrowBuf1);
assertEquiv(arrowBuf1, arrowBuf2);

TransferResult result = arrowBuf1.transferOwnership(childAllocator3);
allocationFit = result.allocationFit;
final ArrowBuf arrowBuf3 = result.buffer;
assertTrue(allocationFit);
assertEquiv(arrowBuf1, arrowBuf3);
rootAllocator.verify();

// Since childAllocator3 now has childAllocator1's buffer, 1, can close
Expand All @@ -533,6 +539,7 @@ public void testAllocator_transferShared() throws Exception {
allocationFit = result.allocationFit;
final ArrowBuf arrowBuf4 = result2.buffer;
assertTrue(allocationFit);
assertEquiv(arrowBuf3, arrowBuf4);
rootAllocator.verify();

arrowBuf3.release();
Expand Down Expand Up @@ -645,4 +652,9 @@ public void multiple() throws Exception {

}
}

public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) {
assertEquals(origBuf.readerIndex(), newBuf.readerIndex());
assertEquals(origBuf.writerIndex(), newBuf.writerIndex());
}
}
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,6 @@
<module>format</module>
<module>memory</module>
<module>vector</module>
<module>tools</module>
<module>tools</module>
</modules>
</project>
22 changes: 20 additions & 2 deletions java/tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,26 @@
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>



</project>
137 changes: 85 additions & 52 deletions java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,85 +18,118 @@
*/
package org.apache.arrow.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.complex.NullableMapVector;
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.file.ArrowReader;
import org.apache.arrow.vector.file.ArrowWriter;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.Arrays;
import java.util.List;

public class FileRoundtrip {
private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class);
public static final Options OPTIONS;

static {
OPTIONS = new Options();
}

public static void main(String[] args) {
try {
String[] cargs = Arrays.copyOfRange(args, 1, args.length);

CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(OPTIONS, cargs, false);

String[] parsed_args = cmd.getArgs();

String inFileName = parsed_args[0];
String outFileName = parsed_args[1];

BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);

File inFile = new File(inFileName);
FileInputStream fileInputStream = new FileInputStream(inFile);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);

ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
LOGGER.debug("Found schema: " + schema);
System.exit(new FileRoundtrip(System.out, System.err).run(args));
}

File outFile = new File(outFileName);
FileOutputStream fileOutputStream = new FileOutputStream(outFile);
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
private final Options options;
private final PrintStream out;
private final PrintStream err;

// initialize vectors
FileRoundtrip(PrintStream out, PrintStream err) {
this.out = out;
this.err = err;
this.options = new Options();
this.options.addOption("i", "in", true, "input file");
this.options.addOption("o", "out", true, "output file");

List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
}

NullableMapVector inParent = new NullableMapVector("parent", allocator, null);
NullableMapVector root = inParent.addOrGet("root", Types.MinorType.MAP, NullableMapVector.class);
VectorLoader vectorLoader = new VectorLoader(schema, root);
vectorLoader.load(inRecordBatch);
private File validateFile(String type, String fileName) {
if (fileName == null) {
throw new IllegalArgumentException("missing " + type + " file parameter");
}
File f = new File(fileName);
if (!f.exists() || f.isDirectory()) {
throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
}
return f;
}

NullableMapVector outParent = new NullableMapVector("parent", allocator, null);
VectorUnloader vectorUnloader = new VectorUnloader(outParent);
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
arrowWriter.writeRecordBatch(recordBatch);
int run(String[] args) {
try {
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args, false);

String inFileName = cmd.getOptionValue("in");
String outFileName = cmd.getOptionValue("out");

File inFile = validateFile("input", inFileName);
File outFile = validateFile("output", outFileName);
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
try(
FileInputStream fileInputStream = new FileInputStream(inFile);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {

ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
LOGGER.debug("Input file size: " + inFile.length());
LOGGER.debug("Found schema: " + schema);

try (
FileOutputStream fileOutputStream = new FileOutputStream(outFile);
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
) {

// initialize vectors

List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {

VectorLoader vectorLoader = new VectorLoader(root);
vectorLoader.load(inRecordBatch);

VectorUnloader vectorUnloader = new VectorUnloader(root);
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
arrowWriter.writeRecordBatch(recordBatch);
}
}
}
LOGGER.debug("Output file size: " + outFile.length());
}
} catch (Throwable th) {
System.err.println(th.getMessage());
} catch (ParseException e) {
return fatalError("Invalid parameters", e);
} catch (IOException e) {
return fatalError("Error accessing files", e);
}
return 0;
}

private int fatalError(String message, Throwable e) {
err.println(message);
LOGGER.error(message, e);
return 1;
}

}
Loading