Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,6 @@ public void close() {
}

public enum IndexType {
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, FLINK_STATE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public static int getBucketId(List<String> hashKeyFields, int numBuckets) {
return hashKeyFields.hashCode() % numBuckets;
}

public static String partitionBucketIdStr(String partition, int bucketId) {
return String.format("%s_%s", partition, bucketIdStr(bucketId));
}

public static int bucketIdFromFileId(String fileId) {
return Integer.parseInt(fileId.substring(0, 8));
}
Expand All @@ -64,11 +68,19 @@ public static String bucketIdStr(int n) {
return String.format("%08d", n);
}

public static String newBucketFileIdPrefix(int bucketId) {
return newBucketFileIdPrefix(bucketIdStr(bucketId));
}

public static String newBucketFileIdPrefix(String bucketId) {
return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId);
}

public static boolean isBucketFileName(String name) {
return BUCKET_NAME.matcher(name).matches();
}

public static int mod(int x, int y) {
return x % y;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ private synchronized FileSystemViewManager getViewManager() {
return viewManager;
}

public HoodieTableMetadata getMetadata() {
return metadata;
}

/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
* @param context HoodieEngineContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;

Expand Down Expand Up @@ -106,6 +108,12 @@ private FlinkOptions() {
// ------------------------------------------------------------------------
// Index Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> INDEX_TYPE = ConfigOptions
.key("index.type")
.stringType()
.defaultValue(HoodieIndex.IndexType.FLINK_STATE.name())
.withDescription("Index type of Flink write job, default is using state backed index.");

public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions
.key("index.bootstrap.enabled")
.booleanType()
Expand Down Expand Up @@ -310,6 +318,20 @@ private FlinkOptions() {
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");

public static final ConfigOption<String> INDEX_KEY_FIELD = ConfigOptions
.key(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key())
.stringType()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this key must be same same with the primary key ? Because all the changes of a key must belong to one data bucket.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be a subset of primary keys. e.g. primary key could be "id1,id2", this index key could be either "id1" "id2" "id1,id2".

.defaultValue("")
.withDescription("Index key field. Value to be used as hashing to find the bucket ID. Should be a subset of or equal to the recordKey fields.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");

public static final ConfigOption<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigOptions
.key(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())
.intType()
.defaultValue(4) // default 4 buckets per partition
.withDescription("Hudi bucket number per partition. Only affected if using Hudi bucket index.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason the default value is 256 here, seems to generate many small files for small data sets.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will change it into a smaller number

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If people change the BUCKET_INDEX_NUM_BUCKETS or the write function parallelism, does the hash index still work ?


public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
.key(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.format.FilePathUtils;

import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -101,6 +102,10 @@ public static boolean isPartitionedTable(Configuration conf) {
return FilePathUtils.extractPartitionKeys(conf).length > 0;
}

public static boolean isBucketIndexType(Configuration conf) {
return conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name());
}

/**
* Returns whether the source should emit changelog.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.table.HoodieFlinkTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import static java.util.stream.Collectors.toList;

public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {

private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class);

private int maxParallelism;

private int parallelism;

private int bucketNum;

protected transient HoodieFlinkTable table;

private String indexKeyFields;

private HashMap<String, String> bucketToFileIDMap;

/**
* Constructs a BucketStreamWriteFunction.
*
* @param config The config options
*/
public BucketStreamWriteFunction(Configuration config) {
super(config);
this.bucketToFileIDMap = new HashMap<String, String>();
}

@Override
public void open(Configuration parameters) throws IOException {
super.open(parameters);
this.bucketNum = config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
bootstrapIndex();
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
super.initializeState(context);
this.table = this.writeClient.getHoodieTable();
}

@Override
public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
HoodieRecord<?> record = (HoodieRecord<?>) i;
final HoodieKey hoodieKey = record.getKey();
final HoodieRecordLocation location;

final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum);

if (bucketToFileIDMap.containsKey(partitionBucketId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition changes are not supported?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can change the job parallelism, but you can't change the bucket index number at this point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess @loukey-lj want to address that when the record switches to new partition, how we send a delete record to the old partition ?

location = new HoodieRecordLocation("U", bucketToFileIDMap.get(partitionBucketId));
} else {
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
location = new HoodieRecordLocation("I", newFileId);
bucketToFileIDMap.put(partitionBucketId, newFileId);
}
record.unseal();
record.setCurrentLocation(location);
record.seal();
bufferRecord(record);
}

/**
* Get partition_bucket -> fileID mapping from the existing hudi table.
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
*/
private void bootstrapIndex() throws IOException {
Option<HoodieInstant> latestCommitTime = table.getFileSystemView().getTimeline().filterCompletedInstants().lastInstant();
if (!latestCommitTime.isPresent()) {
return;
}
// bootstrap bucket info from existing file system
// bucketNum % totalParallelism == this taskID belongs to this task
HashSet<Integer> bucketToLoad = new HashSet<>();
for (int i = 0; i < bucketNum; i++) {
int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
if (partitionOfBucket == taskID) {
LOG.info(String.format("Bootstrapping index. Adding bucket %s , "
+ "Current parallelism: %s , Max parallelism: %s , Current task id: %s",
i, parallelism, maxParallelism, taskID));
bucketToLoad.add(i);
}
}
bucketToLoad.stream().forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket)));

LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(),
table.getMetaClient().getBasePath()));

