diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 87dd56b593809..fd4e93365e3d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1557,11 +1557,11 @@ public boolean isMetadataColumnStatsIndexEnabled() { return isMetadataTableEnabled() && getMetadataConfig().isColumnStatsIndexEnabled(); } - public String getColumnsEnabledForColumnStatsIndex() { + public List getColumnsEnabledForColumnStatsIndex() { return getMetadataConfig().getColumnsEnabledForColumnStatsIndex(); } - public String getColumnsEnabledForBloomFilterIndex() { + public List getColumnsEnabledForBloomFilterIndex() { return getMetadataConfig().getColumnsEnabledForBloomFilterIndex(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 01c16a57baf57..7fc46e8b9bbc4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -54,7 +54,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.SizeEstimator; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieAppendException; import org.apache.hudi.exception.HoodieException; @@ -66,6 +65,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -73,7 +73,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata; @@ -348,15 +347,16 @@ private void processAppendResult(AppendResult result, List record if (config.isMetadataColumnStatsIndexEnabled()) { final List fieldsToIndex; - if (StringUtils.isNullOrEmpty(config.getColumnsEnabledForColumnStatsIndex())) { - // If column stats index is enabled but columns not configured then we assume that all columns should be indexed + // If column stats index is enabled but columns not configured then we assume that + // all columns should be indexed + if (config.getColumnsEnabledForColumnStatsIndex().isEmpty()) { fieldsToIndex = writeSchemaWithMetaFields.getFields(); } else { - Set columnsToIndex = Stream.of(config.getColumnsEnabledForColumnStatsIndex().split(",")) - .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet()); + Set columnsToIndexSet = new HashSet<>(config.getColumnsEnabledForColumnStatsIndex()); fieldsToIndex = writeSchemaWithMetaFields.getFields().stream() - .filter(field -> columnsToIndex.contains(field.name())).collect(Collectors.toList()); + .filter(field -> columnsToIndexSet.contains(field.name())) + .collect(Collectors.toList()); } Map> columnRangesMetadataMap = diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 8d4cb17c56951..4faac22a841fe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -18,6 +18,11 @@ package org.apache.hudi.metadata; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; import org.apache.hudi.avro.model.HoodieInstantInfo; @@ -55,7 +60,6 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; @@ -66,12 +70,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieMetadataException; - -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -729,12 +727,14 @@ public void dropMetadataPartitions(List metadataPartition private MetadataRecordsGenerationParams getRecordsGenerationParams() { return new MetadataRecordsGenerationParams( - dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), + dataMetaClient, + enabledPartitionTypes, + dataWriteConfig.getBloomFilterType(), dataWriteConfig.getMetadataBloomFilterIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(), dataWriteConfig.getColumnStatsIndexParallelism(), - StringUtils.toList(dataWriteConfig.getColumnsEnabledForColumnStatsIndex()), - StringUtils.toList(dataWriteConfig.getColumnsEnabledForBloomFilterIndex())); + dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), + dataWriteConfig.getColumnsEnabledForBloomFilterIndex()); } /** @@ -1021,6 +1021,7 @@ private void initialCommit(String createInstantTime, List }) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum(); List partitions = new ArrayList<>(partitionToFilesMap.keySet()); if (partitionTypes.contains(MetadataPartitionType.FILES)) { @@ -1031,19 +1032,19 @@ private void initialCommit(String createInstantTime, List partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); } - if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { + if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) { final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); } - if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { + if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) { final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); } - LOG.info("Committing " + partitions.size() + " partitions and " + partitionToFilesMap.values().size() + " files to metadata"); + LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata"); commit(createInstantTime, partitionToRecordsMap, false); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index eb48ebc97864e..f6f73f633ef5d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,6 +18,11 @@ package org.apache.hudi.table; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; @@ -60,7 +65,6 @@ import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -78,12 +82,6 @@ import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.storage.HoodieLayoutFactory; import org.apache.hudi.table.storage.HoodieStorageLayout; - -import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -888,7 +886,7 @@ private boolean shouldExecuteMetadataTableDeletion() { return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath()) && !config.isMetadataTableEnabled() && (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS) - || !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions())); + || !metaClient.getTableConfig().getMetadataPartitions().isEmpty()); } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 54de53581792a..34f470eb1b64a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,6 +18,14 @@ package org.apache.hudi.client.functional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.util.Time; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; @@ -70,7 +78,6 @@ import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.hash.ColumnIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; @@ -98,15 +105,6 @@ import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.testutils.MetadataMergeWriteStatus; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -362,7 +360,7 @@ public void testTurnOffMetadataTableAfterEnable() throws Exception { HoodieTableConfig hoodieTableConfig2 = new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass()); - assertEquals(StringUtils.EMPTY_STRING, hoodieTableConfig2.getMetadataPartitions()); + assertEquals(Collections.emptyList(), hoodieTableConfig2.getMetadataPartitions()); // Assert metadata table folder is deleted assertFalse(metaClient.getFs().exists( new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath())))); @@ -623,7 +621,6 @@ public void testTableOperationsWithMetadataIndex(HoodieTableType tableType) thro .withMetadataIndexBloomFilterFileGroups(4) .withMetadataIndexColumnStats(true) .withMetadataIndexBloomFilterFileGroups(2) - .withMetadataIndexForAllColumns(true) .build()) .build(); init(tableType, writeConfig); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index c4308f79d710c..c77e292b4775f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -38,6 +38,8 @@ public class HoodieConfig implements Serializable { private static final Logger LOG = LogManager.getLogger(HoodieConfig.class); + protected static final String CONFIG_VALUES_DELIMITER = ","; + public static HoodieConfig create(FSDataInputStream inputStream) throws IOException { HoodieConfig config = new HoodieConfig(); config.props.load(inputStream); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index c5753cc3da6a6..14a055cb17cc0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.config; import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieNotSupportedException; import javax.annotation.concurrent.Immutable; @@ -26,6 +27,7 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.util.List; import java.util.Properties; /** @@ -173,15 +175,6 @@ public final class HoodieMetadataConfig extends HoodieConfig { + "log files and read parallelism in the column stats index partition. The recommendation is to size the " + "file group count such that the base files are under 1GB."); - public static final ConfigProperty ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS = ConfigProperty - .key(METADATA_PREFIX + ".index.column.stats.all_columns.enable") - .defaultValue(true) - .sinceVersion("0.11.0") - .withDocumentation("Enable indexing column ranges of user data files for all columns under " - + "metadata table key lookups. When enabled, metadata table will have a partition to " - + "store the column ranges and will be used for pruning files during the index lookups. " - + "Only applies if " + ENABLE_METADATA_INDEX_COLUMN_STATS.key() + " is enabled."); - public static final ConfigProperty COLUMN_STATS_INDEX_PARALLELISM = ConfigProperty .key(METADATA_PREFIX + ".index.column.stats.parallelism") .defaultValue(10) @@ -249,16 +242,12 @@ public boolean isColumnStatsIndexEnabled() { return getBooleanOrDefault(ENABLE_METADATA_INDEX_COLUMN_STATS); } - public boolean isMetadataColumnStatsIndexForAllColumnsEnabled() { - return getBooleanOrDefault(ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS); - } - - public String getColumnsEnabledForColumnStatsIndex() { - return getString(COLUMN_STATS_INDEX_FOR_COLUMNS); + public List getColumnsEnabledForColumnStatsIndex() { + return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER); } - public String getColumnsEnabledForBloomFilterIndex() { - return getString(BLOOM_FILTER_INDEX_FOR_COLUMNS); + public List getColumnsEnabledForBloomFilterIndex() { + return StringUtils.split(getString(BLOOM_FILTER_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER); } public int getBloomFilterIndexFileGroupCount() { @@ -353,11 +342,6 @@ public Builder withColumnStatsIndexParallelism(int parallelism) { return this; } - public Builder withMetadataIndexForAllColumns(boolean enable) { - metadataConfig.setValue(ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS, String.valueOf(enable)); - return this; - } - public Builder withColumnStatsIndexForColumns(String columns) { metadataConfig.setValue(COLUMN_STATS_INDEX_FOR_COLUMNS, columns); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index c158372f9820d..254044bd28371 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -618,12 +618,18 @@ private Long getTableChecksum() { return getLong(TABLE_CHECKSUM); } - public String getMetadataPartitionsInflight() { - return getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING); - } - - public String getMetadataPartitions() { - return getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING); + public List getMetadataPartitionsInflight() { + return StringUtils.split( + getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), + CONFIG_VALUES_DELIMITER + ); + } + + public List getMetadataPartitions() { + return StringUtils.split( + getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), + CONFIG_VALUES_DELIMITER + ); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java index 4a092aa6f3400..3e1a1a9cc7f5e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java @@ -19,12 +19,8 @@ package org.apache.hudi.common.util; import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.HashSet; +import java.util.Collections; import java.util.List; -import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -34,10 +30,6 @@ public class StringUtils { public static final String EMPTY_STRING = ""; - private static final Function> STRING_TO_SET = (str) -> Stream.of(str.split(",")) - .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet()); - private static final Function> STRING_TO_LIST = (str) -> Stream.of(str.split(",")) - .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); /** *

@@ -114,22 +106,13 @@ private static boolean stringIsNullOrEmpty(@Nullable String string) { } /** - * Converts the input string, delimited by comma, to a set of strings. - * - * @param input - * @return - */ - public static Set toSet(@Nullable String input) { - return isNullOrEmpty(input) ? new HashSet<>() : STRING_TO_SET.apply(input); - } - - /** - * Converts the input string, delimited by comma, to a list of strings. - * - * @param input - * @return + * Splits input string, delimited {@code delimiter} into a list of non-empty strings + * (skipping any empty string produced during splitting) */ - public static List toList(@Nullable String input) { - return isNullOrEmpty(input) ? new ArrayList<>() : STRING_TO_LIST.apply(input); + public static List split(@Nullable String input, String delimiter) { + if (isNullOrEmpty(input)) { + return Collections.emptyList(); + } + return Stream.of(input.split(delimiter)).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index 412b7e4a54cbb..899c2475da26c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -42,7 +42,7 @@ import org.apache.hudi.common.util.io.ByteBufferBackedInputStream; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.util.LazyRef; +import org.apache.hudi.util.Lazy; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -79,7 +79,7 @@ public class HoodieHFileReader implements HoodieFileRea private final Path path; - private final LazyRef schema; + private final Lazy schema; // NOTE: Reader is ONLY THREAD-SAFE for {@code Scanner} operating in Positional Read ("pread") // mode (ie created w/ "pread = true") @@ -110,8 +110,8 @@ public HoodieHFileReader(Path path, HFile.Reader reader, Option schemaOp // For shared scanner, which is primarily used for point-lookups, we're caching blocks // by default, to minimize amount of traffic to the underlying storage this.sharedScanner = getHFileScanner(reader, true); - this.schema = schemaOpt.map(LazyRef::eager) - .orElseGet(() -> LazyRef.lazy(() -> fetchSchema(reader))); + this.schema = schemaOpt.map(Lazy::eagerly) + .orElseGet(() -> Lazy.lazily(() -> fetchSchema(reader))); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 24ac935f88abf..3904ff6f832c9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -49,7 +49,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -64,6 +63,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.util.Lazy; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -74,11 +74,11 @@ import java.math.RoundingMode; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -94,6 +94,7 @@ import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema; import static org.apache.hudi.avro.HoodieAvroUtils.resolveNullableSchema; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper; import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; @@ -445,21 +446,24 @@ public static HoodieData convertMetadataToBloomFilterRecords( /** * Convert the clean action to metadata records. */ - public static Map> convertMetadataToRecords( - HoodieEngineContext engineContext, HoodieCleanMetadata cleanMetadata, - MetadataRecordsGenerationParams recordsGenerationParams, String instantTime) { + public static Map> convertMetadataToRecords(HoodieEngineContext engineContext, + HoodieCleanMetadata cleanMetadata, + MetadataRecordsGenerationParams recordsGenerationParams, + String instantTime) { final Map> partitionToRecordsMap = new HashMap<>(); final HoodieData filesPartitionRecordsRDD = engineContext.parallelize( convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { - final HoodieData metadataBloomFilterRecordsRDD = convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, recordsGenerationParams); + final HoodieData metadataBloomFilterRecordsRDD = + convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, recordsGenerationParams); partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); } if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { - final HoodieData metadataColumnStatsRDD = convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, recordsGenerationParams); + final HoodieData metadataColumnStatsRDD = + convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, recordsGenerationParams); partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } @@ -554,8 +558,9 @@ public static HoodieData convertMetadataToColumnStatsRecords(Hoodi HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); - List columnsToIndex = getColumnsToIndex(recordsGenerationParams, - dataTableMetaClient.getTableConfig(), tryResolveSchemaForTable(dataTableMetaClient)); + List columnsToIndex = + getColumnsToIndex(recordsGenerationParams, + Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); if (columnsToIndex.isEmpty()) { // In case there are no columns to index, bail @@ -597,7 +602,8 @@ public static Map> convertMetada } if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { - final HoodieData metadataColumnStatsRDD = convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams); + final HoodieData metadataColumnStatsRDD = + convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams); partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } return partitionToRecordsMap; @@ -803,7 +809,7 @@ private static List convertFilesToFilesPartitionRecords(Map convertFilesToColumnStatsRecords(HoodieEn HoodieData allRecordsRDD = engineContext.emptyHoodieData(); HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); - final List columnsToIndex = getColumnsToIndex(recordsGenerationParams, - dataTableMetaClient.getTableConfig(), tryResolveSchemaForTable(dataTableMetaClient)); + final List columnsToIndex = + getColumnsToIndex(recordsGenerationParams, + Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); if (columnsToIndex.isEmpty()) { // In case there are no columns to index, bail return engineContext.emptyHoodieData(); } - final List>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet() - .stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList()); - int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); - final HoodieData>> partitionToDeletedFilesRDD = engineContext.parallelize(partitionToDeletedFilesList, parallelism); + final List>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet().stream() + .map(e -> Pair.of(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + + int deletedFilesTargetParallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + final HoodieData>> partitionToDeletedFilesRDD = + engineContext.parallelize(partitionToDeletedFilesList, deletedFilesTargetParallelism); HoodieData deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> { - final String partitionName = partitionToDeletedFilesPair.getLeft(); - final String partition = getPartitionIdentifier(partitionName); + final String partitionPath = partitionToDeletedFilesPair.getLeft(); + final String partitionId = getPartitionIdentifier(partitionPath); final List deletedFileList = partitionToDeletedFilesPair.getRight(); return deletedFileList.stream().flatMap(deletedFile -> { - final String filePathWithPartition = partitionName + "/" + deletedFile; - return getColumnStatsRecords(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, true); + final String filePathWithPartition = partitionPath + "/" + deletedFile; + return getColumnStatsRecords(partitionId, filePathWithPartition, dataTableMetaClient, columnsToIndex, true); }).iterator(); }); + allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD); - final List>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet() - .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList()); - parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); - final HoodieData>> partitionToAppendedFilesRDD = engineContext.parallelize(partitionToAppendedFilesList, parallelism); + final List>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet().stream() + .map(entry -> Pair.of(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + + int appendedFilesTargetParallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + final HoodieData>> partitionToAppendedFilesRDD = + engineContext.parallelize(partitionToAppendedFilesList, appendedFilesTargetParallelism); HoodieData appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> { - final String partitionName = partitionToAppendedFilesPair.getLeft(); - final String partition = getPartitionIdentifier(partitionName); + final String partitionPath = partitionToAppendedFilesPair.getLeft(); + final String partitionId = getPartitionIdentifier(partitionPath); final Map appendedFileMap = partitionToAppendedFilesPair.getRight(); return appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthEntry -> { @@ -944,11 +958,11 @@ public static HoodieData convertFilesToColumnStatsRecords(HoodieEn || !appendedFileNameLengthEntry.getKey().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { return Stream.empty(); } - final String filePathWithPartition = partitionName + "/" + appendedFileNameLengthEntry.getKey(); - return getColumnStatsRecords(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, false); + final String filePathWithPartition = partitionPath + "/" + appendedFileNameLengthEntry.getKey(); + return getColumnStatsRecords(partitionId, filePathWithPartition, dataTableMetaClient, columnsToIndex, false); }).iterator(); - }); + allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD); return allRecordsRDD; @@ -1091,7 +1105,7 @@ public static HoodieData convertMetadataToColumnStatsRecords(Hoodi tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema); List columnsToIndex = getColumnsToIndex(recordsGenerationParams, - tableConfig, tableSchema); + Lazy.eagerly(tableSchema)); if (columnsToIndex.isEmpty()) { // In case there are no columns to index, bail @@ -1108,19 +1122,24 @@ public static HoodieData convertMetadataToColumnStatsRecords(Hoodi } /** - * Get the latest columns for the table for column stats indexing. + * Get the list of columns for the table for column stats indexing */ private static List getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams, - HoodieTableConfig tableConfig, - Option writerSchemaOpt) { - if (recordsGenParams.isAllColumnStatsIndexEnabled() && writerSchemaOpt.isPresent()) { - return writerSchemaOpt.get().getFields().stream() - .map(Schema.Field::name).collect(Collectors.toList()); + Lazy> lazyWriterSchemaOpt) { + checkState(recordsGenParams.isColumnStatsIndexEnabled()); + + List targetColumns = recordsGenParams.getTargetColumnsForColumnStatsIndex(); + if (!targetColumns.isEmpty()) { + return targetColumns; } - // In case no writer schema could be obtained we fall back to only index primary key - // columns - return Arrays.asList(tableConfig.getRecordKeyFields().get()); + Option writerSchemaOpt = lazyWriterSchemaOpt.get(); + return writerSchemaOpt + .map(writerSchema -> + writerSchema.getFields().stream() + .map(Schema.Field::name) + .collect(Collectors.toList())) + .orElse(Collections.emptyList()); } private static Stream translateWriteStatToColumnStats(HoodieWriteStat writeStat, @@ -1331,11 +1350,11 @@ private static boolean canCompare(Schema schema) { } public static Set getInflightMetadataPartitions(HoodieTableConfig tableConfig) { - return StringUtils.toSet(tableConfig.getMetadataPartitionsInflight()); + return new HashSet<>(tableConfig.getMetadataPartitionsInflight()); } public static Set getCompletedMetadataPartitions(HoodieTableConfig tableConfig) { - return StringUtils.toSet(tableConfig.getMetadataPartitions()); + return new HashSet<>(tableConfig.getMetadataPartitions()); } public static Set getInflightAndCompletedMetadataPartitions(HoodieTableConfig tableConfig) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java index d28be75586b82..72a8bf4cd26f8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java @@ -26,28 +26,33 @@ /** * Encapsulates all parameters required to generate metadata index for enabled index types. + * + * @deprecated this component currently duplicates configuration coming from the {@code HoodieWriteConfig} + * which is problematic; instead we should break this component down and use source of truth + * for each respective data-point directly ({@code HoodieWriteConfig}, {@code HoodieTableMetaClient}, etc) */ +@Deprecated public class MetadataRecordsGenerationParams implements Serializable { private final HoodieTableMetaClient dataMetaClient; private final List enabledPartitionTypes; private final String bloomFilterType; private final int bloomIndexParallelism; - private final boolean isAllColumnStatsIndexEnabled; + private final boolean isColumnStatsIndexEnabled; private final int columnStatsIndexParallelism; - private final List columnsToIndex; - private final List bloomSecondaryKeys; + private final List targetColumnsForColumnStatsIndex; + private final List targetColumnsForBloomFilterIndex; MetadataRecordsGenerationParams(HoodieTableMetaClient dataMetaClient, List enabledPartitionTypes, String bloomFilterType, int bloomIndexParallelism, - boolean isAllColumnStatsIndexEnabled, int columnStatsIndexParallelism, List columnsToIndex, List bloomSecondaryKeys) { + boolean isColumnStatsIndexEnabled, int columnStatsIndexParallelism, List targetColumnsForColumnStatsIndex, List targetColumnsForBloomFilterIndex) { this.dataMetaClient = dataMetaClient; this.enabledPartitionTypes = enabledPartitionTypes; this.bloomFilterType = bloomFilterType; this.bloomIndexParallelism = bloomIndexParallelism; - this.isAllColumnStatsIndexEnabled = isAllColumnStatsIndexEnabled; + this.isColumnStatsIndexEnabled = isColumnStatsIndexEnabled; this.columnStatsIndexParallelism = columnStatsIndexParallelism; - this.columnsToIndex = columnsToIndex; - this.bloomSecondaryKeys = bloomSecondaryKeys; + this.targetColumnsForColumnStatsIndex = targetColumnsForColumnStatsIndex; + this.targetColumnsForBloomFilterIndex = targetColumnsForBloomFilterIndex; } public HoodieTableMetaClient getDataMetaClient() { @@ -62,8 +67,8 @@ public String getBloomFilterType() { return bloomFilterType; } - public boolean isAllColumnStatsIndexEnabled() { - return isAllColumnStatsIndexEnabled; + public boolean isColumnStatsIndexEnabled() { + return isColumnStatsIndexEnabled; } public int getBloomIndexParallelism() { @@ -74,11 +79,11 @@ public int getColumnStatsIndexParallelism() { return columnStatsIndexParallelism; } - public List getColumnsToIndex() { - return columnsToIndex; + public List getTargetColumnsForColumnStatsIndex() { + return targetColumnsForColumnStatsIndex; } - public List getBloomSecondaryKeys() { - return bloomSecondaryKeys; + public List getSecondaryKeysForBloomFilterIndex() { + return targetColumnsForBloomFilterIndex; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/util/LazyRef.java b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java similarity index 64% rename from hudi-common/src/main/java/org/apache/hudi/util/LazyRef.java rename to hudi-common/src/main/java/org/apache/hudi/util/Lazy.java index e4c4f881fc65d..106969b70ff6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/util/LazyRef.java +++ b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java @@ -20,21 +20,25 @@ import java.util.function.Supplier; -// TODO java-doc -public class LazyRef { +/** + * Utility implementing lazy semantics in Java + * + * @param type of the object being held by {@link Lazy} + */ +public class Lazy { private volatile boolean initialized; private Supplier initializer; private T ref; - private LazyRef(Supplier initializer) { + private Lazy(Supplier initializer) { this.initializer = initializer; this.ref = null; this.initialized = false; } - private LazyRef(T ref) { + private Lazy(T ref) { this.initializer = null; this.ref = ref; this.initialized = true; @@ -54,11 +58,20 @@ public T get() { return ref; } - public static LazyRef lazy(Supplier initializer) { - return new LazyRef<>(initializer); + /** + * Executes provided {@code initializer} lazily, while providing for "exactly once" semantic, + * to instantiate value of type {@link T} being subsequently held by the returned instance of + * {@link Lazy} + */ + public static Lazy lazily(Supplier initializer) { + return new Lazy<>(initializer); } - public static LazyRef eager(T ref) { - return new LazyRef<>(ref); + /** + * Instantiates {@link Lazy} in an "eagerly" fashion setting it w/ the provided value of + * type {@link T} directly, bypassing lazy initialization sequence + */ + public static Lazy eagerly(T ref) { + return new Lazy<>(ref); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java index 83a84a3cc9cc5..5f1bcd3c066ef 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java @@ -22,9 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -69,18 +66,10 @@ public void testStringNullOrEmpty() { } @Test - public void testStringToSet() { - assertEquals(new HashSet<>(), StringUtils.toSet(null)); - assertEquals(new HashSet<>(), StringUtils.toSet("")); - Set expected = new HashSet<>(Arrays.asList("a", "b", "c")); - assertEquals(expected, StringUtils.toSet("a,b, c")); - } - - @Test - public void testStringToList() { - assertEquals(new ArrayList<>(), StringUtils.toList(null)); - assertEquals(new ArrayList<>(), StringUtils.toList("")); - List expected = Arrays.asList("a", "b", "c"); - assertEquals(expected, StringUtils.toList("a,b, c")); + public void testSplit() { + assertEquals(new ArrayList<>(), StringUtils.split(null, ",")); + assertEquals(new ArrayList<>(), StringUtils.split("", ",")); + assertEquals(Arrays.asList("a", "b", "c"), StringUtils.split("a,b, c", ",")); + assertEquals(Arrays.asList("a", "b", "c"), StringUtils.split("a,b,, c ", ",")); } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 309050094113a..feed6fd334062 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -351,7 +351,6 @@ class TestHoodieFileIndex extends HoodieClientTestBase { PRECOMBINE_FIELD.key -> "id", HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true", HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" ) @@ -375,8 +374,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", // NOTE: Metadata Table has to be enabled on the read path as well HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true" + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" ) val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index dcc34aa389caf..e3cde53951077 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -82,7 +82,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup PRECOMBINE_FIELD.key -> "c1", HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true", HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> forceFullLogScan.toString, HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala index d648471351e1f..11705f9eb1aa7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala @@ -51,8 +51,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn val metadataOpts: Map[String, String] = Map( HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true" + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" ) val combinedOpts: Map[String, String] = commonOpts ++ metadataOpts ++ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 8dcc3904d5451..a693bb4c65e47 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -926,7 +926,6 @@ public HoodieMetadataValidationContext( .enable(enableMetadataTable) .withMetadataIndexBloomFilter(enableMetadataTable) .withMetadataIndexColumnStats(enableMetadataTable) - .withMetadataIndexForAllColumns(enableMetadataTable) .withAssumeDatePartitioning(cfg.assumeDatePartitioning) .build(); this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,