From 30f8e8f0d853e9fb5c0fa032fcbddd65809e6918 Mon Sep 17 00:00:00 2001 From: Ho Tien Vu Date: Sun, 18 Oct 2020 15:01:06 +0800 Subject: [PATCH] Make sure factory method is used to instanciate DFSPathSelector * Move createSourceSelector into DFSPathSelector factory method * Replace constructor call with factory method * Added some javadoc --- .../apache/hudi/utilities/UtilHelpers.java | 18 ------------ .../hudi/utilities/sources/AvroDFSSource.java | 3 +- .../hudi/utilities/sources/CsvDFSSource.java | 2 +- .../hudi/utilities/sources/JsonDFSSource.java | 2 +- .../utilities/sources/ParquetDFSSource.java | 2 +- .../sources/helpers/DFSPathSelector.java | 29 +++++++++++++++++++ 6 files changed, 33 insertions(+), 23 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 29fc195722469..e76790918754a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -18,7 +18,6 @@ package org.apache.hudi.utilities; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -46,7 +45,6 @@ import org.apache.hudi.utilities.sources.AvroKafkaSource; import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.Source; -import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; @@ -377,22 +375,6 @@ public static Schema getJDBCSchema(Map options) throws Exception } } - public static DFSPathSelector createSourceSelector(TypedProperties props, - Configuration conf) throws IOException { - String sourceSelectorClass = - props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName()); - try { - DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass, - new Class[]{TypedProperties.class, Configuration.class}, - props, conf); - - LOG.info("Using path selector " + selector.getClass().getName()); - return selector; - } catch (Throwable e) { - throw new IOException("Could not load source selector class " + sourceSelectorClass, e); - } - } - public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProvider) { SchemaProvider originalProvider = schemaProvider; if (schemaProvider instanceof SchemaProviderWithPostProcessor) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java index b8f29e8b13be1..1152cd65cabcd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; @@ -46,7 +45,7 @@ public class AvroDFSSource extends AvroSource { public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException { super(props, sparkContext, sparkSession, schemaProvider); - this.pathSelector = UtilHelpers + this.pathSelector = DFSPathSelector .createSourceSelector(props, sparkContext.hadoopConfiguration()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java index 3d158baff658b..dc40b47dd16cf 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java @@ -79,7 +79,7 @@ public CsvDFSSource(TypedProperties props, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); - this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration()); + this.pathSelector = DFSPathSelector.createSourceSelector(props, sparkContext.hadoopConfiguration()); if (schemaProvider != null) { sourceSchema = (StructType) SchemaConverters.toSqlType(schemaProvider.getSourceSchema()) .dataType(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java index e66bfdf080229..d34289daa0942 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java @@ -38,7 +38,7 @@ public class JsonDFSSource extends JsonSource { public JsonDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); - this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration()); + this.pathSelector = DFSPathSelector.createSourceSelector(props, sparkContext.hadoopConfiguration()); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java index dea410ab8bda3..55d2de2d4c360 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java @@ -39,7 +39,7 @@ public class ParquetDFSSource extends RowSource { public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); - this.pathSelector = new DFSPathSelector(props, this.sparkContext.hadoopConfiguration()); + this.pathSelector = DFSPathSelector.createSourceSelector(props, this.sparkContext.hadoopConfiguration()); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java index c8690f553e2f3..6b58003e843c0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java @@ -22,8 +22,10 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.conf.Configuration; @@ -66,6 +68,33 @@ public DFSPathSelector(TypedProperties props, Configuration hadoopConf) { this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf); } + /** + * Factory method for creating custom DFSPathSelector. Default selector + * to use is {@link DFSPathSelector} + */ + public static DFSPathSelector createSourceSelector(TypedProperties props, + Configuration conf) { + String sourceSelectorClass = props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, + DFSPathSelector.class.getName()); + try { + DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass, + new Class[]{TypedProperties.class, Configuration.class}, + props, conf); + + log.info("Using path selector " + selector.getClass().getName()); + return selector; + } catch (Exception e) { + throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e); + } + } + + /** + * Get the list of files changed since last checkpoint. + * + * @param lastCheckpointStr the last checkpoint time string, empty if first run + * @param sourceLimit max bytes to read each time + * @return the list of files concatenated and their latest modified time + */ public Pair, String> getNextFilePathsAndMaxModificationTime(Option lastCheckpointStr, long sourceLimit) {