Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.metadata;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
Expand Down Expand Up @@ -55,6 +53,9 @@
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -71,6 +72,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
Expand Down Expand Up @@ -227,9 +229,14 @@ public void testOnlyValidPartitionsAdded() throws Exception {
"Must not contain the filtered directory " + filteredDirectoryThree);

FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
assertTrue(statuses.length == 2);
assertEquals(2, statuses.length);
statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
assertTrue(statuses.length == 5);
assertEquals(5, statuses.length);
Map<String, FileStatus[]> partitionsToFilesMap = metadata(client).getAllFilesInPartitions(
Arrays.asList(basePath + "/p1", basePath + "/p2"));
assertEquals(2, partitionsToFilesMap.size());
assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length);
assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length);
}
}

Expand Down Expand Up @@ -881,6 +888,10 @@ private void validateMetadata(SparkRDDWriteClient testClient) throws IOException
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, engineContext);
TableFileSystemView tableView = table.getHoodieView();
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
assertEquals(fsPartitions.size(), partitionToFilesMap.size());

fsPartitions.forEach(partition -> {
try {
Path partitionPath;
Expand All @@ -899,6 +910,8 @@ private void validateMetadata(SparkRDDWriteClient testClient) throws IOException
Collections.sort(fsFileNames);
Collections.sort(metadataFilenames);

assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length);

// File sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));

Expand Down
18 changes: 11 additions & 7 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -277,13 +278,16 @@ public static List<String> getAllPartitionPaths(HoodieEngineContext engineContex
}
}

public static FileStatus[] getFilesInPartition(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
String basePathStr, Path partitionPath) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext,
metadataConfig, basePathStr, FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue())) {
return tableMetadata.getAllFilesInPartition(partitionPath);
} catch (Exception e) {
throw new HoodieException("Error get files in partition: " + partitionPath, e);
public static Map<String, FileStatus[]> getFilesInPartitions(HoodieEngineContext engineContext,
HoodieMetadataConfig metadataConfig,
String basePathStr,
String[] partitionPaths,
String spillableMapPath) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
spillableMapPath, true)) {
return tableMetadata.getAllFilesInPartitions(Arrays.asList(partitionPaths));
} catch (Exception ex) {
throw new HoodieException("Error get files in partitions: " + String.join(",", partitionPaths), ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public abstract class BaseTableMetadata implements HoodieTableMetadata {
Expand Down Expand Up @@ -134,6 +136,26 @@ public FileStatus[] getAllFilesInPartition(Path partitionPath)
.getAllFilesInPartition(partitionPath);
}

@Override
public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitionPaths)
throws IOException {
if (enabled) {
Map<String, FileStatus[]> partitionsFilesMap = new HashMap<>();

try {
for (String partitionPath : partitionPaths) {
partitionsFilesMap.put(partitionPath, fetchAllFilesInPartition(new Path(partitionPath)));
}
return partitionsFilesMap;
} catch (Exception e) {
throw new HoodieMetadataException("Failed to retrieve files in partition from metadata", e);
}
}

return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, datasetBasePath, metadataConfig.shouldAssumeDatePartitioning())
.getAllFilesInPartitions(partitionPaths);
}

/**
* Returns a list of all partitions.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
Expand Down Expand Up @@ -105,6 +107,24 @@ public List<String> getAllPartitionPaths() throws IOException {
return partitionPaths;
}

@Override
public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitionPaths)
throws IOException {
if (partitionPaths == null || partitionPaths.isEmpty()) {
return Collections.emptyMap();
}

int parallelism = Math.min(DEFAULT_LISTING_PARALLELISM, partitionPaths.size());

List<Pair<String, FileStatus[]>> partitionToFiles = engineContext.map(partitionPaths, partitionPathStr -> {
Path partitionPath = new Path(partitionPathStr);
FileSystem fs = partitionPath.getFileSystem(hadoopConf.get());
return Pair.of(partitionPathStr, FSUtils.getAllDataFilesInPartition(fs, partitionPath));
Copy link

@pengzhiwei2018 pengzhiwei2018 Jun 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileStatus is not a Serializable class by the default spark serializer. If user have not specify the serializer to kryo when query hudi table, an NotSerializableException will throw out. There is a same problem for FSUtils.getAllPartitionPaths. But I think we fix this in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see yeah FileStatus is not serializable in Hadoop 2, but has been made Serializable in Hadoop 3. We should fix this in a separate PR for all methods by introducing SerializableFileStatus similar to Spark https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L347.

}, parallelism);

return partitionToFiles.stream().collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}

@Override
public Option<String> getSyncedInstantTime() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
* Interface that supports querying various pieces of metadata about a hudi table.
Expand Down Expand Up @@ -95,6 +96,11 @@ static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetad
*/
List<String> getAllPartitionPaths() throws IOException;

