Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.util.FlinkClientUtil;

import java.lang.reflect.Field;
import java.util.ArrayList;
Expand Down Expand Up @@ -708,32 +709,45 @@ private FlinkOptions() {

// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";

/**
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/
public static Map<String, String> getHoodieProperties(Map<String, String> options) {
return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX);
}
private static final String HADOOP_PREFIX = "hadoop.";
private static final String PARQUET_PREFIX = "parquet.";

/**
* Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list.
*/
public static Map<String, String> getHoodiePropertiesWithPrefix(Map<String, String> options, String prefix) {
public static Map<String, String> getPropertiesWithPrefix(Map<String, String> options, String prefix) {
final Map<String, String> hoodieProperties = new HashMap<>();

if (hasPropertyOptions(options)) {
if (hasPropertyOptions(options, prefix)) {
options.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.filter(key -> key.startsWith(prefix))
.forEach(key -> {
final String value = options.get(key);
final String subKey = key.substring((prefix).length());
final String subKey = key.substring(prefix.length());
hoodieProperties.put(subKey, value);
});
}
return hoodieProperties;
}

public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.flink.configuration.Configuration options,
org.apache.hadoop.conf.Configuration hadoopConf) {
org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf);
Map<String, String> parquetOptions = getPropertiesWithPrefix(options.toMap(), PARQUET_PREFIX);
parquetOptions.forEach((k, v) -> copy.set(PARQUET_PREFIX + k, v));
return copy;
}

/**
* Create a new hadoop configuration that is initialized with the given flink configuration.
*/
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) {
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
Map<String, String> options = getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX);
options.forEach((k, v) -> hadoopConf.set(k, v));
return hadoopConf;
}

/**
* Collects all the config options, the 'properties.' prefix would be removed if the option key starts with it.
*/
Expand All @@ -746,11 +760,11 @@ public static Configuration flatOptions(Configuration conf) {
: key;
propsMap.put(subKey, value);
});
return fromMap(propsMap);
return Configuration.fromMap(propsMap);
}

private static boolean hasPropertyOptions(Map<String, String> options) {
return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
private static boolean hasPropertyOptions(Map<String, String> options, String prefix) {
return options.keySet().stream().anyMatch(k -> k.startsWith(prefix));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ public static class Config {

private Schema targetSchema;

@Deprecated
public FilebasedSchemaProvider(TypedProperties props) {
StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), FlinkOptions.getHadoopConf(new Configuration()));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
Expand All @@ -65,7 +66,7 @@ public FilebasedSchemaProvider(TypedProperties props) {

public FilebasedSchemaProvider(Configuration conf) {
final String sourceSchemaPath = conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH);
final FileSystem fs = FSUtils.getFs(sourceSchemaPath, StreamerUtil.getHadoopConf());
final FileSystem fs = FSUtils.getFs(sourceSchemaPath, FlinkOptions.getHadoopConf(conf));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaPath)));
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void initializeState(StateInitializationContext context) throws Exception
}
}

