Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public String overwriteHoodieProperties(
newProps.load(new FileInputStream(new File(overwriteFilePath)));
Map<String, String> oldProps = client.getTableConfig().propsMap();
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
HoodieTableConfig.createHoodieProperties(client.getFs(), metaPathDir, newProps);
HoodieTableConfig.create(client.getFs(), metaPathDir, newProps);

TreeSet<String> allPropKeys = new TreeSet<>();
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.exception.TableNotFoundException;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.shell.core.CommandMarker;
Expand All @@ -35,12 +38,21 @@
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;

/**
* CLI command to display hudi table options.
Expand Down Expand Up @@ -170,6 +182,67 @@ public String fetchTableSchema(
}
}

@CliCommand(value = "table recover-configs", help = "Recover table configs, from update/delete that failed midway.")
public String recoverTableConfig() throws IOException {
HoodieCLI.refreshTableMetadata();
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
HoodieTableConfig.recover(client.getFs(), metaPathDir);
return descTable();
}

@CliCommand(value = "table update-configs", help = "Update the table configs with configs with provided file.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this is more like replace-configs rather than updating existing ones.

public String updateTableConfig(
@CliOption(key = {"props-file"}, mandatory = true, help = "Path to a properties file on local filesystem")
final String updatePropsFilePath) throws IOException {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
Map<String, String> oldProps = client.getTableConfig().propsMap();

Properties updatedProps = new Properties();
updatedProps.load(new FileInputStream(updatePropsFilePath));
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
HoodieTableConfig.update(client.getFs(), metaPathDir, updatedProps);

HoodieCLI.refreshTableMetadata();
Map<String, String> newProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
return renderOldNewProps(newProps, oldProps);
}

@CliCommand(value = "table delete-configs", help = "Delete the supplied table configs from the table.")
public String deleteTableConfig(
@CliOption(key = {"comma-separated-configs"}, mandatory = true, help = "Comma separated list of configs to delete.")
final String csConfigs) {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
Map<String, String> oldProps = client.getTableConfig().propsMap();

Set<String> deleteConfigs = Arrays.stream(csConfigs.split(",")).collect(Collectors.toSet());
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
HoodieTableConfig.delete(client.getFs(), metaPathDir, deleteConfigs);

HoodieCLI.refreshTableMetadata();
Map<String, String> newProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
return renderOldNewProps(newProps, oldProps);
}

private static String renderOldNewProps(Map<String, String> newProps, Map<String, String> oldProps) {
TreeSet<String> allPropKeys = new TreeSet<>();
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
allPropKeys.addAll(oldProps.keySet());

String[][] rows = new String[allPropKeys.size()][];
int ind = 0;
for (String propKey : allPropKeys) {
String[] row = new String[]{
propKey,
oldProps.getOrDefault(propKey, "null"),
newProps.getOrDefault(propKey, "null")
};
rows[ind++] = row;
}
return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
}

/**
* Use Streams when you are dealing with raw data.
* @param filePath output file path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,16 @@
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* Helper class to assist in upgrading/downgrading Hoodie when there is a version change.
Expand Down Expand Up @@ -107,69 +102,38 @@ public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) {
* @param instantTime current instant time that should not be touched.
*/
public void run(HoodieTableVersion toVersion, String instantTime) {
try {
// Fetch version from property file and current version
HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
if (!needsUpgradeOrDowngrade(toVersion)) {
return;
}

if (fs.exists(updatedPropsFilePath)) {
// this can be left over .updated file from a failed attempt before. Many cases exist here.
// a) We failed while writing the .updated file and it's content is partial (e.g hdfs)
// b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway
// c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file
// All cases, it simply suffices to delete the file and proceed.
LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath)));
fs.delete(updatedPropsFilePath, false);
}

