Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public String createTable(

final HoodieTableType tableType = HoodieTableType.valueOf(tableTypeStr);
HoodieTableMetaClient.initTableType(HoodieCLI.conf, path, tableType, name, archiveFolder,
payloadClass, layoutVersion);
payloadClass, layoutVersion, null);

// Now connect to ensure loading works
return connect(path, layoutVersion, false, 0, 0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.Serializable;


/**
* Base class for different types of indexes to determine the mapping from uuid.
*
Expand Down Expand Up @@ -97,13 +98,18 @@ public abstract O updateLocation(O writeStatuses, HoodieEngineContext context,
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract boolean isImplicitWithStorage();

/**
* Get the index Type.
*/
public abstract IndexType indexType();

/**
* Each index type should implement it's own logic to release any resources acquired during the process.
*/
public void close() {
}

public enum IndexType {
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, CUSTOM
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -36,6 +42,7 @@
* Hoodie Index Utilities.
*/
public class HoodieIndexUtils {
private static final Logger LOG = LogManager.getLogger(HoodieIndexUtils.class);

/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
Expand Down Expand Up @@ -85,4 +92,56 @@ record = new HoodieRecord<>(inputRecord);
}
return record;
}

/**
* Check compatible between new writeIndexType and indexType already in hoodie.properties.
* @param writeIndexType new indexType
* @param persistIndexType indexType already in hoodie.properties
*/
public static void checkIndexTypeCompatible(IndexType writeIndexType, IndexType persistIndexType) {
boolean isTypeCompatible = false;
switch (persistIndexType) {
case GLOBAL_BLOOM:
isTypeCompatible = writeIndexType.equals(IndexType.GLOBAL_BLOOM)
|| writeIndexType.equals(IndexType.BLOOM)
|| writeIndexType.equals(IndexType.SIMPLE)
|| writeIndexType.equals(IndexType.GLOBAL_SIMPLE);
break;
case GLOBAL_SIMPLE:
isTypeCompatible = writeIndexType.equals(IndexType.GLOBAL_SIMPLE)
|| writeIndexType.equals(IndexType.GLOBAL_BLOOM)
|| writeIndexType.equals(IndexType.BLOOM)
|| writeIndexType.equals(IndexType.SIMPLE);
break;
case SIMPLE:
isTypeCompatible = writeIndexType.equals(IndexType.SIMPLE)
|| writeIndexType.equals(IndexType.BLOOM);
break;
case BLOOM:
isTypeCompatible = writeIndexType.equals(IndexType.BLOOM)
|| writeIndexType.equals(IndexType.SIMPLE);
break;
case INMEMORY:
isTypeCompatible = writeIndexType.equals(IndexType.INMEMORY);
LOG.error("PersistIndexType INMEMORY can not be used in production");
break;
case HBASE:
isTypeCompatible = writeIndexType.equals(IndexType.HBASE);
break;
case CUSTOM:
isTypeCompatible = writeIndexType.equals(IndexType.CUSTOM);
break;
default:
throw new HoodieIndexException("Index type" + persistIndexType + " unspecified in properties file");
}
if (!isTypeCompatible) {
throw new HoodieIndexException("The new write indextype " + writeIndexType
+ " is not compatible with persistIndexType " + persistIndexType);
}
}

public static IndexType getIndexType(HoodieWriteConfig config) {
// if have index class use CUSTOM for type.
return StringUtils.isNullOrEmpty(config.getIndexClass()) ? config.getIndexType() : IndexType.CUSTOM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,9 @@ public boolean canIndexLogFiles() {
public boolean isImplicitWithStorage() {
return false;
}

@Override
public IndexType indexType() {
return IndexType.INMEMORY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -99,7 +98,7 @@ public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clie
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient);
this.index = SparkHoodieIndex.createIndex(clientConfig);
this.index = this.hoodieTable.getIndex();
this.sqlContextOpt = Option.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
Expand Down Expand Up @@ -88,7 +91,18 @@ public static SparkConf registerClasses(SparkConf conf) {

@Override
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> createIndex(HoodieWriteConfig writeConfig) {
return SparkHoodieIndex.createIndex(config);
String persistIndexType = null;
try {
persistIndexType = this.createMetaClient(false).getTableConfig().getProperties().getProperty(HoodieIndexConfig.INDEX_TYPE_PROP);
} catch (TableNotFoundException e) {
persistIndexType = null;
}
HoodieIndex hoodieIndex = SparkHoodieIndex.createIndex(config);
if (persistIndexType != null) {
HoodieIndexUtils.checkIndexTypeCompatible(hoodieIndex.indexType(),
HoodieIndex.IndexType.valueOf(persistIndexType));
}
return hoodieIndex;
}

/**
Expand Down Expand Up @@ -241,7 +255,7 @@ protected JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata<JavaRDD<WriteStatus
}

postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());

emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
}
return result.getWriteStatuses();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public boolean isImplicitWithStorage() {
return false;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.INMEMORY;
}

/**
* Function that tags each HoodieRecord with an existing location, if known.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,12 @@ public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
return writeStatusRDD;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.BLOOM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,12 @@ protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
public boolean isGlobal() {
return true;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.GLOBAL_BLOOM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,14 @@ public boolean isImplicitWithStorage() {
return false;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.HBASE;
}

public void setHbaseConnection(Connection hbaseConnection) {
SparkHoodieHBaseIndex.hbaseConnection = hbaseConnection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,12 @@ private JavaRDD<HoodieRecord<T>> getTaggedRecords(JavaPairRDD<String, HoodieReco
public boolean isGlobal() {
return true;
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.GLOBAL_SIMPLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,12 @@ protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchRecordLocations(Hood
return jsc.parallelize(baseFiles, fetchParallelism)
.flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile).locations());
}

/**
* Get the index Type.
*/
@Override
public IndexType indexType() {
return IndexType.SIMPLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.SparkHoodieIndex;

import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -67,6 +69,12 @@ public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieW

@Override
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return SparkHoodieIndex.createIndex(config);
String persistIndexType = this.metaClient.getTableConfig().getProperties().getProperty(HoodieIndexConfig.INDEX_TYPE_PROP);
HoodieIndex hoodieIndex = SparkHoodieIndex.createIndex(config);
HoodieIndex.IndexType indexType = hoodieIndex.indexType();
if (persistIndexType != null) {
HoodieIndexUtils.checkIndexTypeCompatible(indexType, HoodieIndex.IndexType.valueOf(persistIndexType));
}
return hoodieIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private void testUpsertsInternal(HoodieWriteConfig config,
VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
metaClient.getTableConfig().getPayloadClass(), VERSION_0, hoodieWriteConfig.getIndexType().name());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);

// Write 1 (only inserts)
Expand Down Expand Up @@ -491,7 +491,7 @@ private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConf
.build()).withTimelineLayoutVersion(VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
metaClient.getTableConfig().getPayloadClass(), VERSION_0, indexType.name());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);

// Write 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client;

import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.Arrays;

public class TestIndexCompatibility extends HoodieClientTestBase {

private static Iterable<Object[]> indexTypeCompatibleParameter() {
return Arrays.asList(new Object[][] { { "GLOBAL_BLOOM", "GLOBAL_BLOOM" }, { "GLOBAL_BLOOM", "BLOOM" },
{ "GLOBAL_BLOOM", "SIMPLE" }, { "GLOBAL_BLOOM", "GLOBAL_SIMPLE" }, { "GLOBAL_SIMPLE", "GLOBAL_SIMPLE" },
{ "GLOBAL_SIMPLE", "GLOBAL_BLOOM" }, { "SIMPLE", "SIMPLE" }, { "BLOOM", "BLOOM" }, { "HBASE", "HBASE" },
{ "CUSTOM", "CUSTOM" } });
}

private static Iterable<Object[]> indexTypeNotCompatibleParameter() {
return Arrays.asList(new Object[][] { { "SIMPLE", "GLOBAL_BLOOM"},
{ "BLOOM", "GLOBAL_BLOOM"}, { "CUSTOM", "BLOOM"}, { "CUSTOM", "GLOBAL_BLOOM"}, { "CUSTOM", "HBASE"}});
}

@ParameterizedTest
@MethodSource("indexTypeCompatibleParameter")
public void testTableIndexTypeCompatible(String persistIndexType, String writeIndexType) {
assertDoesNotThrow(() -> {
HoodieIndexUtils.checkIndexTypeCompatible(IndexType.valueOf(writeIndexType), IndexType.valueOf(persistIndexType));
}, "");
}

@ParameterizedTest
@MethodSource("indexTypeNotCompatibleParameter")
public void testTableIndexTypeNotCompatible(String persistIndexType, String writeIndexType) {
assertThrows(HoodieException.class, () -> {
HoodieIndexUtils.checkIndexTypeCompatible(IndexType.valueOf(writeIndexType), IndexType.valueOf(persistIndexType));
}, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
public void readLocalWriteHDFS() throws Exception {
// Initialize table and filesystem
HoodieTableMetaClient.initTableType(hadoopConf, dfsBasePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
tableName, HoodieAvroPayload.class.getName(), null);

// Create write client to write some records in
HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath);
Expand All @@ -101,7 +101,7 @@ public void readLocalWriteHDFS() throws Exception {

// Write to local
HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
tableName, HoodieAvroPayload.class.getName(), null);

String writeCommitTime = localWriteClient.startCommit();
LOG.info("Starting write commit " + writeCommitTime);
Expand Down
Loading