Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.util.StreamerUtil;

Expand Down Expand Up @@ -130,6 +131,150 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.")
public Integer writeTaskNum = 4;

@Parameter(names = {"--partition-default-name"},
description = "The default partition name in case the dynamic partition column value is null/empty string")
public String partitionDefaultName = "__DEFAULT_PARTITION__";

@Parameter(names = {"--index-bootstrap-enabled"},
description = "Whether to bootstrap the index state from existing hoodie table, default false")
public Boolean indexBootstrapEnabled = false;

@Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default 1.5 day")
public Double indexStateTtl = 1.5D;

@Parameter(names = {"--index-global-enabled"}, description = "Whether to update index for the old partition path "
+ "if same key record with different partition path came in, default false")
public Boolean indexGlobalEnabled = false;

@Parameter(names = {"--index-partition-regex"},
description = "Whether to load partitions in state if partition path matching, default *")
public String indexPartitionRegex = ".*";

@Parameter(names = {"--avro-schema-path"}, description = "Avro schema file path, the parsed schema is used for deserialization")
public String avroSchemaPath = "";

@Parameter(names = {"--avro-schema"}, description = "Avro schema string, the parsed schema is used for deserialization")
public String avroSchema = "";

@Parameter(names = {"--utc-timezone"}, description = "Use UTC timezone or local timezone to the conversion between epoch"
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
+ " use UTC timezone, by default true")
public Boolean utcTimezone = true;

@Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
public Boolean writePartitionUrlEncode;

@Parameter(names = {"--write-task-max-size"}, description = "Maximum memory in MB for a write task, when the threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB")
public Double writeTaskMaxSize = 1024D;

