Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(List<
throw new HoodieMetadataException("Unsupported operation: getColumnsStats!");
}

@Override
public Map<Pair<String, String>, List<HoodieMetadataColumnStats>> getColumnStats(List<Pair<String, String>> partitionNameFileNameList, List<String> columnNames) throws HoodieMetadataException {
throw new HoodieMetadataException("Unsupported operation: getColumnsStats!");
}

@Override
public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> recordKeys) {
throw new HoodieMetadataException("Unsupported operation: readRecordIndex!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,46 +237,21 @@ public Map<Pair<String, String>, BloomFilter> getBloomFilters(final List<Pair<St
@Override
public Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final List<Pair<String, String>> partitionNameFileNameList, final String columnName)
throws HoodieMetadataException {
Map<Pair<String, String>, List<HoodieMetadataColumnStats>> partitionNameFileToColStats = getColumnStats(partitionNameFileNameList, Collections.singletonList(columnName));
ValidationUtils.checkArgument(partitionNameFileToColStats.isEmpty() || partitionNameFileToColStats.values().stream().anyMatch(stats -> stats.size() == 1));
return partitionNameFileToColStats.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get(0)));
}

@Override
public Map<Pair<String, String>, List<HoodieMetadataColumnStats>> getColumnStats(List<Pair<String, String>> partitionNameFileNameList, List<String> columnNames)
throws HoodieMetadataException {
if (!dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS)) {
LOG.error("Metadata column stats index is disabled!");
return Collections.emptyMap();
}

Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new HashMap<>();
Set<String> columnStatKeyset = new HashSet<>();
final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
for (Pair<String, String> partitionNameFileNamePair : partitionNameFileNameList) {
final String columnStatsIndexKey = HoodieMetadataPayload.getColumnStatsIndexKey(
new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
new FileIndexID(partitionNameFileNamePair.getRight()),
columnIndexID);
columnStatKeyset.add(columnStatsIndexKey);
columnStatKeyToFileNameMap.put(columnStatsIndexKey, partitionNameFileNamePair);
}

List<String> columnStatKeylist = new ArrayList<>(columnStatKeyset);
HoodieTimer timer = HoodieTimer.start();
Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
getRecordsByKeys(columnStatKeylist, MetadataPartitionType.COLUMN_STATS.getPartitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR, timer.endTimer()));
metrics.ifPresent(m -> m.setMetric(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_FILE_COUNT_STR, columnStatKeylist.size()));

Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatMap = new HashMap<>();
for (final Map.Entry<String, HoodieRecord<HoodieMetadataPayload>> entry : hoodieRecords.entrySet()) {
final Option<HoodieMetadataColumnStats> columnStatMetadata =
entry.getValue().getData().getColumnStatMetadata();
if (columnStatMetadata.isPresent()) {
if (!columnStatMetadata.get().getIsDeleted()) {
ValidationUtils.checkState(columnStatKeyToFileNameMap.containsKey(entry.getKey()));
final Pair<String, String> partitionFileNamePair = columnStatKeyToFileNameMap.get(entry.getKey());
ValidationUtils.checkState(!fileToColumnStatMap.containsKey(partitionFileNamePair));
fileToColumnStatMap.put(partitionFileNamePair, columnStatMetadata.get());
}
} else {
LOG.error("Meta index column stats missing for: {}", entry.getKey());
}
}
return fileToColumnStatMap;
Map<String, Pair<String, String>> columnStatKeyToFileNameMap = computeColStatKeyToFileName(partitionNameFileNameList, columnNames);
return computeFileToColumnStatsMap(columnStatKeyToFileNameMap);
}

