Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
Expand Down Expand Up @@ -63,7 +64,7 @@ public String showArchivedCommits(
throws IOException {
System.out.println("===============> Showing only " + limit + " archived commits <===============");
String basePath = HoodieCLI.getTableMetaClient().getBasePath();
Path archivePath = new Path(basePath + "/.hoodie/.commits_.archive*");
Path archivePath = new Path(HoodieCLI.getTableMetaClient().getArchivePath() + "/.commits_.archive*");
if (folder != null && !folder.isEmpty()) {
archivePath = new Path(basePath + "/.hoodie/" + folder);
}
Expand Down Expand Up @@ -138,9 +139,11 @@ public String showCommits(
throws IOException {

System.out.println("===============> Showing only " + limit + " archived commits <===============");
String basePath = HoodieCLI.getTableMetaClient().getBasePath();
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
String basePath = metaClient.getBasePath();
Path archivePath = new Path(metaClient.getArchivePath() + "/.commits_.archive*");
FileStatus[] fsStatuses =
FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(new Path(basePath + "/.hoodie/.commits_.archive*"));
FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
List<Comparable[]> allCommits = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
// read the archived file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private List<Comparable[]> convertBootstrapSourceFileMapping(List<BootstrapFileM
final List<Comparable[]> rows = new ArrayList<>();
for (BootstrapFileMapping mapping : mappingList) {
rows.add(new Comparable[] {mapping.getPartitionPath(), mapping.getFileId(),
mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBoostrapFileStatus().getPath().getUri()});
mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBootstrapFileStatus().getPath().getUri()});
}
return rows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
private String tablePath;

@BeforeEach
public void init() throws IOException {
public void init() throws Exception {
initDFS();
jsc.hadoopConfiguration().addResource(dfs.getConf());
HoodieCLI.conf = dfs.getConf();
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testShowArchivedCommits() {
* Test for command: show archived commits.
*/
@Test
public void testShowCommits() throws IOException {
public void testShowCommits() throws Exception {
CommandResult cr = getShell().executeCommand("show archived commits");
assertTrue(cr.isSuccess());
final List<Comparable[]> rows = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
private URL propsFilePath;

@BeforeEach
public void init() throws IOException {
public void init() throws Exception {
HoodieCLI.conf = jsc.hadoopConfiguration();

String tableName = "test_table";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,19 @@ public void init() throws IOException {
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
}

private LinkedHashMap<String, Integer[]> generateData() {
private LinkedHashMap<String, Integer[]> generateData() throws Exception {
// generate data and metadata
LinkedHashMap<String, Integer[]> data = new LinkedHashMap<>();
data.put("102", new Integer[] {15, 10});
data.put("101", new Integer[] {20, 10});
data.put("100", new Integer[] {15, 15});

data.forEach((key, value) -> {
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
});
}

metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
assertEquals(3, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(),
Expand Down Expand Up @@ -138,7 +140,7 @@ private String generateExpectData(int records, Map<String, Integer[]> data) thro
* Test case of 'commits show' command.
*/
@Test
public void testShowCommits() throws IOException {
public void testShowCommits() throws Exception {
Map<String, Integer[]> data = generateData();

CommandResult cr = getShell().executeCommand("commits show");
Expand All @@ -154,7 +156,7 @@ public void testShowCommits() throws IOException {
* Test case of 'commits showarchived' command.
*/
@Test
public void testShowArchivedCommits() throws IOException {
public void testShowArchivedCommits() throws Exception {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
Expand All @@ -168,10 +170,12 @@ public void testShowArchivedCommits() throws IOException {
data.put("102", new Integer[] {25, 45});
data.put("101", new Integer[] {35, 15});

data.forEach((key, value) -> {
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
});
}

// archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
Expand All @@ -198,7 +202,7 @@ public void testShowArchivedCommits() throws IOException {
* Test case of 'commit showpartitions' command.
*/
@Test
public void testShowCommitPartitions() {
public void testShowCommitPartitions() throws Exception {
Map<String, Integer[]> data = generateData();

String commitInstant = "101";
Expand Down Expand Up @@ -235,7 +239,7 @@ public void testShowCommitPartitions() {
* Test case of 'commit showfiles' command.
*/
@Test
public void testShowCommitFiles() {
public void testShowCommitFiles() throws Exception {
Map<String, Integer[]> data = generateData();

String commitInstant = "101";
Expand Down Expand Up @@ -270,18 +274,20 @@ public void testShowCommitFiles() {
* Test case of 'commits compare' command.
*/
@Test
public void testCompareCommits() throws IOException {
public void testCompareCommits() throws Exception {
Map<String, Integer[]> data = generateData();

String tableName2 = "test_table2";
String tablePath2 = basePath + File.separator + tableName2;
HoodieTestUtils.init(jsc.hadoopConfiguration(), tablePath2, getTableType());

data.remove("102");
data.forEach((key, value) -> {
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath2, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
});
}

CommandResult cr = getShell().executeCommand(String.format("commits compare --path %s", tablePath2));
assertTrue(cr.isSuccess());
Expand All @@ -298,18 +304,20 @@ public void testCompareCommits() throws IOException {
* Test case of 'commits sync' command.
*/
@Test
public void testSyncCommits() throws IOException {
public void testSyncCommits() throws Exception {
Map<String, Integer[]> data = generateData();

String tableName2 = "test_table2";
String tablePath2 = basePath + File.separator + tableName2;
HoodieTestUtils.init(jsc.hadoopConfiguration(), tablePath2, getTableType(), tableName2);

data.remove("102");
data.forEach((key, value) -> {
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath2, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
});
}

CommandResult cr = getShell().executeCommand(String.format("commits sync --path %s", tablePath2));
assertTrue(cr.isSuccess());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -41,14 +41,18 @@
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -59,39 +63,37 @@
public class TestRollbacksCommand extends AbstractShellIntegrationTest {

@BeforeEach
public void init() throws IOException {
public void init() throws Exception {
String tableName = "test_table";
String tablePath = basePath + File.separator + tableName;
String tablePath = Paths.get(basePath, tableName).toString();
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");

metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
//Create some commits files and parquet files
String commitTime1 = "100";
String commitTime2 = "101";
String commitTime3 = "102";
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath);

// two commit files
HoodieTestUtils.createCommitFiles(tablePath, commitTime1, commitTime2);
// one .inflight commit file
HoodieTestUtils.createInflightCommitFiles(tablePath, commitTime3);

// generate commit files for commits
for (String commitTime : Arrays.asList(commitTime1, commitTime2, commitTime3)) {
HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1");
HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2");
HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3");
}

Map<String, String> partitionAndFileId = new HashMap<String, String>() {
{
put(DEFAULT_FIRST_PARTITION_PATH, "file-1");
put(DEFAULT_SECOND_PARTITION_PATH, "file-2");
put(DEFAULT_THIRD_PARTITION_PATH, "file-3");
}
};
HoodieTestTable.of(metaClient)
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
.withBaseFilesInPartitions(partitionAndFileId)
.addCommit("101")
.withBaseFilesInPartitions(partitionAndFileId)
.addInflightCommit("102")
.withBaseFilesInPartitions(partitionAndFileId);
// generate two rollback
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();

try (HoodieWriteClient client = getHoodieWriteClient(config)) {
// Rollback inflight commit3 and commit2
client.rollback(commitTime3);
client.rollback(commitTime2);
client.rollback("102");
client.rollback("101");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,19 @@ public void init() throws IOException {
* Test case for command 'stats wa'.
*/
@Test
public void testWriteAmplificationStats() {
public void testWriteAmplificationStats() throws Exception {
// generate data and metadata
Map<String, Integer[]> data = new LinkedHashMap<>();
data.put("100", new Integer[] {15, 10});
data.put("101", new Integer[] {20, 10});
data.put("102", new Integer[] {15, 15});

data.forEach((key, value) -> {
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
});
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String k = entry.getKey();
Integer[] v = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, k, jsc.hadoopConfiguration(),
Option.of(v[0]), Option.of(v[1]));
}

CommandResult cr = getShell().executeCommand("stats wa");
assertTrue(cr.isSuccess());
Expand All @@ -93,7 +95,7 @@ public void testWriteAmplificationStats() {
DecimalFormat df = new DecimalFormat("#.00");
data.forEach((key, value) -> {
// there are two partitions, so need to *2
rows.add(new Comparable[]{key, value[1] * 2, value[0] * 2, df.format((float) value[0] / value[1])});
rows.add(new Comparable[] {key, value[1] * 2, value[0] * 2, df.format((float) value[0] / value[1])});
});
int totalWrite = data.values().stream().map(integers -> integers[0] * 2).mapToInt(s -> s).sum();
int totalUpdate = data.values().stream().map(integers -> integers[1] * 2).mapToInt(s -> s).sum();
Expand Down
Loading