diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml index 27a4a0b453cb7..7338b23bf4b27 100644 --- a/hudi-flink/pom.xml +++ b/hudi-flink/pom.xml @@ -174,6 +174,12 @@ ${flink.version} provided + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + ${flink.version} + provided + org.apache.parquet diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 1d7111f495c58..c5d73036eda60 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -18,6 +18,8 @@ package org.apache.hudi.streamer; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -25,6 +27,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.apache.hudi.util.FlinkStateBackendConverter; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.Parameter; @@ -53,6 +56,10 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--flink-checkpoint-path"}, description = "Flink checkpoint path.") public String flinkCheckPointPath; + @Parameter(names = {"--flink-state-backend-type"}, description = "Flink state backend type, support only hashmap and rocksdb by now," + + " default hashmap.", converter = FlinkStateBackendConverter.class) + public StateBackend stateBackend = new HashMapStateBackend(); + @Parameter(names = {"--instant-retry-times"}, description = "Times to retry when latest instant has not completed.") public String instantRetryTimes = "10"; diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 851931f0d76ba..192de91d238a3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -32,7 +32,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; @@ -61,8 +60,9 @@ public static void main(String[] args) throws Exception { // There can only be one checkpoint at one time. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + env.setStateBackend(cfg.stateBackend); if (cfg.flinkCheckPointPath != null) { - env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); + env.getCheckpointConfig().setCheckpointStorage(cfg.flinkCheckPointPath); } TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java new file mode 100644 index 0000000000000..b46ab14e46384 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.ParameterException; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.hudi.exception.HoodieException; + +/** + * Converter that converts a string into Flink StateBackend. + */ +public class FlinkStateBackendConverter implements IStringConverter { + @Override + public StateBackend convert(String value) throws ParameterException { + switch (value) { + case "hashmap" : return new HashMapStateBackend(); + case "rocksdb" : return new EmbeddedRocksDBStateBackend(); + default: + throw new HoodieException(String.format("Unknown flink state backend %s.", value)); + } + } +}