Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,38 @@

package org.apache.hudi.hadoop;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableQueryType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
Expand Down Expand Up @@ -190,7 +186,7 @@ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}

protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();

if (baseFileOpt.isPresent()) {
Expand All @@ -215,7 +211,7 @@ private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus
@Nonnull
private List<FileStatus> listStatusForSnapshotMode(JobConf job,
Map<String, HoodieTableMetaClient> tableMetaClientMap,
List<Path> snapshotPaths) throws IOException {
List<Path> snapshotPaths) {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
List<FileStatus> targetFiles = new ArrayList<>();

Expand Down Expand Up @@ -248,14 +244,12 @@ private List<FileStatus> listStatusForSnapshotMode(JobConf job,

Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();

Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient);

targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, tableMetaClient))
.collect(Collectors.toList())
);
}
Expand Down Expand Up @@ -290,24 +284,4 @@ protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
throw new HoodieIOException("Failed to get file-status", ioe);
}
}

protected static Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
if (tableConfig.populateMetaFields()) {
return Option.empty();
}
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
Schema schema = tableSchemaResolver.getTableAvroSchema();
boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
return Option.of(
new HoodieVirtualKeyInfo(
tableConfig.getRecordKeyFieldProp(),
isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()),
schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception ", exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,20 @@

package org.apache.hudi.hadoop.realtime;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
Expand All @@ -50,6 +43,18 @@
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -86,7 +91,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
}

@Override
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
Expand All @@ -96,9 +101,9 @@ protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTa

// Check if we're reading a MOR table
if (baseFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, getHoodieVirtualKeyInfo(metaClient));
} else if (latestLogFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, getHoodieVirtualKeyInfo(metaClient));
} else {
throw new IllegalStateException("Invalid state: either base-file or log-file has to be present");
}
Expand Down Expand Up @@ -384,5 +389,24 @@ private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFil
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}
}

private static Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
if (tableConfig.populateMetaFields()) {
return Option.empty();
}
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
Schema schema = tableSchemaResolver.getTableAvroSchema();
boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
return Option.of(
new HoodieVirtualKeyInfo(
tableConfig.getRecordKeyFieldProp(),
isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()),
schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception ", exception);
}
}
}