diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java index 7cf45f12e1aed..922371c4a0f45 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java @@ -141,6 +141,6 @@ public void close() { } public enum IndexType { - HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET + HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, FLINK_STATE } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java index 7dee9f3cdfa33..4ab36d6a5fc23 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -56,6 +56,10 @@ public static int getBucketId(List hashKeyFields, int numBuckets) { return hashKeyFields.hashCode() % numBuckets; } + public static String partitionBucketIdStr(String partition, int bucketId) { + return String.format("%s_%s", partition, bucketIdStr(bucketId)); + } + public static int bucketIdFromFileId(String fileId) { return Integer.parseInt(fileId.substring(0, 8)); } @@ -64,6 +68,10 @@ public static String bucketIdStr(int n) { return String.format("%08d", n); } + public static String newBucketFileIdPrefix(int bucketId) { + return newBucketFileIdPrefix(bucketIdStr(bucketId)); + } + public static String newBucketFileIdPrefix(String bucketId) { return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId); } @@ -71,4 +79,8 @@ public static String newBucketFileIdPrefix(String bucketId) { public static boolean isBucketFileName(String name) { return BUCKET_NAME.matcher(name).matches(); } + + public static int mod(int x, int y) { + return x % y; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 639467fcdaf8f..e496989363ee2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -142,6 +142,10 @@ private synchronized FileSystemViewManager getViewManager() { return viewManager; } + public HoodieTableMetadata getMetadata() { + return metadata; + } + /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * @param context HoodieEngineContext diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 77c3f15e54c45..1be90603605cd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -23,9 +23,11 @@ import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; @@ -106,6 +108,12 @@ private FlinkOptions() { // ------------------------------------------------------------------------ // Index Options // ------------------------------------------------------------------------ + public static final ConfigOption INDEX_TYPE = ConfigOptions + .key("index.type") + .stringType() + .defaultValue(HoodieIndex.IndexType.FLINK_STATE.name()) + .withDescription("Index type of Flink write job, default is using state backed index."); + public static final ConfigOption INDEX_BOOTSTRAP_ENABLED = ConfigOptions .key("index.bootstrap.enabled") .booleanType() @@ -310,6 +318,20 @@ private FlinkOptions() { + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using " + "the dot notation eg: `a.b.c`"); + public static final ConfigOption INDEX_KEY_FIELD = ConfigOptions + .key(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()) + .stringType() + .defaultValue("") + .withDescription("Index key field. Value to be used as hashing to find the bucket ID. Should be a subset of or equal to the recordKey fields.\n" + + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using " + + "the dot notation eg: `a.b.c`"); + + public static final ConfigOption BUCKET_INDEX_NUM_BUCKETS = ConfigOptions + .key(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key()) + .intType() + .defaultValue(4) // default 4 buckets per partition + .withDescription("Hudi bucket number per partition. Only affected if using Hudi bucket index."); + public static final ConfigOption PARTITION_PATH_FIELD = ConfigOptions .key(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) .stringType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index acb4af61110fa..6ebf09069be60 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.format.FilePathUtils; import org.apache.flink.configuration.Configuration; @@ -101,6 +102,10 @@ public static boolean isPartitionedTable(Configuration conf) { return FilePathUtils.extractPartitionKeys(conf).length > 0; } + public static boolean isBucketIndexType(Configuration conf) { + return conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name()); + } + /** * Returns whether the source should emit changelog. * diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java new file mode 100644 index 0000000000000..ac5dd630baf04 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.table.HoodieFlinkTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +public class BucketStreamWriteFunction extends StreamWriteFunction { + + private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class); + + private int maxParallelism; + + private int parallelism; + + private int bucketNum; + + protected transient HoodieFlinkTable table; + + private String indexKeyFields; + + private HashMap bucketToFileIDMap; + + /** + * Constructs a BucketStreamWriteFunction. + * + * @param config The config options + */ + public BucketStreamWriteFunction(Configuration config) { + super(config); + this.bucketToFileIDMap = new HashMap(); + } + + @Override + public void open(Configuration parameters) throws IOException { + super.open(parameters); + this.bucketNum = config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); + this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD); + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); + this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); + bootstrapIndex(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + super.initializeState(context); + this.table = this.writeClient.getHoodieTable(); + } + + @Override + public void processElement(I i, ProcessFunction.Context context, Collector collector) throws Exception { + HoodieRecord record = (HoodieRecord) i; + final HoodieKey hoodieKey = record.getKey(); + final HoodieRecordLocation location; + + final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum); + final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum); + + if (bucketToFileIDMap.containsKey(partitionBucketId)) { + location = new HoodieRecordLocation("U", bucketToFileIDMap.get(partitionBucketId)); + } else { + String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); + location = new HoodieRecordLocation("I", newFileId); + bucketToFileIDMap.put(partitionBucketId, newFileId); + } + record.unseal(); + record.setCurrentLocation(location); + record.seal(); + bufferRecord(record); + } + + /** + * Get partition_bucket -> fileID mapping from the existing hudi table. + * This is a required operation for each restart to avoid having duplicate file ids for one bucket. + */ + private void bootstrapIndex() throws IOException { + Option latestCommitTime = table.getFileSystemView().getTimeline().filterCompletedInstants().lastInstant(); + if (!latestCommitTime.isPresent()) { + return; + } + // bootstrap bucket info from existing file system + // bucketNum % totalParallelism == this taskID belongs to this task + HashSet bucketToLoad = new HashSet<>(); + for (int i = 0; i < bucketNum; i++) { + int partitionOfBucket = BucketIdentifier.mod(i, parallelism); + if (partitionOfBucket == taskID) { + LOG.info(String.format("Bootstrapping index. Adding bucket %s , " + + "Current parallelism: %s , Max parallelism: %s , Current task id: %s", + i, parallelism, maxParallelism, taskID)); + bucketToLoad.add(i); + } + } + bucketToLoad.stream().forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket))); + + LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(), + table.getMetaClient().getBasePath())); + + // Iterate through all existing partitions to load existing fileID belongs to this task + List partitions = table.getMetadata().getAllPartitionPaths(); + for (String partitionPath : partitions) { + List latestFileSlices = table.getSliceView() + .getLatestFileSlices(partitionPath) + .collect(toList()); + for (FileSlice fileslice : latestFileSlices) { + String fileID = fileslice.getFileId(); + int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID); + if (bucketToLoad.contains(bucketNumber)) { + String partitionBucketId = BucketIdentifier.partitionBucketIdStr(partitionPath, bucketNumber); + LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID)); + if (bucketToFileIDMap.containsKey(partitionBucketId)) { + throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found " + + "during the fileGroupPerPartitionedBucketState initialization.", fileID, partitionBucketId)); + } else { + LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId)); + bucketToFileIDMap.put(partitionBucketId, fileID); + } + } + } + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java new file mode 100644 index 0000000000000..cf4ef3f7826c2 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink; + +import org.apache.flink.configuration.Configuration; +import org.apache.hudi.sink.common.AbstractWriteOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; + +public class BucketStreamWriteOperator extends AbstractWriteOperator { + + public BucketStreamWriteOperator(Configuration conf) { + super(new BucketStreamWriteFunction<>(conf)); + } + + public static WriteOperatorFactory getFactory(Configuration conf) { + return WriteOperatorFactory.instance(conf, new BucketStreamWriteOperator<>(conf)); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index a8234a30d8c54..c2f54dd8aaffe 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -371,7 +371,7 @@ private String getBucketID(HoodieRecord record) { * * @param value HoodieRecord */ - private void bufferRecord(HoodieRecord value) { + protected void bufferRecord(HoodieRecord value) { final String bucketID = getBucketID(value); DataBucket bucket = this.buckets.computeIfAbsent(bucketID, diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java new file mode 100644 index 0000000000000..aab52718517f3 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.partitioner; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.index.bucket.BucketIdentifier; + +public class BucketIndexPartitioner implements Partitioner { + + private final int bucketNum; + private final String indexKeyFields; + + public BucketIndexPartitioner(int bucketNum, String indexKeyFields) { + this.bucketNum = bucketNum; + this.indexKeyFields = indexKeyFields; + } + + @Override + public int partition(Object key, int numPartitions) { + int curBucket = BucketIdentifier.getBucketId((HoodieKey) key, indexKeyFields, bucketNum); + return BucketIdentifier.mod(curBucket, numPartitions); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index f97f794223e16..7cc0cb0b84de9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.sink.BucketStreamWriteOperator; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperator; import org.apache.hudi.sink.append.AppendWriteOperator; @@ -36,6 +37,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; +import org.apache.hudi.sink.partitioner.BucketIndexPartitioner; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.table.format.FilePathUtils; @@ -135,7 +137,7 @@ public static DataStream bootstrap( boolean bounded, boolean overwrite) { final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); - if (overwrite) { + if (overwrite || OptionsResolver.isBucketIndexType(conf)) { return rowDataToHoodieRecord(conf, rowType, dataStream); } else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) { return boundedBootstrap(conf, rowType, defaultParallelism, dataStream); @@ -189,14 +191,24 @@ public static DataStream rowDataToHoodieRecord(Configuration conf, } public static DataStream hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream dataStream) { - WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf); - return dataStream + if (OptionsResolver.isBucketIndexType(conf)) { + WriteOperatorFactory operatorFactory = BucketStreamWriteOperator.getFactory(conf); + int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); + String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); + BucketIndexPartitioner partitioner = new BucketIndexPartitioner(bucketNum, indexKeyFields); + return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) + .transform("hoodie_bucket_stream_write", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_hoodie_bucket_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + } else { + WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf); + return dataStream // Key-by record key, to avoid multiple subtasks write to a bucket at the same time .keyBy(HoodieRecord::getRecordKey) .transform( - "bucket_assigner", - TypeInformation.of(HoodieRecord.class), - new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) // shuffle by fileId(bucket id) @@ -204,6 +216,7 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + } } public static DataStreamSink compact(Configuration conf, DataStream dataStream) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index b11c44af4fa34..7543382e19df4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -24,6 +24,7 @@ import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; @@ -199,6 +200,11 @@ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table // the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitionKeys)); } + // set index key for bucket index if not defined + if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name()) + && conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) { + conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD)); + } // tweak the key gen class if possible final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index c1e6d0c28aa06..bbbc67985c8af 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -84,7 +84,6 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // default parallelism int parallelism = dataStream.getExecutionConfig().getParallelism(); DataStream pipeline; - // bootstrap final DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded(), overwrite); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index cfbcced455fbd..eaa2d6ced67d9 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -51,6 +51,8 @@ import org.apache.flink.util.TestLogger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.nio.charset.StandardCharsets; @@ -129,10 +131,14 @@ public void testWriteToHoodieWithoutTransformer() throws Exception { testWriteToHoodie(null, EXPECTED); } - @Test - public void testMergeOnReadWriteWithCompaction() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) + public void testMergeOnReadWriteWithCompaction(String indexType) throws Exception { int parallelism = 4; Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.INDEX_TYPE, indexType); + conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4); + conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 6d8be2153f205..a76e00816189a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -24,6 +24,7 @@ import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; @@ -347,6 +348,16 @@ void testSetupHoodieKeyOptionsForSink() { final Configuration conf3 = tableSink3.getConf(); assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS_NAME), is(NonpartitionedAvroKeyGenerator.class.getName())); + + // definition of bucket index + this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name()); + final MockContext sinkContext4 = MockContext.getInstance(this.conf, schema2, ""); + final HoodieTableSink tableSink4 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext4); + final Configuration conf4 = tableSink4.getConf(); + assertThat(conf4.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); + assertThat(conf4.get(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1")); + assertThat(conf4.get(FlinkOptions.INDEX_TYPE), is(HoodieIndex.IndexType.BUCKET.name())); + assertThat(conf4.get(FlinkOptions.KEYGEN_CLASS_NAME), is(NonpartitionedAvroKeyGenerator.class.getName())); } @Test