Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,6 @@ private FlinkOptions() {
+ "2) payload_combine: read the base file records first, for each record in base file, checks whether the key is in the\n"
+ " log file records(combines the two records with same key for base and log file records), then read the left log file records");

public static final ConfigOption<Boolean> HIVE_STYLE_PARTITION = ConfigOptions
.key("hoodie.datasource.hive_style_partition")
.booleanType()
.defaultValue(false)
.withDescription("Whether the partition path is with Hive style, e.g. '{partition key}={partition value}', default false");

public static final ConfigOption<Boolean> UTC_TIMEZONE = ConfigOptions
.key("read.utc-timezone")
.booleanType()
Expand Down Expand Up @@ -260,12 +254,20 @@ private FlinkOptions() {
.withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n"
+ "Actual value obtained by invoking .toString(), default ''");

public static final ConfigOption<Boolean> PARTITION_PATH_URL_ENCODE = ConfigOptions
.key("write.partition.url_encode")
public static final ConfigOption<Boolean> URL_ENCODE_PARTITIONING = ConfigOptions
.key(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY)
.booleanType()
.defaultValue(false)
.withDescription("Whether to encode the partition path url, default false");

public static final ConfigOption<Boolean> HIVE_STYLE_PARTITIONING = ConfigOptions
.key(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY)
.booleanType()
.defaultValue(false)
.withDescription("Whether to use Hive style partitioning.\n"
+ "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+ "By default false (the names of partition folders are only partition values)");

public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP)
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ private boolean hasData() {
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}

private String instantToWrite() {
private String instantToWrite(boolean hasData) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
Expand All @@ -565,7 +565,7 @@ private String instantToWrite() {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
while (instant == null || (instant.equals(this.currentInstant) && hasData())) {
while (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while
try {
if (waitingTime > ckpTimeout) {
Expand All @@ -588,7 +588,7 @@ private String instantToWrite() {

@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite();
String instant = instantToWrite(true);

if (instant == null) {
// in case there are empty checkpoints that has no input data
Expand Down Expand Up @@ -619,7 +619,7 @@ private boolean flushBucket(DataBucket bucket) {

@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean isEndInput) {
this.currentInstant = instantToWrite();
this.currentInstant = instantToWrite(hasData());
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private static HiveSyncConfig buildSyncConfig(Configuration conf) {
hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);
hiveSyncConfig.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB);
hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE);
hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
return hiveSyncConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
public Boolean writePartitionUrlEncode;

@Parameter(names = {"--hive-style-partitioning"}, description = "Whether to use Hive style partitioning.\n"
+ "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+ "By default false (the names of partition folders are only partition values)")
public Boolean hiveStylePartitioning = false;

@Parameter(names = {"--write-task-max-size"}, description = "Maximum memory in MB for a write task, when the threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB")
public Double writeTaskMaxSize = 1024D;
Expand Down Expand Up @@ -314,7 +319,8 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.avroSchemaPath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, config.avroSchema);
conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone);
conf.setBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE, config.writePartitionUrlEncode);
conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode);
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning);
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, config.writeTaskMaxSize);
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, config.writeBatchSize);
conf.setInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE, config.writeLogBlockSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public Result applyFilters(List<ResolvedExpression> filters) {
@Override
public Optional<List<Map<String, String>>> listPartitions() {
List<Map<String, String>> partitions = FilePathUtils.getPartitions(path, hadoopConf,
partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
return Optional.of(partitions);
}

Expand Down Expand Up @@ -446,7 +446,7 @@ public Path[] getReadPaths() {
return partitionKeys.isEmpty()
? new Path[] {path}
: FilePathUtils.partitionPath2ReadPath(path, partitionKeys, getOrFetchPartitions(),
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
}

private static class LatestFileFilter extends FilePathFilter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public static Path[] getReadPaths(
return new Path[] {path};
} else {
final String defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
final boolean hivePartition = conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION);
final boolean hivePartition = conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING);
List<Map<String, String>> partitionPaths =
getPartitions(path, hadoopConf, partitionKeys, defaultParName, hivePartition);
return partitionPath2ReadPath(path, partitionKeys, partitionPaths, hivePartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos)
// generate partition specs.
LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues(
new org.apache.hadoop.fs.Path(path).getParent(),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
FilePathUtils.extractPartitionKeys(this.conf));
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.File;
import java.util.Collection;
Expand All @@ -56,6 +58,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.utils.TestData.assertRowsEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -254,11 +257,14 @@ void testStreamReadWithDeletes() throws Exception {
}

@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndRead(ExecMode execMode) {
@MethodSource("configParams")
void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
if (hiveStylePartitioning) {
options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
}
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
Expand Down Expand Up @@ -576,6 +582,19 @@ private enum ExecMode {
BATCH, STREAM
}

/**
* Return test params => (execution mode, hive style partitioning).
*/
private static Stream<Arguments> configParams() {
Object[][] data =
new Object[][] {
{ExecMode.BATCH, false},
{ExecMode.BATCH, true},
{ExecMode.STREAM, false},
{ExecMode.STREAM, true}};
return Stream.of(data).map(Arguments::of);
}

private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish
Expand Down