Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ private List<ColumnMetaData> updateColumnMetadataOffset(List<ColumnMetaData> col
for (ColumnMetaData column : columns) {
ColumnMetaData columnMetaData = new ColumnMetaData(column.type, column.encodings, column.path_in_schema, column.codec, column.num_values, column.total_uncompressed_size, column.total_compressed_size, currentOffset);
columnMetaData.setStatistics(column.getStatistics());
columnMetaData.setEncoding_stats(column.getEncoding_stats());
builder.add(columnMetaData);
currentOffset += column.getTotal_compressed_size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.PageEncodingStats;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.openjdk.jol.info.ClassLayout;
Expand All @@ -36,9 +38,11 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -73,7 +77,9 @@ public class PrimitiveColumnWriter
private int currentPageRowCount;

// column meta data stats
private final Set<Encoding> encodings;
private final Set<Encoding> encodings = new HashSet<>();
private final Map<org.apache.parquet.format.Encoding, Integer> dataPagesWithEncoding = new HashMap<>();
private final Map<org.apache.parquet.format.Encoding, Integer> dictionaryPagesWithEncoding = new HashMap<>();
private long totalCompressedSize;
private long totalUnCompressedSize;
private long totalRows;
Expand All @@ -96,7 +102,6 @@ public PrimitiveColumnWriter(ColumnDescriptor columnDescriptor, PrimitiveValueWr
this.definitionLevelEncoder = requireNonNull(definitionLevelEncoder, "definitionLevelEncoder is null");
this.repetitionLevelEncoder = requireNonNull(repetitionLevelEncoder, "repetitionLevelEncoder is null");
this.primitiveValueWriter = requireNonNull(primitiveValueWriter, "primitiveValueWriter is null");
this.encodings = new HashSet<>();
this.compressionCodec = requireNonNull(compressionCodecName, "compressionCodecName is null");
this.compressor = getCompressor(compressionCodecName);
this.pageSizeThreshold = pageSizeThreshold;
Expand Down Expand Up @@ -178,6 +183,14 @@ private ColumnMetaData getColumnMetaData()
totalCompressedSize,
-1);
columnMetaData.setStatistics(ParquetMetadataConverter.toParquetStatistics(columnStatistics));
ImmutableList.Builder<PageEncodingStats> pageEncodingStats = ImmutableList.builder();
dataPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE_V2, encodingAndCount.getKey(), encodingAndCount.getValue()))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should V2 here change in #9497?
if so, let's add a comment it in that PR... (also, would be yet another reason to simplify and not try to have a toggle there)

cc @losipiuk

.forEach(pageEncodingStats::add);
dictionaryPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DICTIONARY_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue()))
.forEach(pageEncodingStats::add);
columnMetaData.setEncoding_stats(pageEncodingStats.build());
return columnMetaData;
}

Expand Down Expand Up @@ -236,6 +249,8 @@ private void flushCurrentPageToBuffer()

List<ParquetDataOutput> dataOutputs = outputDataStreams.build();

dataPagesWithEncoding.merge(new ParquetMetadataConverter().getEncoding(primitiveValueWriter.getEncoding()), 1, Integer::sum);

// update total stats
totalCompressedSize += pageHeader.size() + compressedSize;
totalUnCompressedSize += pageHeader.size() + uncompressedSize;
Expand Down Expand Up @@ -283,6 +298,7 @@ private List<ParquetDataOutput> getDataStreams()
dictPage.add(pageData);
totalCompressedSize += pageHeader.size() + compressedSize;
totalUnCompressedSize += pageHeader.size() + uncompressedSize;
dictionaryPagesWithEncoding.merge(new ParquetMetadataConverter().getEncoding(dictionaryPage.getEncoding()), 1, Integer::sum);

primitiveValueWriter.resetDictionary();
}
Expand Down Expand Up @@ -325,6 +341,8 @@ public void reset()
totalUnCompressedSize = 0;
totalRows = 0;
encodings.clear();
dataPagesWithEncoding.clear();
dictionaryPagesWithEncoding.clear();
this.columnStatistics = Statistics.createStats(columnDescriptor.getPrimitiveType());

getDataStreamsCalled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4625,6 +4625,30 @@ public void testParquetLongDecimalPredicatePushdown()
assertNoDataRead("SELECT * FROM test_parquet_long_decimal_predicate_pushdown WHERE decimal_t != DECIMAL '12345678900000000.345'");
}

@Test
public void testParquetDictionaryPredicatePushdown()
{
testParquetDictionaryPredicatePushdown(getSession());
}

@Test
public void testParquetDictionaryPredicatePushdownWithOptimizedWriter()
{
testParquetDictionaryPredicatePushdown(
Session.builder(getSession())
.setCatalogSessionProperty("hive", "experimental_parquet_optimized_writer_enabled", "true")
.build());
}

private void testParquetDictionaryPredicatePushdown(Session session)
{
String tableName = "test_parquet_dictionary_pushdown";
assertUpdate(session, "DROP TABLE IF EXISTS " + tableName);
assertUpdate(session, "CREATE TABLE " + tableName + " (n BIGINT) WITH (format = 'PARQUET')");
assertUpdate(session, "INSERT INTO " + tableName + " VALUES 1, 1, 2, 2, 4, 4, 5, 5", 8);
assertNoDataRead("SELECT * FROM " + tableName + " WHERE n = 3");
}

private void assertNoDataRead(@Language("SQL") String sql)
{
assertQueryStats(
Expand Down