diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java index d098c4ff7c2b..2afbd1960f5e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -18,19 +18,27 @@ package org.apache.hudi.common.model; +import javax.annotation.Nullable; import java.io.Serializable; -import java.util.Arrays; import java.util.Comparator; import java.util.Objects; import java.util.function.BiFunction; +import java.util.stream.Stream; /** - * Hoodie Range metadata. + * Hoodie metadata for the column range of data stored in columnar format (like Parquet) + * + * NOTE: {@link Comparable} is used as raw-type so that we can handle polymorphism, where + * caller apriori is not aware of the type {@link HoodieColumnRangeMetadata} is + * associated with */ -public class HoodieColumnRangeMetadata implements Serializable { +@SuppressWarnings("rawtype") +public class HoodieColumnRangeMetadata implements Serializable { private final String filePath; private final String columnName; + @Nullable private final T minValue; + @Nullable private final T maxValue; private final long nullCount; private final long valueCount; @@ -38,21 +46,30 @@ public class HoodieColumnRangeMetadata implements Serializable { private final long totalUncompressedSize; public static final BiFunction, HoodieColumnRangeMetadata, HoodieColumnRangeMetadata> COLUMN_RANGE_MERGE_FUNCTION = - (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<>( + (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata( newColumnRange.getFilePath(), newColumnRange.getColumnName(), - (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) - .stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null), - (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) - .stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null), + (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) + .filter(Objects::nonNull) + .min(Comparator.naturalOrder()) + .orElse(null), + (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) + .filter(Objects::nonNull) + .max(Comparator.naturalOrder()).orElse(null), oldColumnRange.getNullCount() + newColumnRange.getNullCount(), oldColumnRange.getValueCount() + newColumnRange.getValueCount(), oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(), oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize() ); - public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, - final long nullCount, long valueCount, long totalSize, long totalUncompressedSize) { + private HoodieColumnRangeMetadata(String filePath, + String columnName, + @Nullable T minValue, + @Nullable T maxValue, + long nullCount, + long valueCount, + long totalSize, + long totalUncompressedSize) { this.filePath = filePath; this.columnName = columnName; this.minValue = minValue; @@ -71,10 +88,12 @@ public String getColumnName() { return this.columnName; } + @Nullable public T getMinValue() { return this.minValue; } + @Nullable public T getMaxValue() { return this.maxValue; } @@ -133,6 +152,23 @@ public String toString() { + '}'; } + public static > HoodieColumnRangeMetadata create(String filePath, + String columnName, + @Nullable T minValue, + @Nullable T maxValue, + long nullCount, + long valueCount, + long totalSize, + long totalUncompressedSize) { + return new HoodieColumnRangeMetadata<>(filePath, columnName, minValue, maxValue, nullCount, valueCount, totalSize, totalUncompressedSize); + } + + @SuppressWarnings("rawtype") + public static HoodieColumnRangeMetadata stub(String filePath, + String columnName) { + return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1); + } + /** * Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index. */ @@ -144,6 +180,6 @@ public static final class Stats { public static final String TOTAL_SIZE = "total_size"; public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size"; - private Stats() { } + private Stats() {} } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index e74f4f77703d..c0f7aabde6c2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -58,6 +58,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -288,18 +289,27 @@ public Boolean apply(String recordKey) { /** * Parse min/max statistics stored in parquet footers for all columns. */ + @SuppressWarnings("rawtype") public List> readRangeFromParquetMetadata( @Nonnull Configuration conf, @Nonnull Path parquetFilePath, @Nonnull List cols ) { ParquetMetadata metadata = readMetadata(conf, parquetFilePath); + + // NOTE: This collector has to have fully specialized generic type params since + // Java 1.8 struggles to infer them + Collector, ?, Map>>> groupingByCollector = + Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName); + // Collect stats from all individual Parquet blocks - Map>> columnToStatsListMap = metadata.getBlocks().stream().sequential() - .flatMap(blockMetaData -> blockMetaData.getColumns().stream() - .filter(f -> cols.contains(f.getPath().toDotString())) + Map>> columnToStatsListMap = + (Map>>) metadata.getBlocks().stream().sequential() + .flatMap(blockMetaData -> + blockMetaData.getColumns().stream() + .filter(f -> cols.contains(f.getPath().toDotString())) .map(columnChunkMetaData -> - new HoodieColumnRangeMetadata( + HoodieColumnRangeMetadata.create( parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(), convertToNativeJavaType( @@ -312,7 +322,8 @@ public List> readRangeFromParquetMetadata( columnChunkMetaData.getValueCount(), columnChunkMetaData.getTotalSize(), columnChunkMetaData.getTotalUncompressedSize())) - ).collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName)); + ) + .collect(groupingByCollector); // Combine those into file-level statistics // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer @@ -360,7 +371,7 @@ private > HoodieColumnRangeMetadata combineRanges( maxValue = one.getMaxValue(); } - return new HoodieColumnRangeMetadata( + return HoodieColumnRangeMetadata.create( one.getFilePath(), one.getColumnName(), minValue, maxValue, one.getNullCount() + another.getNullCount(), @@ -369,7 +380,11 @@ private > HoodieColumnRangeMetadata combineRanges( one.getTotalUncompressedSize() + another.getTotalUncompressedSize()); } - private static Comparable convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) { + private static Comparable convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) { + if (val == null) { + return null; + } + if (primitiveType.getOriginalType() == OriginalType.DECIMAL) { return extractDecimal(val, primitiveType.getDecimalMetadata()); } else if (primitiveType.getOriginalType() == OriginalType.DATE) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 4390e8766c6a..4d6c602c0d17 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -960,8 +960,7 @@ private static Stream getColumnStats(final String partitionPath, f } else { // TODO we should delete records instead of stubbing them columnRangeMetadataList = - columnsToIndex.stream().map(entry -> new HoodieColumnRangeMetadata(fileName, - entry, null, null, 0, 0, 0, 0)) + columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry)) .collect(Collectors.toList()); } return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted); @@ -1012,11 +1011,11 @@ public static void accumulateColumnRanges(Schema.Field field, String filePath, Map> columnRangeMap, Map> columnToStats) { Map columnStats = columnToStats.get(field.name()); - HoodieColumnRangeMetadata columnRangeMetadata = new HoodieColumnRangeMetadata<>( + HoodieColumnRangeMetadata columnRangeMetadata = HoodieColumnRangeMetadata.create( filePath, field.name(), - String.valueOf(columnStats.get(MIN)), - String.valueOf(columnStats.get(MAX)), + (Comparable) String.valueOf(columnStats.get(MIN)), + (Comparable) String.valueOf(columnStats.get(MAX)), Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()), Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()), Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()), diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 832d942c86af..af0c10099e3b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -780,7 +780,7 @@ public List> getSortedColumnStatsList( return allColumnNameList.stream() .flatMap(columnName -> tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream() - .map(stats -> new HoodieColumnRangeMetadata<>( + .map(stats -> HoodieColumnRangeMetadata.create( stats.getFileName(), columnName, stats.getMinValue(), @@ -799,7 +799,7 @@ public List> getSortedColumnStatsList( metaClient.getHadoopConf(), new Path(new Path(metaClient.getBasePath(), partitionPath), filename), allColumnNameList).stream()) - .map(rangeMetadata -> new HoodieColumnRangeMetadata( + .map(rangeMetadata -> HoodieColumnRangeMetadata.create( rangeMetadata.getFilePath(), rangeMetadata.getColumnName(), // Note: here we ignore the type in the validation,