Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,27 @@ public InlineElement getDescription() {
+ " related to the number of initialized bucket, too small will lead to"
+ " insufficient processing speed of assigner.");

public static final ConfigOption<Integer> DYNAMIC_BUCKET_EMPTY_BUCKET_THRESHOLD =
key("dynamic-bucket.empty-bucket-threshold")
.intType()
.defaultValue(-1)
.withDescription(
"Threshold for triggering asynchronous refresh of empty bucket information. "
+ "When the number of available empty buckets drops below this value, "
+ "the assigner will asynchronously check for newly emptied buckets. "
+ "Higher values improve responsiveness but may increase overhead.");


public static final ConfigOption<Duration> DYNAMIC_BUCKET_MIN_REFRESH_INTERVAL =
key("dynamic-bucket.min-refresh-interval")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription(
"Minimum time between bucket refresh checks. Too short will cause excessive "
+ "overhead from scanning bucket table, too long may delay discovery of "
+ "available buckets.");


public static final ConfigOption<String> INCREMENTAL_BETWEEN =
key("incremental-between")
.stringType()
Expand Down Expand Up @@ -2685,6 +2706,14 @@ public Integer dynamicBucketAssignerParallelism() {
return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
}

public Integer dynamicBucketEmptyBucketThreshold() {
return options.get(DYNAMIC_BUCKET_EMPTY_BUCKET_THRESHOLD);
}

public Duration dynamicBucketMinRefreshInterval() {
return options.get(DYNAMIC_BUCKET_MIN_REFRESH_INTERVAL);
}