/**
* Fetch all files for given partition paths.
*/
Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitionPaths) throws IOException;

/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.metadata;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
Expand All @@ -30,7 +31,10 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness {
Expand Down Expand Up @@ -63,8 +67,10 @@ public void testNonPartitionedTable() throws Exception {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 0);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath)).length == 10);
Assertions.assertEquals(0, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath)).length);
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartitions(
Collections.singletonList(basePath)).get(basePath).length);
}

/**
Expand All @@ -86,8 +92,14 @@ public void testDatePartitionedTable() throws Exception {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, true);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + DATE_PARTITIONS.get(0))).length == 10);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + DATE_PARTITIONS.get(0))).length);

List<String> fullPartitionPaths = DATE_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(10, partitionToFilesMap.get(p).length);
}
}

/**
Expand All @@ -101,15 +113,23 @@ public void testDatePartitionedTableWithAssumeDateIsFalse() throws Exception {
// Generate 10 files under each partition
DATE_PARTITIONS.stream().forEach(p -> {
try {
hoodieTestTable = hoodieTestTable.withBaseFilesInPartition(p, IntStream.range(0, 10).toArray());
hoodieTestTable = hoodieTestTable
.withPartitionMetaFiles(p)
.withBaseFilesInPartition(p, IntStream.range(0, 10).toArray());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 0);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());

List<String> fullPartitionPaths = DATE_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(10, partitionToFilesMap.get(p).length);
}
}

@Test
Expand All @@ -128,8 +148,14 @@ public void testOneLevelPartitionedTable() throws Exception {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + ONE_LEVEL_PARTITIONS.get(0))).length == 10);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + ONE_LEVEL_PARTITIONS.get(0))).length);

List<String> fullPartitionPaths = ONE_LEVEL_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(10, partitionToFilesMap.get(p).length);
}
}

@Test
Expand All @@ -148,8 +174,14 @@ public void testMultiLevelPartitionedTable() throws Exception {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length == 10);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length);

List<String> fullPartitionPaths = MULTI_LEVEL_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(10, partitionToFilesMap.get(p).length);
}
}

@Test
Expand All @@ -167,8 +199,14 @@ public void testMultiLevelEmptyPartitionTable() throws Exception {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length == 0);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(0, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length);

List<String> fullPartitionPaths = MULTI_LEVEL_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(0, partitionToFilesMap.get(p).length);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.hudi

import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.common.config.ConfigProperty
import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator}
Expand Down Expand Up @@ -471,4 +473,16 @@ object DataSourceOptionsHelper {
})
translatedOpt.toMap
}

def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
// or else use query type from QUERY_TYPE_OPT_KEY.
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue()))

Map(
QUERY_TYPE_OPT_KEY.key -> queryType
) ++ translateConfigurations(parameters)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class DefaultSource extends RelationProvider
optParams: Map[String, String],
schema: StructType): BaseRelation = {
// Add default options for unspecified read options keys.
val parameters = DataSourceOptionsHelper.translateConfigurations(optParams)
val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(optParams)

val path = parameters.get("path")
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY.key)
Expand Down Expand Up @@ -106,12 +106,7 @@ class DefaultSource extends RelationProvider
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
val tableType = metaClient.getTableType

// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
// or else use query type from QUERY_TYPE_OPT_KEY.
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue()))
val queryType = parameters(QUERY_TYPE_OPT_KEY.key)

log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")

Expand Down
Loading