Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class ParquetProperties {
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;

public static final boolean DEFAULT_PAGE_VERIFY_CHECKSUM_ENABLED = false;
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = false;

public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();

private static final int MIN_SLAB_SIZE = 64;
Expand Down Expand Up @@ -87,10 +90,12 @@ public static WriterVersion fromString(String name) {
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
boolean pageWriteChecksumEnabled) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
Expand All @@ -105,6 +110,8 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.pageRowCountLimit = pageRowCountLimit;

this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
}

public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
Expand Down Expand Up @@ -201,6 +208,10 @@ public int getPageRowCountLimit() {
return pageRowCountLimit;
}

public boolean getPageWriteChecksumEnabled() {
return pageWriteChecksumEnabled;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -221,6 +232,7 @@ public static class Builder {
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;

private Builder() {
}
Expand All @@ -236,6 +248,7 @@ private Builder(ParquetProperties toCopy) {
this.valuesWriterFactory = toCopy.valuesWriterFactory;
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
}

/**
Expand Down Expand Up @@ -330,11 +343,17 @@ public Builder withPageRowCountLimit(int rowCount) {
return this;
}

public Builder withPageWriteChecksumEnabled(boolean val) {
this.pageWriteChecksumEnabled = val;
return this;
}

public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
pageRowCountLimit, pageWriteChecksumEnabled);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class DataPageV1 extends DataPage {
private final Encoding valuesEncoding;
private final int indexRowCount;

// We need an additional flag, since we can not use a default value for the crc32
private final int crc32;
private final boolean isSetCrc32;

/**
* @param bytes the bytes for this page
* @param valueCount count of values in this page
Expand All @@ -41,15 +45,53 @@ public class DataPageV1 extends DataPage {
* @param rlEncoding the repetition level encoding for this page
* @param dlEncoding the definition level encoding for this page
* @param valuesEncoding the values encoding for this page
* @param crc32 the crc32 for this page
*/
public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
private DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize,
Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding,
Encoding valuesEncoding, int crc32, boolean isSetCrc32) {
super(Math.toIntExact(bytes.size()), uncompressedSize, valueCount);
this.bytes = bytes;
this.statistics = statistics;
this.rlEncoding = rlEncoding;
this.dlEncoding = dlEncoding;
this.valuesEncoding = valuesEncoding;
this.indexRowCount = -1;
this.crc32 = crc32;
this.isSetCrc32 = isSetCrc32;
}

/**
* @param bytes the bytes for this page
* @param valueCount count of values in this page
* @param uncompressedSize the uncompressed size of the page
* @param statistics of the page's values (max, min, num_null)
* @param rlEncoding the repetition level encoding for this page
* @param dlEncoding the definition level encoding for this page
* @param valuesEncoding the values encoding for this page
*/
public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize,
Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding,
Encoding valuesEncoding) {
this(bytes, valueCount, uncompressedSize, statistics, rlEncoding, dlEncoding, valuesEncoding,
0, false);
}

/**
* @param bytes the bytes for this page
* @param valueCount count of values in this page
* @param uncompressedSize the uncompressed size of the page
* @param statistics of the page's values (max, min, num_null)
* @param rlEncoding the repetition level encoding for this page
* @param dlEncoding the definition level encoding for this page
* @param valuesEncoding the values encoding for this page
* @param crc32 the crc32 for this page
*/
public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize,
Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding,
Encoding valuesEncoding, int crc32) {
this(bytes, valueCount, uncompressedSize, statistics, rlEncoding, dlEncoding, valuesEncoding,
crc32, true);
}

/**
Expand All @@ -62,16 +104,57 @@ public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statis
* @param rlEncoding the repetition level encoding for this page
* @param dlEncoding the definition level encoding for this page
* @param valuesEncoding the values encoding for this page
* @param crc32 the crc32 for this page
*/
public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, long firstRowIndex, int rowCount,
Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
private DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, long firstRowIndex,
int rowCount, Statistics<?> statistics, Encoding rlEncoding,
Encoding dlEncoding, Encoding valuesEncoding, int crc32, boolean isSetCrc32) {
super(Math.toIntExact(bytes.size()), uncompressedSize, valueCount, firstRowIndex);
this.bytes = bytes;
this.statistics = statistics;
this.rlEncoding = rlEncoding;
this.dlEncoding = dlEncoding;
this.valuesEncoding = valuesEncoding;
this.indexRowCount = rowCount;
this.crc32 = crc32;
this.isSetCrc32 = isSetCrc32;
}

/**
* @param bytes the bytes for this page
* @param valueCount count of values in this page
* @param uncompressedSize the uncompressed size of the page
* @param firstRowIndex the index of the first row in this page
* @param rowCount the number of rows in this page
* @param statistics of the page's values (max, min, num_null)
* @param rlEncoding the repetition level encoding for this page
* @param dlEncoding the definition level encoding for this page
* @param valuesEncoding the values encoding for this page
*/
public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, long firstRowIndex,
int rowCount, Statistics<?> statistics, Encoding rlEncoding,
Encoding dlEncoding, Encoding valuesEncoding) {
this(bytes, valueCount, uncompressedSize, firstRowIndex, rowCount, statistics, rlEncoding,
dlEncoding, valuesEncoding, 0, false);
}

