Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism";
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
Expand Down Expand Up @@ -213,6 +214,10 @@ public int getDeleteShuffleParallelism() {
return Math.max(Integer.parseInt(props.getProperty(DELETE_PARALLELISM)), 1);
}

public int getFileListingParallelism() {
return Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1);
}

public int getRollbackParallelism() {
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
}
Expand Down Expand Up @@ -870,6 +875,11 @@ public Builder withDeleteParallelism(int parallelism) {
return this;
}

public Builder withFileListingParallelism(int parallelism) {
props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism));
return this;
}

public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) {
props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism));
props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism));
Expand Down Expand Up @@ -1024,6 +1034,8 @@ protected void setDefaults() {
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), ROLLBACK_USING_MARKERS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you re-use this same thing to getAllPartitionPaths() here: https://github.com/apache/hudi/blob/rfc-15/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java#L149

The default mechanism is really slow. I investigated FsUtils.getAllPartitionPaths() is also used at multiple places across Hudi code-base which can benefit from RFC-15. So, what I am thinking is by default we can let FsUtils.getAllPartitionPaths() always default to the metadata table, and internally if the table is not present, it will use this default parallelized/optimized listing of partition paths.

So, it benefits both customers who use metadata table and who don't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, what I am thinking is by default we can let FsUtils.getAllPartitionPaths() always default to the metadata table, and internally if the table is not present, it will use this default parallelized/optimized listing of partition paths.

+1 on this. There is a JIRA to track this specifically

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to do that but noticed that HoodieBackedTableMetadata is in hudi-common which does not have spark.

Some new interface / refactoring will be required. Can handle this in a different ticket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pass in the HoodieEngineContext which can also be plain old java if needed. I ll take a closer look in #2351 and how best we can do this. We can scope this PR to just what you just fixed now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/HUDI-1479 filed to track follow up. This needs to be done on master, since it has HoodieEngineContext abstraction that can be moved to hudi-common.

import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -67,7 +68,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

Expand All @@ -82,8 +82,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import scala.Tuple2;

import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
Expand Down Expand Up @@ -196,6 +194,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
.withFileListingParallelism(writeConfig.getFileListingParallelism())
.withFinalizeWriteParallelism(parallelism);

if (writeConfig.isMetricsOn()) {
Expand Down Expand Up @@ -311,43 +310,17 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient
initTableMetadata();

// List all partitions in the basePath of the containing dataset
FileSystem fs = datasetMetaClient.getFs();
FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
datasetWriteConfig.shouldAssumeDatePartitioning());
List<String> partitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");

// List all partitions in parallel and collect the files in them
int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) + 1; // +1 to prevent 0 parallelism
JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism)
.mapToPair(partition -> {
FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition));
return new Tuple2<>(partition, statuses);
});

// Collect the list of partitions and file lists
List<Tuple2<String, FileStatus[]>> partitionFileList = partitionFileListRDD.collect();
LOG.info("Initializing metadata table by using file listings in " + datasetWriteConfig.getBasePath());
Map<String, List<FileStatus>> partitionToFileStatus = getPartitionsToFilesMapping(jsc, datasetMetaClient);

// Create a HoodieCommitMetadata with writeStats for all discovered files
int[] stats = {0};
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();

partitionFileList.forEach(t -> {
final String partition = t._1;
try {
if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
return;
}
} catch (IOException e) {
throw new HoodieMetadataException("Failed to check partition " + partition, e);
}

partitionToFileStatus.forEach((partition, statuses) -> {
// Filter the statuses to only include files which were created before or on createInstantTime
Arrays.stream(t._2).filter(status -> {
statuses.stream().filter(status -> {
String filename = status.getPath().getName();
if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
return false;
}
if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
createInstantTime)) {
return false;
Expand All @@ -370,10 +343,59 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient
}
});

LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata");
LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}

/**
* Function to find hoodie partitions and list files in them in parallel.
*
* @param jsc
* @param datasetMetaClient
* @return Map of partition names to a list of FileStatus for all the files in the partition
*/
private Map<String, List<FileStatus>> getPartitionsToFilesMapping(JavaSparkContext jsc, HoodieTableMetaClient datasetMetaClient) {

List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(datasetWriteConfig.getBasePath()));

Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());

while (!pathsToList.isEmpty()) {
int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
List<Pair<Path, FileStatus[]>> dirToFileListing = jsc.parallelize(pathsToList, listingParallelism)
.map(path -> {
FileSystem fs = path.getFileSystem(conf.get());
return Pair.of(path, fs.listStatus(path));
}).collect();
pathsToList.clear();

// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
// the results.
dirToFileListing.forEach(p -> {
List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
.filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
.collect(Collectors.toList());

if (p.getRight().length > filesInDir.size()) {
// Is a partition. Add all data files to result.
partitionToFileStatus.put(p.getLeft().getName(), filesInDir);
} else {
// Add sub-dirs to the queue
pathsToList.addAll(Arrays.stream(p.getRight())
.filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
.map(fs -> fs.getPath())
.collect(Collectors.toList()));
}
});
}

return partitionToFileStatus;
}

/**
* Sync the Metadata Table from the instants created on the dataset.
*
Expand Down Expand Up @@ -454,7 +476,9 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
writeStats.forEach(hoodieWriteStat -> {
String pathWithPartition = hoodieWriteStat.getPath();
if (pathWithPartition == null) {
throw new HoodieMetadataException("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
// Empty partition
LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
return;
}

int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
Expand Down