diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index c944f6a299144..729f0147b5940 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -709,25 +709,17 @@ private FlinkOptions() { // Prefix for Hoodie specific properties. private static final String PROPERTIES_PREFIX = "properties."; - /** - * Collects the config options that start with 'properties.' into a 'key'='value' list. - */ - public static Map getHoodieProperties(Map options) { - return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX); - } - /** * Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list. */ - public static Map getHoodiePropertiesWithPrefix(Map options, String prefix) { + public static Map getPropertiesWithPrefix(Map options, String prefix) { final Map hoodieProperties = new HashMap<>(); - - if (hasPropertyOptions(options)) { + if (hasPropertyOptions(options, prefix)) { options.keySet().stream() - .filter(key -> key.startsWith(PROPERTIES_PREFIX)) + .filter(key -> key.startsWith(prefix)) .forEach(key -> { final String value = options.get(key); - final String subKey = key.substring((prefix).length()); + final String subKey = key.substring(prefix.length()); hoodieProperties.put(subKey, value); }); } @@ -749,8 +741,8 @@ public static Configuration flatOptions(Configuration conf) { return fromMap(propsMap); } - private static boolean hasPropertyOptions(Map options) { - return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); + private static boolean hasPropertyOptions(Map options, String prefix) { + return options.keySet().stream().anyMatch(k -> k.startsWith(prefix)); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java new file mode 100644 index 0000000000000..7784e7caaae2a --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.configuration; + +import org.apache.flink.configuration.Configuration; +import org.apache.hudi.util.FlinkClientUtil; + +import java.util.Map; + +public class HadoopConfigurations { + private static final String HADOOP_PREFIX = "hadoop."; + private static final String PARQUET_PREFIX = "parquet."; + + public static org.apache.hadoop.conf.Configuration getParquetConf( + org.apache.flink.configuration.Configuration options, + org.apache.hadoop.conf.Configuration hadoopConf) { + org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf); + Map parquetOptions = FlinkOptions.getPropertiesWithPrefix(options.toMap(), PARQUET_PREFIX); + parquetOptions.forEach((k, v) -> copy.set(PARQUET_PREFIX + k, v)); + return copy; + } + + /** + * Create a new hadoop configuration that is initialized with the given flink configuration. + */ + public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) { + org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf(); + Map options = FlinkOptions.getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX); + options.forEach((k, v) -> hadoopConf.set(k, v)); + return hadoopConf; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java index 1443a68cf0fc2..a349314b7a111 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.util.StreamerUtil; @@ -49,9 +50,10 @@ public static class Config { private Schema targetSchema; + @Deprecated public FilebasedSchemaProvider(TypedProperties props) { StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP)); - FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf()); + FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), HadoopConfigurations.getHadoopConf(new Configuration())); try { this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP)))); if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) { @@ -65,7 +67,7 @@ public FilebasedSchemaProvider(TypedProperties props) { public FilebasedSchemaProvider(Configuration conf) { final String sourceSchemaPath = conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH); - final FileSystem fs = FSUtils.getFs(sourceSchemaPath, StreamerUtil.getHadoopConf()); + final FileSystem fs = FSUtils.getFs(sourceSchemaPath, HadoopConfigurations.getHadoopConf(conf)); try { this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaPath))); } catch (IOException ioe) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 1fc8d393be6a9..7c3f5a9329bb1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; import org.apache.hudi.sink.meta.CkpMetadata; @@ -122,7 +123,7 @@ public void initializeState(StateInitializationContext context) throws Exception } } - this.hadoopConf = StreamerUtil.getHadoopConf(); + this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf); this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext()); this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 6c8dcef0f3925..06d9fcd851c22 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -113,7 +113,7 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) { public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); - this.ckpMetadata = CkpMetadata.getInstance(config.getString(FlinkOptions.PATH)); + this.ckpMetadata = CkpMetadata.getInstance(config); this.initInstant = lastPendingInstant(); sendBootstrapEvent(); initWriterHelper(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index ff1277d7b7e74..45a4e04bab285 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -18,11 +18,13 @@ package org.apache.hudi.sink.meta; +import org.apache.flink.configuration.Configuration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.util.StreamerUtil; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -70,8 +72,8 @@ public class CkpMetadata implements Serializable { private List messages; private List instantCache; - private CkpMetadata(String basePath) { - this(FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()), basePath); + private CkpMetadata(Configuration config) { + this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)), config.getString(FlinkOptions.PATH)); } private CkpMetadata(FileSystem fs, String basePath) { @@ -196,8 +198,8 @@ public boolean isAborted(String instant) { // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- - public static CkpMetadata getInstance(String basePath) { - return new CkpMetadata(basePath); + public static CkpMetadata getInstance(Configuration config) { + return new CkpMetadata(config); } public static CkpMetadata getInstance(FileSystem fs, String basePath) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index c4b83bf51aace..89f89cf5c0a9f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.table.action.commit.BucketInfo; @@ -116,7 +117,7 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new SerializableConfiguration(HadoopConfigurations.getHadoopConf(this.conf)), new FlinkTaskContextSupplier(getRuntimeContext())); this.bucketAssigner = BucketAssigners.create( getRuntimeContext().getIndexOfThisSubtask(), diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index 536a0282fbcc4..bd837efc8737d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -22,11 +22,11 @@ import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.table.format.FilePathUtils; -import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -60,7 +60,7 @@ public HiveSyncTool hiveSyncTool() { public static HiveSyncContext create(Configuration conf) { HiveSyncConfig syncConfig = buildSyncConfig(conf); - org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); + org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf); String path = conf.getString(FlinkOptions.PATH); FileSystem fs = FSUtils.getFs(path, hadoopConf); HiveConf hiveConf = new HiveConf(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index 07383ef7fea5f..d7125b414352d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -54,7 +55,7 @@ public class FileIndex { private FileIndex(Path path, Configuration conf) { this.path = path; this.metadataConfig = metadataConfig(conf); - this.tableExists = StreamerUtil.tableExists(path.toString(), StreamerUtil.getHadoopConf()); + this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf)); } public static FileIndex instance(Path path, Configuration conf) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 8138e931e54e7..8bfde209360ac 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.StreamerUtil; @@ -157,7 +158,7 @@ public void initializeState(FunctionInitializationContext context) throws Except @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - this.hadoopConf = StreamerUtil.getHadoopConf(); + this.hadoopConf = HadoopConfigurations.getHadoopConf(parameters); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index d00eb3e3ec700..6c45191076ae1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieROTablePathFilter; @@ -92,7 +93,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.hudi.table.format.FormatUtils.getParquetConf; +import static org.apache.hudi.configuration.HadoopConfigurations.getParquetConf; /** * Hoodie batch table source that always read the latest snapshot of the underneath table. @@ -155,7 +156,7 @@ public HoodieTableSource( : requiredPos; this.limit = limit == null ? NO_LIMIT_CONSTANT : limit; this.filters = filters == null ? Collections.emptyList() : filters; - this.hadoopConf = StreamerUtil.getHadoopConf(); + this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf); this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java index 3317967006101..956d61cc3c2a4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; @@ -93,7 +94,7 @@ public class HoodieCatalog extends AbstractCatalog { public HoodieCatalog(String name, Configuration options) { super(name, options.get(DEFAULT_DATABASE)); this.catalogPathStr = options.get(CATALOG_PATH); - this.hadoopConf = StreamerUtil.getHadoopConf(); + this.hadoopConf = HadoopConfigurations.getHadoopConf(options); this.tableCommonOptions = CatalogOptions.tableCommonOptions(options); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index fce9b75f764ea..478f94cb71f73 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -30,7 +30,6 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.StreamerUtil; @@ -49,7 +48,6 @@ import java.util.Iterator; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.function.Function; /** @@ -253,14 +251,4 @@ public static HoodieMergedLogRecordScanner logScanner( private static Boolean string2Boolean(String s) { return "true".equals(s.toLowerCase(Locale.ROOT)); } - - public static org.apache.hadoop.conf.Configuration getParquetConf( - org.apache.flink.configuration.Configuration options, - org.apache.hadoop.conf.Configuration hadoopConf) { - final String prefix = "parquet."; - org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf); - Map parquetOptions = FlinkOptions.getHoodiePropertiesWithPrefix(options.toMap(), prefix); - parquetOptions.forEach((k, v) -> copy.set(prefix + k, v)); - return copy; - } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 202b14404aa35..4f2de3648ed56 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.KeyGenUtils; @@ -36,7 +37,6 @@ import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.RowDataProjection; import org.apache.hudi.util.RowDataToAvroConverters; -import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StringToRowDataConverter; import org.apache.avro.Schema; @@ -167,7 +167,7 @@ public static Builder builder() { public void open(MergeOnReadInputSplit split) throws IOException { this.currentReadCount = 0L; this.closed = false; - this.hadoopConf = StreamerUtil.getHadoopConf(); + this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf); if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { if (split.getInstantRange() != null) { // base file only with commit time filtering @@ -306,7 +306,7 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) return ParquetSplitReaderUtil.genPartColumnarRowReader( this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), true, - FormatUtils.getParquetConf(this.conf, hadoopConf), + HadoopConfigurations.getParquetConf(this.conf, hadoopConf), fieldNames.toArray(new String[0]), fieldTypes.toArray(new DataType[0]), partObjects, diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java index 6918a06b186b8..d440588b642e5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java @@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; -import static org.apache.hudi.util.StreamerUtil.getHadoopConf; +import static org.apache.hudi.configuration.HadoopConfigurations.getHadoopConf; import static org.apache.hudi.util.StreamerUtil.getHoodieClientConfig; /** @@ -44,7 +44,7 @@ private FlinkTables() { */ public static HoodieFlinkTable createTable(Configuration conf, RuntimeContext runtimeContext) { HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( - new SerializableConfiguration(getHadoopConf()), + new SerializableConfiguration(getHadoopConf(conf)), new FlinkTaskContextSupplier(runtimeContext)); HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true); return HoodieFlinkTable.create(writeConfig, context); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index dfbe0efd67c70..b977dfd7c5343 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -43,6 +43,7 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -101,7 +102,7 @@ public static TypedProperties getProps(FlinkStreamerConfig cfg) { return new TypedProperties(); } return readConfig( - getHadoopConf(), + HadoopConfigurations.getHadoopConf(cfg), new Path(cfg.propsFilePath), cfg.configs).getProps(); } @@ -140,11 +141,6 @@ public static DFSPropertiesConfiguration readConfig(org.apache.hadoop.conf.Confi return conf; } - // Keep the redundant to avoid too many modifications. - public static org.apache.hadoop.conf.Configuration getHadoopConf() { - return FlinkClientUtil.getHadoopConf(); - } - /** * Mainly used for tests. */ @@ -215,7 +211,7 @@ public static HoodieWriteConfig getHoodieClientConfig( HoodieWriteConfig writeConfig = builder.build(); if (loadFsViewStorageConfig) { // do not use the builder to give a change for recovering the original fs view storage config - FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH)); + FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), conf); writeConfig.setViewStorageConfig(viewStorageConfig); } return writeConfig; @@ -255,7 +251,7 @@ public static void checkRequiredProperties(TypedProperties props, List c */ public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException { final String basePath = conf.getString(FlinkOptions.PATH); - final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); + final org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf); if (!tableExists(basePath, hadoopConf)) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA)) @@ -348,18 +344,11 @@ public static HoodieTableMetaClient createMetaClient(String basePath, org.apache return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build(); } - /** - * Creates the meta client. - */ - public static HoodieTableMetaClient createMetaClient(String basePath) { - return createMetaClient(basePath, FlinkClientUtil.getHadoopConf()); - } - /** * Creates the meta client. */ public static HoodieTableMetaClient createMetaClient(Configuration conf) { - return createMetaClient(conf.getString(FlinkOptions.PATH)); + return createMetaClient(conf.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(conf)); } /** @@ -382,7 +371,7 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) { HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( - new SerializableConfiguration(getHadoopConf()), + new SerializableConfiguration(HadoopConfigurations.getHadoopConf(conf)), new FlinkTaskContextSupplier(runtimeContext)); HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig); @@ -410,7 +399,7 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()) .withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs()) .build(); - ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt); + ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf); return writeClient; } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java index da55e27f0c03b..91662e47077c7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java @@ -18,8 +18,10 @@ package org.apache.hudi.util; +import org.apache.flink.configuration.Configuration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; @@ -48,9 +50,10 @@ public class ViewStorageProperties { */ public static void createProperties( String basePath, - FileSystemViewStorageConfig config) throws IOException { + FileSystemViewStorageConfig config, + Configuration flinkConf) throws IOException { Path propertyPath = getPropertiesFilePath(basePath); - FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); + FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(flinkConf)); fs.delete(propertyPath, false); try (FSDataOutputStream outputStream = fs.create(propertyPath)) { config.getProps().store(outputStream, @@ -61,10 +64,10 @@ public static void createProperties( /** * Read the {@link FileSystemViewStorageConfig} with given table base path. */ - public static FileSystemViewStorageConfig loadFromProperties(String basePath) { + public static FileSystemViewStorageConfig loadFromProperties(String basePath, Configuration conf) { Path propertyPath = getPropertiesFilePath(basePath); LOG.info("Loading filesystem view storage properties from " + propertyPath); - FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); + FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(conf)); Properties props = new Properties(); try { try (FSDataInputStream inputStream = fs.open(propertyPath)) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 7a8aeff97b560..55885dcab5837 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.MockCoordinatorExecutor; @@ -103,7 +104,7 @@ void testInstantState() { @Test public void testTableInitialized() throws IOException { - final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); + final org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(new Configuration()); String basePath = tempFile.getAbsolutePath(); try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { assertTrue(fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))); @@ -201,7 +202,7 @@ void testSyncMetadataTable() throws Exception { assertNotEquals("", instant); final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()); - HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath); + HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf)); HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(1L)); assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java index c4eecd7e4941b..a6fb493b9bdda 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java @@ -19,6 +19,7 @@ package org.apache.hudi.sink.meta; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -47,7 +48,7 @@ public class TestCkpMetadata { @BeforeEach public void beforeEach() throws Exception { String basePath = tempFile.getAbsolutePath(); - FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), StreamerUtil.getHadoopConf()); + FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(new Configuration())); Configuration conf = TestConfigurations.getDefaultConf(basePath); StreamerUtil.initTableIfNotExists(conf); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 4f4b5499530cc..0748739064cf3 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.sink.partitioner.profile.WriteProfile; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketType; @@ -71,7 +72,7 @@ public void before() throws IOException { writeConfig = StreamerUtil.getHoodieClientConfig(conf); context = new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new SerializableConfiguration(HadoopConfigurations.getHadoopConf(conf)), new FlinkTaskContextSupplier(null)); StreamerUtil.initTableIfNotExists(conf); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java index a03f870296db7..ba60ff9469d73 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.WriteMetadataEvent; @@ -345,7 +346,7 @@ public TestHarness checkWrittenData( } private void checkWrittenDataMor(File baseFile, Map expected, int partitions) throws Exception { - HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, HadoopConfigurations.getHadoopConf(conf)); Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); String latestInstant = lastCompleteInstant(); FileSystem fs = FSUtils.getFs(basePath, new org.apache.hadoop.conf.Configuration()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index db45a75977f5e..9f2aba77c1105 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; @@ -239,7 +240,7 @@ private List generateSplits(StreamReadMonitoringFunction private OneInputStreamOperatorTestHarness createReader() throws Exception { final String basePath = tempFile.getAbsolutePath(); - final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); + final org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(new Configuration()); final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(hadoopConf).setBasePath(basePath).build(); final List partitionKeys = Collections.singletonList("partition"); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 6fbbab81fa4a6..8d2f3585cd942 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.table.HoodieTableSource; import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; @@ -400,7 +401,7 @@ void testReadIncrementally(HoodieTableType tableType) throws Exception { TestData.writeData(dataset, conf); } - HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tempFile.getAbsolutePath()); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(conf)); List commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index 57297c50ee82b..43b59bdf9e8bc 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -106,7 +106,7 @@ void testInstantTimeDiff() { void testDumpRemoteViewStorageConfig() throws IOException { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); StreamerUtil.createWriteClient(conf); - FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH)); + FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), new Configuration()); assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST)); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java index 466ccdfd01e72..c3aa9c25c61a2 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.source.StreamReadMonitoringFunction; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.StreamerUtil; @@ -39,19 +40,19 @@ public class TestUtils { public static String getLastPendingInstant(String basePath) { final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build(); return StreamerUtil.getLastPendingInstant(metaClient); } public static String getLastCompleteInstant(String basePath) { final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build(); return StreamerUtil.getLastCompletedInstant(metaClient); } public static String getLastDeltaCompleteInstant(String basePath) { final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build(); return metaClient.getCommitsTimeline().filterCompletedInstants() .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) .lastInstant() @@ -61,7 +62,7 @@ public static String getLastDeltaCompleteInstant(String basePath) { public static String getFirstCompleteInstant(String basePath) { final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build(); return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant() .map(HoodieInstant::getTimestamp).orElse(null); } @@ -69,7 +70,7 @@ public static String getFirstCompleteInstant(String basePath) { @Nullable public static String getNthCompleteInstant(String basePath, int n, boolean isDelta) { final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build(); return metaClient.getActiveTimeline() .filterCompletedInstants() .filter(instant -> isDelta ? HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()) : HoodieTimeline.COMMIT_ACTION.equals(instant.getAction())) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java index f80760bf1fd85..121a1c6785f30 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java @@ -18,6 +18,7 @@ package org.apache.hudi.utils; +import org.apache.flink.configuration.Configuration; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.util.ViewStorageProperties; @@ -45,11 +46,12 @@ void testReadWriteProperties() throws IOException { .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK) .withRemoteServerHost("host1") .withRemoteServerPort(1234).build(); - ViewStorageProperties.createProperties(basePath, config); - ViewStorageProperties.createProperties(basePath, config); - ViewStorageProperties.createProperties(basePath, config); + Configuration flinkConfig = new Configuration(); + ViewStorageProperties.createProperties(basePath, config, flinkConfig); + ViewStorageProperties.createProperties(basePath, config, flinkConfig); + ViewStorageProperties.createProperties(basePath, config, flinkConfig); - FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath); + FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath, new Configuration()); assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK)); assertThat(readConfig.getRemoteViewServerHost(), is("host1")); assertThat(readConfig.getRemoteViewServerPort(), is(1234));