Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -383,12 +384,22 @@ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
String path = hiveTable.getSd().getLocation();
parameters.put(PATH.key(), path);
if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
Path hoodieTablePath = new Path(path);
boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, hiveConf).listStatus(hoodieTablePath))
.map(fileStatus -> fileStatus.getPath().getName())
.filter(f -> !f.equals(".hoodie") && !f.equals("default"))
.anyMatch(FilePathUtils::isHiveStylePartitioning);
parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), String.valueOf(hiveStyle));
// read the table config first
final boolean hiveStyle;
HoodieTableConfig tableConfig = StreamerUtil.getTableConfig(path, hiveConf);
if (tableConfig != null && tableConfig.contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
hiveStyle = Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
} else {
// fallback to the partition path pattern
Path hoodieTablePath = new Path(path);
hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, hiveConf).listStatus(hoodieTablePath))
.map(fileStatus -> fileStatus.getPath().getName())
.filter(f -> !f.equals(".hoodie") && !f.equals("default"))
.anyMatch(FilePathUtils::isHiveStylePartitioning);
}
if (hiveStyle) {
parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
}
}
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand Down Expand Up @@ -56,6 +57,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
Expand Down Expand Up @@ -282,6 +285,23 @@ public static HoodieTableMetaClient createMetaClient(Configuration conf) {
return createMetaClient(conf.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(conf));
}

/**
* Returns the table config or null if the table does not exist.
*/
@Nullable
public static HoodieTableConfig getTableConfig(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
Path metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
try {
if (fs.exists(metaPath)) {
return new HoodieTableConfig(fs, metaPath.toString(), null, null);
}
} catch (IOException e) {
throw new HoodieIOException("Get table config error", e);
}
return null;
}

/**
* Returns the median instant time between the given two instant time.
*/
Expand Down