Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cb47f97
[HUDI-53] Implementation of record_index - a HUDI index based on the …
prashantwason May 18, 2023
8d4b993
Address review comments
suryaprasanna Jun 10, 2023
88ffce9
Fix getRecordByKeys
codope Jun 10, 2023
5954232
Add PartitionIdPassthrough partitioner
codope Jun 10, 2023
367143b
Address review comments
suryaprasanna Jun 10, 2023
cde2190
Added caching of records in record index during tagLocation.
prashantwason Jun 13, 2023
ceee48b
Removed code which is passing actionMetadata to MDT constructor as it…
prashantwason Jun 13, 2023
068a198
Fixed build
prashantwason Jun 13, 2023
e80db6c
Fixed the check - if reader is already created then no need to initia…
prashantwason Jun 13, 2023
2aa74ee
Throw an exception if record index is used with MOR tables as it is n…
prashantwason Jun 13, 2023
563d751
Fixed comment to refer to 1MB block size
prashantwason Jun 13, 2023
3142956
Addressed review comments
prashantwason Jun 14, 2023
480855d
Rebased on latest master and fixed compile issues
prashantwason Jun 14, 2023
24231cd
Sorting to be done within the HFileBlock
prashantwason Jun 14, 2023
b5f1aeb
Addressed all remaining code review comments
prashantwason Jun 15, 2023
166efdd
Fixed checkstyle
prashantwason Jun 15, 2023
8da90f4
Fix tests TestHoodieBackedMetadata,TestHoodieIndex,HoodieBackedTableM…
lokeshj1703 Jun 14, 2023
98ff80f
fix code style
xushiyan Jun 19, 2023
ea60632
fix code style
xushiyan Jun 19, 2023
6b62739
fix obsolete usage for mdt deletion flow
xushiyan Jun 19, 2023
963f948
address comments
xushiyan Jun 19, 2023
9eecc9c
rename isMetadataTableEnabled to isMetadataTableAvailable
xushiyan Jun 19, 2023
220c6bb
remove hfile allow dup config
xushiyan Jun 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
Expand All @@ -34,11 +33,15 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.shell.standard.ShellComponent;
Expand All @@ -57,9 +60,6 @@
import java.util.Map;
import java.util.Set;

import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT;

