diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index a5989c1518d..95d2be5a43a 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -179,7 +179,10 @@ public ArrowBuf retain(BufferAllocator target) { historicalLog.recordEvent("retain(%s)", target.getName()); } final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target); - return otherLedger.newArrowBuf(offset, length, null); + ArrowBuf newArrowBuf = otherLedger.newArrowBuf(offset, length, null); + newArrowBuf.readerIndex(this.readerIndex); + newArrowBuf.writerIndex(this.writerIndex); + return newArrowBuf; } /** @@ -214,6 +217,8 @@ public TransferResult transferOwnership(BufferAllocator target) { final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target); final ArrowBuf newBuf = otherLedger.newArrowBuf(offset, length, null); + newBuf.readerIndex(this.readerIndex); + newBuf.writerIndex(this.writerIndex); final boolean allocationFit = this.ledger.transferBalance(otherLedger); return new TransferResult(allocationFit, newBuf); } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index aa6b70c5c74..3c96d57f4e6 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -22,16 +22,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import io.netty.buffer.ArrowBuf; -import io.netty.buffer.ArrowBuf.TransferResult; -import org.apache.arrow.memory.AllocationReservation; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.OutOfMemoryException; -import org.apache.arrow.memory.RootAllocator; import org.junit.Ignore; import org.junit.Test; +import io.netty.buffer.ArrowBuf; +import io.netty.buffer.ArrowBuf.TransferResult; + public class TestBaseAllocator { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class); @@ -134,6 +131,7 @@ public void testAllocator_transferOwnership() throws Exception { final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4); rootAllocator.verify(); TransferResult transferOwnership = arrowBuf1.transferOwnership(childAllocator2); + assertEquiv(arrowBuf1, transferOwnership.buffer); final boolean allocationFit = transferOwnership.allocationFit; rootAllocator.verify(); assertTrue(allocationFit); @@ -160,6 +158,7 @@ public void testAllocator_shareOwnership() throws Exception { rootAllocator.verify(); assertNotNull(arrowBuf2); assertNotEquals(arrowBuf2, arrowBuf1); + assertEquiv(arrowBuf1, arrowBuf2); // release original buffer (thus transferring ownership to allocator 2. (should leave allocator 1 in empty state) arrowBuf1.release(); @@ -172,6 +171,7 @@ public void testAllocator_shareOwnership() throws Exception { assertNotNull(arrowBuf3); assertNotEquals(arrowBuf3, arrowBuf1); assertNotEquals(arrowBuf3, arrowBuf2); + assertEquiv(arrowBuf1, arrowBuf3); rootAllocator.verify(); arrowBuf2.release(); @@ -452,8 +452,10 @@ public void testAllocator_transferSliced() throws Exception { rootAllocator.verify(); TransferResult result1 = arrowBuf2s.transferOwnership(childAllocator1); + assertEquiv(arrowBuf2s, result1.buffer); rootAllocator.verify(); TransferResult result2 = arrowBuf1s.transferOwnership(childAllocator2); + assertEquiv(arrowBuf1s, result2.buffer); rootAllocator.verify(); result1.buffer.release(); @@ -482,7 +484,9 @@ public void testAllocator_shareSliced() throws Exception { rootAllocator.verify(); final ArrowBuf arrowBuf2s1 = arrowBuf2s.retain(childAllocator1); + assertEquiv(arrowBuf2s, arrowBuf2s1); final ArrowBuf arrowBuf1s2 = arrowBuf1s.retain(childAllocator2); + assertEquiv(arrowBuf1s, arrowBuf1s2); rootAllocator.verify(); arrowBuf1s.release(); // releases arrowBuf1 @@ -512,11 +516,13 @@ public void testAllocator_transferShared() throws Exception { rootAllocator.verify(); assertNotNull(arrowBuf2); assertNotEquals(arrowBuf2, arrowBuf1); + assertEquiv(arrowBuf1, arrowBuf2); TransferResult result = arrowBuf1.transferOwnership(childAllocator3); allocationFit = result.allocationFit; final ArrowBuf arrowBuf3 = result.buffer; assertTrue(allocationFit); + assertEquiv(arrowBuf1, arrowBuf3); rootAllocator.verify(); // Since childAllocator3 now has childAllocator1's buffer, 1, can close @@ -533,6 +539,7 @@ public void testAllocator_transferShared() throws Exception { allocationFit = result.allocationFit; final ArrowBuf arrowBuf4 = result2.buffer; assertTrue(allocationFit); + assertEquiv(arrowBuf3, arrowBuf4); rootAllocator.verify(); arrowBuf3.release(); @@ -645,4 +652,9 @@ public void multiple() throws Exception { } } + + public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) { + assertEquals(origBuf.readerIndex(), newBuf.readerIndex()); + assertEquals(origBuf.writerIndex(), newBuf.writerIndex()); + } } diff --git a/java/pom.xml b/java/pom.xml index 38aad73f0ac..7221a140d96 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -467,6 +467,6 @@ format memory vector - tools + tools diff --git a/java/tools/pom.xml b/java/tools/pom.xml index 7c92e0482e5..84b0b5eb425 100644 --- a/java/tools/pom.xml +++ b/java/tools/pom.xml @@ -48,8 +48,26 @@ + + + maven-assembly-plugin + 2.6 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + - - diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java index 44cba68ffdd..db7a1c23f9c 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java @@ -18,85 +18,118 @@ */ package org.apache.arrow.tools; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; + import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.VectorUnloader; -import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.file.ArrowBlock; import org.apache.arrow.vector.file.ArrowFooter; import org.apache.arrow.vector.file.ArrowReader; import org.apache.arrow.vector.file.ArrowWriter; import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.util.Arrays; -import java.util.List; - public class FileRoundtrip { private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class); - public static final Options OPTIONS; - - static { - OPTIONS = new Options(); - } public static void main(String[] args) { - try { - String[] cargs = Arrays.copyOfRange(args, 1, args.length); - - CommandLineParser parser = new PosixParser(); - CommandLine cmd = parser.parse(OPTIONS, cargs, false); - - String[] parsed_args = cmd.getArgs(); - - String inFileName = parsed_args[0]; - String outFileName = parsed_args[1]; - - BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); - - File inFile = new File(inFileName); - FileInputStream fileInputStream = new FileInputStream(inFile); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator); - - ArrowFooter footer = arrowReader.readFooter(); - Schema schema = footer.getSchema(); - LOGGER.debug("Found schema: " + schema); + System.exit(new FileRoundtrip(System.out, System.err).run(args)); + } - File outFile = new File(outFileName); - FileOutputStream fileOutputStream = new FileOutputStream(outFile); - ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); + private final Options options; + private final PrintStream out; + private final PrintStream err; - // initialize vectors + FileRoundtrip(PrintStream out, PrintStream err) { + this.out = out; + this.err = err; + this.options = new Options(); + this.options.addOption("i", "in", true, "input file"); + this.options.addOption("o", "out", true, "output file"); - List recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); + } - NullableMapVector inParent = new NullableMapVector("parent", allocator, null); - NullableMapVector root = inParent.addOrGet("root", Types.MinorType.MAP, NullableMapVector.class); - VectorLoader vectorLoader = new VectorLoader(schema, root); - vectorLoader.load(inRecordBatch); + private File validateFile(String type, String fileName) { + if (fileName == null) { + throw new IllegalArgumentException("missing " + type + " file parameter"); + } + File f = new File(fileName); + if (!f.exists() || f.isDirectory()) { + throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); + } + return f; + } - NullableMapVector outParent = new NullableMapVector("parent", allocator, null); - VectorUnloader vectorUnloader = new VectorUnloader(outParent); - ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); - arrowWriter.writeRecordBatch(recordBatch); + int run(String[] args) { + try { + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args, false); + + String inFileName = cmd.getOptionValue("in"); + String outFileName = cmd.getOptionValue("out"); + + File inFile = validateFile("input", inFileName); + File outFile = validateFile("output", outFileName); + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close + try( + FileInputStream fileInputStream = new FileInputStream(inFile); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) { + + ArrowFooter footer = arrowReader.readFooter(); + Schema schema = footer.getSchema(); + LOGGER.debug("Input file size: " + inFile.length()); + LOGGER.debug("Found schema: " + schema); + + try ( + FileOutputStream fileOutputStream = new FileOutputStream(outFile); + ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); + ) { + + // initialize vectors + + List recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); + VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) { + + VectorLoader vectorLoader = new VectorLoader(root); + vectorLoader.load(inRecordBatch); + + VectorUnloader vectorUnloader = new VectorUnloader(root); + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + arrowWriter.writeRecordBatch(recordBatch); + } + } + } + LOGGER.debug("Output file size: " + outFile.length()); } - } catch (Throwable th) { - System.err.println(th.getMessage()); + } catch (ParseException e) { + return fatalError("Invalid parameters", e); + } catch (IOException e) { + return fatalError("Error accessing files", e); } + return 0; } + + private int fatalError(String message, Throwable e) { + err.println(message); + LOGGER.error(message, e); + return 1; + } + } diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java new file mode 100644 index 00000000000..339725e5af1 --- /dev/null +++ b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.arrow.tools; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.arrow.vector.file.ArrowBlock; +import org.apache.arrow.vector.file.ArrowFooter; +import org.apache.arrow.vector.file.ArrowReader; +import org.apache.arrow.vector.file.ArrowWriter; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestFileRoundtrip { + private static final int COUNT = 10; + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + private void writeData(int count, MapVector parent) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + for (int i = 0; i < count; i++) { + intWriter.setPosition(i); + intWriter.writeInt(i); + bigIntWriter.setPosition(i); + bigIntWriter.writeBigInt(i); + } + writer.setValueCount(count); + } + + @Test + public void test() throws Exception { + File testInFile = testFolder.newFile("testIn.arrow"); + File testOutFile = testFolder.newFile("testOut.arrow"); + + writeInput(testInFile); + + String[] args = { "-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()}; + int result = new FileRoundtrip(System.out, System.err).run(args); + assertEquals(0, result); + + validateOutput(testOutFile); + } + + private void validateOutput(File testOutFile) throws Exception { + // read + try ( + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(testOutFile); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); + BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); + ) { + ArrowFooter footer = arrowReader.readFooter(); + Schema schema = footer.getSchema(); + + // initialize vectors + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) { + VectorLoader vectorLoader = new VectorLoader(root); + + List recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + vectorLoader.load(recordBatch); + } + validateContent(COUNT, root); + } + } + } + } + + private void validateContent(int count, VectorSchemaRoot root) { + Assert.assertEquals(count, root.getRowCount()); + for (int i = 0; i < count; i++) { + Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i)); + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i)); + } + } + + public void writeInput(File testInFile) throws FileNotFoundException, IOException { + int count = COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null)) { + writeData(count, parent); + write(parent.getChild("root"), testInFile); + } + } + + private void write(FieldVector parent, File file) throws FileNotFoundException, IOException { + Schema schema = new Schema(parent.getField().getChildren()); + int valueCount = parent.getAccessor().getValueCount(); + List fields = parent.getChildrenFromFields(); + VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields); + try ( + FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + ) { + arrowWriter.writeRecordBatch(recordBatch); + } + } + +} diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java index bafa3176020..48af7a2bafe 100644 --- a/java/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java @@ -145,7 +145,7 @@ public List getChildrenFromFields() { @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { org.apache.arrow.vector.BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers); - // TODO: do something with the sizes in fieldNode? + bits.valueCount = fieldNode.getLength(); } public List getFieldBuffers() { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index b7040da9d82..4afd82315d9 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -27,7 +27,6 @@ import org.apache.arrow.vector.schema.ArrowRecordBatch; import org.apache.arrow.vector.schema.VectorLayout; import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.Schema; import com.google.common.collect.Iterators; @@ -37,22 +36,16 @@ * Loads buffers into vectors */ public class VectorLoader { - private final List fieldVectors; - private final List fields; + private final VectorSchemaRoot root; /** * will create children in root based on schema * @param schema the expected schema * @param root the root to add vectors to based on schema */ - public VectorLoader(Schema schema, FieldVector root) { + public VectorLoader(VectorSchemaRoot root) { super(); - this.fields = schema.getFields(); - root.initializeChildrenFromFields(fields); - this.fieldVectors = root.getChildrenFromFields(); - if (this.fieldVectors.size() != fields.size()) { - throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + fields.size()); - } + this.root = root; } /** @@ -63,16 +56,19 @@ public VectorLoader(Schema schema, FieldVector root) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); + List fields = root.getSchema().getFields(); for (int i = 0; i < fields.size(); ++i) { Field field = fields.get(i); - FieldVector fieldVector = fieldVectors.get(i); + FieldVector fieldVector = root.getVector(field.getName()); loadBuffers(fieldVector, field, buffers, nodes); } + root.setRowCount(recordBatch.getLength()); if (nodes.hasNext() || buffers.hasNext()) { throw new IllegalArgumentException("not all nodes and buffers where consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers)); } } + private void loadBuffers(FieldVector vector, Field field, Iterator buffers, Iterator nodes) { checkArgument(nodes.hasNext(), "no more field nodes for for field " + field + " and vector " + vector); @@ -85,7 +81,7 @@ private void loadBuffers(FieldVector vector, Field field, Iterator buf try { vector.loadFieldBuffers(fieldNode, ownBuffers); } catch (RuntimeException e) { - throw new IllegalArgumentException("Could not load buffers for field " + field); + throw new IllegalArgumentException("Could not load buffers for field " + field, e); } List children = field.getChildren(); if (children.size() > 0) { @@ -98,4 +94,5 @@ private void loadBuffers(FieldVector vector, Field field, Iterator buf } } } + } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java new file mode 100644 index 00000000000..1cbe18787ef --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +public class VectorSchemaRoot implements AutoCloseable { + + private final Schema schema; + private int rowCount; + private final List fieldVectors; + private final Map fieldVectorsMap = new HashMap<>(); + + public VectorSchemaRoot(FieldVector parent) { + this.schema = new Schema(parent.getField().getChildren()); + this.rowCount = parent.getAccessor().getValueCount(); + this.fieldVectors = parent.getChildrenFromFields(); + for (int i = 0; i < schema.getFields().size(); ++i) { + Field field = schema.getFields().get(i); + FieldVector vector = fieldVectors.get(i); + fieldVectorsMap.put(field.getName(), vector); + } + } + + public VectorSchemaRoot(Schema schema, BufferAllocator allocator) { + super(); + this.schema = schema; + List fieldVectors = new ArrayList<>(); + for (Field field : schema.getFields()) { + MinorType minorType = Types.getMinorTypeForArrowType(field.getType()); + FieldVector vector = minorType.getNewVector(field.getName(), allocator, null); + vector.initializeChildrenFromFields(field.getChildren()); + fieldVectors.add(vector); + fieldVectorsMap.put(field.getName(), vector); + } + this.fieldVectors = Collections.unmodifiableList(fieldVectors); + if (this.fieldVectors.size() != schema.getFields().size()) { + throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + schema.getFields().size()); + } + } + + public List getFieldVectors() { + return fieldVectors; + } + + public FieldVector getVector(String name) { + return fieldVectorsMap.get(name); + } + + public Schema getSchema() { + return schema; + } + + public int getRowCount() { + return rowCount; + } + + public void setRowCount(int rowCount) { + this.rowCount = rowCount; + } + + @Override + public void close() { + RuntimeException ex = null; + for (FieldVector fieldVector : fieldVectors) { + try { + fieldVector.close(); + } catch (RuntimeException e) { + ex = chain(ex, e); + } + } + if (ex!= null) { + throw ex; + } + } + + private RuntimeException chain(RuntimeException root, RuntimeException e) { + if (root == null) { + root = e; + } else { + root.addSuppressed(e); + } + return root; + } + + private void printRow(StringBuilder sb, List row) { + boolean first = true; + for (Object v : row) { + if (first) { + first = false; + } else { + sb.append("\t"); + } + sb.append(v); + } + sb.append("\n"); + } + + public String contentToTSVString() { + StringBuilder sb = new StringBuilder(); + List row = new ArrayList<>(schema.getFields().size()); + for (Field field : schema.getFields()) { + row.add(field.getName()); + } + printRow(sb, row); + for (int i = 0; i < rowCount; i++) { + row.clear(); + for (FieldVector v : fieldVectors) { + row.add(v.getAccessor().getObject(i)); + } + printRow(sb, row); + } + return sb.toString(); + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index 3375a7d5c31..e2462180ffa 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -34,11 +34,15 @@ public class VectorUnloader { private final int valueCount; private final List vectors; - public VectorUnloader(FieldVector parent) { + public VectorUnloader(Schema schema, int valueCount, List vectors) { super(); - this.schema = new Schema(parent.getField().getChildren()); - this.valueCount = parent.getAccessor().getValueCount(); - this.vectors = parent.getChildrenFromFields(); + this.schema = schema; + this.valueCount = valueCount; + this.vectors = vectors; + } + + public VectorUnloader(VectorSchemaRoot root) { + this(root.getSchema(), root.getRowCount(), root.getFieldVectors()); } public Schema getSchema() { @@ -77,4 +81,5 @@ private void appendNodes(FieldVector vector, List nodes, List fields = root.getChildrenFromFields(); + return new VectorUnloader(schema, valueCount, fields); + } + @AfterClass public static void afterClass() { allocator.close(); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java index 0f28d53295c..e97bc14d169 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java @@ -17,6 +17,8 @@ */ package org.apache.arrow.vector.file; +import static org.apache.arrow.vector.TestVectorUnloadLoad.newVectorUnloader; + import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -29,12 +31,12 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector.Accessor; import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; -import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; -import org.apache.arrow.vector.complex.reader.BaseReader.MapReader; +import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter; import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; @@ -43,7 +45,6 @@ import org.apache.arrow.vector.holders.NullableTimeStampHolder; import org.apache.arrow.vector.schema.ArrowBuffer; import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.Schema; import org.joda.time.DateTimeZone; import org.junit.After; @@ -94,8 +95,9 @@ public void testWriteComplex() throws IOException { BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) { writeComplexData(count, parent); - validateComplexContent(count, parent); - write(parent.getChild("root"), file); + FieldVector root = parent.getChild("root"); + validateComplexContent(count, new VectorSchemaRoot(root)); + write(root, file); } } @@ -174,33 +176,31 @@ public void testWriteRead() throws IOException { // initialize vectors - NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); - - VectorLoader vectorLoader = new VectorLoader(schema, root); - - List recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - Assert.assertEquals(0, rbBlock.getOffset() % 8); - Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - List buffersLayout = recordBatch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) { + VectorLoader vectorLoader = new VectorLoader(root); + + List recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + Assert.assertEquals(0, rbBlock.getOffset() % 8); + Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + List buffersLayout = recordBatch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } + vectorLoader.load(recordBatch); } - vectorLoader.load(recordBatch); - } - validateContent(count, parent); + validateContent(count, root); + } } } } - private void validateContent(int count, MapVector parent) { - MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + private void validateContent(int count, VectorSchemaRoot root) { for (int i = 0; i < count; i++) { - rootReader.setPosition(i); - Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); - Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); + Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i)); + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i)); } } @@ -231,15 +231,15 @@ public void testWriteReadComplex() throws IOException { // initialize vectors - NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); - VectorLoader vectorLoader = new VectorLoader(schema, root); - - List recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - vectorLoader.load(recordBatch); + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) { + VectorLoader vectorLoader = new VectorLoader(root); + List recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + vectorLoader.load(recordBatch); + } + validateComplexContent(count, root); } - validateComplexContent(count, parent); } } } @@ -255,23 +255,23 @@ public void printVectors(List vectors) { } } - private void validateComplexContent(int count, NullableMapVector parent) { - printVectors(parent.getChildrenFromFields()); - - MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + private void validateComplexContent(int count, VectorSchemaRoot root) { + Assert.assertEquals(count, root.getRowCount()); + printVectors(root.getFieldVectors()); for (int i = 0; i < count; i++) { - rootReader.setPosition(i); - Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); - Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); - Assert.assertEquals(i % 3, rootReader.reader("list").size()); + Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i)); + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i)); + Assert.assertEquals(i % 3, ((List)root.getVector("list").getAccessor().getObject(i)).size()); NullableTimeStampHolder h = new NullableTimeStampHolder(); - rootReader.reader("map").reader("timestamp").read(h); + FieldReader mapReader = root.getVector("map").getReader(); + mapReader.setPosition(i); + mapReader.reader("timestamp").read(h); Assert.assertEquals(i, h.value); } } private void write(FieldVector parent, File file) throws FileNotFoundException, IOException { - VectorUnloader vectorUnloader = new VectorUnloader(parent); + VectorUnloader vectorUnloader = newVectorUnloader(parent); Schema schema = vectorUnloader.getSchema(); LOGGER.debug("writing schema: " + schema); try ( @@ -294,7 +294,7 @@ public void testWriteReadMultipleRBs() throws IOException { MapVector parent = new MapVector("parent", originalVectorAllocator, null); FileOutputStream fileOutputStream = new FileOutputStream(file);) { writeData(count, parent); - VectorUnloader vectorUnloader = new VectorUnloader(parent.getChild("root")); + VectorUnloader vectorUnloader = newVectorUnloader(parent.getChild("root")); Schema schema = vectorUnloader.getSchema(); Assert.assertEquals(2, schema.getFields().size()); try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);) { @@ -320,20 +320,21 @@ public void testWriteReadMultipleRBs() throws IOException { ArrowFooter footer = arrowReader.readFooter(); Schema schema = footer.getSchema(); LOGGER.debug("reading schema: " + schema); - NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); - VectorLoader vectorLoader = new VectorLoader(schema, root); - List recordBatches = footer.getRecordBatches(); - Assert.assertEquals(2, recordBatches.size()); - for (ArrowBlock rbBlock : recordBatches) { - Assert.assertEquals(0, rbBlock.getOffset() % 8); - Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - List buffersLayout = recordBatch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) { + VectorLoader vectorLoader = new VectorLoader(root); + List recordBatches = footer.getRecordBatches(); + Assert.assertEquals(2, recordBatches.size()); + for (ArrowBlock rbBlock : recordBatches) { + Assert.assertEquals(0, rbBlock.getOffset() % 8); + Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + List buffersLayout = recordBatch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } + vectorLoader.load(recordBatch); + validateContent(count, root); } - vectorLoader.load(recordBatch); - validateContent(count, parent); } } } @@ -351,7 +352,7 @@ public void testWriteReadUnion() throws IOException { printVectors(parent.getChildrenFromFields()); - validateUnionData(count, parent); + validateUnionData(count, new VectorSchemaRoot(parent.getChild("root"))); write(parent.getChild("root"), file); } @@ -361,44 +362,42 @@ public void testWriteReadUnion() throws IOException { FileInputStream fileInputStream = new FileInputStream(file); ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null) ) { ArrowFooter footer = arrowReader.readFooter(); Schema schema = footer.getSchema(); LOGGER.debug("reading schema: " + schema); // initialize vectors - - NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); - VectorLoader vectorLoader = new VectorLoader(schema, root); - - List recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - vectorLoader.load(recordBatch); + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) { + VectorLoader vectorLoader = new VectorLoader(root); + List recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + vectorLoader.load(recordBatch); + } + validateUnionData(count, root); } - validateUnionData(count, parent); } } } - public void validateUnionData(int count, MapVector parent) { - MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + public void validateUnionData(int count, VectorSchemaRoot root) { + FieldReader unionReader = root.getVector("union").getReader(); for (int i = 0; i < count; i++) { - rootReader.setPosition(i); + unionReader.setPosition(i); switch (i % 4) { case 0: - Assert.assertEquals(i, rootReader.reader("union").readInteger().intValue()); + Assert.assertEquals(i, unionReader.readInteger().intValue()); break; case 1: - Assert.assertEquals(i, rootReader.reader("union").readLong().longValue()); + Assert.assertEquals(i, unionReader.readLong().longValue()); break; case 2: - Assert.assertEquals(i % 3, rootReader.reader("union").size()); + Assert.assertEquals(i % 3, unionReader.size()); break; case 3: NullableTimeStampHolder h = new NullableTimeStampHolder(); - rootReader.reader("union").reader("timestamp").read(h); + unionReader.reader("timestamp").read(h); Assert.assertEquals(i, h.value); break; }