Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,58 @@

package org.apache.hudi.common.model;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;

/**
* Hoodie Range metadata.
* Hoodie metadata for the column range of data stored in columnar format (like Parquet)
*
* NOTE: {@link Comparable} is used as raw-type so that we can handle polymorphism, where
* caller apriori is not aware of the type {@link HoodieColumnRangeMetadata} is
* associated with
*/
public class HoodieColumnRangeMetadata<T> implements Serializable {
@SuppressWarnings("rawtype")
public class HoodieColumnRangeMetadata<T extends Comparable> implements Serializable {
private final String filePath;
private final String columnName;
@Nullable
private final T minValue;
@Nullable
private final T maxValue;
private final long nullCount;
private final long valueCount;
private final long totalSize;
private final long totalUncompressedSize;

public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> COLUMN_RANGE_MERGE_FUNCTION =
(oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<>(
(oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<Comparable>(
newColumnRange.getFilePath(),
newColumnRange.getColumnName(),
(Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
.stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null),
(Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
.stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null),
(Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! We should replace Arrays.asList by Stream if there are other such places.

.filter(Objects::nonNull)
.min(Comparator.naturalOrder())
.orElse(null),
(Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
.filter(Objects::nonNull)
.max(Comparator.naturalOrder()).orElse(null),
oldColumnRange.getNullCount() + newColumnRange.getNullCount(),
oldColumnRange.getValueCount() + newColumnRange.getValueCount(),
oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(),
oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize()
);

public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue,
final long nullCount, long valueCount, long totalSize, long totalUncompressedSize) {
private HoodieColumnRangeMetadata(String filePath,
String columnName,
@Nullable T minValue,
@Nullable T maxValue,
long nullCount,
long valueCount,
long totalSize,
long totalUncompressedSize) {
this.filePath = filePath;
this.columnName = columnName;
this.minValue = minValue;
Expand All @@ -71,10 +88,12 @@ public String getColumnName() {
return this.columnName;
}

@Nullable
public T getMinValue() {
return this.minValue;
}

@Nullable
public T getMaxValue() {
return this.maxValue;
}
Expand Down Expand Up @@ -133,6 +152,23 @@ public String toString() {
+ '}';
}

public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> create(String filePath,
String columnName,
@Nullable T minValue,
@Nullable T maxValue,
long nullCount,
long valueCount,
long totalSize,
long totalUncompressedSize) {
return new HoodieColumnRangeMetadata<>(filePath, columnName, minValue, maxValue, nullCount, valueCount, totalSize, totalUncompressedSize);
}

@SuppressWarnings("rawtype")
public static HoodieColumnRangeMetadata<Comparable> stub(String filePath,
String columnName) {
return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1);
}

/**
* Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index.
*/
Expand All @@ -144,6 +180,6 @@ public static final class Stats {
public static final String TOTAL_SIZE = "total_size";
public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size";

private Stats() { }
private Stats() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -288,18 +289,27 @@ public Boolean apply(String recordKey) {
/**
* Parse min/max statistics stored in parquet footers for all columns.
*/
@SuppressWarnings("rawtype")
public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
@Nonnull Configuration conf,
@Nonnull Path parquetFilePath,
@Nonnull List<String> cols
) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);

// NOTE: This collector has to have fully specialized generic type params since
// Java 1.8 struggles to infer them
Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector =
Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);

// Collect stats from all individual Parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().sequential()
.flatMap(blockMetaData -> blockMetaData.getColumns().stream()
.filter(f -> cols.contains(f.getPath().toDotString()))
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
(Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential()
.flatMap(blockMetaData ->
blockMetaData.getColumns().stream()
.filter(f -> cols.contains(f.getPath().toDotString()))
.map(columnChunkMetaData ->
new HoodieColumnRangeMetadata<Comparable>(
HoodieColumnRangeMetadata.<Comparable>create(
parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
convertToNativeJavaType(
Expand All @@ -312,7 +322,8 @@ public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
columnChunkMetaData.getValueCount(),
columnChunkMetaData.getTotalSize(),
columnChunkMetaData.getTotalUncompressedSize()))
).collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
)
.collect(groupingByCollector);

// Combine those into file-level statistics
// NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer
Expand Down Expand Up @@ -360,7 +371,7 @@ private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
maxValue = one.getMaxValue();
}

return new HoodieColumnRangeMetadata<T>(
return HoodieColumnRangeMetadata.create(
one.getFilePath(),
one.getColumnName(), minValue, maxValue,
one.getNullCount() + another.getNullCount(),
Expand All @@ -369,7 +380,11 @@ private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
one.getTotalUncompressedSize() + another.getTotalUncompressedSize());
}

private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) {
private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable<?> val) {
if (val == null) {
return null;
}

if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
return extractDecimal(val, primitiveType.getDecimalMetadata());
} else if (primitiveType.getOriginalType() == OriginalType.DATE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,8 +960,7 @@ private static Stream<HoodieRecord> getColumnStats(final String partitionPath, f
} else {
// TODO we should delete records instead of stubbing them
columnRangeMetadataList =
columnsToIndex.stream().map(entry -> new HoodieColumnRangeMetadata<Comparable>(fileName,
entry, null, null, 0, 0, 0, 0))
columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
.collect(Collectors.toList());
}
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted);
Expand Down Expand Up @@ -1012,11 +1011,11 @@ public static void accumulateColumnRanges(Schema.Field field, String filePath,
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
Map<String, Map<String, Object>> columnToStats) {
Map<String, Object> columnStats = columnToStats.get(field.name());
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>(
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = HoodieColumnRangeMetadata.create(
filePath,
field.name(),
String.valueOf(columnStats.get(MIN)),
String.valueOf(columnStats.get(MAX)),
(Comparable) String.valueOf(columnStats.get(MIN)),
(Comparable) String.valueOf(columnStats.get(MAX)),
Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ public List<HoodieColumnRangeMetadata<String>> getSortedColumnStatsList(
return allColumnNameList.stream()
.flatMap(columnName ->
tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
.map(stats -> new HoodieColumnRangeMetadata<>(
.map(stats -> HoodieColumnRangeMetadata.create(
stats.getFileName(),
columnName,
stats.getMinValue(),
Expand All @@ -799,7 +799,7 @@ public List<HoodieColumnRangeMetadata<String>> getSortedColumnStatsList(
metaClient.getHadoopConf(),
new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
allColumnNameList).stream())
.map(rangeMetadata -> new HoodieColumnRangeMetadata<String>(
.map(rangeMetadata -> HoodieColumnRangeMetadata.create(
rangeMetadata.getFilePath(),
rangeMetadata.getColumnName(),
// Note: here we ignore the type in the validation,
Expand Down