Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
Expand All @@ -49,11 +50,13 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -137,16 +140,14 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSpli
}

// Return parquet file with a list of log files in the same file group.
public static Map<HoodieBaseFile, List<String>> groupLogsByBaseFile(Configuration conf, List<HoodieBaseFile> fileStatuses) {
Map<Path, List<HoodieBaseFile>> partitionsToParquetSplits =
fileStatuses.stream().collect(Collectors.groupingBy(file -> file.getFileStatus().getPath().getParent()));
public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFile(Configuration conf, List<Path> partitionPaths) {
Set<Path> partitionSet = new HashSet<>(partitionPaths);
// TODO(vc): Should we handle also non-hoodie splits here?
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet());
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet);

// for all unique split parents, obtain all delta files based on delta commit timeline,
// grouped on file id
Map<HoodieBaseFile, List<String>> resultMap = new HashMap<>();
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
// Get all the base file and it's log files pairs in required partition paths.
List<Pair<Option<HoodieBaseFile>, List<String>>> baseAndLogsList = new ArrayList<>();
partitionSet.forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
Expand All @@ -161,28 +162,18 @@ public static Map<HoodieBaseFile, List<String>> groupLogsByBaseFile(Configuratio
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
.orElse(Stream.empty());

// subgroup splits again by file id & match with log files.
Map<String, List<HoodieBaseFile>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
.collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getFileStatus().getPath().getName())));
latestFileSlices.forEach(fileSlice -> {
List<HoodieBaseFile> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
dataFileSplits.forEach(split -> {
try {
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
resultMap.put(split, logFilePaths);
} catch (Exception e) {
throw new HoodieException("Error creating hoodie real time split ", e);
}
});
baseAndLogsList.add(Pair.of(fileSlice.getBaseFile(), logFilePaths));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for log only file slice, is fileSlice.getBaseFile() == null?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, for log only file slice, the fileSlice.getBaseFile() is an empty Option.

});
} catch (Exception e) {
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
}
});
return resultMap;
return baseAndLogsList;
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* Add a field to the existing fields projected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ case class HoodieFileIndex(
}
}

private lazy val metadataConfig = {
val properties = new Properties()
properties.putAll(options.asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}

@transient @volatile private var fileSystemView: HoodieTableFileSystemView = _
@transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _
@transient @volatile private var cachedFileSize: Long = 0L
Expand Down Expand Up @@ -195,8 +201,8 @@ case class HoodieFileIndex(
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
private def prunePartition(partitionPaths: Seq[PartitionRowPath],
predicates: Seq[Expression]): Seq[PartitionRowPath] = {
def prunePartition(partitionPaths: Seq[PartitionRowPath],
predicates: Seq[Expression]): Seq[PartitionRowPath] = {

val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
Expand All @@ -222,26 +228,13 @@ case class HoodieFileIndex(
}
}

/**
* Load all partition paths and it's files under the query table path.
*/
private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
def getAllQueryPartitionPaths: Seq[PartitionRowPath] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val properties = new Properties()
properties.putAll(options.asJava)
val metadataConfig = HoodieMetadataConfig.newBuilder.fromProperties(properties).build()

val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath)
// Load all the partition path from the basePath, and filter by the query partition path.
// TODO load files from the queryPartitionPath directly.
val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala
.filter(_.startsWith(queryPartitionPath))

val writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath).withProperties(properties).build()
val maxListParallelism = writeConfig.getFileListingParallelism

val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
val partitionSchema = _partitionSchemaFromProperties
val timeZoneId = CaseInsensitiveMap(options)
.get(DateTimeUtils.TIMEZONE_OPTION)
Expand All @@ -250,7 +243,7 @@ case class HoodieFileIndex(
val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark
.sessionState.conf)
// Convert partition path to PartitionRowPath
val partitionRowPaths = partitionPaths.map { partitionPath =>
partitionPaths.map { partitionPath =>
val partitionRow = if (partitionSchema.fields.length == 0) {
// This is a non-partitioned table
InternalRow.empty
Expand Down Expand Up @@ -308,7 +301,20 @@ case class HoodieFileIndex(
}
PartitionRowPath(partitionRow, partitionPath)
}
}

/**
* Load all partition paths and it's files under the query table path.
*/
private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
val properties = new Properties()
properties.putAll(options.asJava)
val writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath).withProperties(properties).build()

val maxListParallelism = writeConfig.getFileListingParallelism
val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
val partitionRowPaths = getAllQueryPartitionPaths
// List files in all of the partition path.
val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]()
val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi

import org.apache.avro.Schema
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
Expand Down Expand Up @@ -137,12 +136,15 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
}