// Perform the actual upgrade/downgrade; this has to be idempotent, for now.
LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
Map<ConfigProperty, String> tableProps = new HashMap<>();
if (fromVersion.versionCode() < toVersion.versionCode()) {
// upgrade
while (fromVersion.versionCode() < toVersion.versionCode()) {
HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
fromVersion = nextVersion;
}
} else {
// downgrade
while (fromVersion.versionCode() > toVersion.versionCode()) {
HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
fromVersion = prevVersion;
}
}
// Fetch version from property file and current version
HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
if (!needsUpgradeOrDowngrade(toVersion)) {
return;
}

// Write out the current version in hoodie.properties.updated file
for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
// Perform the actual upgrade/downgrade; this has to be idempotent, for now.
LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
Map<ConfigProperty, String> tableProps = new HashMap<>();
if (fromVersion.versionCode() < toVersion.versionCode()) {
// upgrade
while (fromVersion.versionCode() < toVersion.versionCode()) {
HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
fromVersion = nextVersion;
}
metaClient.getTableConfig().setTableVersion(toVersion);
createUpdatedFile(metaClient.getTableConfig().getProps());

// because for different fs the fs.rename have different action,such as:
// a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false
// b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture
// c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException
// so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent
if (fs.exists(propsFilePath)) {
fs.delete(propsFilePath, false);
} else {
// downgrade
while (fromVersion.versionCode() > toVersion.versionCode()) {
HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
fromVersion = prevVersion;
}
// Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores.
// But as long as this does not leave a partial hoodie.properties file, we are okay.
fs.rename(updatedPropsFilePath, propsFilePath);
} catch (IOException e) {
throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e);
}
}

private void createUpdatedFile(Properties props) throws IOException {
try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) {
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
// Write out the current version in hoodie.properties.updated file
for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
}
metaClient.getTableConfig().setTableVersion(toVersion);

HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}

protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
Expand All @@ -46,6 +47,8 @@
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/**
Expand All @@ -68,6 +71,7 @@ public class HoodieTableConfig extends HoodieConfig {
private static final Logger LOG = LogManager.getLogger(HoodieTableConfig.class);

public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup";

public static final ConfigProperty<String> NAME = ConfigProperty
.key("hoodie.table.name")
Expand Down Expand Up @@ -172,12 +176,11 @@ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
LOG.info("Loading table properties from " + propertyPath);
try {
try (FSDataInputStream inputStream = fs.open(propertyPath)) {
props.load(inputStream);
}
fetchConfigs(fs, metaPath);
if (contains(PAYLOAD_CLASS_NAME) && payloadClassName != null
&& !getString(PAYLOAD_CLASS_NAME).equals(payloadClassName)) {
setValue(PAYLOAD_CLASS_NAME, payloadClassName);
// FIXME(vc): wonder if this can be removed. Need to look into history.
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
}
Expand All @@ -191,16 +194,103 @@ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName

/**
* For serializing and de-serializing.
*
*/
public HoodieTableConfig() {
super();
}

private void fetchConfigs(FileSystem fs, String metaPath) throws IOException {
Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
try (FSDataInputStream is = fs.open(cfgPath)) {
props.load(is);
} catch (IOException ioe) {
if (!fs.exists(cfgPath)) {
LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs.");
// try the backup. this way no query ever fails if update fails midway.
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
try (FSDataInputStream is = fs.open(backupCfgPath)) {
props.load(is);
}
} else {
throw ioe;
}
}
}

public static void recover(FileSystem fs, Path metadataFolder) throws IOException {
Path cfgPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE);
Path backupCfgPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE_BACKUP);
recoverIfNeeded(fs, cfgPath, backupCfgPath);
}

