Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.springframework.stereotype.Component;
import scala.collection.JavaConverters;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -153,10 +152,12 @@ public String overwriteHoodieProperties(

HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
Properties newProps = new Properties();
newProps.load(new FileInputStream(new File(overwriteFilePath)));
newProps.load(new FileInputStream(overwriteFilePath));
Map<String, String> oldProps = client.getTableConfig().propsMap();
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
HoodieTableConfig.create(client.getFs(), metaPathDir, newProps);
// reload new props as checksum would have been added
newProps = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();

TreeSet<String> allPropKeys = new TreeSet<>();
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
Expand All @@ -51,6 +50,14 @@
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.HoodieTableConfig.NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_LAYOUT_VERSION;
import static org.apache.hudi.common.table.HoodieTableConfig.TYPE;
import static org.apache.hudi.common.table.HoodieTableConfig.VERSION;
import static org.apache.hudi.common.table.HoodieTableConfig.generateChecksum;
import static org.apache.hudi.common.table.HoodieTableConfig.validateChecksum;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -104,7 +111,7 @@ public void testAddPartitionMetaWithDryRun() throws IOException {
// expected all 'No'.
String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
.stream()
.map(partition -> new String[]{partition, "No", "None"})
.map(partition -> new String[] {partition, "No", "None"})
.toArray(String[][]::new);
String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows);
Expand Down Expand Up @@ -135,7 +142,7 @@ public void testAddPartitionMetaWithRealRun() throws IOException {
List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
// after dry run, the action will be 'Repaired'
String[][] rows = paths.stream()
.map(partition -> new String[]{partition, "No", "Repaired"})
.map(partition -> new String[] {partition, "No", "Repaired"})
.toArray(String[][]::new);
String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows);
Expand All @@ -147,7 +154,7 @@ public void testAddPartitionMetaWithRealRun() throws IOException {

// after real run, Metadata is present now.
rows = paths.stream()
.map(partition -> new String[]{partition, "Yes", "None"})
.map(partition -> new String[] {partition, "Yes", "None"})
.toArray(String[][]::new);
expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows);
Expand All @@ -170,19 +177,24 @@ public void testOverwriteHoodieProperties() throws IOException {
Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap();

// after overwrite, the stored value in .hoodie is equals to which read from properties.
Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().propsMap();
HoodieTableConfig tableConfig = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig();
Map<String, String> result = tableConfig.propsMap();
// validate table checksum
assertTrue(result.containsKey(TABLE_CHECKSUM.key()));
assertTrue(validateChecksum(tableConfig.getProps()));
Properties expectProps = new Properties();
expectProps.load(new FileInputStream(new File(newProps.getPath())));
expectProps.load(new FileInputStream(newProps.getPath()));

Map<String, String> expected = expectProps.entrySet().stream()
.collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
expected.putIfAbsent(TABLE_CHECKSUM.key(), String.valueOf(generateChecksum(tableConfig.getProps())));
assertEquals(expected, result);

// check result
List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", "hoodie.table.version",
"hoodie.archivelog.folder", "hoodie.timeline.layout.version");
String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key,
oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")})
List<String> allPropsStr = Arrays.asList(NAME.key(), TYPE.key(), VERSION.key(),
ARCHIVELOG_FOLDER.key(), TIMELINE_LAYOUT_VERSION.key(), TABLE_CHECKSUM.key());
String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key,
oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")})
.toArray(String[][]::new);
String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
Expand Down
1 change: 0 additions & 1 deletion hudi-client/hudi-client-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.upgrade;

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;

import java.util.Collections;
import java.util.Map;

/**
* DowngradeHandler to assist in downgrading {@link org.apache.hudi.table.HoodieTable} from version 4 to 3.
*/
public class FourToThreeDowngradeHandler implements DowngradeHandler {

@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.upgrade;

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;

import java.util.Hashtable;
import java.util.Map;

/**
* UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 3 to 4.
*/
public class ThreeToFourUpgradeHandler implements UpgradeHandler {

@Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
tablePropsToAdd.put(HoodieTableConfig.TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
return tablePropsToAdd;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, Ho
return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
} else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) {
return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
} else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.FOUR) {
return new ThreeToFourUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
} else {
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
}
Expand All @@ -155,6 +157,8 @@ protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion,
return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
} else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) {
return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
} else if (fromVersion == HoodieTableVersion.FOUR && toVersion == HoodieTableVersion.THREE) {
return new FourToThreeDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
} else {
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
}
Expand Down
6 changes: 3 additions & 3 deletions hudi-client/hudi-spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@
</dependency>
<!-- Other Utils -->
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1548,7 +1548,7 @@ public void testUpgradeDowngrade() throws IOException {
assertTrue(currentStatus.getModificationTime() > prevStatus.getModificationTime());

initMetaClient();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode());
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode());
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath));
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
Expand Down Expand Up @@ -1630,7 +1630,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte
}