def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = {

val fileStatuses = if (globPaths.isDefined) {
// Get all partition paths
val partitionPaths = if (globPaths.isDefined) {
// Load files from the global paths if it has defined to be compatible with the original mode
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
inMemoryFileIndex.allFiles()
} else { // Load files by the HoodieFileIndex.
val fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline.getCommitsTimeline
.filterCompletedInstants, inMemoryFileIndex.allFiles().toArray)
fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we only have log files. inMemoryFileIndex.allfiles will be empty, since spark will filter .log
fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent) will return a empty partitionPaths. then buildFileIndex will return HoodieMergeOnReadFileSplit, nothings will be read

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently I have not support reading-log only table for the globPaths query which users must specify some * in the query path and the partition prune is also not work.
In the long way, we should recommend the non-globalPath query which use the HoodieFileIndex to list files and partitions. User do not need to specify the * in the query path and can also have better performance from the partition prune and meta table list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your explanation, LGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a Jira issue about the plan?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we have already support the non-globalPaths query way by #2651, I will add some doc to introduce it in 0.9 release.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice. Didn't follow Spark related changes recently. Seems like it has a lot of new features landed.

} else { // Load partition path by the HoodieFileIndex.
val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))

Expand All @@ -152,34 +154,35 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val partitionFilterExpression =
HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema)

// if convert success to catalyst expression, use the partition prune
if (partitionFilterExpression.isDefined) {
hoodieFileIndex.listFiles(Seq(partitionFilterExpression.get), Seq.empty).flatMap(_.files)
} else {
hoodieFileIndex.allFiles
}
val allPartitionPaths = hoodieFileIndex.getAllQueryPartitionPaths
// If convert success to catalyst expression, use the partition prune
hoodieFileIndex.prunePartition(allPartitionPaths, partitionFilterExpression.map(Seq(_)).getOrElse(Seq.empty))
.map(_.fullPartitionPath(metaClient.getBasePath))
}

if (fileStatuses.isEmpty) { // If this an empty table, return an empty split list.
if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list.
List.empty[HoodieMergeOnReadFileSplit]
} else {
val fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline.getCommitsTimeline
.filterCompletedInstants, fileStatuses.toArray)
val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList

if (!fsView.getLastInstant.isPresent) { // Return empty list if the table has no commit
val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant()
if (!lastInstant.isPresent) { // Return empty list if the table has no commit
List.empty
} else {
val latestCommit = fsView.getLastInstant.get().getTimestamp
val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
val fileSplits = fileGroup.map(kv => {
val baseFile = kv._1
val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath)

val partitionedFile = PartitionedFile(InternalRow.empty, filePath, 0, baseFile.getFileLen)
HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit,
val latestCommit = lastInstant.get().getTimestamp
val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala
val fileSplits = baseAndLogsList.map(kv => {
val baseFile = kv.getLeft
val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList)

val baseDataPath = if (baseFile.isPresent) {
Some(PartitionedFile(
InternalRow.empty,
MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath),
0, baseFile.get.getFileLen)
)
} else {
None
}
HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
}).toList
fileSplits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

package org.apache.hudi.testutils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FileIOUtils;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -118,4 +124,27 @@ public static List<Row> updateRowsWithHigherTs(Dataset<Row> inputDf) {
}
return rows;
}

/**
* Test if there is only log files exists in the table.
*/
public static boolean isLogFileOnly(String basePath) throws IOException {
Configuration conf = new Configuration();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(conf).setBasePath(basePath)
.build();
String baseDataFormat = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
Path path = new Path(basePath);
FileSystem fs = path.getFileSystem(conf);
RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, true);
while (files.hasNext()) {
LocatedFileStatus file = files.next();
if (file.isFile()) {
if (file.getPath().toString().endsWith(baseDataFormat)) {
return false;
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@

package org.apache.hudi.functional

import org.apache.hadoop.fs.Path

import scala.collection.JavaConverters._
import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
Expand Down Expand Up @@ -677,4 +681,23 @@ class TestMORDataSource extends HoodieClientTestBase {

assertEquals(partitionCounts("2021/03/03"), count7)
}

@Test
def testReadLogOnlyMergeOnReadTable(): Unit = {
initMetaClient(HoodieTableType.MERGE_ON_READ)
val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20)
val inputDF = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
inputDF.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// Use InMemoryIndex to generate log only mor table.
.option(HoodieIndexConfig.INDEX_TYPE_PROP.key, IndexType.INMEMORY.toString)
.mode(SaveMode.Overwrite)
.save(basePath)
// There should no base file in the file list.
assertTrue(DataSourceTestUtils.isLogFileOnly(basePath))
// Test read log only mor table.
assertEquals(20, spark.read.format("hudi").load(basePath).count())
}
}
Loading