Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2014048
Cleaned up conditional that should not be hit;
Feb 3, 2022
17a72c8
Made `BaseFileWithLogsSplit` implement `RealtimeSplit`;
Feb 3, 2022
258bd27
Tidying up `HoodieRealtimeFileSplit`
Feb 3, 2022
a13d3ac
`PathWithLogFilePath` > `RealtimePath`;
Feb 3, 2022
cf87b0e
Refactored `RealtimePath` to produce `HoodieRealtimeFileSplit`
Feb 3, 2022
4770a89
Prefix w/ "Hoodie"
Feb 3, 2022
f0a86b5
Fixed `HoodieRealtimeFileSplit` to bear `belongsToIncrementalQuery` flag
Feb 3, 2022
6ae1a71
Tidying up
Feb 3, 2022
04c024a
Removed `BaseFileWithLogsSplit`
Feb 3, 2022
c2d8b04
Cleaned up dead-code
Feb 3, 2022
5fda93e
Fixed `RealtimeBootstrapBaseFileSplit`
Feb 3, 2022
ba49892
`RealtimeBootstrapBaseFileSplit` > `HoodieRealtimeBootstrapBaseFileSp…
Feb 3, 2022
ce26a38
Added comments
Feb 3, 2022
ffe1451
Propagate missing Virtual Key info into `RealtimeSplit`s
Feb 11, 2022
20b8380
Cleaned up `RealtimeFileStatus`, `HoodieRealtimePath` to be mostly im…
Feb 11, 2022
f07469c
XXX
Feb 11, 2022
3d78efd
Properly set up `HoodieVirtualKeyInfo`
Feb 11, 2022
3e83820
Separate COW/MOR `InputFormat`s to make sure that no MOR-specific log…
Feb 11, 2022
ee70c2f
Tidying up
Feb 11, 2022
f2dde38
Made sure Virtual Key info is fetched once for individual table
Feb 11, 2022
4a68cff
Tidying up
Feb 11, 2022
d357ea2
Reverting inadvertent change
Feb 11, 2022
a124630
Fixed incorrect `blockSize` being set in `FileStatus` object when lis…
Feb 11, 2022
5d842ac
Cleaned up deprecated code
Feb 12, 2022
d8beebf
Extracted base `HoodieTableInputFormat`
Feb 12, 2022
b3a5264
Misisng license
Feb 14, 2022
d1f40f1
Added missing validation for whether instant is present in the timeline
Feb 16, 2022
3ebaec5
Killing dead-code
Feb 16, 2022
91262ed
Fixed tests
Feb 16, 2022
cb19f84
Make `BaseHoodieTableFileIndex` only validate instants when required …
Feb 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -74,6 +75,7 @@ public abstract class BaseHoodieTableFileIndex {
protected final List<Path> queryPaths;

private final boolean shouldIncludePendingCommits;
private final boolean shouldValidateInstant;

private final HoodieTableType tableType;
protected final String basePath;
Expand All @@ -98,6 +100,7 @@ public abstract class BaseHoodieTableFileIndex {
* @param queryPaths target DFS paths being queried
* @param specifiedQueryInstant instant as of which table is being queried
* @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations
* @param shouldValidateInstant flags to validate whether query instant is present in the timeline
* @param fileStatusCache transient cache of fetched [[FileStatus]]es
*/
public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
Expand All @@ -107,6 +110,7 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
List<Path> queryPaths,
Option<String> specifiedQueryInstant,
boolean shouldIncludePendingCommits,
boolean shouldValidateInstant,
FileStatusCache fileStatusCache) {
this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
.orElse(new String[0]);
Expand All @@ -122,6 +126,7 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
this.queryPaths = queryPaths;
this.specifiedQueryInstant = specifiedQueryInstant;
this.shouldIncludePendingCommits = shouldIncludePendingCommits;
this.shouldValidateInstant = shouldValidateInstant;

this.tableType = metaClient.getTableType();
this.basePath = metaClient.getBasePath();
Expand All @@ -142,6 +147,13 @@ public Option<HoodieInstant> getLatestCompletedInstant() {
return getActiveTimeline().filterCompletedInstants().lastInstant();
}

/**
* Returns table's base-path
*/
public String getBasePath() {
return metaClient.getBasePath();
}

/**
* Fetch list of latest base files and log files per partition.
*
Expand Down Expand Up @@ -264,6 +276,8 @@ private void doRefresh() {
Option<String> queryInstant =
specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));

validate(activeTimeline, queryInstant);

if (tableType.equals(HoodieTableType.MERGE_ON_READ) && queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
Expand All @@ -277,15 +291,15 @@ private void doRefresh() {
)
);
} else {
// TODO re-align with the branch (MOR, snapshot) branch
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
Function.identity(),
partitionPath ->
specifiedQueryInstant.map(instant ->
fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true))
.orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
queryInstant.map(instant ->
fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)
)
.orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
)
);
}
Expand All @@ -303,6 +317,14 @@ private void doRefresh() {
LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
}

private void validate(HoodieTimeline activeTimeline, Option<String> queryInstant) {
if (shouldValidateInstant) {
if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) {
throw new HoodieIOException(String.format("Query instant (%s) not found in the timeline", queryInstant.get()));
}
}
}

private static long fileSliceSize(FileSlice fileSlice) {
long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize)
.filter(s -> s > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
Expand Down Expand Up @@ -147,18 +148,21 @@ public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String basePath
* been touched multiple times in the given commits, the return value will keep the one
* from the latest commit.
*
*
* @param hadoopConf
* @param basePath The base path
* @return the file full path to file status mapping
*/
public Map<String, FileStatus> getFullPathToFileStatus(String basePath) {
public Map<String, FileStatus> getFullPathToFileStatus(Configuration hadoopConf, String basePath) {
Map<String, FileStatus> fullPathToFileStatus = new HashMap<>();
for (List<HoodieWriteStat> stats : getPartitionToWriteStats().values()) {
// Iterate through all the written files.
for (HoodieWriteStat stat : stats) {
String relativeFilePath = stat.getPath();
Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
if (fullPath != null) {
FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, 0,
long blockSize = FSUtils.getFs(fullPath.toString(), hadoopConf).getDefaultBlockSize(fullPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the block size be extracted outside the loop based on the file system of base path, since it's a file-system config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's based on path, so don't want to bake in the assumptions that all paths are homogeneous (might be pointing at different HDFS nodes, w/ different block settings)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block size is a per file thing in HDFS. but rarely ever set differently. For cloud storage its all 0s. We need to see if getDefaultBlockSize() is an RPC call. if so, then need to avoid this, even if it assumes things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check. The reason why i'm fixing this is b/c w/ block-size 0 Hive will slice the file in 1 byte blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar there's no RPC in the path (config stored w/in DFSClient)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin I think I confused it with getDefaultBlockSize() call which is based on file system (see below), not file status, and it only fetches from the config. This is fine.

Even if the block size is 0, should Hive still honor the actual block size of the file? At least that's my understanding for Trino Hive connector.

/**
   * Return the number of bytes that large input files should be optimally
   * be split into to minimize i/o time.
   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
   */
  @Deprecated
  public long getDefaultBlockSize() {
    // default to 32MB: large enough to minimize the impact of seeks
    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
  }

FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, blockSize,
0, fullPath);
fullPathToFileStatus.put(fullPath.getName(), fileStatus);
}
Expand All @@ -172,14 +176,16 @@ public Map<String, FileStatus> getFullPathToFileStatus(String basePath) {
* been touched multiple times in the given commits, the return value will keep the one
* from the latest commit by file group ID.
*
* <p>Note: different with {@link #getFullPathToFileStatus(String)},
* <p>Note: different with {@link #getFullPathToFileStatus(Configuration, String)},
* only the latest commit file for a file group is returned,
* this is an optimization for COPY_ON_WRITE table to eliminate legacy files for filesystem view.
*
*
* @param hadoopConf
* @param basePath The base path
* @return the file ID to file status mapping
*/
public Map<String, FileStatus> getFileIdToFileStatus(String basePath) {
public Map<String, FileStatus> getFileIdToFileStatus(Configuration hadoopConf, String basePath) {
Map<String, FileStatus> fileIdToFileStatus = new HashMap<>();
for (List<HoodieWriteStat> stats : getPartitionToWriteStats().values()) {
// Iterate through all the written files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.sink.partitioner.profile;

import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand All @@ -29,11 +33,6 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,7 +116,7 @@ private static Map<String, FileStatus> getFilesToReadOfInstant(
HoodieCommitMetadata metadata,
FileSystem fs,
HoodieTableType tableType) {
return getFilesToRead(metadata, basePath.toString(), tableType).entrySet().stream()
return getFilesToRead(fs.getConf(), metadata, basePath.toString(), tableType).entrySet().stream()
// filter out the file paths that does not exist, some files may be cleaned by
// the cleaner.
.filter(entry -> {
Expand All @@ -133,14 +132,16 @@ private static Map<String, FileStatus> getFilesToReadOfInstant(
}

private static Map<String, FileStatus> getFilesToRead(
Configuration hadoopConf,
HoodieCommitMetadata metadata,
String basePath,
HoodieTableType tableType) {
HoodieTableType tableType
) {
switch (tableType) {
case COPY_ON_WRITE:
return metadata.getFileIdToFileStatus(basePath);
return metadata.getFileIdToFileStatus(hadoopConf, basePath);
case MERGE_ON_READ:
return metadata.getFullPathToFileStatus(basePath);
return metadata.getFullPathToFileStatus(hadoopConf, basePath);
default:
throw new AssertionError();
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ public class BootstrapBaseFileSplit extends FileSplit {
* NOTE: This ctor is necessary for Hive to be able to serialize and
* then instantiate it when deserializing back
*/
public BootstrapBaseFileSplit() {
super();
}
public BootstrapBaseFileSplit() {}

public BootstrapBaseFileSplit(FileSplit baseSplit, FileSplit bootstrapFileSplit)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public HiveHoodieTableFileIndex(HoodieEngineContext engineContext,
queryPaths,
specifiedQueryInstant,
shouldIncludePendingCommits,
true,
new NoopCache());
}

Expand Down
Loading