Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -39,7 +40,6 @@
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand All @@ -51,7 +51,6 @@
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -68,6 +67,8 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -168,7 +169,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
.build())
.withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
.withAutoCommit(true)
.withAvroSchemaValidate(true)
.withEmbeddedTimelineServerEnabled(false)
Expand Down Expand Up @@ -379,92 +380,68 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi

// List all partitions in the basePath of the containing dataset
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
Map<String, List<FileStatus>> partitionToFileStatus = getPartitionsToFilesMapping(dataMetaClient);

// Create a HoodieCommitMetadata with writeStats for all discovered files
int[] stats = {0};
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();

partitionToFileStatus.forEach((partition, statuses) -> {
// Filter the statuses to only include files which were created before or on createInstantTime
statuses.stream().filter(status -> {
String filename = status.getPath().getName();
return !HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
createInstantTime);
}).forEach(status -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPath((partition.isEmpty() ? "" : partition + Path.SEPARATOR) + status.getPath().getName());
writeStat.setPartitionPath(partition);
writeStat.setTotalWriteBytes(status.getLen());
commitMetadata.addWriteStat(partition, writeStat);
stats[0] += 1;
});

// If the partition has no files then create a writeStat with no file path
if (commitMetadata.getWriteStats(partition) == null) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partition);
commitMetadata.addWriteStat(partition, writeStat);
}
});
List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);

LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
update(commitMetadata, createInstantTime, false);
// During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these
// large number of files and calling the existing update(HoodieCommitMetadata) function does not scale well.
// Hence, we have a special commit just for the bootstrap scenario.
bootstrapCommit(dirInfoList, createInstantTime);
return true;
}

/**
* Function to find hoodie partitions and list files in them in parallel.
*
* @param dataMetaClient
* @param datasetMetaClient data set meta client instance.
* @return Map of partition names to a list of FileStatus for all the files in the partition
*/
private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient datasetMetaClient) {
List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(dataWriteConfig.getBasePath()));

Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>();
final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
SerializableConfiguration conf = new SerializableConfiguration(dataMetaClient.getHadoopConf());
SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
final String datasetBasePath = datasetMetaClient.getBasePath();

while (!pathsToList.isEmpty()) {
int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
// In each round we will list a section of directories
int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
List<DirectoryInfo> processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
FileSystem fs = path.getFileSystem(conf.get());
return Pair.of(path, fs.listStatus(path));
}, listingParallelism);
pathsToList.clear();
String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), path);
return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
}, numDirsToList);

pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size()));

// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
// the results.
dirToFileListing.forEach(p -> {
if (!dirFilterRegex.isEmpty() && p.getLeft().getName().matches(dirFilterRegex)) {
LOG.info("Ignoring directory " + p.getLeft() + " which matches the filter regex " + dirFilterRegex);
return;
for (DirectoryInfo dirInfo : processedDirectories) {
if (!dirFilterRegex.isEmpty()) {
final String relativePath = dirInfo.getRelativePath();
if (!relativePath.isEmpty()) {
Path partitionPath = new Path(datasetBasePath, relativePath);
if (partitionPath.getName().matches(dirFilterRegex)) {
LOG.info("Ignoring directory " + partitionPath + " which matches the filter regex " + dirFilterRegex);
continue;
}
}
}

List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
.filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
.collect(Collectors.toList());

if (p.getRight().length > filesInDir.size()) {
String partitionName = FSUtils.getRelativePartitionPath(new Path(dataMetaClient.getBasePath()), p.getLeft());
// deal with Non-partition table, we should exclude .hoodie
partitionToFileStatus.put(partitionName, filesInDir.stream()
.filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
if (dirInfo.isHoodiePartition()) {
// Add to result
partitionsToBootstrap.add(dirInfo);
} else {
// Add sub-dirs to the queue
pathsToList.addAll(Arrays.stream(p.getRight())
.filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
.map(fs -> fs.getPath())
.collect(Collectors.toList()));
pathsToList.addAll(dirInfo.getSubDirectories());
}
});
}
}

return partitionToFileStatus;
return partitionsToBootstrap;
}

/**
Expand Down Expand Up @@ -528,7 +505,7 @@ private interface ConvertMetadataFunction {
private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
if (enabled && metadata != null) {
List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService);
commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService);
}
}

Expand Down Expand Up @@ -590,7 +567,7 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime)

List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), rollbackMetadata, instantTime,
metadata.getSyncedInstantTime(), wasSynced);
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, false);
commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, false);
}
}

Expand All @@ -603,12 +580,12 @@ public void close() throws Exception {

/**
* Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
* @param records The list of records to be written.
* @param records The HoodieData of records to be written.
* @param partitionName The partition to which the records are to be written.
* @param instantTime The timestamp to use for the deltacommit.
* @param canTriggerTableService true if table services can be scheduled and executed. false otherwise.
*/
protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService);
protected abstract void commit(HoodieData<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService);

