Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
Expand Down Expand Up @@ -96,19 +94,21 @@ public class UtilHelpers {
private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);

public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException {

SparkSession sparkSession, SchemaProvider schemaProvider,
HoodieDeltaStreamerMetrics metrics) throws IOException {
try {
if (JsonKafkaSource.class.getName().equals(sourceClass)
|| AvroKafkaSource.class.getName().equals(sourceClass)) {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if handling this via exception is making this more readable. wdyt?
For e.g the current impl makes it clear, in what case a certain constructor is called,.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about readability, I would say it looks fine to me. The goal is to add an ability to override the parent class that expects "metrics" as a parameter, otherwise I simply can't do that and I can't include that class into the hudi source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay my bad. I understand what you tried to do now. So you have your own source class, that implements has a constructor with the metrics argument.
I was thinking along the lines of having all sources take in the metrics field. We can deal with that separately

return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[]{TypedProperties.class, JavaSparkContext.class,
SparkSession.class, SchemaProvider.class,
HoodieDeltaStreamerMetrics.class},
cfg, jssc, sparkSession, schemaProvider, metrics);
} catch (HoodieException e) {
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieDeltaStreamerMetrics.class}, cfg,
jssc, sparkSession, schemaProvider, metrics);
new Class<?>[]{TypedProperties.class, JavaSparkContext.class,
SparkSession.class, SchemaProvider.class},
cfg, jssc, sparkSession, schemaProvider);
}

return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg,
jssc, sparkSession, schemaProvider);
} catch (Throwable e) {
throw new IOException("Could not load source class " + sourceClass, e);
}
Expand Down