diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 013753b6d9276..29f55f78acf17 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -69,12 +69,12 @@ public static void main(String[] args) throws Exception { TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps(); kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg)); + Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg); // Read from kafka source RowType rowType = - (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) .getLogicalType(); - Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg); long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); int parallelism = env.getParallelism(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index e5e30c1c987e2..4b93faeaf72d9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -116,10 +116,6 @@ public static TypedProperties getProps(FlinkStreamerConfig cfg) { new Path(cfg.propsFilePath), cfg.configs).getProps(); } - public static Schema getSourceSchema(FlinkStreamerConfig cfg) { - return new FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema(); - } - public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) { if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) { return new FilebasedSchemaProvider(conf).getSourceSchema();