Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public FileSystem getRawFileSystem()
return fs;
}

@Override
public String getScheme()
{
return fs.getScheme();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this API?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we dont fix this method, MOR tables could not be queried. I added the intergration tests in #17463.
Following is part of the error messages:

Caused by: java.lang.UnsupportedOperationException: Not implemented by the HadoopExtendedFileSystem FileSystem implementation
at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:219)
at org.apache.hudi.common.fs.FSUtils.isGCSFileSystem(FSUtils.java:592)
at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStream(HoodieLogFileReader.java:119)
at org.apache.hudi.common.table.log.HoodieLogFileReader.(HoodieLogFileReader.java:95)
at org.apache.hudi.common.table.log.HoodieLogFileReader.(HoodieLogFileReader.java:86)
at org.apache.hudi.common.table.log.HoodieLogFormat.newReader(HoodieLogFormat.java:282)
at org.apache.hudi.common.table.log.LogReaderUtils.readSchemaFromLogFileInReverse(LogReaderUtils.java:49)
at org.apache.hudi.common.table.log.LogReaderUtils.readLatestSchemaFromLogFiles(LogReaderUtils.java:77)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:85)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.(AbstractRealtimeRecordReader.java:67)
at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.(RealtimeCompactedRecordReader.java:62)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:70)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.(HoodieRealtimeRecordReader.java:47)
at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:323)
at com.facebook.presto.hive.HiveUtil.createRecordReader(HiveUtil.java:272)
at com.facebook.presto.hive.GenericHiveRecordCursorProvider.lambda$createRecordCursor$0(GenericHiveRecordCursorProvider.java:74)
at com.facebook.presto.hive.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:23)
at com.facebook.presto.hive.HdfsEnvironment.doAs(HdfsEnvironment.java:81)
at com.facebook.presto.hive.GenericHiveRecordCursorProvider.createRecordCursor(GenericHiveRecordCursorProvider.java:73)
at com.facebook.presto.hive.HivePageSourceProvider.getPageSourceFromCursorProvider(HivePageSourceProvider.java:571)

You can check the full error log at https://github.com/prestodb/presto/runs/5566744421?check_suite_focus=true.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, please review the PR at #17463

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

}

@Override
public URI getUri()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive.util;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
Expand All @@ -21,7 +22,6 @@
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -37,6 +37,8 @@
public class HudiRealtimeSplitConverter
implements CustomSplitConverter
{
private static final Splitter SPLITTER = Splitter.on(",").omitEmptyStrings();

public static final String HUDI_DELTA_FILEPATHS_KEY = "hudi_delta_filepaths";
public static final String HUDI_BASEPATH_KEY = "hudi_basepath";
public static final String HUDI_MAX_COMMIT_TIME_KEY = "hudi_max_commit_time";
Expand Down Expand Up @@ -64,7 +66,7 @@ public Optional<FileSplit> recreateFileSplitWithCustomInfo(FileSplit split, Map<
String customSplitClass = customSplitInfo.get(CUSTOM_FILE_SPLIT_CLASS_KEY);
if (HoodieRealtimeFileSplit.class.getName().equals(customSplitClass)) {
requireNonNull(customSplitInfo.get(HUDI_DELTA_FILEPATHS_KEY), "HUDI_DELTA_FILEPATHS_KEY is missing");
List<String> deltaLogPaths = Arrays.asList(customSplitInfo.get(HUDI_DELTA_FILEPATHS_KEY).split(","));
List<String> deltaLogPaths = SPLITTER.splitToList(customSplitInfo.get(HUDI_DELTA_FILEPATHS_KEY));
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
return Optional.of(new HoodieRealtimeFileSplit(
split,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive.util;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -64,6 +65,32 @@ public void testHudiRealtimeSplitConverterRoundTrip()
assertEquals(expectedMaxCommitTime, recreatedSplit.getMaxCommitTime());
}

@Test
public void testHudiRealtimeSplitConverterNoLogRoundTrip()
throws IOException
{
List<String> deltaLogPaths = ImmutableList.of();
List<HoodieLogFile> deltaLogFiles = ImmutableList.of();
String expectedMaxCommitTime = "max_commit_time";

FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
FileSplit hudiSplit = new HoodieRealtimeFileSplit(baseSplit, BASE_PATH, deltaLogFiles, expectedMaxCommitTime, Option.empty());

// Test conversion of HudiSplit -> customSplitInfo
Map<String, String> customSplitInfo = CustomSplitConversionUtils.extractCustomSplitInfo(hudiSplit);

// Test conversion of (customSplitInfo + baseSplit) -> HudiSplit
HoodieRealtimeFileSplit recreatedSplit = (HoodieRealtimeFileSplit) CustomSplitConversionUtils.recreateSplitWithCustomInfo(baseSplit, customSplitInfo);

assertEquals(FILE_PATH, recreatedSplit.getPath());
assertEquals(SPLIT_START_POS, recreatedSplit.getStart());
assertEquals(SPLIT_LENGTH, recreatedSplit.getLength());
assertEquals(SPLIT_HOSTS, recreatedSplit.getLocations());
assertEquals(BASE_PATH, recreatedSplit.getBasePath());
assertEquals(deltaLogPaths, recreatedSplit.getDeltaLogPaths());
assertEquals(expectedMaxCommitTime, recreatedSplit.getMaxCommitTime());
}

@Test
public void testHudiBootstrapBaseFileSplitConverter()
throws IOException
Expand Down