Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions conf/hudi-defaults.conf.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running Hudi jobs.
# This is useful for setting default environmental settings.

# Example:
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
# hoodie.datasource.hive_sync.use_jdbc true
# hoodie.datasource.hive_sync.support_timestamp false
# hoodie.index.type BLOOM
# hoodie.metadata.enable false
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {

TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig();
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getProps(true);

properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@

package org.apache.hudi.common.config;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;

Expand All @@ -43,72 +49,110 @@ public class DFSPropertiesConfiguration {

private static final Logger LOG = LogManager.getLogger(DFSPropertiesConfiguration.class);

public static final String DEFAULT_PROPERTIES_FILE = "hudi-defaults.conf";

public static final String CONF_FILE_DIR_ENV_NAME = "HUDI_CONF_DIR";

public static final String DEFAULT_CONF_FILE_DIR = "file:/etc/hudi/conf";

// props read from hudi-defaults.conf
private static TypedProperties GLOBAL_PROPS = loadGlobalProps();

private final FileSystem fs;

private final Path rootFile;
private Path currentFilePath;

private final TypedProperties props;
// props read from user defined configuration file or input stream
private final HoodieConfig hoodieConfig;

// Keep track of files visited, to detect loops
private final Set<String> visitedFiles;
private final Set<String> visitedFilePaths;

public DFSPropertiesConfiguration(FileSystem fs, Path rootFile, TypedProperties defaults) {
public DFSPropertiesConfiguration(FileSystem fs, Path filePath) {
this.fs = fs;
this.rootFile = rootFile;
this.props = defaults;
this.visitedFiles = new HashSet<>();
visitFile(rootFile);
}

public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) {
this(fs, rootFile, new TypedProperties());
this.currentFilePath = filePath;
this.hoodieConfig = new HoodieConfig();
this.visitedFilePaths = new HashSet<>();
addPropsFromFile(filePath);
}

public DFSPropertiesConfiguration() {
this.fs = null;
this.rootFile = null;
this.props = new TypedProperties();
this.visitedFiles = new HashSet<>();
this.currentFilePath = null;
this.hoodieConfig = new HoodieConfig();
this.visitedFilePaths = new HashSet<>();
}

private String[] splitProperty(String line) {
int ind = line.indexOf('=');
String k = line.substring(0, ind).trim();
String v = line.substring(ind + 1).trim();
return new String[] {k, v};
/**
* Load global props from hudi-defaults.conf which is under CONF_FILE_DIR_ENV_NAME.
* @return Typed Properties
*/
public static TypedProperties loadGlobalProps() {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration();
Option<Path> defaultConfPath = getConfPathFromEnv();
if (defaultConfPath.isPresent()) {
conf.addPropsFromFile(defaultConfPath.get());
} else {
try {
conf.addPropsFromFile(new Path(DEFAULT_CONF_FILE_DIR));
} catch (Exception ignored) {
LOG.debug("Didn't find config file under default conf file dir: " + DEFAULT_CONF_FILE_DIR);
}
}
return conf.getProps();
}

public static void refreshGlobalProps() {
GLOBAL_PROPS = loadGlobalProps();
}

public static void clearGlobalProps() {
GLOBAL_PROPS = new TypedProperties();
}

private void visitFile(Path file) {
/**
* Add properties from external configuration files.
*
* @param filePath File path for configuration file
*/
public void addPropsFromFile(Path filePath) {
if (visitedFilePaths.contains(filePath.toString())) {
throw new IllegalStateException("Loop detected; file " + filePath + " already referenced");
}
FileSystem fileSystem;
try {
if (visitedFiles.contains(file.getName())) {
throw new IllegalStateException("Loop detected; file " + file + " already referenced");
}
visitedFiles.add(file.getName());
BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file)));
addProperties(reader);
fileSystem = fs != null ? fs : filePath.getFileSystem(new Configuration());
} catch (IOException e) {
throw new IllegalArgumentException("Cannot get the file system from file path", e);
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(filePath)))) {
visitedFilePaths.add(filePath.toString());
currentFilePath = filePath;
addPropsFromStream(reader);
} catch (IOException ioe) {
LOG.error("Error reading in properies from dfs", ioe);
LOG.error("Error reading in properties from dfs");
throw new IllegalArgumentException("Cannot read properties from dfs", ioe);
}
}

