Skip to content

Commit b6ae903

Browse files
JingGeAHeise
authored andcommitted
[FLINK-24077][HBase/IT] use MiniClusterWithClientResource as @ClassRule.
while using TableEnvironment in the ITCase, a Flink MiniCluster will be started/stopped automatically in the background. Since the shutdown of the MiniCluster will be called asynchronously, CollectResultFetcher will get data lost sometimes based on race conditions and the unchecked RuntimeException java.lang.IllegalStateException will be thrown that we were not aware of. The solution is to control the lifecycle of the MiniCluster manually in this test. The MiniClusterWithClientResource could be a good fit in this case. (cherry picked from commit fca04c3)
1 parent 8e3b291 commit b6ae903

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.api.java.ExecutionEnvironment;
2626
import org.apache.flink.api.java.tuple.Tuple1;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.configuration.Configuration;
2829
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
2930
import org.apache.flink.connector.hbase.util.PlannerType;
3031
import org.apache.flink.connector.hbase2.source.AbstractTableInputFormat;
@@ -33,6 +34,7 @@
3334
import org.apache.flink.connector.hbase2.source.HBaseRowInputFormat;
3435
import org.apache.flink.connector.hbase2.source.HBaseTableSource;
3536
import org.apache.flink.connector.hbase2.util.HBaseTestBase;
37+
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
3638
import org.apache.flink.streaming.api.datastream.DataStream;
3739
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3840
import org.apache.flink.table.api.DataTypes;
@@ -46,6 +48,7 @@
4648
import org.apache.flink.table.descriptors.HBase;
4749
import org.apache.flink.table.descriptors.Schema;
4850
import org.apache.flink.table.functions.ScalarFunction;
51+
import org.apache.flink.test.util.MiniClusterWithClientResource;
4952
import org.apache.flink.test.util.TestBaseUtils;
5053
import org.apache.flink.types.Row;
5154
import org.apache.flink.types.RowKind;
@@ -57,6 +60,7 @@
5760
import org.apache.hadoop.hbase.client.Result;
5861
import org.apache.hadoop.hbase.client.Scan;
5962
import org.apache.hadoop.hbase.util.Bytes;
63+
import org.junit.ClassRule;
6064
import org.junit.Test;
6165
import org.junit.runner.RunWith;
6266
import org.junit.runners.Parameterized;
@@ -77,6 +81,13 @@
7781
@RunWith(Parameterized.class)
7882
public class HBaseConnectorITCase extends HBaseTestBase {
7983

84+
@ClassRule
85+
public static final MiniClusterWithClientResource MINI_CLUSTER =
86+
new MiniClusterWithClientResource(
87+
new MiniClusterResourceConfiguration.Builder()
88+
.setConfiguration(new Configuration())
89+
.build());
90+
8091
@Parameterized.Parameter public PlannerType planner;
8192

8293
@Parameterized.Parameter(1)
@@ -436,8 +447,6 @@ public void testTableSink() throws Exception {
436447
+ " AS h");
437448

438449
TableResult tableResult2 = table.execute();
439-
// wait to finish
440-
tableResult2.getJobClient().get().getJobExecutionResult().get();
441450

442451
List<Row> results = CollectionUtil.iteratorToList(tableResult2.collect());
443452

@@ -529,8 +538,6 @@ public void testTableSourceSinkWithDDL() throws Exception {
529538
+ " AS h";
530539

531540
TableResult tableResult3 = batchEnv.executeSql(query);
532-
// wait to finish
533-
tableResult3.getJobClient().get().getJobExecutionResult().get();
534541

535542
List<String> result =
536543
Lists.newArrayList(tableResult3.collect()).stream()

0 commit comments

Comments
 (0)