/**
* @param bytes the bytes for this page
* @param valueCount count of values in this page
* @param uncompressedSize the uncompressed size of the page
* @param firstRowIndex the index of the first row in this page
* @param rowCount the number of rows in this page
* @param statistics of the page's values (max, min, num_null)
* @param rlEncoding the repetition level encoding for this page
* @param dlEncoding the definition level encoding for this page
* @param valuesEncoding the values encoding for this page
* @param crc32 the crc32 for this page
*/
public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, long firstRowIndex,
int rowCount, Statistics<?> statistics, Encoding rlEncoding,
Encoding dlEncoding, Encoding valuesEncoding, int crc32) {
this(bytes, valueCount, uncompressedSize, firstRowIndex, rowCount, statistics, rlEncoding,
dlEncoding, valuesEncoding, crc32, true);
}

/**
Expand Down Expand Up @@ -110,9 +193,25 @@ public Encoding getValueEncoding() {
return valuesEncoding;
}

/**
* @return the crc32 for this page
*/
public int getCrc32() {
return crc32;
}

/**
* @return the boolean representing whether the crc32 field is actually set
*/
public boolean isSetCrc32() {
return isSetCrc32;
}

@Override
public String toString() {
return "Page [bytes.size=" + bytes.size() + ", valueCount=" + getValueCount() + ", uncompressedSize=" + getUncompressedSize() + "]";
return "Page [bytes.size=" + bytes.size() + ", valueCount=" + getValueCount() +
", uncompressedSize=" + getUncompressedSize() + (isSetCrc32() ? ", crc=" + getCrc32() : "" )
+ "]";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.util.HadoopCodecs;

import java.util.Map;

import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
import static org.apache.parquet.hadoop.ParquetInputFormat.*;
import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;

public class HadoopReadOptions extends ParquetReadOptions {
Expand All @@ -45,6 +42,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax,
boolean useDictionaryFilter,
boolean useRecordFilter,
boolean useColumnIndexFilter,
boolean usePageChecksumVerification,
FilterCompat.Filter recordFilter,
MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
Expand All @@ -54,7 +52,8 @@ private HadoopReadOptions(boolean useSignedStringMinMax,
Configuration conf) {
super(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties
usePageChecksumVerification, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize,
properties
);
this.conf = conf;
}
Expand Down Expand Up @@ -86,6 +85,8 @@ public Builder(Configuration conf) {
useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true));
usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED,
ParquetProperties.DEFAULT_PAGE_VERIFY_CHECKSUM_ENABLED));
withCodecFactory(HadoopCodecs.newFactory(conf, 0));
withRecordFilter(getFilter(conf));
withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
Expand All @@ -98,9 +99,9 @@ public Builder(Configuration conf) {
@Override
public ParquetReadOptions build() {
return new HadoopReadOptions(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties,
conf);
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter,
codecFactory, allocator, maxAllocationSize, properties, conf);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ public class ParquetReadOptions {
private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true;
private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;

private final boolean useSignedStringMinMax;
private final boolean useStatsFilter;
private final boolean useDictionaryFilter;
private final boolean useRecordFilter;
private final boolean useColumnIndexFilter;
private final boolean usePageChecksumVerification;
private final FilterCompat.Filter recordFilter;
private final ParquetMetadataConverter.MetadataFilter metadataFilter;
private final CompressionCodecFactory codecFactory;
Expand All @@ -58,6 +60,7 @@ public class ParquetReadOptions {
boolean useDictionaryFilter,
boolean useRecordFilter,
boolean useColumnIndexFilter,
boolean usePageChecksumVerification,
FilterCompat.Filter recordFilter,
ParquetMetadataConverter.MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
Expand All @@ -69,6 +72,7 @@ public class ParquetReadOptions {
this.useDictionaryFilter = useDictionaryFilter;
this.useRecordFilter = useRecordFilter;
this.useColumnIndexFilter = useColumnIndexFilter;
this.usePageChecksumVerification = usePageChecksumVerification;
this.recordFilter = recordFilter;
this.metadataFilter = metadataFilter;
this.codecFactory = codecFactory;
Expand Down Expand Up @@ -97,6 +101,10 @@ public boolean useColumnIndexFilter() {
return useColumnIndexFilter;
}

public boolean usePageChecksumVerification() {
return usePageChecksumVerification;
}

public FilterCompat.Filter getRecordFilter() {
return recordFilter;
}
Expand Down Expand Up @@ -143,6 +151,7 @@ public static class Builder {
protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT;
protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT;
protected FilterCompat.Filter recordFilter = null;
protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER;
// the page size parameter isn't used when only using the codec factory to get decompressors
Expand Down Expand Up @@ -200,6 +209,16 @@ public Builder useColumnIndexFilter() {
return useColumnIndexFilter(true);
}


public Builder usePageChecksumVerification(boolean usePageChecksumVerification) {
this.usePageChecksumVerification = usePageChecksumVerification;
return this;
}

public Builder usePageChecksumVerification() {
return usePageChecksumVerification(true);
}

public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) {
this.recordFilter = rowGroupFilter;
return this;
Expand Down Expand Up @@ -235,6 +254,11 @@ public Builder withMaxAllocationInBytes(int allocationSizeInBytes) {
return this;
}

public Builder withPageChecksumVerification(boolean val) {
this.usePageChecksumVerification = val;
return this;
}

public Builder set(String key, String value) {
properties.put(key, value);
return this;
Expand All @@ -249,6 +273,7 @@ public Builder copy(ParquetReadOptions options) {
withMetadataFilter(options.metadataFilter);
withCodecFactory(options.codecFactory);
withAllocator(options.allocator);
withPageChecksumVerification(options.usePageChecksumVerification);
for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
set(keyValue.getKey(), keyValue.getValue());
}
Expand All @@ -257,8 +282,9 @@ public Builder copy(ParquetReadOptions options) {

public ParquetReadOptions build() {
return new ParquetReadOptions(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties);
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter,
codecFactory, allocator, maxAllocationSize, properties);
}
}
}
Loading