Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<dep.pinot.version>0.10.0</dep.pinot.version>
<dep.druid.version>0.19.0</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>
<dep.hudi.version>0.10.1</dep.hudi.version>
<dep.hudi.version>0.11.0</dep.hudi.version>
<dep.testcontainers.version>1.15.1</dep.testcontainers.version>
<dep.docker-java.version>3.2.12</dep.docker-java.version>
<!--
Expand Down Expand Up @@ -1237,6 +1237,14 @@
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr-bundle</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- End dependencies for querying Hudi table-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -45,11 +46,11 @@ public class HudiRealtimeBootstrapBaseFileSplitConverter
@Override
public Optional<Map<String, String>> extractCustomSplitInfo(FileSplit split)
{
if (split instanceof RealtimeBootstrapBaseFileSplit) {
if (split instanceof HoodieRealtimeBootstrapBaseFileSplit) {
ImmutableMap.Builder<String, String> customSplitInfo = ImmutableMap.builder();
RealtimeBootstrapBaseFileSplit hudiSplit = (RealtimeBootstrapBaseFileSplit) split;
HoodieRealtimeBootstrapBaseFileSplit hudiSplit = (HoodieRealtimeBootstrapBaseFileSplit) split;

customSplitInfo.put(CUSTOM_FILE_SPLIT_CLASS_KEY, RealtimeBootstrapBaseFileSplit.class.getName());
customSplitInfo.put(CUSTOM_FILE_SPLIT_CLASS_KEY, HoodieRealtimeBootstrapBaseFileSplit.class.getName());
customSplitInfo.put(BASE_PATH_KEY, hudiSplit.getBasePath());
customSplitInfo.put(MAX_COMMIT_TIME_KEY, hudiSplit.getMaxCommitTime());
customSplitInfo.put(DELTA_FILE_PATHS_KEY, String.join(",", hudiSplit.getDeltaLogPaths()));
Expand All @@ -67,7 +68,7 @@ public Optional<FileSplit> recreateFileSplitWithCustomInfo(FileSplit split, Map<
{
requireNonNull(customSplitInfo);
String customFileSplitClass = customSplitInfo.get(CUSTOM_FILE_SPLIT_CLASS_KEY);
if (!isNullOrEmpty(customFileSplitClass) && RealtimeBootstrapBaseFileSplit.class.getName().equals(customFileSplitClass)) {
if (!isNullOrEmpty(customFileSplitClass) && HoodieRealtimeBootstrapBaseFileSplit.class.getName().equals(customFileSplitClass)) {
String deltaFilePaths = customSplitInfo.get(DELTA_FILE_PATHS_KEY);
List<String> deltaLogPaths = isNullOrEmpty(deltaFilePaths) ? Collections.emptyList() : Arrays.asList(deltaFilePaths.split(","));
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
Expand All @@ -76,8 +77,8 @@ public Optional<FileSplit> recreateFileSplitWithCustomInfo(FileSplit split, Map<
parseLong(customSplitInfo.get(BOOTSTRAP_FILE_SPLIT_START)),
parseLong(customSplitInfo.get(BOOTSTRAP_FILE_SPLIT_LEN)),
(String[]) null);
split = new RealtimeBootstrapBaseFileSplit(split, customSplitInfo.get(BASE_PATH_KEY), deltaLogFiles,
customSplitInfo.get(MAX_COMMIT_TIME_KEY), bootstrapFileSplit);
split = new HoodieRealtimeBootstrapBaseFileSplit(split, customSplitInfo.get(BASE_PATH_KEY), deltaLogFiles,
customSplitInfo.get(MAX_COMMIT_TIME_KEY), bootstrapFileSplit, false, Option.empty());
return Optional.of(split);
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public Optional<FileSplit> recreateFileSplitWithCustomInfo(FileSplit split, Map<
requireNonNull(customSplitInfo.get(HUDI_BASEPATH_KEY), "HUDI_BASEPATH_KEY is missing"),
deltaLogFiles,
requireNonNull(customSplitInfo.get(HUDI_MAX_COMMIT_TIME_KEY), "HUDI_MAX_COMMIT_TIME_KEY is missing"),
false, // incremental query not supported currently
Option.empty()));
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import org.testng.annotations.Test;

import java.io.IOException;
Expand Down Expand Up @@ -48,7 +48,7 @@ public void testHudiRealtimeSplitConverterRoundTrip()
String expectedMaxCommitTime = "max_commit_time";

FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
FileSplit hudiSplit = new HoodieRealtimeFileSplit(baseSplit, BASE_PATH, deltaLogFiles, expectedMaxCommitTime, Option.empty());
FileSplit hudiSplit = new HoodieRealtimeFileSplit(baseSplit, BASE_PATH, deltaLogFiles, expectedMaxCommitTime, false, Option.empty());

// Test conversion of HudiSplit -> customSplitInfo
Map<String, String> customSplitInfo = CustomSplitConversionUtils.extractCustomSplitInfo(hudiSplit);
Expand All @@ -74,7 +74,7 @@ public void testHudiRealtimeSplitConverterNoLogRoundTrip()
String expectedMaxCommitTime = "max_commit_time";

FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
FileSplit hudiSplit = new HoodieRealtimeFileSplit(baseSplit, BASE_PATH, deltaLogFiles, expectedMaxCommitTime, Option.empty());
FileSplit hudiSplit = new HoodieRealtimeFileSplit(baseSplit, BASE_PATH, deltaLogFiles, expectedMaxCommitTime, false, Option.empty());

// Test conversion of HudiSplit -> customSplitInfo
Map<String, String> customSplitInfo = CustomSplitConversionUtils.extractCustomSplitInfo(hudiSplit);
Expand Down Expand Up @@ -135,14 +135,14 @@ public void testHudiRealtimeBootstrapBaseFileSplitConverter()
FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
FileSplit bootstrapSourceSplit = new FileSplit(bootstrapSourceFilePath, bootstrapSourceSplitStartPos, bootstrapSourceSplitLength,
new String[0]);
FileSplit hudiSplit = new RealtimeBootstrapBaseFileSplit(baseSplit, BASE_PATH, deltaLogFiles, maxCommitTime,
bootstrapSourceSplit);
FileSplit hudiSplit = new HoodieRealtimeBootstrapBaseFileSplit(baseSplit, BASE_PATH, deltaLogFiles, maxCommitTime,
bootstrapSourceSplit, false, Option.empty());

// Test conversion of HudiSplit -> customSplitInfo
Map<String, String> customSplitInfo = CustomSplitConversionUtils.extractCustomSplitInfo(hudiSplit);

// Test conversion of (customSplitInfo + baseSplit) -> HudiSplit
RealtimeBootstrapBaseFileSplit recreatedSplit = (RealtimeBootstrapBaseFileSplit) CustomSplitConversionUtils
HoodieRealtimeBootstrapBaseFileSplit recreatedSplit = (HoodieRealtimeBootstrapBaseFileSplit) CustomSplitConversionUtils
.recreateSplitWithCustomInfo(baseSplit, customSplitInfo);

assertEquals(FILE_PATH, recreatedSplit.getPath());
Expand Down