Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ public class HoodieIndexConfig extends HoodieConfig {
.key("hoodie.simple.index.update.partition.path")
.defaultValue("false")
.withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH + ", but for simple index.");


public static final ConfigProperty<Boolean> RANGE_INDEX_ENABLED = ConfigProperty
.key("hoodie.range.index.enabled")
.defaultValue(false)
.withDocumentation("Enable storing range index");

private EngineType engineType;

Expand Down Expand Up @@ -245,6 +251,11 @@ public Builder withHBaseIndexConfig(HoodieHBaseIndexConfig hBaseIndexConfig) {
return this;
}

public Builder withRangeIndexEnabled(boolean enableRangeIndex) {
hoodieIndexConfig.setValue(RANGE_INDEX_ENABLED, String.valueOf(enableRangeIndex));
return this;
}

public Builder bloomFilterNumEntries(int numEntries) {
hoodieIndexConfig.setValue(BLOOM_FILTER_NUM_ENTRIES, String.valueOf(numEntries));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. "
+ "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data");

public static final ConfigProperty<Boolean> TABLE_REQUIRES_SORTING = ConfigProperty
.key("hoodie.table.sort.key")
.defaultValue(false)
.withDocumentation("optional configuration to require sorting based on key");

private ConsistencyGuardConfig consistencyGuardConfig;

// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
Expand Down Expand Up @@ -933,6 +938,13 @@ public boolean getGlobalSimpleIndexUpdatePartitionPath() {
return getBoolean(HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH);
}

/**
* Secondary index config.
*/
public Boolean getIsRangeIndexEnabled() {
return getBoolean(HoodieIndexConfig.RANGE_INDEX_ENABLED);
}

/**
* storage properties.
*/
Expand Down Expand Up @@ -1304,6 +1316,10 @@ public String getPreCommitValidatorInequalitySqlQueries() {
public boolean allowEmptyCommit() {
return getBooleanOrDefault(ALLOW_EMPTY_COMMIT);
}

public boolean requiresSorting() {
return getBooleanOrDefault(TABLE_REQUIRES_SORTING);
}

public static class Builder {

Expand Down Expand Up @@ -1611,6 +1627,11 @@ public Builder withPopulateMetaFields(boolean populateMetaFields) {
return this;
}

public Builder withRequiresSorting(boolean tableNeedsSorting) {
writeConfig.setValue(HoodieWriteConfig.TABLE_REQUIRES_SORTING, Boolean.toString(tableNeedsSorting));
return this;
}

public Builder withProperties(Properties properties) {
this.writeConfig.getProps().putAll(properties);
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.index;

import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import java.io.Serializable;

/**
* Base class for different types of secondary indexes.
* This is different from primary index because
* a) index lookup operation is very different (using predicates instead of sending all keys).
* b) There can be multiple secondary index configured for same table. (one column may use range index/some other column may use bloom index)
*
* @param <T> Sub type of HoodieRecordPayload
* @param <I> Type of inputs
* @param <K> Type of keys
* @param <O> Type of outputs
*/
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public abstract class HoodieSecondaryIndex<T extends HoodieRecordPayload, I, K, O> implements Serializable {

protected final HoodieWriteConfig config;

protected final HoodieEngineContext engineContext;

protected HoodieSecondaryIndex(HoodieWriteConfig config, HoodieEngineContext engineContext) {
this.config = config;
this.engineContext = engineContext;
}

/**
* Update index to quickly identify records.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract void updateIndex(HoodieWriteMetadata<O> writeMetadata, String instantTime,
HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException;

/**
* Rollback the effects of the commit made at instantTime.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract boolean rollbackCommit(String instantTime);

/**
* TODO figure out the signature for this
public abstract List<HoodieFileGroup> findMatchingDataFiles(List<Predicate> predicates)
*/
/**
* Each index type should implement it's own logic to release any resources acquired during the process.
*/
public void close() {
}

public enum SecondaryIndexType {
/* TODO: also support multiple secondary index on same table - different columns can have different values */
RANGE,
BLOOM, /* TODO: we can leverage same bloom index used for primary key */
CUCKOO, /* TODO: implementation, For medium cardinality, cuckoo index likely performs better than bloom index https://www.vldb.org/pvldb/vol13/p3559-kipf.pdf */
POINT /* TODO: Finding exact value quickly. maybe useful for primary key lookups too. */
}

public HoodieWriteConfig getWriteConfig() {
return this.config;
}

public HoodieEngineContext getEngineContext() {
return this.engineContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.metadata;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
Expand All @@ -29,6 +33,7 @@
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -51,16 +56,13 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -98,10 +100,17 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteC
this.hadoopConf = new SerializableConfiguration(hadoopConf);

if (writeConfig.useFileListingMetadata()) {
enabled = true;
} else if (writeConfig.getIsRangeIndexEnabled()) {
enabled = true;
} else {
enabled = false;
this.metrics = Option.empty();
}

if (enabled) {
this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
this.metadataWriteConfig = createMetadataWriteConfig(writeConfig);
enabled = true;

// Inline compaction and auto clean is required as we dont expose this table outside
ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table.");
Expand All @@ -112,18 +121,13 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteC
initRegistry();
HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
initialize(engineContext, datasetMetaClient);
if (enabled) {
// This is always called even in case the table was created for the first time. This is because
// initFromFilesystem() does file listing and hence may take a long time during which some new updates
// may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync
// with the active timeline.
HoodieTimer timer = new HoodieTimer().startTimer();
syncFromInstants(datasetMetaClient);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR, timer.endTimer()));
}
} else {
enabled = false;
this.metrics = Option.empty();
// This is always called even in case the table was created for the first time. This is because
// initFromFilesystem() does file listing and hence may take a long time during which some new updates
// may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync
// with the active timeline.
HoodieTimer timer = new HoodieTimer().startTimer();
syncFromInstants(datasetMetaClient);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR, timer.endTimer()));
}
}

