Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1557,11 +1557,11 @@ public boolean isMetadataColumnStatsIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isColumnStatsIndexEnabled();
}

public String getColumnsEnabledForColumnStatsIndex() {
public List<String> getColumnsEnabledForColumnStatsIndex() {
return getMetadataConfig().getColumnsEnabledForColumnStatsIndex();
}

public String getColumnsEnabledForBloomFilterIndex() {
public List<String> getColumnsEnabledForBloomFilterIndex() {
return getMetadataConfig().getColumnsEnabledForBloomFilterIndex();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -66,14 +65,14 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;

Expand Down Expand Up @@ -348,15 +347,16 @@ private void processAppendResult(AppendResult result, List<IndexedRecord> record

if (config.isMetadataColumnStatsIndexEnabled()) {
final List<Schema.Field> fieldsToIndex;
if (StringUtils.isNullOrEmpty(config.getColumnsEnabledForColumnStatsIndex())) {
// If column stats index is enabled but columns not configured then we assume that all columns should be indexed
// If column stats index is enabled but columns not configured then we assume that
// all columns should be indexed
if (config.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
fieldsToIndex = writeSchemaWithMetaFields.getFields();
} else {
Set<String> columnsToIndex = Stream.of(config.getColumnsEnabledForColumnStatsIndex().split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
Set<String> columnsToIndexSet = new HashSet<>(config.getColumnsEnabledForColumnStatsIndex());

fieldsToIndex = writeSchemaWithMetaFields.getFields().stream()
.filter(field -> columnsToIndex.contains(field.name())).collect(Collectors.toList());
.filter(field -> columnsToIndexSet.contains(field.name()))
.collect(Collectors.toList());
}

Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.hudi.metadata;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieInstantInfo;
Expand Down Expand Up @@ -55,7 +60,6 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
Expand All @@ -66,12 +70,6 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -729,12 +727,14 @@ public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartition

private MetadataRecordsGenerationParams getRecordsGenerationParams() {
return new MetadataRecordsGenerationParams(
dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataMetaClient,
enabledPartitionTypes,
dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getMetadataBloomFilterIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(),
StringUtils.toList(dataWriteConfig.getColumnsEnabledForColumnStatsIndex()),
StringUtils.toList(dataWriteConfig.getColumnsEnabledForBloomFilterIndex()));
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getColumnsEnabledForBloomFilterIndex());
}

/**
Expand Down Expand Up @@ -1021,6 +1021,7 @@ private void initialCommit(String createInstantTime, List<MetadataPartitionType>
})
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());

if (partitionTypes.contains(MetadataPartitionType.FILES)) {
Expand All @@ -1031,19 +1032,19 @@ private void initialCommit(String createInstantTime, List<MetadataPartitionType>
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
}

if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) {
final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
}

if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) {
final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
}

LOG.info("Committing " + partitions.size() + " partitions and " + partitionToFilesMap.values().size() + " files to metadata");
LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata");

commit(createInstantTime, partitionToRecordsMap, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.hudi.table;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
Expand Down Expand Up @@ -60,7 +65,6 @@
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -78,12 +82,6 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -888,7 +886,7 @@ private boolean shouldExecuteMetadataTableDeletion() {
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
&& !config.isMetadataTableEnabled()
&& (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
|| !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions()));
|| !metaClient.getTableConfig().getMetadataPartitions().isEmpty());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

package org.apache.hudi.client.functional;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
Expand Down Expand Up @@ -70,7 +78,6 @@
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
Expand Down Expand Up @@ -98,15 +105,6 @@
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
Expand Down Expand Up @@ -362,7 +360,7 @@ public void testTurnOffMetadataTableAfterEnable() throws Exception {

HoodieTableConfig hoodieTableConfig2 =
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass());
assertEquals(StringUtils.EMPTY_STRING, hoodieTableConfig2.getMetadataPartitions());
assertEquals(Collections.emptyList(), hoodieTableConfig2.getMetadataPartitions());
// Assert metadata table folder is deleted
assertFalse(metaClient.getFs().exists(
new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath()))));
Expand Down Expand Up @@ -623,7 +621,6 @@ public void testTableOperationsWithMetadataIndex(HoodieTableType tableType) thro
.withMetadataIndexBloomFilterFileGroups(4)
.withMetadataIndexColumnStats(true)
.withMetadataIndexBloomFilterFileGroups(2)
.withMetadataIndexForAllColumns(true)
.build())
.build();
init(tableType, writeConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class HoodieConfig implements Serializable {

private static final Logger LOG = LogManager.getLogger(HoodieConfig.class);

protected static final String CONFIG_VALUES_DELIMITER = ",";

public static HoodieConfig create(FSDataInputStream inputStream) throws IOException {
HoodieConfig config = new HoodieConfig();
config.props.load(inputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.hudi.common.config;

import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieNotSupportedException;

import javax.annotation.concurrent.Immutable;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import java.util.Properties;

/**
Expand Down Expand Up @@ -173,15 +175,6 @@ public final class HoodieMetadataConfig extends HoodieConfig {
+ "log files and read parallelism in the column stats index partition. The recommendation is to size the "
+ "file group count such that the base files are under 1GB.");

public static final ConfigProperty<Boolean> ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS = ConfigProperty
.key(METADATA_PREFIX + ".index.column.stats.all_columns.enable")
.defaultValue(true)
.sinceVersion("0.11.0")
.withDocumentation("Enable indexing column ranges of user data files for all columns under "
+ "metadata table key lookups. When enabled, metadata table will have a partition to "
+ "store the column ranges and will be used for pruning files during the index lookups. "
+ "Only applies if " + ENABLE_METADATA_INDEX_COLUMN_STATS.key() + " is enabled.");

public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_PARALLELISM = ConfigProperty
.key(METADATA_PREFIX + ".index.column.stats.parallelism")
.defaultValue(10)
Expand Down Expand Up @@ -249,16 +242,12 @@ public boolean isColumnStatsIndexEnabled() {
return getBooleanOrDefault(ENABLE_METADATA_INDEX_COLUMN_STATS);
}

public boolean isMetadataColumnStatsIndexForAllColumnsEnabled() {
return getBooleanOrDefault(ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS);
}

public String getColumnsEnabledForColumnStatsIndex() {
return getString(COLUMN_STATS_INDEX_FOR_COLUMNS);
public List<String> getColumnsEnabledForColumnStatsIndex() {
return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER);
}

public String getColumnsEnabledForBloomFilterIndex() {
return getString(BLOOM_FILTER_INDEX_FOR_COLUMNS);
public List<String> getColumnsEnabledForBloomFilterIndex() {
return StringUtils.split(getString(BLOOM_FILTER_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER);
}

public int getBloomFilterIndexFileGroupCount() {
Expand Down Expand Up @@ -353,11 +342,6 @@ public Builder withColumnStatsIndexParallelism(int parallelism) {
return this;
}

public Builder withMetadataIndexForAllColumns(boolean enable) {
metadataConfig.setValue(ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS, String.valueOf(enable));
return this;
}

public Builder withColumnStatsIndexForColumns(String columns) {
metadataConfig.setValue(COLUMN_STATS_INDEX_FOR_COLUMNS, columns);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,12 +618,18 @@ private Long getTableChecksum() {
return getLong(TABLE_CHECKSUM);
}

public String getMetadataPartitionsInflight() {
return getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING);
}

public String getMetadataPartitions() {
return getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING);
public List<String> getMetadataPartitionsInflight() {
return StringUtils.split(
getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING),
CONFIG_VALUES_DELIMITER
);
}

public List<String> getMetadataPartitions() {
return StringUtils.split(
getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING),
CONFIG_VALUES_DELIMITER
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@
package org.apache.hudi.common.util;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -34,10 +30,6 @@
public class StringUtils {

public static final String EMPTY_STRING = "";
private static final Function<String, Set<String>> STRING_TO_SET = (str) -> Stream.of(str.split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
private static final Function<String, List<String>> STRING_TO_LIST = (str) -> Stream.of(str.split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());

/**
* <p>
Expand Down Expand Up @@ -114,22 +106,13 @@ private static boolean stringIsNullOrEmpty(@Nullable String string) {
}

/**
* Converts the input string, delimited by comma, to a set of strings.
*
* @param input
* @return
*/
public static Set<String> toSet(@Nullable String input) {
return isNullOrEmpty(input) ? new HashSet<>() : STRING_TO_SET.apply(input);
}

/**
* Converts the input string, delimited by comma, to a list of strings.
*
* @param input
* @return
* Splits input string, delimited {@code delimiter} into a list of non-empty strings
* (skipping any empty string produced during splitting)
*/
public static List<String> toList(@Nullable String input) {
return isNullOrEmpty(input) ? new ArrayList<>() : STRING_TO_LIST.apply(input);
public static List<String> split(@Nullable String input, String delimiter) {
if (isNullOrEmpty(input)) {
return Collections.emptyList();
}
return Stream.of(input.split(delimiter)).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
}
}
Loading