Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
currentParts = new ConsecutivePartList(startingPos);
allParts.add(currentParts);
}
currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
}
}
// actually read all the chunks
Expand Down Expand Up @@ -1066,7 +1066,7 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData bloc
allParts.add(currentParts);
}
ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos,
Math.toIntExact(range.getLength()));
range.getLength());
currentParts.addChunk(chunkDescriptor);
builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex);
}
Expand Down Expand Up @@ -1691,7 +1691,7 @@ private static class ChunkDescriptor {
private final ColumnDescriptor col;
private final ColumnChunkMetaData metadata;
private final long fileOffset;
private final int size;
private final long size;

/**
* @param col column this chunk is part of
Expand All @@ -1703,7 +1703,7 @@ private ChunkDescriptor(
ColumnDescriptor col,
ColumnChunkMetaData metadata,
long fileOffset,
int size) {
long size) {
super();
this.col = col;
this.metadata = metadata;
Expand Down Expand Up @@ -1735,7 +1735,7 @@ public boolean equals(Object obj) {
private class ConsecutivePartList {

private final long offset;
private int length;
private long length;
private final List<ChunkDescriptor> chunks = new ArrayList<>();

/**
Expand Down Expand Up @@ -1763,8 +1763,8 @@ public void addChunk(ChunkDescriptor descriptor) {
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
f.seek(offset);

int fullAllocations = length / options.getMaxAllocationSize();
int lastAllocationSize = length % options.getMaxAllocationSize();
int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize());
int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize());

int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
ParquetFileWriter.Mode mode,
WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName,
int rowGroupSize,
long rowGroupSize,
boolean validating,
Configuration conf,
int maxPaddingSize,
Expand Down Expand Up @@ -355,7 +355,7 @@ public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
private Configuration conf = new Configuration();
private ParquetFileWriter.Mode mode;
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
private int rowGroupSize = DEFAULT_BLOCK_SIZE;
private long rowGroupSize = DEFAULT_BLOCK_SIZE;
private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
private ParquetProperties.Builder encodingPropsBuilder =
Expand Down Expand Up @@ -432,8 +432,20 @@ public SELF withEncryption (FileEncryptionProperties encryptionProperties) {
*
* @param rowGroupSize an integer size in bytes
* @return this builder for method chaining.
* @deprecated Use {@link #withRowGroupSize(long)} instead
*/
@Deprecated
public SELF withRowGroupSize(int rowGroupSize) {
return withRowGroupSize((long) rowGroupSize);
}

/**
* Set the Parquet format row group size used by the constructed writer.
*
* @param rowGroupSize an integer size in bytes
* @return this builder for method chaining.
*/
public SELF withRowGroupSize(long rowGroupSize) {
this.rowGroupSize = rowGroupSize;
return self();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.hadoop;

import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.Types.buildMessage;
import static org.apache.parquet.schema.Types.required;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.junit.*;
import org.junit.rules.TemporaryFolder;

/**
* This test is to test parquet-mr working with potential int overflows (when the sizes are greater than
* Integer.MAX_VALUE). The test requires ~3GB memory so it is likely to fail in the CI environment, so these
* tests are flagged to be ignored.
*/
@Ignore
public class TestLargeColumnChunk {
private static final MessageType SCHEMA = buildMessage().addFields(
required(INT64).named("id"),
required(BINARY).named("data"))
.named("schema");
private static final int DATA_SIZE = 256;
// Ensure that the size of the column chunk would overflow an int
private static final int ROW_COUNT = Integer.MAX_VALUE / DATA_SIZE + 1000;
private static final long RANDOM_SEED = 42;
private static final int ID_INDEX = SCHEMA.getFieldIndex("id");
private static final int DATA_INDEX = SCHEMA.getFieldIndex("data");

private static final long ID_OF_FILTERED_DATA = ROW_COUNT / 2;
private static Binary VALUE_IN_DATA;
private static Binary VALUE_NOT_IN_DATA;
private static Path file;

@ClassRule
public static TemporaryFolder folder = new TemporaryFolder();

@BeforeClass
public static void createFile() throws IOException {
file = new Path(folder.newFile().getAbsolutePath());

GroupFactory factory = new SimpleGroupFactory(SCHEMA);
Random random = new Random(RANDOM_SEED);
Configuration conf = new Configuration();
GroupWriteSupport.setSchema(SCHEMA, conf);
try (ParquetWriter<Group> writer = ExampleParquetWriter
.builder(HadoopOutputFile.fromPath(file, conf))
.withWriteMode(OVERWRITE)
.withConf(conf)
.withCompressionCodec(UNCOMPRESSED)
.withRowGroupSize(4L * 1024 * 1024 * 1024) // 4G to ensure all data goes to one row group
.withBloomFilterEnabled(true)
.build()) {
for (long id = 0; id < ROW_COUNT; ++id) {
Group group = factory.newGroup();
group.add(ID_INDEX, id);
Binary data = nextBinary(random);
group.add(DATA_INDEX, data);
writer.write(group);
if (id == ID_OF_FILTERED_DATA) {
VALUE_IN_DATA = data;
}
}
}
VALUE_NOT_IN_DATA = nextBinary(random);
}

private static Binary nextBinary(Random random) {
byte[] bytes = new byte[DATA_SIZE];
random.nextBytes(bytes);
return Binary.fromConstantByteArray(bytes);
}

@Test
public void validateAllData() throws IOException {
Random random = new Random(RANDOM_SEED);
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).build()) {
for (long id = 0; id < ROW_COUNT; ++id) {
Group group = reader.read();
assertEquals(id, group.getLong(ID_INDEX, 0));
assertEquals(nextBinary(random), group.getBinary(DATA_INDEX, 0));
}
assertNull("No more record should be read", reader.read());
}
}

@Test
public void validateFiltering() throws IOException {
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_IN_DATA)))
.build()) {
Group group = reader.read();
assertEquals(ID_OF_FILTERED_DATA, group.getLong(ID_INDEX, 0));
assertEquals(VALUE_IN_DATA, group.getBinary(DATA_INDEX, 0));
assertNull("No more record should be read", reader.read());
}
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_NOT_IN_DATA)))
.build()) {
assertNull("No record should be read", reader.read());
}
}
}
3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
<commons-text.version>1.8</commons-text.version>

<!-- properties for the profiles -->
<surefire.argLine>-Xmx512m</surefire.argLine>
<surefire.logLevel>INFO</surefire.logLevel>
</properties>

Expand Down Expand Up @@ -558,7 +557,7 @@
<id>ci-test</id>
<properties>
<surefire.logLevel>WARN</surefire.logLevel>
<surefire.argLine>-Xmx512m -XX:MaxJavaStackTraceDepth=10</surefire.argLine>
<surefire.argLine>-XX:MaxJavaStackTraceDepth=10</surefire.argLine>
</properties>
</profile>

Expand Down