diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java index ddd95721a46b6..1a07c4063f358 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -38,11 +38,15 @@ public static int getBucketId(HoodieRecord record, String indexKeyFields, int nu } public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) { + return getBucketId(hoodieKey.getRecordKey(), indexKeyFields, numBuckets); + } + + public static int getBucketId(String recordKey, String indexKeyFields, int numBuckets) { List hashKeyFields; - if (!hoodieKey.getRecordKey().contains(":")) { - hashKeyFields = Collections.singletonList(hoodieKey.getRecordKey()); + if (!recordKey.contains(":")) { + hashKeyFields = Collections.singletonList(recordKey); } else { - Map recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(",")) + Map recordKeyPairs = Arrays.stream(recordKey.split(",")) .map(p -> p.split(":")) .collect(Collectors.toMap(p -> p[0], p -> p[1])); hashKeyFields = Arrays.stream(indexKeyFields.split(",")) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java new file mode 100644 index 0000000000000..c52b6f0e98243 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bucket; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle; +import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; +import org.apache.hudi.table.HoodieTable; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Helper class for bucket index bulk insert used by Flink. + */ +public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper { + private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class); + + private final int bucketNum; + private final String indexKeyFields; + + public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig, + String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) { + super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType); + this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); + this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); + } + + public void write(RowData record) throws IOException { + try { + String recordKey = keyGen.getRecordKey(record); + String partitionPath = keyGen.getPartitionPath(record); + final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum); + String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); + getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record); + } catch (Throwable throwable) { + LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable); + throw throwable; + } + } + + private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) { + if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId + HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId, + instantTime, taskPartitionId, taskId, taskEpochId, rowType); + handles.put(fileId, rowCreateHandle); + } + return handles.get(fileId); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java similarity index 98% rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index 4c9e4dc25912b..e53d2b2402936 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.sink; +package org.apache.hudi.sink.bucket; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieKey; @@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.flink.configuration.Configuration; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java similarity index 97% rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java index cf740cc2ccc59..a48ea44ddc44a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.sink; +package org.apache.hudi.sink.bucket; import org.apache.hudi.sink.common.AbstractWriteOperator; import org.apache.hudi.sink.common.WriteOperatorFactory; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 9b34c3edcd800..6c8dcef0f3925 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -167,7 +167,7 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { private void initWriterHelper() { String instant = instantToWrite(); - this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), + this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), this.rowType); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index 4bc8ae27fb5d5..013595a9d236a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -50,21 +50,21 @@ public class BulkInsertWriterHelper { private static final Logger LOG = LogManager.getLogger(BulkInsertWriterHelper.class); - private final String instantTime; - private final int taskPartitionId; - private final long taskId; - private final long taskEpochId; - private final HoodieTable hoodieTable; - private final HoodieWriteConfig writeConfig; - private final RowType rowType; + protected final String instantTime; + protected final int taskPartitionId; + protected final long taskId; + protected final long taskEpochId; + protected final HoodieTable hoodieTable; + protected final HoodieWriteConfig writeConfig; + protected final RowType rowType; private final Boolean arePartitionRecordsSorted; private final List writeStatusList = new ArrayList<>(); private HoodieRowDataCreateHandle handle; private String lastKnownPartitionPath = null; private final String fileIdPrefix; private int numFilesWritten = 0; - private final Map handles = new HashMap<>(); - private final RowDataKeyGen keyGen; + protected final Map handles = new HashMap<>(); + protected final RowDataKeyGen keyGen; public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig, String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java new file mode 100644 index 0000000000000..99a9ae114cd8e --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bulk; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper; +import org.apache.hudi.table.HoodieTable; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.types.logical.RowType; + +/** + * Factory clazz to generate bulk insert writer helpers. + */ +public class WriterHelpers { + public static BulkInsertWriterHelper getWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig, + String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) { + return OptionsResolver.isBucketIndexType(conf) + ? new BucketBulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType) + : new BulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java index 0c4e2a129da7a..b9b737ce22857 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java @@ -18,7 +18,6 @@ package org.apache.hudi.sink.partitioner; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.flink.api.common.functions.Partitioner; @@ -29,7 +28,7 @@ * * @param The type of obj to hash */ -public class BucketIndexPartitioner implements Partitioner { +public class BucketIndexPartitioner implements Partitioner { private final int bucketNum; private final String indexKeyFields; @@ -40,7 +39,7 @@ public BucketIndexPartitioner(int bucketNum, String indexKeyFields) { } @Override - public int partition(HoodieKey key, int numPartitions) { + public int partition(String key, int numPartitions) { int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum); return BucketIdentifier.mod(curBucket, numPartitions); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 65d67fe95227a..4930dbe0b3937 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -18,16 +18,15 @@ package org.apache.hudi.sink.utils; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; -import org.apache.hudi.sink.BucketStreamWriteOperator; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperator; import org.apache.hudi.sink.append.AppendWriteOperator; import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator; +import org.apache.hudi.sink.bucket.BucketStreamWriteOperator; import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; import org.apache.hudi.sink.bulk.RowDataKeyGen; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; @@ -88,6 +87,18 @@ public class Pipelines { */ public static DataStreamSink bulkInsert(Configuration conf, RowType rowType, DataStream dataStream) { WriteOperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType); + if (OptionsResolver.isBucketIndexType(conf)) { + int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); + String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); + BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); + RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); + return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey) + .transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME)) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) + .addSink(DummySink.INSTANCE) + .name("dummy"); + } final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); if (partitionFields.length > 0) { @@ -278,8 +289,8 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau WriteOperatorFactory operatorFactory = BucketStreamWriteOperator.getFactory(conf); int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); - BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); - return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) + BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); + return dataStream.partitionCustom(partitioner, HoodieRecord::getRecordKey) .transform("bucket_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index c5d73036eda60..592520bf902f8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -18,8 +18,6 @@ package org.apache.hudi.streamer; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -32,6 +30,8 @@ import com.beust.jcommander.Parameter; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import java.util.ArrayList; import java.util.HashMap; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java index b46ab14e46384..a6b15ffb74e32 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java @@ -18,12 +18,13 @@ package org.apache.hudi.util; +import org.apache.hudi.exception.HoodieException; + import com.beust.jcommander.IStringConverter; import com.beust.jcommander.ParameterException; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; -import org.apache.hudi.exception.HoodieException; /** * Converter that converts a string into Flink StateBackend. diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 72c0890bbf649..1ef7157ab6dd9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -906,8 +906,8 @@ void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testBulkInsert(boolean hiveStylePartitioning) { + @MethodSource("indexAndPartitioningParams") + void testBulkInsert(String indexType, boolean hiveStylePartitioning) { TableEnvironment tableEnv = batchTableEnv; // csv source String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"); @@ -917,6 +917,7 @@ void testBulkInsert(boolean hiveStylePartitioning) { .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.OPERATION, "bulk_insert") .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true) + .option(FlinkOptions.INDEX_TYPE, indexType) .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) .end(); tableEnv.executeSql(hoodieTableDDL); @@ -1262,6 +1263,19 @@ private static Stream tableTypeAndPartitioningParams() { return Stream.of(data).map(Arguments::of); } + /** + * Return test params => (index type, hive style partitioning). + */ + private static Stream indexAndPartitioningParams() { + Object[][] data = + new Object[][] { + {"FLINK_STATE", false}, + {"FLINK_STATE", true}, + {"BUCKET", false}, + {"BUCKET", true}}; + return Stream.of(data).map(Arguments::of); + } + private void execInsertSql(TableEnvironment tEnv, String insert) { TableResult tableResult = tEnv.executeSql(insert); // wait to finish