Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b71143d
[HUDI-3173] Add INDEX action type and corresponding commit metadata
codope Jan 6, 2022
3d0b5f0
Take lock and initialize filegroup while scheduling
codope Mar 12, 2022
93d8b17
Support indexing subset of columns
codope Mar 14, 2022
03600ac
[HUDI-3368] Add support for bloom index for secondary keys
codope Mar 14, 2022
e9c528d
[HUDI-3382] Add support for drop index in metadata writer
codope Mar 14, 2022
ab0b369
Handle upgrade downgrade and consider archival timeline
codope Mar 17, 2022
b4d4100
Add drop index action to utility and avoid fs.exists check
codope Mar 21, 2022
3b85bb0
Minor fix for empty partition path
codope Mar 24, 2022
3e37433
Minor fix for no columns configured
codope Mar 24, 2022
808934d
Fix active timeline test
codope Mar 24, 2022
2b4871b
Take lock before writing index completed to data timeline
codope Mar 25, 2022
80fee23
Add test for indexer with continuous deltastreamer
codope Mar 25, 2022
6d6178c
Address feedback from second review
codope Mar 28, 2022
a8ab116
Cleanup and fix one bug in index catchup
codope Mar 29, 2022
d25a8fb
Update table configs for files partition
codope Mar 29, 2022
010de76
Fix processAndCommit to consider partitions from table config
codope Mar 29, 2022
a3ee4cd
Tidying up, renames, refactoring
codope Mar 30, 2022
514c051
- Handle corner cases related to partial failures.
codope Mar 30, 2022
18b9acd
Check for existing indexes in HoodieIndexer
codope Mar 30, 2022
fc9ac46
Rename index_action and other nits
codope Mar 30, 2022
01120c1
Address some minors from last pass
codope Mar 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.cli.commands;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.cli.DeDupeType;
import org.apache.hudi.cli.DedupeSparkJob;
Expand Down Expand Up @@ -52,9 +51,10 @@
import org.apache.hudi.utilities.HoodieCompactionAdminTool;
import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation;
import org.apache.hudi.utilities.HoodieCompactor;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
Expand All @@ -65,6 +65,12 @@
import java.util.List;
import java.util.Locale;

import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.buildProperties;
import static org.apache.hudi.utilities.UtilHelpers.readConfig;

/**
* This class deals with initializing spark context based on command entered to hudi-cli.
*/
Expand Down Expand Up @@ -192,7 +198,7 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2],
Integer.parseInt(args[7]), HoodieClusteringJob.EXECUTE, propsFilePath, configs);
Integer.parseInt(args[7]), EXECUTE, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE_AND_EXECUTE:
assert (args.length >= 8);
Expand All @@ -205,7 +211,7 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(8, args.length));
}
returnCode = cluster(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[2],
Integer.parseInt(args[6]), HoodieClusteringJob.SCHEDULE_AND_EXECUTE, propsFilePath, configs);
Integer.parseInt(args[6]), SCHEDULE_AND_EXECUTE, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE:
assert (args.length >= 7);
Expand All @@ -218,7 +224,7 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(7, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2],
0, HoodieClusteringJob.SCHEDULE, propsFilePath, configs);
0, SCHEDULE, propsFilePath, configs);
break;
case CLEAN:
assert (args.length >= 5);
Expand Down Expand Up @@ -411,8 +417,8 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta
String bootstrapIndexClass, String selectorClass, String keyGenerator, String fullBootstrapInputProvider,
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {

TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
: UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true);
TypedProperties properties = propsFilePath == null ? buildProperties(configs)
: readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true);

properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
Expand Down Expand Up @@ -62,11 +64,13 @@
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -400,7 +404,6 @@ protected void rollbackFailedBootstrap() {
public abstract O bulkInsert(I records, final String instantTime,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);


/**
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk loads into a Hoodie
* table for the very first time (e.g: converting an existing table to Hoodie). The input records should contain no
Expand Down Expand Up @@ -925,6 +928,53 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
}


/**
* Schedules INDEX action.
*
* @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed
* @return instant time for the requested INDEX action
*/
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this api also take additional args for what kind of indexes to build?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consistent use of indexing vs index

String instantTime = HoodieActiveTimeline.createNewInstantTime();
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if someone tries to trigger indexing twice? I expect we would fail the 2nd trigger conveying that already an indexing is in progress

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we check table config to see inflight/completed indexes and this would return false in case triggered twice.

.scheduleIndexing(context, instantTime, partitionTypes);
return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
}

/**
* Runs INDEX action to build out the metadata partitions as planned for the given instant time.
*
* @param indexInstantTime - instant time for the requested INDEX action
* @return {@link Option<HoodieIndexCommitMetadata>} after successful indexing.
*/
public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
return createTable(config, hadoopConf, config.isMetadataTableEnabled()).index(context, indexInstantTime);
}

