diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 289e9b473aee7..5c7c07aec324e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -112,6 +112,7 @@ import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT; import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME; import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER; +import static org.apache.hudi.table.catalog.TableOptionProperties.loadFromHoodiePropertieFile; /** * A catalog implementation for Hoodie based on MetaStore. @@ -382,6 +383,8 @@ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) { parameters.putAll(TableOptionProperties.translateSparkTableProperties2Flink(hiveTable)); String path = hiveTable.getSd().getLocation(); parameters.put(PATH.key(), path); + Map hoodieProps = loadFromHoodiePropertieFile(path, hiveConf); + parameters.putAll(TableOptionProperties.translateSparkTableProperties2Flink(hoodieProps)); if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) { Path hoodieTablePath = new Path(path); boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, hiveConf).listStatus(hoodieTablePath)) @@ -808,7 +811,7 @@ public void dropPartition( try (HoodieFlinkWriteClient writeClient = createWriteClient(tablePath, table)) { boolean hiveStylePartitioning = Boolean.parseBoolean(table.getOptions().get(FlinkOptions.HIVE_STYLE_PARTITIONING.key())); writeClient.deletePartitions( - Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)), + Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)), HoodieActiveTimeline.createNewInstantTime()) .forEach(writeStatus -> { if (writeStatus.hasErrors()) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java index a0864bbf3773b..231a7f7c0114f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java @@ -50,6 +50,7 @@ import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME; +import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; /** * Helper class to read/write flink table options as a map. @@ -64,6 +65,7 @@ public class TableOptionProperties { static final Map KEY_MAPPING = new HashMap<>(); private static final String FILE_NAME = "table_option.properties"; + private static final String HOODUE_PROP_FILE_NAME = "hoodie.properties"; public static final String PK_CONSTRAINT_NAME = "pk.constraint.name"; public static final String PK_COLUMNS = "pk.columns"; @@ -89,6 +91,7 @@ public class TableOptionProperties { KEY_MAPPING.put(FlinkOptions.RECORD_KEY_FIELD.key(), "primaryKey"); KEY_MAPPING.put(FlinkOptions.PRECOMBINE_FIELD.key(), "preCombineField"); KEY_MAPPING.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), "payloadClass"); + KEY_MAPPING.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), FlinkOptions.HIVE_STYLE_PARTITIONING.key()); } /** @@ -113,6 +116,18 @@ public static void createProperties(String basePath, */ public static Map loadFromProperties(String basePath, Configuration hadoopConf) { Path propertiesFilePath = getPropertiesFilePath(basePath); + return getPropsFromFile(basePath, hadoopConf, propertiesFilePath); + } + + /** + * Read table options map from the given table base path. + */ + public static Map loadFromHoodiePropertieFile(String basePath, Configuration hadoopConf) { + Path propertiesFilePath = getHoodiePropertiesFilePath(basePath); + return getPropsFromFile(basePath, hadoopConf, propertiesFilePath); + } + + private static Map getPropsFromFile(String basePath, Configuration hadoopConf, Path propertiesFilePath) { Map options = new HashMap<>(); Properties props = new Properties(); @@ -134,6 +149,11 @@ private static Path getPropertiesFilePath(String basePath) { return new Path(auxPath, FILE_NAME); } + private static Path getHoodiePropertiesFilePath(String basePath) { + String auxPath = basePath + Path.SEPARATOR + METAFOLDER_NAME; + return new Path(auxPath, HOODUE_PROP_FILE_NAME); + } + public static String getPkConstraintName(Map options) { return options.get(PK_CONSTRAINT_NAME); }