/**
* Perform a compaction on the Metadata Table.
Expand Down Expand Up @@ -647,4 +624,96 @@ protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime
// metadata table.
writeClient.clean(instantTime + "002");
}

/**
* This is invoked to bootstrap metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to
* other regular commits.
*
*/
protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String createInstantTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewer: I have added HoodieData abstractions to commit() in metadata writer. And bootstrap is just one code across all engines. I see some difference in actual commit() impl across spark and flink and so did not try to generalize it in this patch.

List<String> partitions = partitionInfoList.stream().map(p -> p.getRelativePath()).collect(Collectors.toList());
final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum();

// Record which saves the list of all partitions
HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
if (partitions.isEmpty()) {
// in case of boostrapping of a fresh table, there won't be any partitions, but we need to make a boostrap commit
commit(engineContext.parallelize(Collections.singletonList(allPartitionRecord), 1), MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
return;
}
HoodieData<HoodieRecord> partitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
if (!partitionInfoList.isEmpty()) {
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
// Record which saves files within a partition
return HoodieMetadataPayload.createPartitionFilesRecord(
partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty());
});
partitionRecords = partitionRecords.union(fileListRecords);
}

LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata");
ValidationUtils.checkState(partitionRecords.count() == (partitions.size() + 1));
commit(partitionRecords, MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
}

/**
* A class which represents a directory and the files and directories inside it.
*
* A {@code PartitionFileInfo} object saves the name of the partition and various properties requires of each file
* required for bootstrapping the metadata table. Saving limited properties reduces the total memory footprint when
* a very large number of files are present in the dataset being bootstrapped.
*/
static class DirectoryInfo implements Serializable {
// Relative path of the directory (relative to the base directory)
private final String relativePath;
// Map of filenames within this partition to their respective sizes
private HashMap<String, Long> filenameToSizeMap;
// List of directories within this partition
private final List<Path> subDirectories = new ArrayList<>();
// Is this a hoodie partition
private boolean isHoodiePartition = false;

public DirectoryInfo(String relativePath, FileStatus[] fileStatus) {
this.relativePath = relativePath;

// Pre-allocate with the maximum length possible
filenameToSizeMap = new HashMap<>(fileStatus.length);

for (FileStatus status : fileStatus) {
if (status.isDirectory()) {
// Ignore .hoodie directory as there cannot be any partitions inside it
if (!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
this.subDirectories.add(status.getPath());
}
} else if (status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
// Presence of partition meta file implies this is a HUDI partition
this.isHoodiePartition = true;
} else if (FSUtils.isDataFile(status.getPath())) {
// Regular HUDI data file (base file or log file)
filenameToSizeMap.put(status.getPath().getName(), status.getLen());
}
}
}

String getRelativePath() {
return relativePath;
}

int getTotalFiles() {
return filenameToSizeMap.size();
}

boolean isHoodiePartition() {
return isHoodiePartition;
}

List<Path> getSubDirectories() {
return subDirectories;
}

// Returns a map of filenames mapped to their lengths
Map<String, Long> getFileNameToSizeMap() {
return filenameToSizeMap;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
Expand Down Expand Up @@ -90,8 +91,9 @@ protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext eng
}

@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService) {
protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
List<HoodieRecord> records = (List<HoodieRecord>) hoodieDataRecords.get();
List<HoodieRecord> recordList = prepRecords(records, partitionName, 1);

try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public HoodieData<T> distinct() {
return HoodieJavaRDD.of(rddData.distinct());
}

@Override
public HoodieData<T> union(HoodieData<T> other) {
return HoodieJavaRDD.of(rddData.union((JavaRDD<T>) other.get()));
}

@Override
public List<T> collectAsList() {
return rddData.collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
Expand All @@ -39,7 +40,6 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -102,9 +102,9 @@ protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext eng
}
}

@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService) {
protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) hoodieDataRecords.get();
JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);

try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
Expand Down Expand Up @@ -147,12 +147,11 @@ protected void commit(List<HoodieRecord> records, String partitionName, String i
*
* The record is tagged with respective file slice's location based on its record key.
*/
private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));

JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext();
return jsc.parallelize(records, 1).map(r -> {
return recordsRDD.map(r -> {
FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
return r;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public abstract <O> HoodieData<O> mapPartitions(
*/
public abstract HoodieData<T> distinct();

/**
* Unions this {@link HoodieData} with other {@link HoodieData}.
* @param other {@link HoodieData} of interest.
* @return the union of two as as instance of {@link HoodieData}.
*/
public abstract HoodieData<T> union(HoodieData<T> other);

/**
* @return collected results in {@link List<T>}.
*/
Expand Down
Loading