Skip to content

Commit 4a5238c

Browse files
authored
[FLINK-37172][table-runtime] Add logs in all existing async state ops to check if they are under async state when running
This closes #26014
1 parent d98051f commit 4a5238c

File tree

6 files changed

+46
-0
lines changed

6 files changed

+46
-0
lines changed

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java

+7
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,16 @@
3131
import org.apache.flink.table.types.logical.LogicalType;
3232
import org.apache.flink.util.Collector;
3333

34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
3437
/** Aggregate Function used for the groupby (without window) aggregate with async state api. */
3538
public class AsyncStateGroupAggFunction extends GroupAggFunctionBase {
3639

3740
private static final long serialVersionUID = 1L;
3841

42+
private static final Logger LOG = LoggerFactory.getLogger(AsyncStateGroupAggFunction.class);
43+
3944
// stores the accumulators
4045
private transient ValueState<RowData> accState = null;
4146

@@ -73,6 +78,8 @@ public AsyncStateGroupAggFunction(
7378
public void open(OpenContext openContext) throws Exception {
7479
super.open(openContext);
7580

81+
LOG.info("Group agg is using async state");
82+
7683
InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(accTypes);
7784
ValueStateDescriptor<RowData> accDesc = new ValueStateDescriptor<>("accState", accTypeInfo);
7885
if (ttlConfig.isEnabled()) {

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java

+9
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceSharedAssigner;
3333
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
3434

35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
3538
import java.time.ZoneId;
3639
import java.util.ArrayList;
3740
import java.util.List;
@@ -46,6 +49,9 @@ public abstract class AbstractAsyncStateSliceWindowAggProcessor
4649
extends AbstractAsyncStateWindowAggProcessor<Long>
4750
implements AsyncStateSlicingWindowProcessor<Long> {
4851

52+
private static final Logger LOG =
53+
LoggerFactory.getLogger(AbstractAsyncStateSliceWindowAggProcessor.class);
54+
4955
protected final AsyncStateWindowBuffer.Factory windowBufferFactory;
5056
protected final SliceAssigner sliceAssigner;
5157
protected final long windowInterval;
@@ -80,6 +86,9 @@ public AbstractAsyncStateSliceWindowAggProcessor(
8086
@Override
8187
public void open(AsyncStateContext<Long> context) throws Exception {
8288
super.open(context);
89+
90+
LOG.info("Slice window agg is using async state");
91+
8392
this.windowBuffer =
8493
windowBufferFactory.create(
8594
ctx.getOperatorOwner(),

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java

+8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2828
import org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionBase;
2929

30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
3033
import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
3134

3235
/**
@@ -42,6 +45,9 @@ abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, OUT>
4245

4346
private static final long serialVersionUID = 1L;
4447

48+
private static final Logger LOG =
49+
LoggerFactory.getLogger(AsyncStateDeduplicateFunctionBase.class);
50+
4551
// state stores previous message under the key.
4652
protected ValueState<T> state;
4753

@@ -54,6 +60,8 @@ public AsyncStateDeduplicateFunctionBase(
5460
public void open(OpenContext openContext) throws Exception {
5561
super.open(openContext);
5662

63+
LOG.info("Deduplicate is using async state");
64+
5765
ValueStateDescriptor<T> stateDesc =
5866
new ValueStateDescriptor<>("deduplicate-state", typeInfo);
5967
StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.java

+8
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
3535
import org.apache.flink.types.RowKind;
3636

37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
3740
/**
3841
* Streaming unbounded Join operator based on async state api, which supports INNER/LEFT/RIGHT/FULL
3942
* JOIN.
@@ -42,6 +45,9 @@ public class AsyncStateStreamingJoinOperator extends AbstractAsyncStateStreaming
4245

4346
private static final long serialVersionUID = 1L;
4447

48+
private static final Logger LOG =
49+
LoggerFactory.getLogger(AsyncStateStreamingJoinOperator.class);
50+
4551
// whether left side is outer side, e.g. left is outer but right is not when LEFT OUTER JOIN
4652
private final boolean leftIsOuter;
4753
// whether right side is outer side, e.g. right is outer but left is not when RIGHT OUTER JOIN
@@ -86,6 +92,8 @@ public AsyncStateStreamingJoinOperator(
8692
public void open() throws Exception {
8793
super.open();
8894

95+
LOG.info("Join is using async state");
96+
8997
this.outRow = new JoinedRowData();
9098
this.leftNullRow = new GenericRowData(leftType.toRowSize());
9199
this.rightNullRow = new GenericRowData(rightType.toRowSize());

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java

+7
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
4949
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
5050

51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
53+
5154
import java.time.ZoneId;
5255

5356
/**
@@ -69,6 +72,8 @@ public class AsyncStateWindowJoinOperator extends AsyncStateTableStreamOperator<
6972

7073
private static final long serialVersionUID = 1L;
7174

75+
private static final Logger LOG = LoggerFactory.getLogger(AsyncStateWindowJoinOperator.class);
76+
7277
private static final String LEFT_RECORDS_STATE_NAME = "left-records";
7378
private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
7479

@@ -119,6 +124,8 @@ public AsyncStateWindowJoinOperator(
119124
public void open() throws Exception {
120125
super.open();
121126

127+
LOG.info("Window join is using async state");
128+
122129
this.collector = new TimestampedCollector<>(output);
123130
collector.eraseTimestamp();
124131

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java

+7
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,16 @@
3434
import org.apache.flink.table.runtime.operators.rank.RankType;
3535
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
3636

37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
3740
import java.util.Objects;
3841

3942
/** Base class for TopN Function with async state api. */
4043
public abstract class AbstractAsyncStateTopNFunction extends AbstractTopNFunction {
4144

45+
private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncStateTopNFunction.class);
46+
4247
private ValueState<Long> rankEndState;
4348

4449
public AbstractAsyncStateTopNFunction(
@@ -65,6 +70,8 @@ public AbstractAsyncStateTopNFunction(
6570
public void open(OpenContext openContext) throws Exception {
6671
super.open(openContext);
6772

73+
LOG.info("Top-N is using async state");
74+
6875
if (!isConstantRankEnd) {
6976
ValueStateDescriptor<Long> rankStateDesc =
7077
new ValueStateDescriptor<>("rankEnd", Types.LONG);

0 commit comments

Comments
 (0)