Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
import static org.apache.hudi.table.catalog.TableOptionProperties.loadFromHoodiePropertieFile;

/**
* A catalog implementation for Hoodie based on MetaStore.
Expand Down Expand Up @@ -382,6 +383,8 @@ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
parameters.putAll(TableOptionProperties.translateSparkTableProperties2Flink(hiveTable));
String path = hiveTable.getSd().getLocation();
parameters.put(PATH.key(), path);
Map<String, String> hoodieProps = loadFromHoodiePropertieFile(path, hiveConf);
parameters.putAll(TableOptionProperties.translateSparkTableProperties2Flink(hoodieProps));
if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to put the explicit table options here ? Which function needs this config and where we miss it then ?

Copy link
Contributor

@danny0405 danny0405 Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can read the table config through StreamerUtil.createMetaClient(xx).getTableConfig()

We actually get the original hive table params first here: parameters = hiveTable.getParameters(), you mean there are options missing for spark ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after Spark created the table, there are some table properties in .hoodie/hoodie.properties, such as hoodie.datasource.write.hive_style_partitioning. These attributes are not included in the hive table attribute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we only append hive options here, curious why this can cause error ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because spark uses the hoodie.datasource.write.hive_style_partitioning property to be true when creating hudi non-partitioned tables, and records hoodie.datasource.write.hive_style_partitioning=true in hoodie.properties. But this property is not seen in hoodiehivecatalog, and inferred It is also inferred that it is wrong, so hoodie.datasource.write.hive_style_partitioning is assigned a value of false and uploaded to hive options. As a result, spark has two different hoodie.datasource.write.hive_style_partitioning attribute values when viewing the table, thus reporting an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
Path hoodieTablePath = new Path(path);
boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, hiveConf).listStatus(hoodieTablePath))
.map(fileStatus -> fileStatus.getPath().getName())
.filter(f -> !f.equals(".hoodie") && !f.equals("default"))
.anyMatch(FilePathUtils::isHiveStylePartitioning);
parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), String.valueOf(hiveStyle));
}

--- The params in this code are not inferred according to the hoodie.properties property file, resulting in an inference error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you, i think we should not put extra options like FlinkOptions.HIVE_STYLE_PARTITIONING here, the right fix is to supplement the table config options for the flink read/write path, I have applied a patch actually.

Path hoodieTablePath = new Path(path);
boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, hiveConf).listStatus(hoodieTablePath))
Expand Down Expand Up @@ -808,7 +811,7 @@ public void dropPartition(
try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(tablePath, table)) {
boolean hiveStylePartitioning = Boolean.parseBoolean(table.getOptions().get(FlinkOptions.HIVE_STYLE_PARTITIONING.key()));
writeClient.deletePartitions(
Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)),
Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)),
HoodieActiveTimeline.createNewInstantTime())
.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;

/**
* Helper class to read/write flink table options as a map.
Expand All @@ -64,6 +65,7 @@ public class TableOptionProperties {
static final Map<String, String> KEY_MAPPING = new HashMap<>();

private static final String FILE_NAME = "table_option.properties";
private static final String HOODUE_PROP_FILE_NAME = "hoodie.properties";

public static final String PK_CONSTRAINT_NAME = "pk.constraint.name";
public static final String PK_COLUMNS = "pk.columns";
Expand All @@ -89,6 +91,7 @@ public class TableOptionProperties {
KEY_MAPPING.put(FlinkOptions.RECORD_KEY_FIELD.key(), "primaryKey");
KEY_MAPPING.put(FlinkOptions.PRECOMBINE_FIELD.key(), "preCombineField");
KEY_MAPPING.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), "payloadClass");
KEY_MAPPING.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), FlinkOptions.HIVE_STYLE_PARTITIONING.key());
}

/**
Expand All @@ -113,6 +116,18 @@ public static void createProperties(String basePath,
*/
public static Map<String, String> loadFromProperties(String basePath, Configuration hadoopConf) {
Path propertiesFilePath = getPropertiesFilePath(basePath);
return getPropsFromFile(basePath, hadoopConf, propertiesFilePath);
}

/**
* Read table options map from the given table base path.
*/
public static Map<String, String> loadFromHoodiePropertieFile(String basePath, Configuration hadoopConf) {
Path propertiesFilePath = getHoodiePropertiesFilePath(basePath);
return getPropsFromFile(basePath, hadoopConf, propertiesFilePath);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, gentle ping :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, just saw it. Is it better to replace loadFromHoodiePropertieFile with StreamerUtil.getTableConfig here?


private static Map<String, String> getPropsFromFile(String basePath, Configuration hadoopConf, Path propertiesFilePath) {
Map<String, String> options = new HashMap<>();
Properties props = new Properties();

Expand All @@ -134,6 +149,11 @@ private static Path getPropertiesFilePath(String basePath) {
return new Path(auxPath, FILE_NAME);
}

private static Path getHoodiePropertiesFilePath(String basePath) {
String auxPath = basePath + Path.SEPARATOR + METAFOLDER_NAME;
return new Path(auxPath, HOODUE_PROP_FILE_NAME);
}

public static String getPkConstraintName(Map<String, String> options) {
return options.get(PK_CONSTRAINT_NAME);
}
Expand Down