/**
* Add properties from input stream.
*
* Add properties from buffered reader.
*
* @param reader Buffered Reader
* @throws IOException
*/
public void addProperties(BufferedReader reader) throws IOException {
public void addPropsFromStream(BufferedReader reader) throws IOException {
try {
reader.lines().forEach(line -> {
if (line.startsWith("#") || line.equals("") || !line.contains("=")) {
if (!isValidLine(line)) {
return;
}
String[] split = splitProperty(line);
if (line.startsWith("include=") || line.startsWith("include =")) {
visitFile(new Path(rootFile.getParent(), split[1]));
Path includeFilePath = new Path(currentFilePath.getParent(), split[1]);
addPropsFromFile(includeFilePath);
} else {
props.setProperty(split[0], split[1]);
hoodieConfig.setValue(split[0], split[1]);
}
});

Expand All @@ -117,7 +161,46 @@ public void addProperties(BufferedReader reader) throws IOException {
}
}

public TypedProperties getConfig() {
return props;
public static TypedProperties getGlobalProps() {
final TypedProperties globalProps = new TypedProperties();
globalProps.putAll(GLOBAL_PROPS);
return globalProps;
}

public TypedProperties getProps() {
return new TypedProperties(hoodieConfig.getProps());
}

public TypedProperties getProps(boolean includeGlobalProps) {
return new TypedProperties(hoodieConfig.getProps(includeGlobalProps));
}

private static Option<Path> getConfPathFromEnv() {
String confDir = System.getenv(CONF_FILE_DIR_ENV_NAME);
if (confDir == null) {
LOG.warn("Cannot find " + CONF_FILE_DIR_ENV_NAME + ", please set it as the dir of " + DEFAULT_PROPERTIES_FILE);
return Option.empty();
}
if (StringUtils.isNullOrEmpty(URI.create(confDir).getScheme())) {
confDir = "file://" + confDir;
}
return Option.of(new Path(confDir + File.separator + DEFAULT_PROPERTIES_FILE));
}

private String[] splitProperty(String line) {
line = line.replaceAll("\\s+"," ");
String delimiter = line.contains("=") ? "=" : " ";
int ind = line.indexOf(delimiter);
String k = line.substring(0, ind).trim();
String v = line.substring(ind + 1).trim();
return new String[] {k, v};
}

private boolean isValidLine(String line) {
ValidationUtils.checkArgument(line != null, "passed line is null");
if (line.startsWith("#") || line.equals("")) {
return false;
}
return line.contains("=") || line.matches(".*\\s.*");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ public <T> void setValue(ConfigProperty<T> cfg, String val) {
props.setProperty(cfg.key(), val);
}

public <T> void setValue(String key, String val) {
props.setProperty(key, val);
}

public void setAll(Properties properties) {
props.putAll(properties);
}

public <T> void setDefaultValue(ConfigProperty<T> configProperty) {
if (!contains(configProperty)) {
Option<T> inferValue = Option.empty();
Expand Down Expand Up @@ -167,7 +175,17 @@ public <T> String getStringOrDefault(ConfigProperty<T> configProperty, String de
}

public Properties getProps() {
return props;
return getProps(false);
}

public Properties getProps(boolean includeGlobalProps) {
if (includeGlobalProps) {
Properties mergedProps = DFSPropertiesConfiguration.getGlobalProps();
mergedProps.putAll(props);
return mergedProps;
} else {
return props;
}
}

public void setDefaultOnCondition(boolean condition, HoodieConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Rule;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;

Expand All @@ -47,6 +51,10 @@ public class TestDFSPropertiesConfiguration {
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;

@Rule
public static final EnvironmentVariables ENVIRONMENT_VARIABLES
= new EnvironmentVariables();

@BeforeAll
public static void initClass() throws Exception {
hdfsTestService = new HdfsTestService();
Expand All @@ -73,12 +81,17 @@ public static void initClass() throws Exception {
}

@AfterAll
public static void cleanupClass() throws Exception {
public static void cleanupClass() {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
}

@AfterEach
public void cleanupGlobalConfig() {
DFSPropertiesConfiguration.clearGlobalProps();
}

private static void writePropertiesFile(Path path, String[] lines) throws IOException {
PrintStream out = new PrintStream(dfs.create(path, true));
for (String line : lines) {
Expand All @@ -91,7 +104,7 @@ private static void writePropertiesFile(Path path, String[] lines) throws IOExce
@Test
public void testParsing() {
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t1.props"));
TypedProperties props = cfg.getConfig();
TypedProperties props = cfg.getProps();
assertEquals(5, props.size());
assertThrows(IllegalArgumentException.class, () -> {
props.getString("invalid.key");
Expand Down Expand Up @@ -119,15 +132,38 @@ public void testParsing() {
@Test
public void testIncludes() {
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t3.props"));
TypedProperties props = cfg.getConfig();
TypedProperties props = cfg.getProps();

assertEquals(123, props.getInteger("int.prop"));
assertEquals(243.4, props.getDouble("double.prop"), 0.001);
assertTrue(props.getBoolean("boolean.prop"));
assertEquals("t3.value", props.getString("string.prop"));
assertEquals(1354354354, props.getLong("long.prop"));
assertThrows(IllegalStateException.class, () -> {
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t4.props"));
cfg.addPropsFromFile(new Path(dfsBasePath + "/t4.props"));
}, "Should error out on a self-included file.");
}

@Test
public void testNoGlobalConfFileConfigured() {
ENVIRONMENT_VARIABLES.clear(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME);
// Should not throw any exception when no external configuration file configured
DFSPropertiesConfiguration.refreshGlobalProps();
assertEquals(0, DFSPropertiesConfiguration.getGlobalProps().size());
}

@Test
public void testLoadGlobalConfFile() {
// set HUDI_CONF_DIR
String testPropsFilePath = new File("src/test/resources/external-config").getAbsolutePath();
ENVIRONMENT_VARIABLES.set(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME, testPropsFilePath);

DFSPropertiesConfiguration.refreshGlobalProps();
assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size());
assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.use_jdbc"));
assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp"));
assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable"));
}
}
Loading