/**
* Drops the index and removes the metadata partitions.
*
* @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed
*/
public void dropIndex(List<MetadataPartitionType> partitionTypes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there tests for these APIs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add a test for dropIndex.. the scheduleIndex and buildIndex APIs are covered in a deltastreamer test. i'll add more failure scenarios in TestHoodieIndexer.

HoodieTable table = createTable(config, hadoopConf);
String dropInstant = HoodieActiveTimeline.createNewInstantTime();
this.txnManager.beginTransaction();
try {
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table");
table.getMetadataWriter(dropInstant).ifPresent(w -> {
try {
((HoodieTableMetadataWriter) w).dropMetadataPartitions(partitionTypes);
} catch (IOException e) {
throw new HoodieIndexException("Failed to drop metadata index. ", e);
}
});
} finally {
this.txnManager.endTransaction();
}
}

/**
* Performs Compaction for the workload stored in instant-time.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1507,8 +1507,20 @@ public boolean isMetadataBloomFilterIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isBloomFilterIndexEnabled();
}

public boolean isMetadataIndexColumnStatsForAllColumnsEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isMetadataColumnStatsIndexForAllColumnsEnabled();
public boolean isMetadataColumnStatsIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isColumnStatsIndexEnabled();
}

public String getColumnsEnabledForColumnStatsIndex() {
return getMetadataConfig().getColumnsEnabledForColumnStatsIndex();
}

public String getColumnsEnabledForBloomFilterIndex() {
return getMetadataConfig().getColumnsEnabledForBloomFilterIndex();
}

public int getIndexingCheckTimeoutSeconds() {
return getMetadataConfig().getIndexingCheckTimeoutSeconds();
}

public int getColumnStatsIndexParallelism() {
Expand Down Expand Up @@ -1892,6 +1904,10 @@ public boolean isMetadataAsyncClean() {
return getBoolean(HoodieMetadataConfig.ASYNC_CLEAN_ENABLE);
}

public boolean isMetadataAsyncIndex() {
return getBooleanOrDefault(HoodieMetadataConfig.ASYNC_INDEX_ENABLE);
}

public int getMetadataMaxCommitsToKeep() {
return getInt(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList;
if (config.getBloomIndexPruneByRanges()) {
fileInfoList = (config.getMetadataConfig().isColumnStatsIndexEnabled()
fileInfoList = (config.isMetadataColumnStatsIndexEnabled()
? loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable)
: loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
Expand All @@ -69,8 +71,10 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.accumulateColumnRanges;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.aggregateColumnStats;
Expand Down Expand Up @@ -343,16 +347,27 @@ private void processAppendResult(AppendResult result, List<IndexedRecord> record
updateWriteStatus(stat, result);
}

if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
if (config.isMetadataColumnStatsIndexEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nts: follow up on all this code. needs to be more modular.

final List<Schema.Field> fieldsToIndex;
if (!StringUtils.isNullOrEmpty(config.getColumnsEnabledForColumnStatsIndex())) {
Set<String> columnsToIndex = Stream.of(config.getColumnsEnabledForColumnStatsIndex().split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
fieldsToIndex = writeSchemaWithMetaFields.getFields().stream()
.filter(field -> columnsToIndex.contains(field.name())).collect(Collectors.toList());
} else {
// if column stats index is enabled but columns not configured then we assume that all columns should be indexed
fieldsToIndex = writeSchemaWithMetaFields.getFields();
}

Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()
? stat.getRecordsStats().get().getStats() : new HashMap<>();
final String filePath = stat.getPath();
// initialize map of column name to map of stats name to stats value
Map<String, Map<String, Object>> columnToStats = new HashMap<>();
writeSchemaWithMetaFields.getFields().forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>()));
fieldsToIndex.forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>()));
// collect stats for columns at once per record and keep iterating through every record to eventually find col stats for all fields.
recordList.forEach(record -> aggregateColumnStats(record, writeSchemaWithMetaFields, columnToStats, config.isConsistentLogicalTimestampEnabled()));
writeSchemaWithMetaFields.getFields().forEach(field -> accumulateColumnRanges(field, filePath, columnRangeMap, columnToStats));
recordList.forEach(record -> aggregateColumnStats(record, fieldsToIndex, columnToStats, config.isConsistentLogicalTimestampEnabled()));
fieldsToIndex.forEach(field -> accumulateColumnRanges(field, filePath, columnRangeMap, columnToStats));
stat.setRecordsStats(new HoodieDeltaWriteStat.RecordsStats<>(columnRangeMap));
}

Expand Down
Loading