diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 1fa9c1f44d..3a68e0133b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -944,7 +944,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE currentParts = new ConsecutivePartList(startingPos); allParts.add(currentParts); } - currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize())); + currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize())); } } // actually read all the chunks @@ -1066,7 +1066,7 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData bloc allParts.add(currentParts); } ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos, - Math.toIntExact(range.getLength())); + range.getLength()); currentParts.addChunk(chunkDescriptor); builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex); } @@ -1691,7 +1691,7 @@ private static class ChunkDescriptor { private final ColumnDescriptor col; private final ColumnChunkMetaData metadata; private final long fileOffset; - private final int size; + private final long size; /** * @param col column this chunk is part of @@ -1703,7 +1703,7 @@ private ChunkDescriptor( ColumnDescriptor col, ColumnChunkMetaData metadata, long fileOffset, - int size) { + long size) { super(); this.col = col; this.metadata = metadata; @@ -1735,7 +1735,7 @@ public boolean equals(Object obj) { private class ConsecutivePartList { private final long offset; - private int length; + private long length; private final List chunks = new ArrayList<>(); /** @@ -1763,8 +1763,8 @@ public void addChunk(ChunkDescriptor descriptor) { public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException { f.seek(offset); - int fullAllocations = length / options.getMaxAllocationSize(); - int lastAllocationSize = length % options.getMaxAllocationSize(); + int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize()); + int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize()); int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); List buffers = new ArrayList<>(numAllocations); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 696fec3140..b9953a57f3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -269,7 +269,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport ParquetFileWriter.Mode mode, WriteSupport writeSupport, CompressionCodecName compressionCodecName, - int rowGroupSize, + long rowGroupSize, boolean validating, Configuration conf, int maxPaddingSize, @@ -355,7 +355,7 @@ public abstract static class Builder> { private Configuration conf = new Configuration(); private ParquetFileWriter.Mode mode; private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME; - private int rowGroupSize = DEFAULT_BLOCK_SIZE; + private long rowGroupSize = DEFAULT_BLOCK_SIZE; private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT; private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED; private ParquetProperties.Builder encodingPropsBuilder = @@ -432,8 +432,20 @@ public SELF withEncryption (FileEncryptionProperties encryptionProperties) { * * @param rowGroupSize an integer size in bytes * @return this builder for method chaining. + * @deprecated Use {@link #withRowGroupSize(long)} instead */ + @Deprecated public SELF withRowGroupSize(int rowGroupSize) { + return withRowGroupSize((long) rowGroupSize); + } + + /** + * Set the Parquet format row group size used by the constructed writer. + * + * @param rowGroupSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withRowGroupSize(long rowGroupSize) { this.rowGroupSize = rowGroupSize; return self(); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java new file mode 100644 index 0000000000..90015f57e1 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.eq; +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.buildMessage; +import static org.apache.parquet.schema.Types.required; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.junit.*; +import org.junit.rules.TemporaryFolder; + +/** + * This test is to test parquet-mr working with potential int overflows (when the sizes are greater than + * Integer.MAX_VALUE). The test requires ~3GB memory so it is likely to fail in the CI environment, so these + * tests are flagged to be ignored. + */ +@Ignore +public class TestLargeColumnChunk { + private static final MessageType SCHEMA = buildMessage().addFields( + required(INT64).named("id"), + required(BINARY).named("data")) + .named("schema"); + private static final int DATA_SIZE = 256; + // Ensure that the size of the column chunk would overflow an int + private static final int ROW_COUNT = Integer.MAX_VALUE / DATA_SIZE + 1000; + private static final long RANDOM_SEED = 42; + private static final int ID_INDEX = SCHEMA.getFieldIndex("id"); + private static final int DATA_INDEX = SCHEMA.getFieldIndex("data"); + + private static final long ID_OF_FILTERED_DATA = ROW_COUNT / 2; + private static Binary VALUE_IN_DATA; + private static Binary VALUE_NOT_IN_DATA; + private static Path file; + + @ClassRule + public static TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void createFile() throws IOException { + file = new Path(folder.newFile().getAbsolutePath()); + + GroupFactory factory = new SimpleGroupFactory(SCHEMA); + Random random = new Random(RANDOM_SEED); + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(SCHEMA, conf); + try (ParquetWriter writer = ExampleParquetWriter + .builder(HadoopOutputFile.fromPath(file, conf)) + .withWriteMode(OVERWRITE) + .withConf(conf) + .withCompressionCodec(UNCOMPRESSED) + .withRowGroupSize(4L * 1024 * 1024 * 1024) // 4G to ensure all data goes to one row group + .withBloomFilterEnabled(true) + .build()) { + for (long id = 0; id < ROW_COUNT; ++id) { + Group group = factory.newGroup(); + group.add(ID_INDEX, id); + Binary data = nextBinary(random); + group.add(DATA_INDEX, data); + writer.write(group); + if (id == ID_OF_FILTERED_DATA) { + VALUE_IN_DATA = data; + } + } + } + VALUE_NOT_IN_DATA = nextBinary(random); + } + + private static Binary nextBinary(Random random) { + byte[] bytes = new byte[DATA_SIZE]; + random.nextBytes(bytes); + return Binary.fromConstantByteArray(bytes); + } + + @Test + public void validateAllData() throws IOException { + Random random = new Random(RANDOM_SEED); + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file).build()) { + for (long id = 0; id < ROW_COUNT; ++id) { + Group group = reader.read(); + assertEquals(id, group.getLong(ID_INDEX, 0)); + assertEquals(nextBinary(random), group.getBinary(DATA_INDEX, 0)); + } + assertNull("No more record should be read", reader.read()); + } + } + + @Test + public void validateFiltering() throws IOException { + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_IN_DATA))) + .build()) { + Group group = reader.read(); + assertEquals(ID_OF_FILTERED_DATA, group.getLong(ID_INDEX, 0)); + assertEquals(VALUE_IN_DATA, group.getBinary(DATA_INDEX, 0)); + assertNull("No more record should be read", reader.read()); + } + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_NOT_IN_DATA))) + .build()) { + assertNull("No record should be read", reader.read()); + } + } +} diff --git a/pom.xml b/pom.xml index 24f122935f..ecfd17f49f 100644 --- a/pom.xml +++ b/pom.xml @@ -109,7 +109,6 @@ 1.8 - -Xmx512m INFO @@ -558,7 +557,7 @@ ci-test WARN - -Xmx512m -XX:MaxJavaStackTraceDepth=10 + -XX:MaxJavaStackTraceDepth=10