From a842ab5880d3ae7e4a5494551c009ed4f7d92ab8 Mon Sep 17 00:00:00 2001 From: liufangqi <1059023054@qq.com> Date: Mon, 14 Mar 2022 14:34:25 +0800 Subject: [PATCH 1/3] [HUDI-3607] Support backend switch in HoodieFlinkStreamer --- hudi-flink/pom.xml | 6 +++ .../hudi/streamer/FlinkStreamerConfig.java | 7 ++++ .../hudi/streamer/HoodieFlinkStreamer.java | 3 +- .../hudi/util/FlinkStateBackendConverter.java | 41 +++++++++++++++++++ 4 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml index 27a4a0b453cb7..7338b23bf4b27 100644 --- a/hudi-flink/pom.xml +++ b/hudi-flink/pom.xml @@ -174,6 +174,12 @@ ${flink.version} provided + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + ${flink.version} + provided + org.apache.parquet diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 1d7111f495c58..ff32817b9a873 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -18,6 +18,8 @@ package org.apache.hudi.streamer; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -25,6 +27,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.apache.hudi.util.FlinkStateBackendConverter; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.Parameter; @@ -53,6 +56,10 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--flink-checkpoint-path"}, description = "Flink checkpoint path.") public String flinkCheckPointPath; + @Parameter(names = {"--flink-state-backend-type"}, description = "Flink state backend type : HashMapStateBackend " + + "(default).", converter = FlinkStateBackendConverter.class) + public StateBackend stateBackend = new HashMapStateBackend(); + @Parameter(names = {"--instant-retry-times"}, description = "Times to retry when latest instant has not completed.") public String instantRetryTimes = "10"; diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 851931f0d76ba..8d76e10588492 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -61,8 +61,9 @@ public static void main(String[] args) throws Exception { // There can only be one checkpoint at one time. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + env.setStateBackend(cfg.stateBackend); if (cfg.flinkCheckPointPath != null) { - env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); + env.getCheckpointConfig().setCheckpointStorage(cfg.flinkCheckPointPath); } TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java new file mode 100644 index 0000000000000..5bc942461df69 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.ParameterException; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.hudi.exception.HoodieException; + +/** + * Converter that converts a string into Flink StateBackend. + */ +public class FlinkStateBackendConverter implements IStringConverter { + @Override + public StateBackend convert(String value) throws ParameterException { + switch (value) { + case "hashmap" : return new HashMapStateBackend(); + case "rocksdb" : return new EmbeddedRocksDBStateBackend(); + default: + throw new HoodieException("Can not convert flink state backend."); + } + } +} From caaf05c0a829d858889e33ce0a545ba4e290c6a1 Mon Sep 17 00:00:00 2001 From: liufangqi <1059023054@qq.com> Date: Tue, 15 Mar 2022 12:06:32 +0800 Subject: [PATCH 2/3] [HUDI-3607] Support backend switch in HoodieFlinkStreamer 1. checkstyle fix --- .../java/org/apache/hudi/streamer/FlinkStreamerConfig.java | 4 ++-- .../java/org/apache/hudi/streamer/HoodieFlinkStreamer.java | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index ff32817b9a873..3c6ccd530def3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -56,8 +56,8 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--flink-checkpoint-path"}, description = "Flink checkpoint path.") public String flinkCheckPointPath; - @Parameter(names = {"--flink-state-backend-type"}, description = "Flink state backend type : HashMapStateBackend " + - "(default).", converter = FlinkStateBackendConverter.class) + @Parameter(names = {"--flink-state-backend-type"}, description = "Flink state backend type : HashMapStateBackend " + + "(default).", converter = FlinkStateBackendConverter.class) public StateBackend stateBackend = new HashMapStateBackend(); @Parameter(names = {"--instant-retry-times"}, description = "Times to retry when latest instant has not completed.") diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 8d76e10588492..192de91d238a3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -32,7 +32,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; From 87b7a0fa831fa06ef3888d94cd732df0d418dd1a Mon Sep 17 00:00:00 2001 From: liufangqi <1059023054@qq.com> Date: Wed, 16 Mar 2022 11:20:10 +0800 Subject: [PATCH 3/3] [HUDI-3607] Support backend switch in HoodieFlinkStreamer 1. change the msg --- .../java/org/apache/hudi/streamer/FlinkStreamerConfig.java | 4 ++-- .../java/org/apache/hudi/util/FlinkStateBackendConverter.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 3c6ccd530def3..c5d73036eda60 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -56,8 +56,8 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--flink-checkpoint-path"}, description = "Flink checkpoint path.") public String flinkCheckPointPath; - @Parameter(names = {"--flink-state-backend-type"}, description = "Flink state backend type : HashMapStateBackend " - + "(default).", converter = FlinkStateBackendConverter.class) + @Parameter(names = {"--flink-state-backend-type"}, description = "Flink state backend type, support only hashmap and rocksdb by now," + + " default hashmap.", converter = FlinkStateBackendConverter.class) public StateBackend stateBackend = new HashMapStateBackend(); @Parameter(names = {"--instant-retry-times"}, description = "Times to retry when latest instant has not completed.") diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java index 5bc942461df69..b46ab14e46384 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java @@ -35,7 +35,7 @@ public StateBackend convert(String value) throws ParameterException { case "hashmap" : return new HashMapStateBackend(); case "rocksdb" : return new EmbeddedRocksDBStateBackend(); default: - throw new HoodieException("Can not convert flink state backend."); + throw new HoodieException(String.format("Unknown flink state backend %s.", value)); } } }