diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java index f8784841374f9..a01a67dad70dd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java @@ -23,7 +23,9 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.exception.HoodieFlinkStreamerException; import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.schema.FilebasedSchemaProvider; import org.apache.hudi.util.AvroConvertor; import org.apache.hudi.util.StreamerUtil; @@ -31,9 +33,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.functions.MapFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; /** @@ -41,10 +40,7 @@ */ public class JsonStringToHoodieRecordMapFunction implements MapFunction { - private static Logger LOG = LoggerFactory.getLogger(JsonStringToHoodieRecordMapFunction.class); - private final HoodieFlinkStreamer.Config cfg; - private TypedProperties props; private KeyGenerator keyGenerator; private AvroConvertor avroConvertor; @@ -63,12 +59,13 @@ public HoodieRecord map(String value) throws Exception { } private void init() { - this.props = StreamerUtil.getProps(cfg); + TypedProperties props = StreamerUtil.getProps(cfg); avroConvertor = new AvroConvertor(new FilebasedSchemaProvider(props).getSourceSchema()); try { keyGenerator = StreamerUtil.createKeyGenerator(props); } catch (IOException e) { - LOG.error("Init keyGenerator failed ", e); + throw new HoodieFlinkStreamerException(String.format("KeyGenerator %s initialization failed", + props.getString("hoodie.datasource.write.keygenerator.class", SimpleAvroKeyGenerator.class.getName())), e); } } }