Expand Down Expand Up @@ -169,6 +173,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
.withRequiresSorting(true)
.withFinalizeWriteParallelism(parallelism);

if (writeConfig.isMetricsOn()) {
Expand Down Expand Up @@ -283,15 +288,16 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
// We can only bootstrap if there are no pending operations on the dataset
Option<HoodieInstant> pendingInstantOption = Option.fromJavaOptional(datasetMetaClient.getActiveTimeline()
.getReverseOrderedInstants().filter(i -> !i.isCompleted()).findFirst());
if (pendingInstantOption.isPresent()) {
// This filterCompletedInstants.firstInstant.isPresent check is a hack to make 'quick start' work when metadata is enabled.
if (datasetMetaClient.getCommitsTimeline().filterCompletedInstants().firstInstant().isPresent() && pendingInstantOption.isPresent()) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
LOG.warn("Cannot bootstrap metadata table as operation is in progress: " + pendingInstantOption.get());
return false;
}

// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the latest commit timestamp.
String createInstantTime = datasetMetaClient.getActiveTimeline().getReverseOrderedInstants().findFirst()
String createInstantTime = datasetMetaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst()
.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);

Expand Down Expand Up @@ -336,8 +342,18 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
});

Map<String, Collection<HoodieColumnRangeMetadata<Comparable>>> rangeStats;
if (datasetWriteConfig.getIsRangeIndexEnabled()) {
List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(entry -> entry.stream()).collect(Collectors.toList());
//TODO this result may be too large to fit into memory. We may need to convert to dataframe and move this to spark-client
rangeStats = HoodieTableMetadataUtil.createRangeIndexInfoFromWriteStats(engineContext, datasetMetaClient, allWriteStats);
} else {
rangeStats = Collections.emptyMap();
}

LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
update(commitMetadata, rangeStats, createInstantTime);
return true;
}

