Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class ParquetProperties {
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;

public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = false;
Comment thread
gszadovszky marked this conversation as resolved.
Outdated

public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();

private static final int MIN_SLAB_SIZE = 64;
Expand Down Expand Up @@ -87,10 +89,12 @@ public static WriterVersion fromString(String name) {
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
boolean pageWriteChecksumEnabled) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
Expand All @@ -105,6 +109,8 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.pageRowCountLimit = pageRowCountLimit;

Comment thread
Fokko marked this conversation as resolved.
Outdated
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
}

public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
Expand Down Expand Up @@ -201,6 +207,10 @@ public int getPageRowCountLimit() {
return pageRowCountLimit;
}

public boolean getPageWriteChecksumEnabled() {
return pageWriteChecksumEnabled;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -221,6 +231,7 @@ public static class Builder {
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;

private Builder() {
}
Expand All @@ -236,6 +247,7 @@ private Builder(ParquetProperties toCopy) {
this.valuesWriterFactory = toCopy.valuesWriterFactory;
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
}

/**
Expand Down Expand Up @@ -330,11 +342,17 @@ public Builder withPageRowCountLimit(int rowCount) {
return this;
}

public Builder withPageWriteChecksumEnabled(boolean val) {
this.pageWriteChecksumEnabled = val;
return this;
}

public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
pageRowCountLimit, pageWriteChecksumEnabled);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,25 @@ public int getUncompressedSize() {
return uncompressedSize;
}

// Note: the following fields are only used for testing purposes and are NOT used in checksum
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// Note: the following fields are only used for testing purposes and are NOT used in checksum
// Note: the following field is only used for testing purposes and is NOT used in checksum

// verification. The crc value here is merely a copy of the actual crc field read in
// ParquetFileReader.Chunk.readAllPages()
private int crc;
private boolean isSetCrc = false;

// Visible for testing
public void setCrc(int crc) {
this.crc = crc;
this.isSetCrc = true;
}

// Visible for testing
public int getCrc() {
Comment thread
gszadovszky marked this conversation as resolved.
Outdated
return crc;
}

// Visible for testing
public boolean isSetCrc() {
return isSetCrc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@

import java.util.Map;

import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
import static org.apache.parquet.hadoop.ParquetInputFormat.*;
Comment thread
Fokko marked this conversation as resolved.
Outdated
import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;

public class HadoopReadOptions extends ParquetReadOptions {
Expand All @@ -45,6 +41,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax,
boolean useDictionaryFilter,
boolean useRecordFilter,
boolean useColumnIndexFilter,
boolean usePageChecksumVerification,
FilterCompat.Filter recordFilter,
MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
Expand All @@ -54,7 +51,8 @@ private HadoopReadOptions(boolean useSignedStringMinMax,
Configuration conf) {
super(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties
usePageChecksumVerification, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize,
properties
);
this.conf = conf;
}
Expand Down Expand Up @@ -86,6 +84,8 @@ public Builder(Configuration conf) {
useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true));
usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED,
usePageChecksumVerification));
withCodecFactory(HadoopCodecs.newFactory(conf, 0));
withRecordFilter(getFilter(conf));
withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
Expand All @@ -98,9 +98,9 @@ public Builder(Configuration conf) {
@Override
public ParquetReadOptions build() {
return new HadoopReadOptions(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties,
conf);
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter,
codecFactory, allocator, maxAllocationSize, properties, conf);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ public class ParquetReadOptions {
private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true;
private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
Comment thread
gszadovszky marked this conversation as resolved.

private final boolean useSignedStringMinMax;
private final boolean useStatsFilter;
private final boolean useDictionaryFilter;
private final boolean useRecordFilter;
private final boolean useColumnIndexFilter;
private final boolean usePageChecksumVerification;
private final FilterCompat.Filter recordFilter;
private final ParquetMetadataConverter.MetadataFilter metadataFilter;
private final CompressionCodecFactory codecFactory;
Expand All @@ -58,6 +60,7 @@ public class ParquetReadOptions {
boolean useDictionaryFilter,
boolean useRecordFilter,
boolean useColumnIndexFilter,
boolean usePageChecksumVerification,
FilterCompat.Filter recordFilter,
ParquetMetadataConverter.MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
Expand All @@ -69,6 +72,7 @@ public class ParquetReadOptions {
this.useDictionaryFilter = useDictionaryFilter;
this.useRecordFilter = useRecordFilter;
this.useColumnIndexFilter = useColumnIndexFilter;
this.usePageChecksumVerification = usePageChecksumVerification;
this.recordFilter = recordFilter;
this.metadataFilter = metadataFilter;
this.codecFactory = codecFactory;
Expand Down Expand Up @@ -97,6 +101,10 @@ public boolean useColumnIndexFilter() {
return useColumnIndexFilter;
}

public boolean usePageChecksumVerification() {
return usePageChecksumVerification;
}

public FilterCompat.Filter getRecordFilter() {
return recordFilter;
}
Expand Down Expand Up @@ -143,6 +151,7 @@ public static class Builder {
protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT;
protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT;
protected FilterCompat.Filter recordFilter = null;
protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER;
// the page size parameter isn't used when only using the codec factory to get decompressors
Expand Down Expand Up @@ -200,6 +209,16 @@ public Builder useColumnIndexFilter() {
return useColumnIndexFilter(true);
}


public Builder usePageChecksumVerification(boolean usePageChecksumVerification) {
this.usePageChecksumVerification = usePageChecksumVerification;
return this;
}

public Builder usePageChecksumVerification() {
return usePageChecksumVerification(true);
}

public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) {
this.recordFilter = rowGroupFilter;
return this;
Expand Down Expand Up @@ -235,6 +254,11 @@ public Builder withMaxAllocationInBytes(int allocationSizeInBytes) {
return this;
}

public Builder withPageChecksumVerification(boolean val) {
this.usePageChecksumVerification = val;
return this;
}

public Builder set(String key, String value) {
properties.put(key, value);
return this;
Expand All @@ -249,6 +273,7 @@ public Builder copy(ParquetReadOptions options) {
withMetadataFilter(options.metadataFilter);
withCodecFactory(options.codecFactory);
withAllocator(options.allocator);
withPageChecksumVerification(options.usePageChecksumVerification);
for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
set(keyValue.getKey(), keyValue.getValue());
}
Expand All @@ -257,8 +282,9 @@ public Builder copy(ParquetReadOptions options) {

public ParquetReadOptions build() {
return new ParquetReadOptions(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties);
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter,
codecFactory, allocator, maxAllocationSize, properties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1338,13 +1338,15 @@ public void writeDataPageHeader(
org.apache.parquet.column.Encoding rlEncoding,
org.apache.parquet.column.Encoding dlEncoding,
org.apache.parquet.column.Encoding valuesEncoding,
int crc,
Comment thread
gszadovszky marked this conversation as resolved.
Outdated
OutputStream to) throws IOException {
writePageHeader(newDataPageHeader(uncompressedSize,
compressedSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding), to);
valuesEncoding,
crc), to);
}

// Statistics are no longer saved in page headers
Expand All @@ -1364,14 +1366,30 @@ public void writeDataPageHeader(
to);
}

private PageHeader newDataPageHeader(
int uncompressedSize, int compressedSize,
int valueCount,
org.apache.parquet.column.Encoding rlEncoding,
org.apache.parquet.column.Encoding dlEncoding,
org.apache.parquet.column.Encoding valuesEncoding) {
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
pageHeader.setData_page_header(new DataPageHeader(
valueCount,
getEncoding(valuesEncoding),
getEncoding(dlEncoding),
getEncoding(rlEncoding)));
return pageHeader;
}

private PageHeader newDataPageHeader(
int uncompressedSize, int compressedSize,
int valueCount,
org.apache.parquet.column.Encoding rlEncoding,
org.apache.parquet.column.Encoding dlEncoding,
org.apache.parquet.column.Encoding valuesEncoding) {
org.apache.parquet.column.Encoding valuesEncoding,
int crc) {
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
// TODO: pageHeader.crc = ...;
pageHeader.setCrc(crc);
pageHeader.setData_page_header(new DataPageHeader(
valueCount,
getEncoding(valuesEncoding),
Expand All @@ -1397,20 +1415,38 @@ public void writeDataPageV2Header(
rlByteLength, dlByteLength), to);
}

public void writeDataPageV1Header(
int uncompressedSize,
int compressedSize,
int valueCount,
org.apache.parquet.column.Encoding rlEncoding,
org.apache.parquet.column.Encoding dlEncoding,
org.apache.parquet.column.Encoding valuesEncoding,
OutputStream to) throws IOException {
writePageHeader(newDataPageHeader(uncompressedSize,
compressedSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding), to);
}

public void writeDataPageV1Header(
int uncompressedSize,
int compressedSize,
int valueCount,
org.apache.parquet.column.Encoding rlEncoding,
org.apache.parquet.column.Encoding dlEncoding,
org.apache.parquet.column.Encoding valuesEncoding,
int crc,
OutputStream to) throws IOException {
writePageHeader(newDataPageHeader(uncompressedSize,
compressedSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding), to);
valuesEncoding,
crc), to);
}

public void writeDataPageV2Header(
Expand Down Expand Up @@ -1442,10 +1478,19 @@ private PageHeader newDataPageV2Header(
return pageHeader;
}

public void writeDictionaryPageHeader(
int uncompressedSize, int compressedSize, int valueCount,
org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
writePageHeader(pageHeader, to);
}

public void writeDictionaryPageHeader(
int uncompressedSize, int compressedSize, int valueCount,
org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
pageHeader.setCrc(crc);
pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
writePageHeader(pageHeader, to);
}
Expand Down
Loading