this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = FlinkOptions.getHadoopConf(this.conf);
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) {
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.ckpMetadata = CkpMetadata.getInstance(config.getString(FlinkOptions.PATH));
this.ckpMetadata = CkpMetadata.getInstance(config);
this.initInstant = lastPendingInstant();
sendBootstrapEvent();
initWriterHelper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.hudi.sink.meta;

import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.util.StreamerUtil;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -70,8 +71,8 @@ public class CkpMetadata implements Serializable {
private List<CkpMessage> messages;
private List<String> instantCache;

private CkpMetadata(String basePath) {
this(FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()), basePath);
private CkpMetadata(Configuration config) {
this(FSUtils.getFs(config.getString(FlinkOptions.PATH), FlinkOptions.getHadoopConf(config)), config.getString(FlinkOptions.PATH));
}

private CkpMetadata(FileSystem fs, String basePath) {
Expand Down Expand Up @@ -196,8 +197,8 @@ public boolean isAborted(String instant) {
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
public static CkpMetadata getInstance(String basePath) {
return new CkpMetadata(basePath);
public static CkpMetadata getInstance(Configuration config) {
return new CkpMetadata(config);
}

public static CkpMetadata getInstance(FileSystem fs, String basePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
new SerializableConfiguration(FlinkOptions.getHadoopConf(this.conf)),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = BucketAssigners.create(
getRuntimeContext().getIndexOfThisSubtask(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -60,7 +59,7 @@ public HiveSyncTool hiveSyncTool() {

public static HiveSyncContext create(Configuration conf) {
HiveSyncConfig syncConfig = buildSyncConfig(conf);
org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
org.apache.hadoop.conf.Configuration hadoopConf = FlinkOptions.getHadoopConf(conf);
String path = conf.getString(FlinkOptions.PATH);
FileSystem fs = FSUtils.getFs(path, hadoopConf);
HiveConf hiveConf = new HiveConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class FileIndex {
private FileIndex(Path path, Configuration conf) {
this.path = path;
this.metadataConfig = metadataConfig(conf);
this.tableExists = StreamerUtil.tableExists(path.toString(), StreamerUtil.getHadoopConf());
this.tableExists = StreamerUtil.tableExists(path.toString(), FlinkOptions.getHadoopConf(conf));
}

public static FileIndex instance(Path path, Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = FlinkOptions.getHadoopConf(parameters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
Configuration conf = Configuration.fromMap(context.getCatalogTable().getOptions());
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
Expand All @@ -84,7 +84,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
Configuration conf = Configuration.fromMap(context.getCatalogTable().getOptions());
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
"Option [path] should not be empty.");
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.hudi.table.format.FormatUtils.getParquetConf;
import static org.apache.hudi.configuration.FlinkOptions.getParquetConf;

/**
* Hoodie batch table source that always read the latest snapshot of the underneath table.
Expand Down Expand Up @@ -155,7 +155,7 @@ public HoodieTableSource(
: requiredPos;
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
this.filters = filters == null ? Collections.emptyList() : filters;
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = FlinkOptions.getHadoopConf(conf);
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class HoodieCatalog extends AbstractCatalog {
public HoodieCatalog(String name, Configuration options) {
super(name, options.get(DEFAULT_DATABASE));
this.catalogPathStr = options.get(CATALOG_PATH);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = FlinkOptions.getHadoopConf(options);
this.tableCommonOptions = CatalogOptions.tableCommonOptions(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
Expand All @@ -49,7 +48,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -253,14 +251,4 @@ public static HoodieMergedLogRecordScanner logScanner(
private static Boolean string2Boolean(String s) {
return "true".equals(s.toLowerCase(Locale.ROOT));
}

public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.flink.configuration.Configuration options,
org.apache.hadoop.conf.Configuration hadoopConf) {
final String prefix = "parquet.";
org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf);
Map<String, String> parquetOptions = FlinkOptions.getHoodiePropertiesWithPrefix(options.toMap(), prefix);
parquetOptions.forEach((k, v) -> copy.set(prefix + k, v));
return copy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.StringToRowDataConverter;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -167,7 +166,7 @@ public static Builder builder() {
public void open(MergeOnReadInputSplit split) throws IOException {
this.currentReadCount = 0L;
this.closed = false;
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = FlinkOptions.getHadoopConf(this.conf);
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
if (split.getInstantRange() != null) {
// base file only with commit time filtering
Expand Down Expand Up @@ -306,7 +305,7 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos)
return ParquetSplitReaderUtil.genPartColumnarRowReader(
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
true,
FormatUtils.getParquetConf(this.conf, hadoopConf),
FlinkOptions.getParquetConf(this.conf, hadoopConf),
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new DataType[0]),
partObjects,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;

import static org.apache.hudi.util.StreamerUtil.getHadoopConf;
import static org.apache.hudi.configuration.FlinkOptions.getHadoopConf;
import static org.apache.hudi.util.StreamerUtil.getHoodieClientConfig;

/**
Expand All @@ -44,7 +44,7 @@ private FlinkTables() {
*/
public static HoodieFlinkTable<?> createTable(Configuration conf, RuntimeContext runtimeContext) {
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(getHadoopConf()),
new SerializableConfiguration(getHadoopConf(conf)),
new FlinkTaskContextSupplier(runtimeContext));
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
return HoodieFlinkTable.create(writeConfig, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static TypedProperties getProps(FlinkStreamerConfig cfg) {
return new TypedProperties();
}
return readConfig(
getHadoopConf(),
FlinkOptions.getHadoopConf(cfg),
new Path(cfg.propsFilePath), cfg.configs).getProps();
}

Expand Down Expand Up @@ -140,11 +140,6 @@ public static DFSPropertiesConfiguration readConfig(org.apache.hadoop.conf.Confi
return conf;
}

// Keep the redundant to avoid too many modifications.
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
return FlinkClientUtil.getHadoopConf();
}

/**
* Mainly used for tests.
*/
Expand Down Expand Up @@ -215,7 +210,7 @@ public static HoodieWriteConfig getHoodieClientConfig(
HoodieWriteConfig writeConfig = builder.build();
if (loadFsViewStorageConfig) {
// do not use the builder to give a change for recovering the original fs view storage config
FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), conf);
writeConfig.setViewStorageConfig(viewStorageConfig);
}
return writeConfig;
Expand Down Expand Up @@ -255,7 +250,7 @@ public static void checkRequiredProperties(TypedProperties props, List<String> c
*/
public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException {
final String basePath = conf.getString(FlinkOptions.PATH);
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
final org.apache.hadoop.conf.Configuration hadoopConf = FlinkOptions.getHadoopConf(conf);
if (!tableExists(basePath, hadoopConf)) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
Expand Down Expand Up @@ -348,18 +343,11 @@ public static HoodieTableMetaClient createMetaClient(String basePath, org.apache
return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build();
}

/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(String basePath) {
return createMetaClient(basePath, FlinkClientUtil.getHadoopConf());
}

/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(Configuration conf) {
return createMetaClient(conf.getString(FlinkOptions.PATH));
return createMetaClient(conf.getString(FlinkOptions.PATH), FlinkOptions.getHadoopConf(conf));
}

/**
Expand All @@ -382,7 +370,7 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) {
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(
new SerializableConfiguration(getHadoopConf()),
new SerializableConfiguration(FlinkOptions.getHadoopConf(conf)),
new FlinkTaskContextSupplier(runtimeContext));

HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig);
Expand Down Expand Up @@ -410,7 +398,7 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
.build();
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf);
return writeClient;
}

Expand Down
Loading