@Parameter(names = {"--write-batch-size"},
description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB")
public Double writeBatchSize = 64D;

@Parameter(names = {"--write-log-block-size"}, description = "Max log block size in MB for log file, default 128MB")
public Integer writeLogBlockSize = 128;

@Parameter(names = {"--write-log-max-size"},
description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB")
public Integer writeLogMaxSize = 1024;

@Parameter(names = {"--write-merge-max-memory"}, description = "Max memory in MB for merge, default 100MB")
public Integer writeMergeMaxMemory = 100;

@Parameter(names = {"--compaction-async-enabled"}, description = "Async Compaction, enabled by default for MOR")
public Boolean compactionAsyncEnabled = true;

@Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is 10")
public Integer compactionTasks = 10;

@Parameter(names = {"--compaction-trigger-strategy"},
description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
+ "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
+ "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
+ "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
+ "Default is 'num_commits'")
public String compactionTriggerStrategy = FlinkOptions.NUM_COMMITS;

@Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits")
public Integer compactionDeltaCommits = 5;

@Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour")
public Integer compactionDeltaSeconds = 3600;

@Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB")
public Integer compactionMaxMemory = 100;

@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 5 GB")
public Long compactionTargetIo = 5120L;

@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
public Boolean cleanAsyncEnabled = true;

@Parameter(names = {"--clean-retain-commits"},
description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10")
public Integer cleanRetainCommits = 10;

@Parameter(names = {"--archive-max-commits"},
description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
public Integer archiveMaxCommits = 30;

@Parameter(names = {"--archive-min-commits"},
description = "Min number of commits to keep before archiving older commits into a sequential log, default 20")
public Integer archiveMinCommits = 20;

@Parameter(names = {"--hive-sync-enable"}, description = "Asynchronously sync Hive meta to HMS, default false")
public Boolean hiveSyncEnabled = false;

@Parameter(names = {"--hive-sync-db"}, description = "Database name for hive sync, default 'default'")
public String hiveSyncDb = "default";

@Parameter(names = {"--hive-sync-table"}, description = "Table name for hive sync, default 'unknown'")
public String hiveSyncTable = "unknown";

@Parameter(names = {"--hive-sync-file-format"}, description = "File format for hive sync, default 'PARQUET'")
public String hiveSyncFileFormat = "PARQUET";

@Parameter(names = {"--hive-sync-username"}, description = "Username for hive sync, default 'hive'")
public String hiveSyncUsername = "hive";

@Parameter(names = {"--hive-sync-password"}, description = "Password for hive sync, default 'hive'")
public String hiveSyncPassword = "hive";

@Parameter(names = {"--hive-sync-jdbc-url"}, description = "Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'")
public String hiveSyncJdbcUrl = "jdbc:hive2://localhost:10000";

@Parameter(names = {"--hive-sync-metastore-uris"}, description = "Metastore uris for hive sync, default ''")
public String hiveSyncMetastoreUri = "";

@Parameter(names = {"--hive-sync-partition-fields"}, description = "Partition fields for hive sync, default ''")
public String hiveSyncPartitionFields = "";

@Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'")
public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();

@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
public Boolean hiveSyncAssumeDatePartition = false;

@Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization is enabled, default true")
public Boolean hiveSyncUseJdbc = true;

@Parameter(names = {"--hive-sync-auto-create-db"}, description = "Auto create hive database if it does not exists, default true")
public Boolean hiveSyncAutoCreateDb = true;

@Parameter(names = {"--hive-sync-ignore-exceptions"}, description = "Ignore exceptions during hive synchronization, default false")
public Boolean hiveSyncIgnoreExceptions = false;

@Parameter(names = {"--hive-sync-skip-ro-suffix"}, description = "Skip the _ro suffix for Read optimized table when registering, default false")
public Boolean hiveSyncSkipRoSuffix = false;

@Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
+ "Disabled by default for backward compatibility.")
public Boolean hiveSyncSupportTimestamp = false;


/**
* Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
Expand Down Expand Up @@ -162,7 +307,47 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
}
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);

conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, config.partitionDefaultName);
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, config.indexBootstrapEnabled);
conf.setDouble(FlinkOptions.INDEX_STATE_TTL, config.indexStateTtl);
conf.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, config.indexGlobalEnabled);
conf.setString(FlinkOptions.INDEX_PARTITION_REGEX, config.indexPartitionRegex);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.avroSchemaPath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, config.avroSchema);
conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone);
conf.setBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE, config.writePartitionUrlEncode);
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, config.writeTaskMaxSize);
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, config.writeBatchSize);
conf.setInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE, config.writeLogBlockSize);
conf.setInteger(FlinkOptions.WRITE_LOG_MAX_SIZE, config.writeLogMaxSize);
conf.setInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY, config.writeMergeMaxMemory);
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, config.compactionAsyncEnabled);
conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks);
conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, config.compactionTriggerStrategy);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, config.compactionDeltaCommits);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits);
conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, config.hiveSyncEnabled);
conf.setString(FlinkOptions.HIVE_SYNC_DB, config.hiveSyncDb);
conf.setString(FlinkOptions.HIVE_SYNC_TABLE, config.hiveSyncTable);
conf.setString(FlinkOptions.HIVE_SYNC_FILE_FORMAT, config.hiveSyncFileFormat);
conf.setString(FlinkOptions.HIVE_SYNC_USERNAME, config.hiveSyncUsername);
conf.setString(FlinkOptions.HIVE_SYNC_PASSWORD, config.hiveSyncPassword);
conf.setString(FlinkOptions.HIVE_SYNC_JDBC_URL, config.hiveSyncJdbcUrl);
conf.setString(FlinkOptions.HIVE_SYNC_METASTORE_URIS, config.hiveSyncMetastoreUri);
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, config.hiveSyncPartitionFields);
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS, config.hiveSyncPartitionExtractorClass);
conf.setBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION, config.hiveSyncAssumeDatePartition);
conf.setBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC, config.hiveSyncUseJdbc);
conf.setBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB, config.hiveSyncAutoCreateDb);
conf.setBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS, config.hiveSyncIgnoreExceptions);
conf.setBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX, config.hiveSyncSkipRoSuffix);
conf.setBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP, config.hiveSyncSupportTimestamp);
return conf;
}
}