Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ public static int getBucketId(HoodieRecord record, String indexKeyFields, int nu
}

public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) {
return getBucketId(hoodieKey.getRecordKey(), indexKeyFields, numBuckets);
}

public static int getBucketId(String recordKey, String indexKeyFields, int numBuckets) {
List<String> hashKeyFields;
if (!hoodieKey.getRecordKey().contains(":")) {
hashKeyFields = Collections.singletonList(hoodieKey.getRecordKey());
if (!recordKey.contains(":")) {
hashKeyFields = Collections.singletonList(recordKey);
} else {
Map<String, String> recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(","))
Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(","))
.map(p -> p.split(":"))
.collect(Collectors.toMap(p -> p[0], p -> p[1]));
hashKeyFields = Arrays.stream(indexKeyFields.split(","))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink.bucket;

import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.table.HoodieTable;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Helper class for bucket index bulk insert used by Flink.
*/
public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper {
private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class);

private final int bucketNum;
private final String indexKeyFields;

public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
}

public void write(RowData record) throws IOException {
try {
String recordKey = keyGen.getRecordKey(record);
String partitionPath = keyGen.getPartitionPath(record);
final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum);
String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record);
} catch (Throwable throwable) {
LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable);
throw throwable;
}
}

private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) {
if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
instantTime, taskPartitionId, taskId, taskEpochId, rowType);
handles.put(fileId, rowCreateHandle);
}
return handles.get(fileId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hudi.sink;
package org.apache.hudi.sink.bucket;

import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -26,6 +26,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.table.HoodieFlinkTable;

import org.apache.flink.configuration.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hudi.sink;
package org.apache.hudi.sink.bucket;

import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {

private void initWriterHelper() {
String instant = instantToWrite();
this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,21 @@ public class BulkInsertWriterHelper {

private static final Logger LOG = LogManager.getLogger(BulkInsertWriterHelper.class);

private final String instantTime;
private final int taskPartitionId;
private final long taskId;
private final long taskEpochId;
private final HoodieTable hoodieTable;
private final HoodieWriteConfig writeConfig;
private final RowType rowType;
protected final String instantTime;
protected final int taskPartitionId;
protected final long taskId;
protected final long taskEpochId;
protected final HoodieTable hoodieTable;
protected final HoodieWriteConfig writeConfig;
protected final RowType rowType;
private final Boolean arePartitionRecordsSorted;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
private HoodieRowDataCreateHandle handle;
private String lastKnownPartitionPath = null;
private final String fileIdPrefix;
private int numFilesWritten = 0;
private final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<>();
private final RowDataKeyGen keyGen;
protected final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<>();
protected final RowDataKeyGen keyGen;

public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink.bulk;

import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.table.HoodieTable;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;

/**
* Factory clazz to generate bulk insert writer helpers.
*/
public class WriterHelpers {
public static BulkInsertWriterHelper getWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
return OptionsResolver.isBucketIndexType(conf)
? new BucketBulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType)
: new BulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.sink.partitioner;

import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.index.bucket.BucketIdentifier;

import org.apache.flink.api.common.functions.Partitioner;
Expand All @@ -29,7 +28,7 @@
*
* @param <T> The type of obj to hash
*/
public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<T> {
public class BucketIndexPartitioner<T extends String> implements Partitioner<T> {

private final int bucketNum;
private final String indexKeyFields;
Expand All @@ -40,7 +39,7 @@ public BucketIndexPartitioner(int bucketNum, String indexKeyFields) {
}

@Override
public int partition(HoodieKey key, int numPartitions) {
public int partition(String key, int numPartitions) {
int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
return BucketIdentifier.mod(curBucket, numPartitions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.BucketStreamWriteOperator;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
Expand Down Expand Up @@ -88,6 +87,18 @@ public class Pipelines {
*/
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
if (OptionsResolver.isBucketIndexType(conf)) {
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
.transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(DummySink.INSTANCE)
.name("dummy");
}

final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
if (partitionFields.length > 0) {
Expand Down Expand Up @@ -278,8 +289,8 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getRecordKey)
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.streamer;

import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
Expand All @@ -32,6 +30,8 @@

import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.hudi.util;

import org.apache.hudi.exception.HoodieException;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.ParameterException;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.hudi.exception.HoodieException;

/**
* Converter that converts a string into Flink StateBackend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,8 +906,8 @@ void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testBulkInsert(boolean hiveStylePartitioning) {
@MethodSource("indexAndPartitioningParams")
void testBulkInsert(String indexType, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = batchTableEnv;
// csv source
String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data");
Expand All @@ -917,6 +917,7 @@ void testBulkInsert(boolean hiveStylePartitioning) {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "bulk_insert")
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true)
.option(FlinkOptions.INDEX_TYPE, indexType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();
tableEnv.executeSql(hoodieTableDDL);
Expand Down Expand Up @@ -1262,6 +1263,19 @@ private static Stream<Arguments> tableTypeAndPartitioningParams() {
return Stream.of(data).map(Arguments::of);
}

/**
* Return test params => (index type, hive style partitioning).
*/
private static Stream<Arguments> indexAndPartitioningParams() {
Object[][] data =
new Object[][] {
{"FLINK_STATE", false},
{"FLINK_STATE", true},
{"BUCKET", false},
{"BUCKET", true}};
return Stream.of(data).map(Arguments::of);
}

private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish
Expand Down