/**
Expand Down Expand Up @@ -427,6 +402,55 @@ Map<String, List<StoragePathInfo>> fetchAllFilesInPartitionPaths(List<StoragePat
return partitionPathToFilesMap;
}

/**
* Computes a map from col-stats key to partition and file name pair.
*
* @param partitionNameFileNameList - List of partition and file name pair for which bloom filters need to be retrieved.
* @param columnNames - List of column name for which stats are needed.
*/
private Map<String, Pair<String, String>> computeColStatKeyToFileName(
final List<Pair<String, String>> partitionNameFileNameList,
final List<String> columnNames) {
Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new HashMap<>();
for (String columnName : columnNames) {
final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
for (Pair<String, String> partitionNameFileNamePair : partitionNameFileNameList) {
final String columnStatsIndexKey = HoodieMetadataPayload.getColumnStatsIndexKey(
new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
new FileIndexID(partitionNameFileNamePair.getRight()),
columnIndexID);
columnStatKeyToFileNameMap.put(columnStatsIndexKey, partitionNameFileNamePair);
}
}
return columnStatKeyToFileNameMap;
}

/**
* Computes the map from partition and file name pair to HoodieMetadataColumnStats record.
*
* @param columnStatKeyToFileNameMap - A map from col-stats key to partition and file name pair.
*/
private Map<Pair<String, String>, List<HoodieMetadataColumnStats>> computeFileToColumnStatsMap(Map<String, Pair<String, String>> columnStatKeyToFileNameMap) {
List<String> columnStatKeylist = new ArrayList<>(columnStatKeyToFileNameMap.keySet());
HoodieTimer timer = HoodieTimer.start();
Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
getRecordsByKeys(columnStatKeylist, MetadataPartitionType.COLUMN_STATS.getPartitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR, timer.endTimer()));
Map<Pair<String, String>, List<HoodieMetadataColumnStats>> fileToColumnStatMap = new HashMap<>();
for (final Map.Entry<String, HoodieRecord<HoodieMetadataPayload>> entry : hoodieRecords.entrySet()) {
final Option<HoodieMetadataColumnStats> columnStatMetadata =
entry.getValue().getData().getColumnStatMetadata();
if (columnStatMetadata.isPresent() && !columnStatMetadata.get().getIsDeleted()) {
ValidationUtils.checkState(columnStatKeyToFileNameMap.containsKey(entry.getKey()));
final Pair<String, String> partitionFileNamePair = columnStatKeyToFileNameMap.get(entry.getKey());
fileToColumnStatMap.computeIfAbsent(partitionFileNamePair, k -> new ArrayList<>()).add(columnStatMetadata.get());
} else {
LOG.error("Meta index column stats missing for {}", entry.getKey());
}
}
return fileToColumnStatMap;
}

/**
* Handle spurious deletes. Depending on config, throw an exception or log a warn msg.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ public Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final
throw new HoodieMetadataException("Unsupported operation: getColumnsStats!");
}

@Override
public Map<Pair<String, String>, List<HoodieMetadataColumnStats>> getColumnStats(List<Pair<String, String>> partitionNameFileNameList, List<String> columnNames) throws HoodieMetadataException {
throw new HoodieMetadataException("Unsupported operation: getColumnsStats!");
}

@Override
public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName, boolean shouldLoadInMemory) {
throw new HoodieMetadataException("Unsupported operation: getRecordsByKeyPrefixes!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,18 @@ Map<Pair<String, String>, BloomFilter> getBloomFilters(final List<Pair<String, S
Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final List<Pair<String, String>> partitionNameFileNameList, final String columnName)
throws HoodieMetadataException;


/**
* Get column stats for files from the metadata table index.
*
* @param partitionNameFileNameList - List of partition and file name pair for which bloom filters need to be retrieved.
* @param columnNames - List of column name for which stats are needed.
* @return Map of partition and file name pair to a list of column stats.
* @throws HoodieMetadataException
*/
Map<Pair<String, String>, List<HoodieMetadataColumnStats>> getColumnStats(List<Pair<String, String>> partitionNameFileNameList, List<String> columnNames)
throws HoodieMetadataException;

