Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
edeea57
Abstracted & unified `buildScan` functionality for COW/MOR Relations;
Feb 22, 2022
b5cf9f0
`BaseFileOnlyViewRelation` > `BaseFileRelation`
Feb 22, 2022
787b6a3
Fixing compilation
Feb 22, 2022
d5d3a3a
Extracted common converter utils to `HoodieCommonUtils`;
Feb 22, 2022
1bf0933
Abstracted common functionality;
Feb 22, 2022
bc639ed
Extracted common functionality to lists latest base files into `Hoodi…
Feb 23, 2022
c86bba7
Streamlined `MergeOnReadSnapshotRelation` to re-use common functional…
Feb 23, 2022
70356e5
Killing dead code;
Feb 23, 2022
0b2d604
Further simplified `MergeOnReadSnapshotRelation`
Feb 23, 2022
1bc09d3
`lint`
Feb 23, 2022
f8aa085
Cleaned up & streamlined `MergeOnReadIncrementalRelation`
Feb 23, 2022
b9fa316
Tidying up
Feb 23, 2022
804bb96
Extract most of the incremental-specific aspects into a trait that co…
Feb 23, 2022
899db46
Fixing compilation
Feb 23, 2022
48af420
Cleaning up unnecessary filtering
Mar 1, 2022
6027652
After rebase fixes
Mar 11, 2022
1d45bf0
Scaffolded `HoodieInMemoryFileIndex` and replicated `HoodieHadoopFSUt…
Mar 12, 2022
b7a4f8b
Fixed usages
Mar 12, 2022
40b0c05
Moved tests
Mar 12, 2022
0ab3b9b
Missing licenses
Mar 12, 2022
86e8fe3
Disabling linter
Mar 12, 2022
83bd0ea
Fixed compilation for Spark 2.x
Mar 12, 2022
fe8c7a8
Added missing scala-docs
Mar 14, 2022
dcd693d
Fixed incorrect casting
Mar 14, 2022
35eb6df
Fixed partition path handling for MOR Incremental Relation
Mar 14, 2022
eee5151
Fixed `HoodieIncrementalRelationTrait` to extend `HoodieBaseRelation`…
Mar 14, 2022
d85be0b
Handle the case when there are no commits to handle in Incremental Re…
Mar 14, 2022
f966aec
Return empty RDD in case there's no file-splits to handle
Mar 14, 2022
71b1435
Cleaned up `listLatestBaseFiles`
Mar 15, 2022
e39f963
Added TODO
Mar 15, 2022
f37854b
Fixing file handle leak
Mar 15, 2022
b0aa03e
Disabled vectorized reader to make sure MOR Incremental Relation work…
Mar 15, 2022
40e5a85
Fixed Parquet column-projection tests
Mar 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

object HoodieConversionUtils {

def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else org.apache.hudi.common.util.Option.empty()

def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
if (opt.isPresent) Some(opt.get) else None

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ object HoodieSparkUtils extends SparkAdapterSupport {
})
}

def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
}

/**
* @deprecated please use other overload [[createRdd]]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ protected HoodieTimeline getActiveTimeline() {
// that is under the pending compaction process, new log-file will bear the compaction's instant (on the
// timeline) in its name, as opposed to the base-file's commit instant. To make sure we're not filtering
// such log-file we have to _always_ include pending compaction instants into consideration
// TODO(HUDI-3302) re-evaluate whether we should not filter any commits in here
// TODO(HUDI-3302) re-evaluate whether we should filter any commits in here
HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
if (shouldIncludePendingCommits) {
return timeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,19 +509,16 @@ public MessageType readSchemaFromLogFile(Path path) throws IOException {
* @return
*/
public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
HoodieDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieDataBlock) {
lastBlock = (HoodieDataBlock) block;
try (Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null)) {
HoodieDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieDataBlock) {
lastBlock = (HoodieDataBlock) block;
}
}
return lastBlock != null ? new AvroSchemaConverter().convert(lastBlock.getSchema()) : null;
}
reader.close();
if (lastBlock != null) {
return new AvroSchemaConverter().convert(lastBlock.getSchema());
}
return null;
}

public boolean isHasOperationField() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,18 @@
package org.apache.hudi.hadoop.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.TypeUtils.unsafeCast;

public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
Expand All @@ -67,41 +49,6 @@ public static boolean doesBelongToIncrementalQuery(FileSplit s) {
return false;
}

// Return parquet file with a list of log files in the same file group.
public static List<Pair<Option<HoodieBaseFile>, List<HoodieLogFile>>> groupLogsByBaseFile(Configuration conf, List<Path> partitionPaths) {
Set<Path> partitionSet = new HashSet<>(partitionPaths);
// TODO(vc): Should we handle also non-hoodie splits here?
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet);

// Get all the base file and it's log files pairs in required partition paths.
List<Pair<Option<HoodieBaseFile>, List<HoodieLogFile>>> baseAndLogsList = new ArrayList<>();
partitionSet.forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);

try {
// Both commit and delta-commits are included - pick the latest completed one
Option<HoodieInstant> latestCompletedInstant =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();

Stream<FileSlice> latestFileSlices = latestCompletedInstant
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
.orElse(Stream.empty());

latestFileSlices.forEach(fileSlice -> {
List<HoodieLogFile> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
baseAndLogsList.add(Pair.of(fileSlice.getBaseFile(), logFilePaths));
});
} catch (Exception e) {
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
}
});
return baseAndLogsList;
}


/**
* Add a field to the existing fields projected.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType

/**
* [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying
* modes:
* <ul>
* <li>For COW tables: Snapshot</li>
* <li>For MOR tables: Read-optimized</li>
* </ul>
*
* NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the
* fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists
* as part of the record payload. In some cases, however, partition path might not necessarily be equal to the
* verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect
* partition field values being written
*/
class BaseFileOnlyRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {

override type FileSplit = HoodieBaseFileSplit

protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieUnsafeRDD = {
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
filters = filters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)

new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
}

protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters)
val fileSplits = partitions.values.toSeq.flatMap { files =>
files.flatMap { file =>
// TODO move to adapter
// TODO fix, currently assuming parquet as underlying format
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = file,
// TODO clarify why this is required
partitionValues = InternalRow.empty
)
}
}

val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes

sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes).map(HoodieBaseFileSplit.apply)
}
}

This file was deleted.

Loading