Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
Expand Down Expand Up @@ -70,8 +69,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
extends KeyedProcessFunction<K, I, O>
implements CheckpointedFunction, CheckpointListener {

private BucketAssignOperator.Context context;

/**
* Index cache(speed-up) state for the underneath file based(BloomFilter) indices.
* When a record came in, we do these check:
Expand Down Expand Up @@ -158,7 +155,6 @@ public void initializeState(FunctionInitializationContext context) {
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (value instanceof IndexRecord) {
IndexRecord<?> indexRecord = (IndexRecord<?>) value;
this.context.setCurrentKey(indexRecord.getRecordKey());
this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
} else {
processRecord((HoodieRecord<?>) value, out);
Expand Down Expand Up @@ -198,7 +194,6 @@ private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exce
}
} else {
location = getNewRecordLocation(partitionPath);
this.context.setCurrentKey(recordKey);
}
// always refresh the index
if (isChangingRecords) {
Expand Down Expand Up @@ -243,13 +238,4 @@ public void notifyCheckpointComplete(long checkpointId) {
public void close() throws Exception {
this.bucketAssigner.close();
}

public void setContext(BucketAssignOperator.Context context) {
this.context = context;
}

@VisibleForTesting
public void clearIndexState() {
this.indexState.clear();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;

Expand All @@ -44,6 +43,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
Expand Down Expand Up @@ -163,7 +163,7 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
// shuffle by fileId(bucket id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
Expand Down Expand Up @@ -91,7 +90,7 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
/**
* BucketAssignOperator context.
**/
private MockBucketAssignOperatorContext bucketAssignOperatorContext;
private final MockBucketAssignFunctionContext bucketAssignFunctionContext;
/**
* Stream write function.
*/
Expand Down Expand Up @@ -125,11 +124,10 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
this.stateInitializationContext = new MockStateInitializationContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to bucketAssignFunctionContext.

this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
this.output = new CollectorOutput<>(new ArrayList<>());
this.streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
Expand All @@ -155,7 +153,6 @@ public void openFunction() throws Exception {
bucketAssignerFunction = new BucketAssignFunction<>(conf);
bucketAssignerFunction.setRuntimeContext(runtimeContext);
bucketAssignerFunction.open(conf);
bucketAssignerFunction.setContext(bucketAssignOperatorContext);
bucketAssignerFunction.initializeState(this.stateInitializationContext);

setupWriteFunction();
Expand Down Expand Up @@ -187,15 +184,16 @@ public void close() {
if (streamElement.isRecord()) {
HoodieRecord<?> bootstrapRecord = (HoodieRecord<?>) streamElement.asRecord().getValue();
bucketAssignerFunction.processElement(bootstrapRecord, null, collector);
bucketAssignFunctionContext.setCurrentKey(bootstrapRecord.getRecordKey());
}
}

bootstrapOperator.processElement(new StreamRecord<>(hoodieRecord));
list.clear();
this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey());
}

bucketAssignerFunction.processElement(hoodieRecord, null, collector);
bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
writeFunction.processElement(hoodieRecords[0], null, null);
}

Expand Down Expand Up @@ -267,13 +265,8 @@ public MockOperatorCoordinatorContext getCoordinatorContext() {
return coordinatorContext;
}

public void clearIndexState() {
this.bucketAssignerFunction.clearIndexState();
this.bucketAssignOperatorContext.clearIndexState();
}

public boolean isKeyInState(HoodieKey hoodieKey) {
return this.bucketAssignOperatorContext.isKeyInState(hoodieKey.getRecordKey());
return this.bucketAssignFunctionContext.isKeyInState(hoodieKey.getRecordKey());
}

public boolean isConforming() {
Expand Down Expand Up @@ -303,18 +296,13 @@ private void setupWriteFunction() throws Exception {
// Inner Class
// -------------------------------------------------------------------------

private static class MockBucketAssignOperatorContext implements BucketAssignOperator.Context {
private static class MockBucketAssignFunctionContext {
private final Set<Object> updateKeys = new HashSet<>();

@Override
public void setCurrentKey(Object key) {
this.updateKeys.add(key);
}

public void clearIndexState() {
this.updateKeys.clear();
}

public boolean isKeyInState(String key) {
return this.updateKeys.contains(key);
}
Expand Down