/**
* CLI commands to operate on the Metadata Table.
* <p>
Expand Down Expand Up @@ -115,13 +115,13 @@ public String set(@ShellOption(value = {"--metadataDir"},
@ShellMethod(key = "metadata create", value = "Create the Metadata Table if it does not exist")
public String create(
@ShellOption(value = "--sparkMaster", defaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master
) throws IOException {
) throws Exception {
HoodieCLI.getTableMetaClient();
Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
try {
FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
if (statuses.length > 0) {
throw new RuntimeException("Metadata directory (" + metadataPath.toString() + ") not empty.");
throw new RuntimeException("Metadata directory (" + metadataPath + ") not empty.");
}
} catch (FileNotFoundException e) {
// Metadata directory does not exist yet
Expand All @@ -131,28 +131,32 @@ public String create(
HoodieTimer timer = HoodieTimer.start();
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext(Option.of(master));
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc));
return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0);
try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) {
return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0);
}
}

@ShellMethod(key = "metadata delete", value = "Remove the Metadata Table")
public String delete() throws Exception {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
try {
FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
if (statuses.length > 0) {
HoodieCLI.fs.delete(metadataPath, true);
}
} catch (FileNotFoundException e) {
// Metadata directory does not exist
public String delete(@ShellOption(value = "--backup", help = "Backup the metadata table before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception {
HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient();
String backupPath = HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, new HoodieSparkEngineContext(jsc), backup);
if (backup) {
return "Metadata Table has been deleted and backed up to " + backupPath;
} else {
return "Metadata Table has been deleted from " + getMetadataTableBasePath(HoodieCLI.basePath);
}
}

LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
HoodieTableConfig.delete(metaClient.getFs(), new Path(metaClient.getMetaPath()), new HashSet<>(Arrays
.asList(TABLE_METADATA_PARTITIONS.key(), TABLE_METADATA_PARTITIONS_INFLIGHT.key())));

return String.format("Removed Metadata Table from %s", metadataPath);
@ShellMethod(key = "metadata delete-record-index", value = "Delete the record index from Metadata Table")
public String deleteRecordIndex(@ShellOption(value = "--backup", help = "Backup the record index before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception {
HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient();
String backupPath = HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, new HoodieSparkEngineContext(jsc),
MetadataPartitionType.RECORD_INDEX, backup);
if (backup) {
return "Record Index has been deleted from the Metadata Table and backed up to " + backupPath;
} else {
return "Record Index has been deleted from the Metadata Table";
}
}

@ShellMethod(key = "metadata init", value = "Update the metadata table from commits since the creation")
Expand All @@ -165,14 +169,16 @@ public String init(@ShellOption(value = "--sparkMaster", defaultValue = SparkUti
HoodieCLI.fs.listStatus(metadataPath);
} catch (FileNotFoundException e) {
// Metadata directory does not exist
throw new RuntimeException("Metadata directory (" + metadataPath.toString() + ") does not exist.");
throw new RuntimeException("Metadata directory (" + metadataPath + ") does not exist.");
}

HoodieTimer timer = HoodieTimer.start();
if (!readOnly) {
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext(Option.of(master));
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc));
try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) {
// Empty
}
}

String action = readOnly ? "Opened" : "Initialized";
Expand All @@ -183,23 +189,23 @@ public String init(@ShellOption(value = "--sparkMaster", defaultValue = SparkUti
public String stats() throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf),
config, HoodieCLI.basePath, "/tmp");
Map<String, String> stats = metadata.stats();
try (HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf),
config, HoodieCLI.basePath)) {
Map<String, String> stats = metadata.stats();

final List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, String> entry : stats.entrySet()) {
Comparable[] row = new Comparable[2];
row[0] = entry.getKey();
row[1] = entry.getValue();
rows.add(row);
}

final List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, String> entry : stats.entrySet()) {
Comparable[] row = new Comparable[2];
row[0] = entry.getKey();
row[1] = entry.getValue();
rows.add(row);
TableHeader header = new TableHeader()
.addTableHeaderField("stat key")
.addTableHeaderField("stat value");
return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows);
}

TableHeader header = new TableHeader()
.addTableHeaderField("stat key")
.addTableHeaderField("stat value");
return HoodiePrintHelper.print(header, new HashMap<>(), "",
false, Integer.MAX_VALUE, false, rows);
}

@ShellMethod(key = "metadata list-partitions", value = "List all partitions from metadata")
Expand All @@ -209,60 +215,60 @@ public String listPartitions(
HoodieCLI.getTableMetaClient();
initJavaSparkContext(Option.of(master));
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config,
HoodieCLI.basePath, "/tmp");
try (HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config,
HoodieCLI.basePath)) {

if (!metadata.enabled()) {
return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
if (!metadata.enabled()) {
return "[ERROR] Metadata Table not enabled/initialized\n\n";
}

HoodieTimer timer = HoodieTimer.start();
List<String> partitions = metadata.getAllPartitionPaths();
LOG.debug("Took " + timer.endTimer() + " ms");
HoodieTimer timer = HoodieTimer.start();
List<String> partitions = metadata.getAllPartitionPaths();
LOG.debug("Took " + timer.endTimer() + " ms");

final List<Comparable[]> rows = new ArrayList<>();
partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
Comparable[] row = new Comparable[1];
row[0] = p;
rows.add(row);
});

TableHeader header = new TableHeader().addTableHeaderField("partition");
return HoodiePrintHelper.print(header, new HashMap<>(), "",
false, Integer.MAX_VALUE, false, rows);
final List<Comparable[]> rows = new ArrayList<>();
partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
Comparable[] row = new Comparable[1];
row[0] = p;
rows.add(row);
});

TableHeader header = new TableHeader().addTableHeaderField("partition");
return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows);
}
}

@ShellMethod(key = "metadata list-files", value = "Print a list of all files in a partition from the metadata")
public String listFiles(
@ShellOption(value = {"--partition"}, help = "Name of the partition to list files", defaultValue = "") final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
try (HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath)) {

if (!metaReader.enabled()) {
return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
if (!metaReader.enabled()) {
return "[ERROR] Metadata Table not enabled/initialized\n\n";
}

Path partitionPath = new Path(HoodieCLI.basePath);
if (!StringUtils.isNullOrEmpty(partition)) {
partitionPath = new Path(HoodieCLI.basePath, partition);
}
Path partitionPath = new Path(HoodieCLI.basePath);
if (!StringUtils.isNullOrEmpty(partition)) {
partitionPath = new Path(HoodieCLI.basePath, partition);
}

HoodieTimer timer = HoodieTimer.start();
FileStatus[] statuses = metaReader.getAllFilesInPartition(partitionPath);
LOG.debug("Took " + timer.endTimer() + " ms");
HoodieTimer timer = HoodieTimer.start();
FileStatus[] statuses = metaReader.getAllFilesInPartition(partitionPath);
LOG.debug("Took " + timer.endTimer() + " ms");

final List<Comparable[]> rows = new ArrayList<>();
Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> {
Comparable[] row = new Comparable[1];
row[0] = f;
rows.add(row);
});

TableHeader header = new TableHeader().addTableHeaderField("file path");
return HoodiePrintHelper.print(header, new HashMap<>(), "",
false, Integer.MAX_VALUE, false, rows);
final List<Comparable[]> rows = new ArrayList<>();
Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> {
Comparable[] row = new Comparable[1];
row[0] = f;
rows.add(row);
});

TableHeader header = new TableHeader().addTableHeaderField("file path");
return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows);
}
}

@ShellMethod(key = "metadata validate-files", value = "Validate all files in all partitions from the metadata")
Expand All @@ -271,15 +277,15 @@ public String validateFiles(
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadataReader = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath);

if (!metadataReader.enabled()) {
return "[ERROR] Metadata Table not enabled/initialized\n\n";
}

HoodieMetadataConfig fsConfig = HoodieMetadataConfig.newBuilder().enable(false).build();
HoodieBackedTableMetadata fsMetaReader = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath, "/tmp");
new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath);

HoodieTimer timer = HoodieTimer.start();
List<String> metadataPartitions = metadataReader.getAllPartitionPaths();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void init() throws Exception {
metaClient.getHadoopConf(), config, context))
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
.withBaseFilesInPartitions(partitionAndFileId)
.withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addCommit("101");

hoodieTestTable.addCommit("102").withBaseFilesInPartitions(partitionAndFileId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ public void init() throws Exception {
metaClient.getHadoopConf(), config, context))
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
.withBaseFilesInPartitions(partitionAndFileId)
.withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addCommit("101")
.withBaseFilesInPartitions(partitionAndFileId)
.withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addInflightCommit("102")
.withBaseFilesInPartitions(partitionAndFileId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ public void init() throws Exception {
HoodieTestTable.of(metaClient)
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
.withBaseFilesInPartition(DEFAULT_FIRST_PARTITION_PATH, "file-1")
.withBaseFilesInPartition(DEFAULT_SECOND_PARTITION_PATH, "file-2")
.withBaseFilesInPartition(DEFAULT_THIRD_PARTITION_PATH, "file-3")
.withBaseFilesInPartition(DEFAULT_FIRST_PARTITION_PATH, "file-1").getLeft()
.withBaseFilesInPartition(DEFAULT_SECOND_PARTITION_PATH, "file-2").getLeft()
.withBaseFilesInPartition(DEFAULT_THIRD_PARTITION_PATH, "file-3").getLeft()
.addInflightCommit("101")
.withBaseFilesInPartition(DEFAULT_FIRST_PARTITION_PATH, "file-1")
.withBaseFilesInPartition(DEFAULT_SECOND_PARTITION_PATH, "file-2")
.withBaseFilesInPartition(DEFAULT_THIRD_PARTITION_PATH, "file-3")
.withBaseFilesInPartition(DEFAULT_FIRST_PARTITION_PATH, "file-1").getLeft()
.withBaseFilesInPartition(DEFAULT_SECOND_PARTITION_PATH, "file-2").getLeft()
.withBaseFilesInPartition(DEFAULT_THIRD_PARTITION_PATH, "file-3").getLeft()
.withMarkerFile(DEFAULT_FIRST_PARTITION_PATH, "file-1", IOType.MERGE)
.withMarkerFile(DEFAULT_SECOND_PARTITION_PATH, "file-2", IOType.MERGE)
.withMarkerFile(DEFAULT_THIRD_PARTITION_PATH, "file-3", IOType.MERGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ public void testRollbackCommit() throws Exception {
HoodieTestTable.of(metaClient)
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
.withBaseFilesInPartitions(partitionAndFileId)
.withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addCommit("101")
.withBaseFilesInPartitions(partitionAndFileId)
.withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addCommit("102")
.withBaseFilesInPartitions(partitionAndFileId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -494,15 +495,16 @@ protected void runAnyPendingClustering(HoodieTable table) {
/**
* Write the HoodieCommitMetadata to metadata table if available.
*
* @param table {@link HoodieTable} of interest.
* @param instantTime instant time of the commit.
* @param actionType action type of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
* @param table {@link HoodieTable} of interest.
* @param instantTime instant time of the commit.
* @param actionType action type of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
* @param writeStatuses Write statuses of the commit
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
checkArgument(table.isTableServiceAction(actionType, instantTime), String.format("Unsupported action: %s.%s is not table service.", actionType, instantTime));
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime));
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, writeStatuses, instantTime));
}

/**
Expand Down
Loading