Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public String createTable(

final HoodieTableType tableType = HoodieTableType.valueOf(tableTypeStr);
HoodieTableMetaClient.initTableType(HoodieCLI.conf, path, tableType, name, archiveFolder,
payloadClass, layoutVersion);
payloadClass, layoutVersion, null);

// Now connect to ensure loading works
return connect(path, layoutVersion, false, 0, 0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
private transient HoodieWriteCommitCallback commitCallback;
protected final boolean rollbackPending;
protected transient AsyncCleanerService asyncCleanerService;
protected HoodieTableMetaClient metaClient;

/**
* Create a write client, without cleaning up failed/inflight commits.
Expand Down Expand Up @@ -128,6 +129,7 @@ public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig
super(context, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
this.metaClient = createMetaClient(false);
this.index = createIndex(writeConfig);
}

Expand All @@ -153,8 +155,7 @@ public boolean commit(String instantTime, O writeStatuses) {
* Commit changes performed at the given instantTime marker.
*/
public boolean commit(String instantTime, O writeStatuses, Option<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = createMetaClient(false);
String actionType = metaClient.getCommitActionType();
String actionType = this.metaClient.getCommitActionType();
return commit(instantTime, writeStatuses, extraMetadata, actionType, Collections.emptyMap());
}

Expand Down Expand Up @@ -590,15 +591,15 @@ public String startCommit() {
* @param instantTime Instant time to be generated
*/
public void startCommitWithTime(String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
this.metaClient.reloadActiveTimeline();
startCommitWithTime(instantTime, metaClient.getCommitActionType(), metaClient);
}

/**
* Completes a new commit time for a write operation (insert/update/delete) with specified action.
*/
public void startCommitWithTime(String instantTime, String actionType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
this.metaClient.reloadActiveTimeline();
startCommitWithTime(instantTime, actionType, metaClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.Serializable;


/**
* Base class for different types of indexes to determine the mapping from uuid.
*
Expand Down Expand Up @@ -97,13 +98,18 @@ public abstract O updateLocation(O writeStatuses, HoodieEngineContext context,
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract boolean isImplicitWithStorage();

/**
* Get the index Type.
*/
public abstract IndexType indexType();

/**
* Each index type should implement it's own logic to release any resources acquired during the process.
*/
public void close() {
}

public enum IndexType {
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, CUSTOM
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -36,6 +42,7 @@
* Hoodie Index Utilities.
*/
public class HoodieIndexUtils {
private static final Logger LOG = LogManager.getLogger(HoodieIndexUtils.class);

/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
Expand Down Expand Up @@ -85,4 +92,56 @@ record = new HoodieRecord<>(inputRecord);
}
return record;
}

/**
* Check compatible between new writeIndexType and indexType already in hoodie.properties.
* @param writeIndexType new indexType
* @param persistIndexType indexType already in hoodie.properties
*/
public static void checkIndexTypeCompatible(IndexType writeIndexType, IndexType persistIndexType) {
boolean isTypeCompatible = false;
switch (persistIndexType) {
case GLOBAL_BLOOM:
isTypeCompatible = writeIndexType.equals(IndexType.GLOBAL_BLOOM)
|| writeIndexType.equals(IndexType.BLOOM)
|| writeIndexType.equals(IndexType.SIMPLE)
|| writeIndexType.equals(IndexType.GLOBAL_SIMPLE);
break;
case GLOBAL_SIMPLE:
isTypeCompatible = writeIndexType.equals(IndexType.GLOBAL_SIMPLE)
|| writeIndexType.equals(IndexType.GLOBAL_BLOOM)
|| writeIndexType.equals(IndexType.BLOOM)
|| writeIndexType.equals(IndexType.SIMPLE);
break;
case SIMPLE:
isTypeCompatible = writeIndexType.equals(IndexType.SIMPLE)
|| writeIndexType.equals(IndexType.BLOOM);
break;
case BLOOM:
isTypeCompatible = writeIndexType.equals(IndexType.BLOOM)
|| writeIndexType.equals(IndexType.SIMPLE);
break;
case INMEMORY:
isTypeCompatible = writeIndexType.equals(IndexType.INMEMORY);
LOG.warn("PersistIndexType INMEMORY can not be used in production");
break;
case HBASE:
isTypeCompatible = writeIndexType.equals(IndexType.HBASE);
break;
case CUSTOM:
isTypeCompatible = writeIndexType.equals(IndexType.CUSTOM);
break;
default:
throw new HoodieIndexException("Index type" + persistIndexType + " unspecified in properties file");
}
if (!isTypeCompatible) {
throw new HoodieIndexException("The new write indextype " + writeIndexType
+ " is not compatible with persistIndexType " + persistIndexType);
}
}

public static IndexType getIndexType(HoodieWriteConfig config) {
// if have index class use CUSTOM for type.
return StringUtils.isNullOrEmpty(config.getIndexClass()) ? config.getIndexType() : IndexType.CUSTOM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ protected List<WriteStatus> compact(String compactionInstantTime, boolean should

@Override
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
this.metaClient.reloadActiveTimeline();
new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
return getTableAndInitCtx(metaClient, operationType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,9 @@ public boolean canIndexLogFiles() {
public boolean isImplicitWithStorage() {
return false;
}

@Override
public IndexType indexType() {
return IndexType.INMEMORY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -99,7 +98,7 @@ public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clie
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient);
this.index = SparkHoodieIndex.createIndex(clientConfig);
this.index = this.hoodieTable.getIndex();
this.sqlContextOpt = Option.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
Expand Down Expand Up @@ -88,7 +91,18 @@ public static SparkConf registerClasses(SparkConf conf) {

@Override
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> createIndex(HoodieWriteConfig writeConfig) {
return SparkHoodieIndex.createIndex(config);
String persistIndexType = null;
try {
persistIndexType = this.metaClient.getTableConfig().getProperties().getProperty(HoodieIndexConfig.INDEX_TYPE_PROP);
} catch (TableNotFoundException e) {
persistIndexType = null;
}
HoodieIndex hoodieIndex = SparkHoodieIndex.createIndex(config);
if (persistIndexType != null) {
HoodieIndexUtils.checkIndexTypeCompatible(hoodieIndex.indexType(),
HoodieIndex.IndexType.valueOf(persistIndexType));
}
return hoodieIndex;
}

/**
Expand Down Expand Up @@ -258,7 +272,7 @@ protected JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata<JavaRDD<WriteStatus
}

postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());

emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
}
return result.getWriteStatuses();
Expand Down Expand Up @@ -315,7 +329,7 @@ protected JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean sho

@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
this.metaClient.reloadActiveTimeline();
new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
return getTableAndInitCtx(metaClient, operationType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public boolean isImplicitWithStorage() {
return false;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.INMEMORY;
}

/**
* Function that tags each HoodieRecord with an existing location, if known.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,12 @@ public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
return writeStatusRDD;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.BLOOM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,12 @@ protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
public boolean isGlobal() {
return true;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.GLOBAL_BLOOM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,14 @@ public boolean isImplicitWithStorage() {
return false;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.HBASE;
}

public void setHbaseConnection(Connection hbaseConnection) {
SparkHoodieHBaseIndex.hbaseConnection = hbaseConnection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,12 @@ private JavaRDD<HoodieRecord<T>> getTaggedRecords(JavaPairRDD<String, HoodieReco
public boolean isGlobal() {
return true;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.GLOBAL_SIMPLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,12 @@ protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchRecordLocations(Hood
return jsc.parallelize(baseFiles, fetchParallelism)
.flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile).locations());
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.SIMPLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.SparkHoodieIndex;

import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -67,6 +69,12 @@ public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieW

@Override
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return SparkHoodieIndex.createIndex(config);
String persistIndexType = this.metaClient.getTableConfig().getProperties().getProperty(HoodieIndexConfig.INDEX_TYPE_PROP);
HoodieIndex hoodieIndex = SparkHoodieIndex.createIndex(config);
HoodieIndex.IndexType indexType = hoodieIndex.indexType();
if (persistIndexType != null) {
HoodieIndexUtils.checkIndexTypeCompatible(indexType, HoodieIndex.IndexType.valueOf(persistIndexType));
}
return hoodieIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ private void testUpsertsInternal(HoodieWriteConfig config,
VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
metaClient.getTableConfig().getPayloadClass(), VERSION_0, hoodieWriteConfig.getIndexType().name());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);

// Write 1 (only inserts)
Expand Down Expand Up @@ -541,7 +541,7 @@ private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConf
.build()).withTimelineLayoutVersion(VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
metaClient.getTableConfig().getPayloadClass(), VERSION_0, indexType.name());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);

// Write 1
Expand Down
Loading