diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java index 9c947e4d407e3..7c64c81b3facf 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java @@ -108,7 +108,7 @@ public String createTable( final HoodieTableType tableType = HoodieTableType.valueOf(tableTypeStr); HoodieTableMetaClient.initTableType(HoodieCLI.conf, path, tableType, name, archiveFolder, - payloadClass, layoutVersion); + payloadClass, layoutVersion, null); // Now connect to ensure loading works return connect(path, layoutVersion, false, 0, 0, 0); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 222e1ab2ca5b2..c393bd2df6ced 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -93,6 +93,7 @@ public abstract class AbstractHoodieWriteClient> extraMetadata) { - HoodieTableMetaClient metaClient = createMetaClient(false); - String actionType = metaClient.getCommitActionType(); + String actionType = this.metaClient.getCommitActionType(); return commit(instantTime, writeStatuses, extraMetadata, actionType, Collections.emptyMap()); } @@ -590,7 +591,7 @@ public String startCommit() { * @param instantTime Instant time to be generated */ public void startCommitWithTime(String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); + this.metaClient.reloadActiveTimeline(); startCommitWithTime(instantTime, metaClient.getCommitActionType(), metaClient); } @@ -598,7 +599,7 @@ public void startCommitWithTime(String instantTime) { * Completes a new commit time for a write operation (insert/update/delete) with specified action. */ public void startCommitWithTime(String instantTime, String actionType) { - HoodieTableMetaClient metaClient = createMetaClient(true); + this.metaClient.reloadActiveTimeline(); startCommitWithTime(instantTime, actionType, metaClient); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java index 6d04594cbab63..adb49b07deeb0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java @@ -31,6 +31,7 @@ import java.io.Serializable; + /** * Base class for different types of indexes to determine the mapping from uuid. * @@ -97,6 +98,11 @@ public abstract O updateLocation(O writeStatuses, HoodieEngineContext context, @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) public abstract boolean isImplicitWithStorage(); + /** + * Get the index Type. + */ + public abstract IndexType indexType(); + /** * Each index type should implement it's own logic to release any resources acquired during the process. */ @@ -104,6 +110,6 @@ public void close() { } public enum IndexType { - HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE + HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, CUSTOM } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index ad7807b707dcb..a0e90989e268a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -24,8 +24,14 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -36,6 +42,7 @@ * Hoodie Index Utilities. */ public class HoodieIndexUtils { + private static final Logger LOG = LogManager.getLogger(HoodieIndexUtils.class); /** * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions. @@ -85,4 +92,56 @@ record = new HoodieRecord<>(inputRecord); } return record; } + + /** + * Check compatible between new writeIndexType and indexType already in hoodie.properties. + * @param writeIndexType new indexType + * @param persistIndexType indexType already in hoodie.properties + */ + public static void checkIndexTypeCompatible(IndexType writeIndexType, IndexType persistIndexType) { + boolean isTypeCompatible = false; + switch (persistIndexType) { + case GLOBAL_BLOOM: + isTypeCompatible = writeIndexType.equals(IndexType.GLOBAL_BLOOM) + || writeIndexType.equals(IndexType.BLOOM) + || writeIndexType.equals(IndexType.SIMPLE) + || writeIndexType.equals(IndexType.GLOBAL_SIMPLE); + break; + case GLOBAL_SIMPLE: + isTypeCompatible = writeIndexType.equals(IndexType.GLOBAL_SIMPLE) + || writeIndexType.equals(IndexType.GLOBAL_BLOOM) + || writeIndexType.equals(IndexType.BLOOM) + || writeIndexType.equals(IndexType.SIMPLE); + break; + case SIMPLE: + isTypeCompatible = writeIndexType.equals(IndexType.SIMPLE) + || writeIndexType.equals(IndexType.BLOOM); + break; + case BLOOM: + isTypeCompatible = writeIndexType.equals(IndexType.BLOOM) + || writeIndexType.equals(IndexType.SIMPLE); + break; + case INMEMORY: + isTypeCompatible = writeIndexType.equals(IndexType.INMEMORY); + LOG.warn("PersistIndexType INMEMORY can not be used in production"); + break; + case HBASE: + isTypeCompatible = writeIndexType.equals(IndexType.HBASE); + break; + case CUSTOM: + isTypeCompatible = writeIndexType.equals(IndexType.CUSTOM); + break; + default: + throw new HoodieIndexException("Index type" + persistIndexType + " unspecified in properties file"); + } + if (!isTypeCompatible) { + throw new HoodieIndexException("The new write indextype " + writeIndexType + + " is not compatible with persistIndexType " + persistIndexType); + } + } + + public static IndexType getIndexType(HoodieWriteConfig config) { + // if have index class use CUSTOM for type. + return StringUtils.isNullOrEmpty(config.getIndexClass()) ? config.getIndexType() : IndexType.CUSTOM; + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index f975406e4505b..4e811b017b359 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -190,7 +190,7 @@ protected List compact(String compactionInstantTime, boolean should @Override protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); + this.metaClient.reloadActiveTimeline(); new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); return getTableAndInitCtx(metaClient, operationType); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java index d3fdf67d76a92..4ac8eb2d510dd 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java @@ -132,4 +132,9 @@ public boolean canIndexLogFiles() { public boolean isImplicitWithStorage() { return false; } + + @Override + public IndexType indexType() { + return IndexType.INMEMORY; + } } \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java index 4fb9f221cbdd7..f1f8af7c30d39 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java @@ -33,7 +33,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.index.SparkHoodieIndex; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; @@ -99,7 +98,7 @@ public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clie // Create a Hoodie table which encapsulated the commits and files visible HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true); this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient); - this.index = SparkHoodieIndex.createIndex(clientConfig); + this.index = this.hoodieTable.getIndex(); this.sqlContextOpt = Option.empty(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 10a55df9f882d..955b022c77861 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -33,9 +33,12 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.index.SparkHoodieIndex; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieSparkTable; @@ -88,7 +91,18 @@ public static SparkConf registerClasses(SparkConf conf) { @Override protected HoodieIndex>, JavaRDD, JavaRDD> createIndex(HoodieWriteConfig writeConfig) { - return SparkHoodieIndex.createIndex(config); + String persistIndexType = null; + try { + persistIndexType = this.metaClient.getTableConfig().getProperties().getProperty(HoodieIndexConfig.INDEX_TYPE_PROP); + } catch (TableNotFoundException e) { + persistIndexType = null; + } + HoodieIndex hoodieIndex = SparkHoodieIndex.createIndex(config); + if (persistIndexType != null) { + HoodieIndexUtils.checkIndexTypeCompatible(hoodieIndex.indexType(), + HoodieIndex.IndexType.valueOf(persistIndexType)); + } + return hoodieIndex; } /** @@ -258,7 +272,7 @@ protected JavaRDD postWrite(HoodieWriteMetadata compact(String compactionInstantTime, boolean sho @Override protected HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); + this.metaClient.reloadActiveTimeline(); new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); return getTableAndInitCtx(metaClient, operationType); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkInMemoryHashIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkInMemoryHashIndex.java index 55ce8d2cc90c0..a6edce0a16bc6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkInMemoryHashIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkInMemoryHashIndex.java @@ -112,6 +112,14 @@ public boolean isImplicitWithStorage() { return false; } + /** + * Get the index Type. + */ + @Override + public IndexType indexType() { + return IndexType.INMEMORY; + } + /** * Function that tags each HoodieRecord with an existing location, if known. */ diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java index 894b41b51c6bf..18f7310376880 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java @@ -295,4 +295,12 @@ public JavaRDD updateLocation(JavaRDD writeStatusRDD, HoodieTable>, JavaRDD, JavaRDD> hoodieTable) { return writeStatusRDD; } + + /** + * Get the index Type. + */ + @Override + public IndexType indexType() { + return IndexType.BLOOM; + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java index 771c01ab875de..600f7f967eed7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java @@ -147,4 +147,12 @@ protected JavaRDD> tagLocationBacktoRecords( public boolean isGlobal() { return true; } + + /** + * Get the index Type. + */ + @Override + public IndexType indexType() { + return IndexType.GLOBAL_BLOOM; + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index 5b67f838509bd..99cd62d1a2568 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -565,6 +565,14 @@ public boolean isImplicitWithStorage() { return false; } + /** + * Get the index Type. + */ + @Override + public IndexType indexType() { + return IndexType.HBASE; + } + public void setHbaseConnection(Connection hbaseConnection) { SparkHoodieHBaseIndex.hbaseConnection = hbaseConnection; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java index bdb4991cf76e3..ac898a36b9506 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java @@ -157,4 +157,12 @@ private JavaRDD> getTaggedRecords(JavaPairRDD fetchRecordLocations(Hood return jsc.parallelize(baseFiles, fetchParallelism) .flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile).locations()); } + + /** + * Get the index Type. + */ + @Override + public IndexType indexType() { + return IndexType.SIMPLE; + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index d5fb4ee018a69..bb6eb410b8b68 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -27,9 +27,11 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.index.SparkHoodieIndex; import org.apache.spark.api.java.JavaRDD; @@ -67,6 +69,12 @@ public static HoodieSparkTable create(HoodieW @Override protected HoodieIndex>, JavaRDD, JavaRDD> getIndex(HoodieWriteConfig config, HoodieEngineContext context) { - return SparkHoodieIndex.createIndex(config); + String persistIndexType = this.metaClient.getTableConfig().getProperties().getProperty(HoodieIndexConfig.INDEX_TYPE_PROP); + HoodieIndex hoodieIndex = SparkHoodieIndex.createIndex(config); + HoodieIndex.IndexType indexType = hoodieIndex.indexType(); + if (persistIndexType != null) { + HoodieIndexUtils.checkIndexTypeCompatible(indexType, HoodieIndex.IndexType.valueOf(persistIndexType)); + } + return hoodieIndex; } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index d278b08f3fc14..c02c3bc173de0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -314,7 +314,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, VERSION_0).build(); HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), - metaClient.getTableConfig().getPayloadClass(), VERSION_0); + metaClient.getTableConfig().getPayloadClass(), VERSION_0, hoodieWriteConfig.getIndexType().name()); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); // Write 1 (only inserts) @@ -541,7 +541,7 @@ private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConf .build()).withTimelineLayoutVersion(VERSION_0).build(); HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), - metaClient.getTableConfig().getPayloadClass(), VERSION_0); + metaClient.getTableConfig().getPayloadClass(), VERSION_0, indexType.name()); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); // Write 1 diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestIndexCompatibility.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestIndexCompatibility.java new file mode 100644 index 0000000000000..914ec0b00537a --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestIndexCompatibility.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.testutils.HoodieClientTestBase; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Arrays; + +public class TestIndexCompatibility extends HoodieClientTestBase { + + private static Iterable indexTypeCompatibleParameter() { + return Arrays.asList(new Object[][] { { "GLOBAL_BLOOM", "GLOBAL_BLOOM" }, { "GLOBAL_BLOOM", "BLOOM" }, + { "GLOBAL_BLOOM", "SIMPLE" }, { "GLOBAL_BLOOM", "GLOBAL_SIMPLE" }, { "GLOBAL_SIMPLE", "GLOBAL_SIMPLE" }, + { "GLOBAL_SIMPLE", "GLOBAL_BLOOM" }, { "SIMPLE", "SIMPLE" }, { "BLOOM", "BLOOM" }, { "HBASE", "HBASE" }, + { "CUSTOM", "CUSTOM" } }); + } + + private static Iterable indexTypeNotCompatibleParameter() { + return Arrays.asList(new Object[][] { { "SIMPLE", "GLOBAL_BLOOM"}, + { "BLOOM", "GLOBAL_BLOOM"}, { "CUSTOM", "BLOOM"}, { "CUSTOM", "GLOBAL_BLOOM"}, { "CUSTOM", "HBASE"}}); + } + + @ParameterizedTest + @MethodSource("indexTypeCompatibleParameter") + public void testTableIndexTypeCompatible(String persistIndexType, String writeIndexType) { + assertDoesNotThrow(() -> { + HoodieIndexUtils.checkIndexTypeCompatible(IndexType.valueOf(writeIndexType), IndexType.valueOf(persistIndexType)); + }, ""); + } + + @ParameterizedTest + @MethodSource("indexTypeNotCompatibleParameter") + public void testTableIndexTypeNotCompatible(String persistIndexType, String writeIndexType) { + assertThrows(HoodieException.class, () -> { + HoodieIndexUtils.checkIndexTypeCompatible(IndexType.valueOf(writeIndexType), IndexType.valueOf(persistIndexType)); + }, ""); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 34daed76f1ba8..a26f9c8cc12f2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -76,7 +76,10 @@ protected HoodieWriteConfig getHoodieWriteConfig(String basePath) { public void readLocalWriteHDFS() throws Exception { // Initialize table and filesystem HoodieTableMetaClient.initTableType(hadoopConf, dfsBasePath, HoodieTableType.valueOf(tableType), - tableName, HoodieAvroPayload.class.getName()); + tableName, HoodieAvroPayload.class.getName(), null); + + HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType), + tableName, HoodieAvroPayload.class.getName(), null); // Create write client to write some records in HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath); @@ -101,7 +104,7 @@ public void readLocalWriteHDFS() throws Exception { // Write to local HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType), - tableName, HoodieAvroPayload.class.getName()); + tableName, HoodieAvroPayload.class.getName(), null); String writeCommitTime = localWriteClient.startCommit(); LOG.info("Starting write commit " + writeCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 8ee0c163409a5..cfcb7d6e8fdb1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -129,7 +129,7 @@ public void testMORTable() throws Exception { // Create the table HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), HoodieTableType.MERGE_ON_READ, metaClient.getTableConfig().getTableName(), - metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1); + metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1, null); HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); @@ -274,7 +274,7 @@ public void testCopyOnWriteTable() throws Exception { // Create the table HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), HoodieTableType.COPY_ON_WRITE, metaClient.getTableConfig().getTableName(), - metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1); + metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1, null); HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index f10e845f05b55..0d5bfe53b7406 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -56,6 +56,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Random; import java.util.UUID; @@ -77,7 +78,9 @@ public class TestHoodieIndex extends HoodieClientTestHarness { private void setUp(IndexType indexType) throws Exception { this.indexType = indexType; - initResources(); + Properties properties = new Properties(); + properties.setProperty(HoodieIndexConfig.INDEX_TYPE_PROP, indexType.name()); + initResources(properties); config = getConfigBuilder() .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType) .build()).withAutoCommit(false).build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java index 9175ebde51333..b6d455a28733a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java @@ -164,6 +164,11 @@ public boolean canIndexLogFiles() { public boolean isImplicitWithStorage() { return false; } + + @Override + public IndexType indexType() { + return IndexType.CUSTOM; + } } public static class IndexWithConstructor { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index b10781e3b8ec9..73a9828267c36 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -56,6 +56,7 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -100,11 +101,18 @@ public void setTestMethodName(TestInfo testInfo) { * Initializes resource group for the subclasses of {@link HoodieClientTestBase}. */ public void initResources() throws IOException { + initResources(new Properties()); + } + + /** + * Initializes resource group for the subclasses of {@link HoodieClientTestBase}. + */ + public void initResources(Properties properties) throws IOException { initPath(); initSparkContexts(); initTestDataGenerator(); initFileSystem(); - initMetaClient(); + initMetaClient(properties); } /** @@ -206,6 +214,15 @@ protected void cleanupFileSystem() throws IOException { * @throws IOException */ protected void initMetaClient() throws IOException { + initMetaClient(new Properties()); + } + + /** + * Initializes an instance of {@link HoodieTableMetaClient} with properties. + * + * @throws IOException + */ + protected void initMetaClient(Properties properties) throws IOException { if (basePath == null) { throw new IllegalStateException("The base path has not been initialized."); } @@ -213,8 +230,7 @@ protected void initMetaClient() throws IOException { if (jsc == null) { throw new IllegalStateException("The Spark context has not been initialized."); } - - metaClient = HoodieTestUtils.init(context.getHadoopConf().get(), basePath, getTableType()); + metaClient = HoodieTestUtils.init(context.getHadoopConf().get(), basePath, getTableType(), properties); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 7e2dffbfe1362..f8da4f74deb05 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -67,6 +67,7 @@ public class HoodieTableConfig implements Serializable { public static final String HOODIE_ARCHIVELOG_FOLDER_PROP_NAME = "hoodie.archivelog.folder"; public static final String HOODIE_BOOTSTRAP_INDEX_CLASS_PROP_NAME = "hoodie.bootstrap.index.class"; public static final String HOODIE_BOOTSTRAP_BASE_PATH = "hoodie.bootstrap.base.path"; + public static final String HOODIE_INDEX_TYPE_PROP_NAME = "hoodie.index.type"; public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE; public static final HoodieTableVersion DEFAULT_TABLE_VERSION = HoodieTableVersion.ZERO; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 2e8857b2f5252..86d9187ebbb85 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -329,38 +329,38 @@ public synchronized HoodieArchivedTimeline getArchivedTimeline() { public static HoodieTableMetaClient initTableTypeWithBootstrap(Configuration hadoopConf, String basePath, HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName, String baseFileFormat, String bootstrapIndexClass, - String bootstrapBasePath) throws IOException { + String bootstrapBasePath, String indexType) throws IOException { return initTableType(hadoopConf, basePath, tableType, tableName, - archiveLogFolder, payloadClassName, null, baseFileFormat, bootstrapIndexClass, bootstrapBasePath); + archiveLogFolder, payloadClassName, null, baseFileFormat, bootstrapIndexClass, bootstrapBasePath, indexType); } public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName, - String baseFileFormat) throws IOException { + String baseFileFormat, String indexType) throws IOException { return initTableType(hadoopConf, basePath, tableType, tableName, - archiveLogFolder, payloadClassName, null, baseFileFormat, null, null); + archiveLogFolder, payloadClassName, null, baseFileFormat, null, null, indexType); } /** * Used primarily by tests, examples. */ public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String payloadClassName) throws IOException { + String tableName, String payloadClassName, String indexType) throws IOException { return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, - null, null, null, null); + null, null, null, null, indexType); } public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName, - Integer timelineLayoutVersion) throws IOException { + Integer timelineLayoutVersion, String indexType) throws IOException { return initTableType(hadoopConf, basePath, tableType, tableName, archiveLogFolder, payloadClassName, - timelineLayoutVersion, null, null, null); + timelineLayoutVersion, null, null, null, indexType); } private static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName, Integer timelineLayoutVersion, String baseFileFormat, - String bootstrapIndexClass, String bootstrapBasePath) throws IOException { + String bootstrapIndexClass, String bootstrapBasePath, String indexType) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); @@ -389,6 +389,9 @@ private static HoodieTableMetaClient initTableType(Configuration hadoopConf, Str properties.put(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH, bootstrapBasePath); } + if (null != indexType) { + properties.put(HoodieTableConfig.HOODIE_INDEX_TYPE_PROP_NAME, indexType); + } return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index d7e3bde8cb074..7e3ef2a947957 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -111,7 +111,7 @@ public void testLoadingInstantsFromFiles() throws IOException { // Backwards compatibility testing for reading compaction plans metaClient = HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), metaClient.getTableConfig().getTableName(), - metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_0); + metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_0, null); HoodieInstant instant6 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "9"); byte[] dummy = new byte[5]; HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(new HoodieTableMetaClient(metaClient.getHadoopConf(), diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java index b606c527b0306..e016721749f13 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java @@ -84,10 +84,6 @@ public static void main(String[] args) throws Exception { // initialize the table, if not done already Path path = new Path(tablePath); FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration()); - if (!fs.exists(path)) { - HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), - tableName, HoodieAvroPayload.class.getName()); - } // Create the write client to write some records in HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) @@ -95,6 +91,10 @@ public static void main(String[] args) throws Exception { .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + if (!fs.exists(path)) { + HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), + tableName, HoodieAvroPayload.class.getName(), cfg.getIndexType().name()); + } SparkRDDWriteClient client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg); // inserts diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 165eeb087b207..80a35ab021294 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.api.common.state.ListState; @@ -206,7 +207,8 @@ private void doCheck() throws InterruptedException { private void initTable() throws IOException { if (!fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient.initTableType(new Configuration(serializableHadoopConf.get()), cfg.targetBasePath, - HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, 1); + HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, + 1, HoodieIndexUtils.getIndexType(this.writeClient.getConfig()).name()); LOG.info("Table initialized"); } else { LOG.info("Table already [{}/{}] exists, do nothing here", cfg.targetBasePath, cfg.targetTableName); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 7b3324e4b569e..d11b5afb26561 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -94,7 +94,7 @@ public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throw if (!fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), cfg.targetBasePath, - HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived"); + HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", null); } if (cfg.cleanInput) { diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b10a05b025e05..6671be0055820 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -38,6 +38,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.index.HoodieIndexUtils import org.apache.hudi.internal.HoodieDataSourceInternalWriter import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.log4j.LogManager @@ -65,7 +66,6 @@ private[hudi] object HoodieSparkSqlWriter { ) : (Boolean, common.util.Option[String], common.util.Option[String], SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { - val sparkContext = sqlContext.sparkContext val path = parameters.get("path") val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME) @@ -117,9 +117,10 @@ private[hudi] object HoodieSparkSqlWriter { if (!tableExists) { val archiveLogFolder = parameters.getOrElse( HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") + val hoodieWriteConfig = DataSourceUtils.createHoodieConfig(Schema.create(Schema.Type.NULL).toString, path.get, tblName, mapAsJavaMap(parameters)) val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType, tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY), - null.asInstanceOf[String]) + null.asInstanceOf[String], HoodieIndexUtils.getIndexType(hoodieWriteConfig).name()) tableConfig = tableMetaClient.getTableConfig } @@ -232,7 +233,6 @@ private[hudi] object HoodieSparkSqlWriter { df: DataFrame, hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = { - val sparkContext = sqlContext.sparkContext val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set.")) val tableName = parameters.getOrElse(HoodieWriteConfig.TABLE_NAME, @@ -267,9 +267,11 @@ private[hudi] object HoodieSparkSqlWriter { if (!tableExists) { val archiveLogFolder = parameters.getOrElse( HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") + val hoodieWriteConfig = DataSourceUtils.createHoodieConfig( + Schema.create(Schema.Type.NULL).toString, path, tableName, mapAsJavaMap(parameters)) HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path, HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY), - null, bootstrapIndexClass, bootstrapBasePath) + null, bootstrapIndexClass, bootstrapBasePath, HoodieIndexUtils.getIndexType(hoodieWriteConfig).name()) } val jsc = new JavaSparkContext(sqlContext.sparkContext) diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 41a45b2b1f951..fcd4b8053f515 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -24,8 +24,10 @@ import java.util.{Collections, Date, UUID} import org.apache.commons.io.FileUtils import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap} -import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload, HoodieTableType} +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.config.HoodieBootstrapConfig.DEFAULT_BOOTSTRAP_INDEX_CLASS import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} @@ -302,8 +304,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + val jsc = new JavaSparkContext(sc) + HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration, path.toAbsolutePath.toString, + HoodieTableType.valueOf(tableType), hoodieFooTableName, "archived", null) val client = spy(DataSourceUtils.createHoodieClient( - new JavaSparkContext(sc), + jsc, schema.toString, path.toAbsolutePath.toString, hoodieFooTableName, @@ -373,8 +378,13 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS -> classOf[NonpartitionedKeyGenerator].getCanonicalName) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + val jsc = new JavaSparkContext(sc) + HoodieTableMetaClient.initTableTypeWithBootstrap(jsc.hadoopConfiguration, path.toAbsolutePath.toString, HoodieTableType.valueOf(tableType), + hoodieFooTableName, "archived", "", + "PARQUET", DEFAULT_BOOTSTRAP_INDEX_CLASS, + srcPath.toAbsolutePath.toString, null) val client = spy(DataSourceUtils.createHoodieClient( - new JavaSparkContext(sc), + jsc, null, path.toAbsolutePath.toString, hoodieFooTableName, diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index d0d1b667aea20..d3f99b16066b9 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -127,7 +127,7 @@ public static void setUp() throws IOException, InterruptedException { public static void clear() throws IOException { fileSystem.delete(new Path(hiveSyncConfig.basePath), true); HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, - hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); + hiveSyncConfig.tableName, HoodieAvroPayload.class.getName(), null); HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), fileSystem); for (String tableName : createdTablesSet) { @@ -162,7 +162,7 @@ public static void createCOWTable(String instantTime, int numberOfPartitions, bo Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, - hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); + hiveSyncConfig.tableName, HoodieAvroPayload.class.getName(), null); boolean result = fileSystem.mkdirs(path); checkResult(result); DateTime dateTime = DateTime.now(); @@ -178,7 +178,7 @@ public static void createMORTable(String commitTime, String deltaCommitTime, int Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ, - hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); + hiveSyncConfig.tableName, HoodieAvroPayload.class.getName(), null); boolean result = fileSystem.mkdirs(path); checkResult(result); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java index c7974b3eec204..d7ea830cffbfe 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java @@ -35,6 +35,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -139,7 +140,6 @@ public BootstrapExecutor(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, public void execute() throws IOException { initializeTable(); SparkRDDWriteClient bootstrapClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jssc), bootstrapConfig, true); - try { HashMap checkpointCommitMetadata = new HashMap<>(); checkpointCommitMetadata.put(HoodieDeltaStreamer.CHECKPOINT_KEY, cfg.checkpoint); @@ -170,10 +170,9 @@ private void initializeTable() throws IOException { throw new HoodieException("target base path already exists at " + cfg.targetBasePath + ". Cannot bootstrap data on top of an existing table"); } - HoodieTableMetaClient.initTableTypeWithBootstrap(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, - cfg.baseFileFormat, cfg.bootstrapIndexClass, bootstrapBasePath); + cfg.baseFileFormat, cfg.bootstrapIndexClass, bootstrapBasePath, HoodieIndexUtils.getIndexType(bootstrapConfig).name()); } public HoodieWriteConfig getBootstrapConfig() { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index a27ce996d4bc0..43b9c8e0299bb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -45,6 +45,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.utilities.UtilHelpers; @@ -188,6 +189,8 @@ public class DeltaSync implements Serializable { private transient HoodieDeltaStreamerMetrics metrics; + private transient HoodieWriteConfig hoodieClientConfig; + public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function onInitializingHoodieWriteClient) throws IOException { @@ -200,15 +203,14 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, Sche this.props = props; this.userProvidedSchemaProvider = schemaProvider; this.processedSchema = new SchemaSet(); - + this.hoodieClientConfig = getHoodieClientConfig(this.schemaProvider); refreshTimeline(); // Register User Provided schema first registerAvroSchemas(schemaProvider); this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); this.keyGenerator = DataSourceUtils.createKeyGenerator(props); - - this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider)); + this.metrics = new HoodieDeltaStreamerMetrics(this.hoodieClientConfig); this.formatAdapter = new SourceFormatAdapter( UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics)); @@ -237,7 +239,8 @@ public void refreshTimeline() throws IOException { } else { this.commitTimelineOpt = Option.empty(); HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, - HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat); + HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat, + HoodieIndexUtils.getIndexType(this.hoodieClientConfig).name()); } } @@ -317,7 +320,8 @@ public Pair>> readFromSource( } } else { HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, - HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat); + HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat, + HoodieIndexUtils.getIndexType(hoodieClientConfig).name()); } if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java index 5f511740cb1eb..9d69625b63ecb 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java @@ -81,7 +81,7 @@ public void init() throws Exception { dfs().mkdirs(new Path(sourcePath)); HoodieTableMetaClient .initTableType(jsc().hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME, - HoodieAvroPayload.class.getName()); + HoodieAvroPayload.class.getName(), null); // Prepare data as source Hudi dataset HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 0bbdb23466984..7d9c34207d183 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -180,7 +180,7 @@ private static void clearHiveDb() throws IOException { HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy"); hiveConf.addResource(hiveServer.getHiveConf()); HoodieTableMetaClient.initTableType(dfs.getConf(), hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, - hiveSyncConfig.tableName, null); + hiveSyncConfig.tableName, null, null); HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveConf, dfs); client.updateHiveSQL("drop database if exists " + hiveSyncConfig.databaseName); client.updateHiveSQL("create database " + hiveSyncConfig.databaseName);