public List<String> sequenceField() {
return options.getOptional(SEQUENCE_FIELD)
.map(s -> Arrays.asList(s.split(",")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -47,6 +48,8 @@ public class HashBucketAssigner implements BucketAssigner {
private final long targetBucketRowNumber;
private final int maxBucketsNum;
private int maxBucketId;
private int minEmptyBucketsBeforeAsyncCheck;
private Duration minRefreshInterval;

private final Map<BinaryRow, PartitionIndex> partitionIndex;

Expand All @@ -58,7 +61,9 @@ public HashBucketAssigner(
int numAssigners,
int assignId,
long targetBucketRowNumber,
int maxBucketsNum) {
int maxBucketsNum,
int minEmptyBucketsBeforeAsyncCheck,
Duration minRefreshInterval) {
this.snapshotManager = snapshotManager;
this.commitUser = commitUser;
this.indexFileHandler = indexFileHandler;
Expand All @@ -68,6 +73,8 @@ public HashBucketAssigner(
this.targetBucketRowNumber = targetBucketRowNumber;
this.partitionIndex = new HashMap<>();
this.maxBucketsNum = maxBucketsNum;
this.minEmptyBucketsBeforeAsyncCheck = minEmptyBucketsBeforeAsyncCheck;
this.minRefreshInterval = minRefreshInterval;
}

/** Assign a bucket for key hash of a record. */
Expand All @@ -88,7 +95,14 @@ public int assign(BinaryRow partition, int hash) {
this.partitionIndex.put(partition, index);
}

int assigned = index.assign(hash, this::isMyBucket, maxBucketsNum, maxBucketId);
int assigned =
index.assign(
hash,
this::isMyBucket,
maxBucketsNum,
maxBucketId,
minEmptyBucketsBeforeAsyncCheck,
minRefreshInterval);
if (LOG.isDebugEnabled()) {
LOG.debug("Assign {} to the partition {} key hash {}", assigned, partition, hash);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.IntPredicate;

import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
Expand All @@ -43,7 +47,7 @@ public class PartitionIndex {

public final Int2ShortHashMap hash2Bucket;

public final Map<Integer, Long> nonFullBucketInformation;
public final ConcurrentHashMap<Integer, Long> nonFullBucketInformation;

public final Set<Integer> totalBucketSet;
public final List<Integer> totalBucketArray;
Expand All @@ -54,27 +58,51 @@ public class PartitionIndex {

public long lastAccessedCommitIdentifier;

private final IndexFileHandler indexFileHandler;

private final BinaryRow partition;

private volatile CompletableFuture<Void> refreshFuture;

private Instant lastRefreshTime;

public PartitionIndex(
Int2ShortHashMap hash2Bucket,
Map<Integer, Long> bucketInformation,
long targetBucketRowNumber) {
ConcurrentHashMap<Integer, Long> bucketInformation,
long targetBucketRowNumber,
IndexFileHandler indexFileHandler,
BinaryRow partition) {
this.hash2Bucket = hash2Bucket;
this.nonFullBucketInformation = bucketInformation;
this.totalBucketSet = new LinkedHashSet<>(bucketInformation.keySet());
this.totalBucketArray = new ArrayList<>(totalBucketSet);
this.targetBucketRowNumber = targetBucketRowNumber;
this.lastAccessedCommitIdentifier = Long.MIN_VALUE;
this.accessed = true;
this.indexFileHandler = indexFileHandler;
this.partition = partition;
this.lastRefreshTime = Instant.now();
}

public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum, int maxBucketId) {
public int assign(
int hash,
IntPredicate bucketFilter,
int maxBucketsNum,
int maxBucketId,
int minEmptyBucketsBeforeAsyncCheck,
Duration minRefreshInterval) {
accessed = true;

// 1. is it a key that has appeared before
if (hash2Bucket.containsKey(hash)) {
return hash2Bucket.get(hash);
}

if (shouldRefreshEmptyBuckets(maxBucketId, minEmptyBucketsBeforeAsyncCheck)
&& isReachedTheMinRefreshInterval(minRefreshInterval)) {
refreshBucketsFromDisk();
}

// 2. find bucket from existing buckets
Iterator<Map.Entry<Integer, Long>> iterator =
nonFullBucketInformation.entrySet().iterator();
Expand All @@ -91,6 +119,7 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum, int ma
}
}

// from onwards is to create new bucket
int globalMaxBucketId = (maxBucketsNum == -1 ? Short.MAX_VALUE : maxBucketsNum) - 1;
if (totalBucketSet.isEmpty() || maxBucketId < globalMaxBucketId) {
// 3. create a new bucket
Expand All @@ -110,7 +139,7 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum, int ma
maxBucketId, targetBucketRowNumber));
}
}

// todo: check this part
// 4. exceed buckets upper bound
int bucket = ListUtils.pickRandomly(totalBucketArray);
hash2Bucket.put(hash, (short) bucket);
Expand All @@ -125,7 +154,7 @@ public static PartitionIndex loadIndex(
IntPredicate bucketFilter) {
List<IndexManifestEntry> files = indexFileHandler.scanEntries(HASH_INDEX, partition);
Int2ShortHashMap.Builder mapBuilder = Int2ShortHashMap.builder();
Map<Integer, Long> buckets = new HashMap<>();
ConcurrentHashMap<Integer, Long> buckets = new ConcurrentHashMap<>();
for (IndexManifestEntry file : files) {
try (IntIterator iterator =
indexFileHandler
Expand All @@ -150,6 +179,50 @@ public static PartitionIndex loadIndex(
throw new UncheckedIOException(e);
}
}
return new PartitionIndex(mapBuilder.build(), buckets, targetBucketRowNumber);
return new PartitionIndex(
mapBuilder.build(), buckets, targetBucketRowNumber, indexFileHandler, partition);
}

private boolean shouldRefreshEmptyBuckets(
int maxBucketId, int minEmptyBucketsBeforeAsyncCheck) {
return maxBucketId != -1
&& minEmptyBucketsBeforeAsyncCheck != -1
&& (nonFullBucketInformation.size()
== maxBucketId - minEmptyBucketsBeforeAsyncCheck);
}

private boolean isReachedTheMinRefreshInterval(final Duration duration) {
return Instant.now().isAfter(lastRefreshTime.plus(duration));
}


private void refreshBucketsFromDisk() {
// Only start refresh if not already in progress
if (refreshFuture == null || refreshFuture.isDone()) {
refreshFuture =
CompletableFuture.runAsync(
() -> {
try {
List<IndexManifestEntry> files =
indexFileHandler.scanEntries(HASH_INDEX, partition);
Map<Integer, Long> tempBucketInfo = new HashMap<>();

for (IndexManifestEntry file : files) {
long currentNumberOfRows = file.indexFile().rowCount();
if (currentNumberOfRows < targetBucketRowNumber) {
tempBucketInfo.put(file.bucket(), currentNumberOfRows);
}
}

nonFullBucketInformation.putAll(tempBucketInfo);
lastRefreshTime = Instant.now();
} catch (Exception e) {
// Log error instead of throwing
System.err.println(
"Error refreshing buckets from disk: "
+ e.getMessage());
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ private HashBucketAssigner createAssigner(int numChannels, int numAssigners, int
numAssigners,
assignId,
5,
-1);
-1,
-1,
null);
}

private HashBucketAssigner createAssigner(
Expand All @@ -79,7 +81,9 @@ private HashBucketAssigner createAssigner(
numAssigners,
assignId,
5,
maxBucketsNum);
maxBucketsNum,
-1,
null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.time.Duration;

/** Assign bucket for the input record, output record with bucket. */
public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2<T, Integer>>
implements OneInputStreamOperator<T, Tuple2<T, Integer>> {
Expand Down Expand Up @@ -79,6 +81,8 @@ public void initializeState(StateInitializationContext context) throws Exception
int taskId = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets();
Integer minEmptyBucketsBeforeAsyncCheck = table.coreOptions().dynamicBucketEmptyBucketThreshold();
Duration minRefreshInterval = table.coreOptions().dynamicBucketMinRefreshInterval();
this.assigner =
overwrite
? new SimpleHashBucketAssigner(
Expand All @@ -91,7 +95,9 @@ public void initializeState(StateInitializationContext context) throws Exception
MathUtils.min(numAssigners, numberTasks),
taskId,
targetRowNum,
maxBucketsNum);
maxBucketsNum,
minEmptyBucketsBeforeAsyncCheck,
minRefreshInterval);
this.extractor = extractorFunction.apply(table.schema());
}

Expand Down
Loading