Expand Down Expand Up @@ -379,6 +395,8 @@ private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMet

if (p.getRight().length > filesInDir.size()) {
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), p.getLeft());
LOG.info("Processing partition " + partitionName + " total files: " + filesInDir.size());

// deal with Non-partition table, we should exclude .hoodie
partitionToFileStatus.put(partitionName, filesInDir.stream()
.filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
Expand Down Expand Up @@ -416,10 +434,15 @@ private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
for (HoodieInstant instant : instantsToSync) {
LOG.info("Syncing instant " + instant + " to metadata table");

Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime());
if (records.isPresent()) {
commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp());
//TODO: Can we combine below two lines and generate proper RDD/Dataframe here here? we may have to move this to hudi-spark-client
Option<List<HoodieRecord>> fileRecords = HoodieTableMetadataUtil.convertInstantToFilesMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime());
List<HoodieRecord> rangeIndexRecords;
if (datasetWriteConfig.getIsRangeIndexEnabled()) {
rangeIndexRecords = HoodieTableMetadataUtil.convertInstantToRangeMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime());
} else {
rangeIndexRecords = Collections.emptyList();
}
commit(fileRecords.orElse(Collections.emptyList()), rangeIndexRecords, instant.getTimestamp());
}
initTableMetadata();
} catch (IOException ioe) {
Expand Down Expand Up @@ -497,6 +520,29 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime)
}
}

/**
* Update column range info for new files created by instant.
*/
@Override
public void update(Map<String, Collection<HoodieColumnRangeMetadata<Comparable>>> fileToColumnRangeInfo, String instantTime) {
if (enabled) {
List<HoodieRecord> rangeIndexRecords = HoodieTableMetadataUtil.convertMetadataToRecords(fileToColumnRangeInfo, instantTime, metadata.getSyncedInstantTime());
commit(Collections.emptyList(), rangeIndexRecords, instantTime);
}
}

public void update(HoodieCommitMetadata commitMetadata, Map<String, Collection<HoodieColumnRangeMetadata<Comparable>>> fileToColumnRangeInfo, String instantTime) {
List<HoodieRecord> fileListRecords = HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime);
List<HoodieRecord> rangeIndexRecords;
if (!fileToColumnRangeInfo.isEmpty()) {
rangeIndexRecords = HoodieTableMetadataUtil.convertMetadataToRecords(fileToColumnRangeInfo, instantTime, metadata.getSyncedInstantTime());
} else {
rangeIndexRecords = Collections.emptyList();
}

commit(fileListRecords, rangeIndexRecords, instantTime);
}

@Override
public void close() throws Exception {
if (metadata != null) {
Expand All @@ -509,4 +555,7 @@ public void close() throws Exception {
*
*/
protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime);

protected abstract void commit(List<HoodieRecord> fileListRecords, List<HoodieRecord> rangeIndexRecords, String instantTime);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.Option;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;

/**
* Interface that supports updating metadata for a given table, as actions complete.
Expand All @@ -42,6 +45,8 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {

void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);

void update(Map<String, Collection<HoodieColumnRangeMetadata<Comparable>>> fileToColumnRangeInfo, String instantTime);

/**
* Return the timestamp of the latest instant synced to the metadata table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ public String getBaseFileExtension() {
}

public boolean requireSortedRecords() {
return getBaseFileFormat() == HoodieFileFormat.HFILE;
return getBaseFileFormat() == HoodieFileFormat.HFILE || config.requiresSorting();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satishkotha : may I know why we need this extra condition. base file is always going to be HFile for metadata table right? Or do we have plans for any other partition where base file will not be Hfile in metadata table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan We were initially considering parquet file format as an option of range index. So I added it as a config to support sorting on parquet files in metadata table. Now that we have finalized HFile format (at least for short term), we can probably remove config. Sorry for delay, I missed your mention. Feel free to ping me on slack if you have any followups

}

public HoodieEngineContext getContext() {
Expand Down
Loading