// Iterate through all existing partitions to load existing fileID belongs to this task
List<String> partitions = table.getMetadata().getAllPartitionPaths();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may have poor performance for application starting when the partition num is huge. Load it at runtime may be better, especially in the case most of the partitions in the table are frozen.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do the optimization part in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, can we fire an issue to address this improvement ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it with #5093

for (String partitionPath : partitions) {
List<FileSlice> latestFileSlices = table.getSliceView()
.getLatestFileSlices(partitionPath)
.collect(toList());
for (FileSlice fileslice : latestFileSlices) {
String fileID = fileslice.getFileId();
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
if (bucketToLoad.contains(bucketNumber)) {
String partitionBucketId = BucketIdentifier.partitionBucketIdStr(partitionPath, bucketNumber);
LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID));
if (bucketToFileIDMap.containsKey(partitionBucketId)) {
throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found "
+ "during the fileGroupPerPartitionedBucketState initialization.", fileID, partitionBucketId));
} else {
LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId));
bucketToFileIDMap.put(partitionBucketId, fileID);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bucketToFileIDMap seems never be cleared, is there possibility that this map be put into the state ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this could be store in the state, but we need to think about the consistency issue between the state and actual file system view. Any diff could lead to incorrect data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoint can keep the correctness i think.

}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;

public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {

public BucketStreamWriteOperator(Configuration conf) {
super(new BucketStreamWriteFunction<>(conf));
}

public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
return WriteOperatorFactory.instance(conf, new BucketStreamWriteOperator<>(conf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private String getBucketID(HoodieRecord<?> record) {
*
* @param value HoodieRecord
*/
private void bufferRecord(HoodieRecord<?> value) {
protected void bufferRecord(HoodieRecord<?> value) {
final String bucketID = getBucketID(value);

DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink.partitioner;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.index.bucket.BucketIdentifier;

public class BucketIndexPartitioner implements Partitioner {

private final int bucketNum;
private final String indexKeyFields;

public BucketIndexPartitioner(int bucketNum, String indexKeyFields) {
this.bucketNum = bucketNum;
this.indexKeyFields = indexKeyFields;
}

@Override
public int partition(Object key, int numPartitions) {
int curBucket = BucketIdentifier.getBucketId((HoodieKey) key, indexKeyFields, bucketNum);
return BucketIdentifier.mod(curBucket, numPartitions);
}
}
Loading