Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
922f765
[HUDI-4100] CTAS failed to clean up when given an illegal MANAGED tab…
jinxing64 May 21, 2022
32a5d26
[HUDI-3890] fix rat plugin issue with sql files (#5644)
uday08bce May 21, 2022
271d1a7
[HUDI-4051] Allow nested field as primary key and preCombineField in …
xushiyan May 22, 2022
3ef137d
[HUDI-4129] Initializes a new fs view for WriteProfile#reload (#5640)
YuangZhang May 23, 2022
42c7129
[HUDI-4142] Claim RFC-54 for new table APIs (#5665)
codope May 23, 2022
752f956
[HUDI-3933] Add UT cases to cover different key gen (#5638)
yihua May 23, 2022
716e995
[MINOR] Removing redundant semicolons and line breaks (#5662)
felixYyu May 23, 2022
47b764e
[HUDI-4134] Fix Method naming consistency issues in FSUtils (#5655)
h1ap May 23, 2022
af1128a
[HUDI-4084] Add support to test async table services with integ test …
nsivabalan May 24, 2022
676d5ce
[HUDI-4138] Fix the concurrency modification of hoodie table config f…
danny0405 May 24, 2022
c05ebf2
[HUDI-2473] Fixing compaction write operation in commit metadata (#5203)
nsivabalan May 24, 2022
eb21901
[HUDI-4145] Archives the metadata file in HoodieInstant.State sequenc…
danny0405 May 24, 2022
0caa55e
[HUDI-4135] remove netty and netty-all (#5663)
liujinhui1994 May 24, 2022
c20db99
[HUDI-2207] Support independent flink hudi clustering function
yuzhaojing May 21, 2022
10363c1
[HUDI-4132] Fixing determining target table schema for delta sync wit…
nsivabalan May 24, 2022
18635b5
Merge pull request #3599 from yuzhaojing/HUDI-2207
yuzhaojing May 24, 2022
f30b3ae
[MINOR] Fix a potential NPE and some finer points of hudi cli (#5656)
May 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: true
delete_input_data: true
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: second_hive_sync
last_validate:
config:
execute_itr_count: 50
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 30
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 50
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ dag_content:
config:
validate_hive: false
delete_input_data: true
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 20
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: false
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 20
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 50
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ dag_content:
config:
validate_hive: false
delete_input_data: true
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 6
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
2 changes: 2 additions & 0 deletions docker/demo/config/test-suite/spark-long-running.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 30
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private HoodieWriteConfig getWriteConfig() {

private void initJavaSparkContext(Option<String> userDefinedMaster) {
if (jsc == null) {
jsc = SparkUtil.initJavaSparkConf(SparkUtil.getDefaultConf("HoodieCLI", userDefinedMaster));
jsc = SparkUtil.initJavaSparkContext(SparkUtil.getDefaultConf("HoodieCLI", userDefinedMaster));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static void main(String[] args) throws Exception {
LOG.info("Invoking SparkMain: " + commandString);
final SparkCommand cmd = SparkCommand.valueOf(commandString);

JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + commandString,
JavaSparkContext jsc = SparkUtil.initJavaSparkContext("hoodie-cli-" + commandString,
Option.of(args[1]), Option.of(args[2]));

int returnCode = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public String upgradeHoodieTable(
if (exitCode != 0) {
return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion);
}
return String.format("Hoodie table upgraded/downgraded to ", toVersion);
return String.format("Hoodie table upgraded/downgraded to %s", toVersion);
}

@CliCommand(value = "downgrade table", help = "Downgrades a table")
Expand All @@ -78,6 +78,6 @@ public String downgradeHoodieTable(
if (exitCode != 0) {
return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion);
}
return String.format("Hoodie table upgraded/downgraded to ", toVersion);
return String.format("Hoodie table upgraded/downgraded to %s", toVersion);
}
}
19 changes: 11 additions & 8 deletions hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

import java.io.File;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;

/**
Expand All @@ -56,9 +56,12 @@ public static SparkLauncher initLauncher(String propertiesFile) throws URISyntax
if (!StringUtils.isNullOrEmpty(propertiesFile)) {
sparkLauncher.setPropertiesFile(propertiesFile);
}

File libDirectory = new File(new File(currentJar).getParent(), "lib");
for (String library : Objects.requireNonNull(libDirectory.list())) {
sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath());
// This lib directory may be not required, such as providing libraries through a bundle jar
if (libDirectory.exists()) {
Arrays.stream(libDirectory.list()).forEach(library ->
sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath()));
}
return sparkLauncher;
}
Expand Down Expand Up @@ -99,20 +102,20 @@ public static SparkConf getDefaultConf(final String appName, final Option<String
return sparkConf;
}

public static JavaSparkContext initJavaSparkConf(String name) {
return initJavaSparkConf(name, Option.empty(), Option.empty());
public static JavaSparkContext initJavaSparkContext(String name) {
return initJavaSparkContext(name, Option.empty(), Option.empty());
}

public static JavaSparkContext initJavaSparkConf(String name, Option<String> master, Option<String> executorMemory) {
public static JavaSparkContext initJavaSparkContext(String name, Option<String> master, Option<String> executorMemory) {
SparkConf sparkConf = getDefaultConf(name, master);
if (executorMemory.isPresent()) {
sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
}

return initJavaSparkConf(sparkConf);
return initJavaSparkContext(sparkConf);
}

public static JavaSparkContext initJavaSparkConf(SparkConf sparkConf) {
public static JavaSparkContext initJavaSparkContext(SparkConf sparkConf) {
SparkRDDWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ public void init() throws IOException {
// Write date files and log file
String testWriteToken = "1-0-1";
Files.createFile(Paths.get(fullPartitionPath, FSUtils
.makeDataFileName(commitTime1, testWriteToken, fileId1)));
.makeBaseFileName(commitTime1, testWriteToken, fileId1)));
Files.createFile(Paths.get(fullPartitionPath, FSUtils
.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0, testWriteToken)));
Files.createFile(Paths.get(fullPartitionPath, FSUtils
.makeDataFileName(commitTime2, testWriteToken, fileId1)));
.makeBaseFileName(commitTime2, testWriteToken, fileId1)));
Files.createFile(Paths.get(fullPartitionPath, FSUtils
.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, testWriteToken)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,22 @@
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.spark.SparkConf;

import org.apache.spark.launcher.SparkLauncher;
import org.junit.jupiter.api.Test;

import java.net.URISyntaxException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class SparkUtilTest {

@Test
public void testInitSparkLauncher() throws URISyntaxException {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(null);
assertNotNull(sparkLauncher);
}

@Test
public void testGetDefaultSparkConf() {
SparkConf sparkConf = SparkUtil.getDefaultConf("test-spark-app", Option.of(""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ protected Option<String> inlineScheduleClustering(Option<Map<String, String>> ex
return scheduleClustering(extraMetadata);
}

protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -507,10 +506,16 @@ private Stream<HoodieInstant> getInstantsToArchive() {
List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction())));
if (instantsToStream != null) {
return instantsToStream.stream();
// sorts the instants in natural order to make sure the metadata files be removed
// in HoodieInstant.State sequence: requested -> inflight -> completed,
// this is important because when a COMPLETED metadata file is removed first,
// other monitors on the timeline(such as the compaction or clustering services) would
// mistakenly recognize the pending file as a pending operation,
// then all kinds of weird bugs occur.
return instantsToStream.stream().sorted();
} else {
// if a concurrent writer archived the instant
return Collections.EMPTY_LIST.stream();
return Stream.empty();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void init(HoodieRecord record) {
// base file to denote some log appends happened on a slice. writeToken will still fence concurrent
// writers.
// https://issues.apache.org/jira/browse/HUDI-1517
createMarkerFile(partitionPath, FSUtils.makeDataFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()));
createMarkerFile(partitionPath, FSUtils.makeBaseFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()));

this.writer = createLogWriter(fileSlice, baseInstantTime);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
createMarkerFile(partitionPath, FSUtils.makeBaseFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config,
writeSchemaWithMetaFields, this.taskContextSupplier);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave(getPartitionId());

String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());
String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());
makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);

LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public Path makeNewPath(String partitionPath) {
throw new HoodieIOException("Failed to make dir " + path, e);
}

return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId,
return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime, writeToken, fileId,
hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@
import java.io.Serializable;

/**
* Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. - Average records per output spark
* partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
* Repartition input records into at least expected number of output partitions. It should give below guarantees -
* Output partition will have records from only one hoodie partition. - Average records per output
* partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews.
*/
public interface BulkInsertPartitioner<I> extends Serializable {

/**
* Repartitions the input records into at least expected number of output spark partitions.
* Repartitions the input records into at least expected number of output partitions.
*
* @param records Input Hoodie records
* @param outputSparkPartitions Expected number of output partitions
* @param records Input Hoodie records
* @param outputPartitions Expected number of output partitions
* @return
*/
I repartitionRecords(I records, int outputSparkPartitions);
I repartitionRecords(I records, int outputPartitions);

/**
* @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
Expand All @@ -48,6 +48,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
/**
* Return file group id prefix for the given data partition.
* By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group
*
* @param partitionId data partition
* @return
*/
Expand All @@ -57,6 +58,7 @@ default String getFileIdPfx(int partitionId) {

/**
* Return write handle factory for the given partition.
*
* @param partitionId data partition
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,24 +885,22 @@ private boolean shouldExecuteMetadataTableDeletion() {
// partitions are ready to use
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
&& !config.isMetadataTableEnabled()
&& (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
|| !metaClient.getTableConfig().getMetadataPartitions().isEmpty());
&& !metaClient.getTableConfig().getMetadataPartitions().isEmpty();
}

/**
* Clears hoodie.table.metadata.partitions in hoodie.properties
*/
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
if (clearAll) {
Set<String> partitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
if (clearAll && partitions.size() > 0) {
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
return;
} else if (partitions.remove(partitionType.get().getPartitionPath())) {
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", partitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
completedPartitions.remove(partitionType.get().getPartitionPath());
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}

public HoodieTableMetadata getMetadataTable() {
Expand Down
Loading