diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 19de67f7b6994..3904765036f0c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -43,8 +43,6 @@ import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; -import org.apache.hudi.utilities.sources.AvroKafkaSource; -import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; @@ -96,19 +94,21 @@ public class UtilHelpers { private static final Logger LOG = LogManager.getLogger(UtilHelpers.class); public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, - SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException { - + SparkSession sparkSession, SchemaProvider schemaProvider, + HoodieDeltaStreamerMetrics metrics) throws IOException { try { - if (JsonKafkaSource.class.getName().equals(sourceClass) - || AvroKafkaSource.class.getName().equals(sourceClass)) { + try { + return (Source) ReflectionUtils.loadClass(sourceClass, + new Class[]{TypedProperties.class, JavaSparkContext.class, + SparkSession.class, SchemaProvider.class, + HoodieDeltaStreamerMetrics.class}, + cfg, jssc, sparkSession, schemaProvider, metrics); + } catch (HoodieException e) { return (Source) ReflectionUtils.loadClass(sourceClass, - new Class[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieDeltaStreamerMetrics.class}, cfg, - jssc, sparkSession, schemaProvider, metrics); + new Class[]{TypedProperties.class, JavaSparkContext.class, + SparkSession.class, SchemaProvider.class}, + cfg, jssc, sparkSession, schemaProvider); } - - return (Source) ReflectionUtils.loadClass(sourceClass, - new Class[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg, - jssc, sparkSession, schemaProvider); } catch (Throwable e) { throw new IOException("Could not load source class " + sourceClass, e); }