/**
* Returns the location of record keys which are found in the record index.
* Records that are not found are ignored and wont be part of map object that is returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
Expand Down Expand Up @@ -121,6 +124,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -152,11 +156,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
Expand Down Expand Up @@ -3140,6 +3146,71 @@ public void testNonPartitionedColStats() throws Exception {
}
}

@Test
public void testColStatsMultipleColumns() throws Exception {
initPath();
Properties properties = new TypedProperties();
properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.withMetadataIndexColumnStats(true)
.withColumnStatsIndexForColumns("begin_lat,end_lat,distance_in_meters,weight")
.withProperties(properties)
.build()).build();
init(HoodieTableType.COPY_ON_WRITE, writeConfig);
initMetaClient(writeConfig.getProps());
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

HoodieTestDataGenerator partitionedGenerator = new HoodieTestDataGenerator();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
// Write 1 (Bulk insert)
String newCommitTime = "0000001";
List<HoodieRecord> records = partitionedGenerator.generateInserts(newCommitTime, 10);
client.startCommitWithTime(newCommitTime);
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();

HoodieTableMetadata tableMetadata = metadata(client, metaClient.getStorage());
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
List<Pair<String, String>> partitionNameFileNameList = new ArrayList<>();
for (String metadataPartition : metadataPartitions) {
partitionNameFileNameList.addAll(
tableMetadata.getAllFilesInPartitions(singletonList(String.format("%s/%s",basePath, metadataPartition))).entrySet().stream()
.flatMap(entry -> entry.getValue().stream().map(storagePathInfo -> Pair.of(metadataPartition, storagePathInfo.getPath().getName())))
.collect(Collectors.toList())
);
}
List<String> columns = Arrays.asList("begin_lat", "end_lat", "distance_in_meters", "weight", "seconds_since_epoch", "nation");
Map<Pair<String, String>, List<HoodieMetadataColumnStats>> colStatsFromMetadata = tableMetadata.getColumnStats(partitionNameFileNameList, columns);
assertEquals(partitionNameFileNameList.size(), colStatsFromMetadata.size());
// Assert stats from parquet footer same as metadata.
colStatsFromMetadata.forEach(((partitionAndFileName, stats) -> {
StoragePath fullFilePath = new StoragePath(basePath, String.format("%s/%s", partitionAndFileName.getLeft(), partitionAndFileName.getRight()));
Map<String, HoodieColumnRangeMetadata<Comparable>> parquetStatsMap =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(metaClient.getStorage(), fullFilePath, columns)
.stream()
.collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity()));
Map<String, HoodieMetadataColumnStats> columnStatsMap = stats.stream().collect(Collectors.toMap(HoodieMetadataColumnStats::getColumnName, Function.identity()));
List<String> columnsWithoutStats = Arrays.asList("seconds_since_epoch", "nation");
Assertions.assertEquals(parquetStatsMap.size() - columnsWithoutStats.size(), columnStatsMap.size());
for (String column : columns) {
if (columnsWithoutStats.contains(column)) {
// Assert columnsWithoutStats are not present in MDT result.
Assertions.assertFalse(columnStatsMap.containsKey(column));
continue;
}
// Assert getColumnStats returns same data.
Assertions.assertEquals(columnStatsMap.get(column), tableMetadata.getColumnStats(Collections.singletonList(partitionAndFileName), column).get(partitionAndFileName));
Assertions.assertEquals(parquetStatsMap.get(column).getNullCount(), columnStatsMap.get(column).getNullCount());
Assertions.assertEquals(parquetStatsMap.get(column).getValueCount(),columnStatsMap.get(column).getValueCount());
Assertions.assertEquals(parquetStatsMap.get(column).getMaxValue(), unwrapAvroValueWrapper(columnStatsMap.get(column).getMaxValue()));
Assertions.assertEquals(parquetStatsMap.get(column).getMinValue(), unwrapAvroValueWrapper(columnStatsMap.get(column).getMinValue()));
}
}));
}
}

/**
* Test various metrics published by metadata table.
*/
Expand Down
Loading