initMetaClient();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode());
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode());
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath));
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,43 @@ public void testUpgradeOneToTwo(HoodieTableType tableType) throws IOException {
assertTableProps(cfg);
}

@Test
public void testUpgradeDowngradeBetweenThreeAndCurrentVersion() throws IOException {
// init config, table and client.
Map<String, String> params = new HashMap<>();
addNewTableParamsToProps(params);
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build();

// write inserts
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
doInsert(client);

// current version should have TABLE_CHECKSUM key
assertEquals(HoodieTableVersion.current(), metaClient.getTableConfig().getTableVersion());
assertTableVersionFromPropertyFile(HoodieTableVersion.current());
assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key()));
String checksum = metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key());

// downgrade to version 3 and check TABLE_CHECKSUM is still present
new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.THREE, null);
assertEquals(HoodieTableVersion.THREE.versionCode(), metaClient.getTableConfig().getTableVersion().versionCode());
assertTableVersionFromPropertyFile(HoodieTableVersion.THREE);
assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key()));
assertEquals(checksum, metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key()));

// remove TABLE_CHECKSUM and upgrade to current version
metaClient.getTableConfig().getProps().remove(HoodieTableConfig.TABLE_CHECKSUM.key());
new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.current(), null);

// verify upgrade and TABLE_CHECKSUM
metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build();
assertEquals(HoodieTableVersion.current().versionCode(), metaClient.getTableConfig().getTableVersion().versionCode());
assertTableVersionFromPropertyFile(HoodieTableVersion.current());
assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key()));
assertEquals(checksum, metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key()));
}

private void addNewTableParamsToProps(Map<String, String> params) {
params.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
params.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Type-aware extension of {@link java.util.Properties}.
*/
public class TypedProperties extends Properties implements Serializable {

private final HashSet<Object> keys = new LinkedHashSet<>();

public TypedProperties() {
super(null);
}
Expand All @@ -42,6 +49,45 @@ public TypedProperties(Properties defaults) {
}
}

@Override
public Enumeration propertyNames() {
return Collections.enumeration(keys);
}

@Override
public synchronized Enumeration<Object> keys() {
return Collections.enumeration(keys);
}

@Override
public Set<String> stringPropertyNames() {
Set<String> set = new LinkedHashSet<>();
for (Object key : this.keys) {
set.add((String) key);
}
return set;
}

@Override
public synchronized Object put(Object key, Object value) {
keys.remove(key);
keys.add(key);
return super.put(key, value);
}

public synchronized Object putIfAbsent(Object key, Object value) {
if (!containsKey(String.valueOf(key))) {
keys.add(key);
}
return super.putIfAbsent(key, value);
}

@Override
public Object remove(Object key) {
keys.remove(key);
return super.remove(key);
}

private void checkKey(String property) {
if (!containsKey(property)) {
throw new IllegalArgumentException("Property " + property + " not found");
Expand Down
Loading