diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 27f19f4a66192..43d842241abf5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.index.HoodieIndex; @@ -41,8 +40,6 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; -import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -921,18 +918,6 @@ public static Set> optionalOptions() { * Returns all the config options. */ public static List> allOptions() { - Field[] declaredFields = FlinkOptions.class.getDeclaredFields(); - List> options = new ArrayList<>(); - for (Field field : declaredFields) { - if (java.lang.reflect.Modifier.isStatic(field.getModifiers()) - && field.getType().equals(ConfigOption.class)) { - try { - options.add((ConfigOption) field.get(ConfigOption.class)); - } catch (IllegalAccessException e) { - throw new HoodieException("Error while fetching static config option", e); - } - } - } - return options; + return OptionsResolver.allOptions(FlinkOptions.class); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 93044f6ca34b3..e0102ef7abaaa 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -22,11 +22,16 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.Map; @@ -198,4 +203,27 @@ public static HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode(Con String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE); return HoodieCDCSupplementalLoggingMode.parse(mode); } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + /** + * Returns all the config options with the given class {@code clazz}. + */ + public static List> allOptions(Class clazz) { + Field[] declaredFields = clazz.getDeclaredFields(); + List> options = new ArrayList<>(); + for (Field field : declaredFields) { + if (java.lang.reflect.Modifier.isStatic(field.getModifiers()) + && field.getType().equals(ConfigOption.class)) { + try { + options.add((ConfigOption) field.get(ConfigOption.class)); + } catch (IllegalAccessException e) { + throw new HoodieException("Error while fetching static config option", e); + } + } + } + return options; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java index 58eb3171dad01..7bdfc2b866463 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java @@ -18,15 +18,13 @@ package org.apache.hudi.table.catalog; -import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.CommonCatalogOptions; -import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -69,19 +67,7 @@ public class CatalogOptions { * Returns all the config options. */ public static List> allOptions() { - Field[] declaredFields = CatalogOptions.class.getDeclaredFields(); - List> options = new ArrayList<>(); - for (Field field : declaredFields) { - if (java.lang.reflect.Modifier.isStatic(field.getModifiers()) - && field.getType().equals(ConfigOption.class)) { - try { - options.add((ConfigOption) field.get(ConfigOption.class)); - } catch (IllegalAccessException e) { - throw new HoodieException("Error while fetching static config option", e); - } - } - } - return options; + return OptionsResolver.allOptions(CatalogOptions.class); } /**