static void recoverIfNeeded(FileSystem fs, Path cfgPath, Path backupCfgPath) throws IOException {
if (!fs.exists(cfgPath)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there chances of partial/corrupted HOODIE_PROPERTIES_FILE during updates? If yes, can we do something like: irrespective of whether HOODIE_PROPERTIES_FILE exists or not, if HOODIE_PROPERTIES_FILE_BACKUP exists, we override the HOODIE_PROPERTIES_FILE with contents from HOODIE_PROPERTIES_FILE_BACKUP.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is not in cloud storage. its atomic. On hdfs there is a small window for this - but very rare since the write will fit within the block size. It's solvable by adding a checksum entry to the file and validating. I have a note in the description around this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @prashantwason I filed HUDI-2809 for this. if you have time, please help! :)

// copy over from backup
try (FSDataInputStream in = fs.open(backupCfgPath);
FSDataOutputStream out = fs.create(cfgPath, false)) {
FileIOUtils.copy(in, out);
}
}
// regardless, we don't need the backup anymore.
fs.delete(backupCfgPath, false);
}

private static void upsertProperties(Properties current, Properties updated) {
updated.forEach((k, v) -> current.setProperty(k.toString(), v.toString()));
}

private static void deleteProperties(Properties current, Properties deleted) {
deleted.forEach((k, v) -> current.remove(k.toString()));
}

private static void modify(FileSystem fs, Path metadataFolder, Properties modifyProps, BiConsumer<Properties, Properties> modifyFn) {
Path cfgPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE);
Path backupCfgPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE_BACKUP);
try {
// 0. do any recovery from prior attempts.
recoverIfNeeded(fs, cfgPath, backupCfgPath);

// 1. backup the existing properties.
try (FSDataInputStream in = fs.open(cfgPath);
FSDataOutputStream out = fs.create(backupCfgPath, false)) {
FileIOUtils.copy(in, out);
}
/// 2. delete the properties file, reads will go to the backup, until we are done.
fs.delete(cfgPath, false);
// 3. read current props, upsert and save back.
try (FSDataInputStream in = fs.open(backupCfgPath);
FSDataOutputStream out = fs.create(cfgPath, true)) {
Properties props = new Properties();
props.load(in);
modifyFn.accept(props, modifyProps);
props.store(out, "Updated at " + System.currentTimeMillis());
}
// 4. verify and remove backup.
// FIXME(vc): generate a hash for verification.
fs.delete(backupCfgPath, false);
} catch (IOException e) {
throw new HoodieIOException("Error updating table configs.", e);
}
}

/**
* Upserts the table config with the set of properties passed in. We implement a fail-safe backup protocol
* here for safely updating with recovery and also ensuring the table config continues to be readable.
*/
public static void update(FileSystem fs, Path metadataFolder, Properties updatedProps) {
modify(fs, metadataFolder, updatedProps, HoodieTableConfig::upsertProperties);
}

public static void delete(FileSystem fs, Path metadataFolder, Set<String> deletedProps) {
Properties props = new Properties();
deletedProps.forEach(p -> props.setProperty(p, ""));
modify(fs, metadataFolder, props, HoodieTableConfig::deleteProperties);
}

/**
* Initialize the hoodie meta directory and any necessary files inside the meta (including the hoodie.properties).
*/
public static void createHoodieProperties(FileSystem fs, Path metadataFolder, Properties properties)
public static void create(FileSystem fs, Path metadataFolder, Properties properties)
throws IOException {
if (!fs.exists(metadataFolder)) {
fs.mkdirs(metadataFolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hado
}

initializeBootstrapDirsIfNotExists(hadoopConf, basePath, fs);
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
HoodieTableConfig.create(fs, metaPathDir, props);
// We should not use fs.getConf as this might be different from the original configuration
// used to create the fs in unit tests
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testNoOpBootstrapIndex() throws IOException {
props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), "false");
Properties properties = new Properties();
properties.putAll(props);
HoodieTableConfig.createHoodieProperties(metaClient.getFs(), new Path(metaClient.getMetaPath()), properties);
HoodieTableConfig.create(metaClient.getFs(), new Path(metaClient.getMetaPath()), properties);

metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
BootstrapIndex bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient);
Expand Down
Loading