Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@
<dep.pinot.version>0.11.0</dep.pinot.version>
<dep.druid.version>30.0.1</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>
<dep.hudi.version>0.14.0</dep.hudi.version>
<dep.testcontainers.version>1.20.5</dep.testcontainers.version>
<dep.docker-java.version>3.4.1</dep.docker-java.version>
<dep.hudi.version>0.15.0</dep.hudi.version>
<dep.testcontainers.version>1.18.3</dep.testcontainers.version>
<dep.docker-java.version>3.3.0</dep.docker-java.version>
<dep.jayway.version>2.9.0</dep.jayway.version>
<dep.ratis.version>3.1.3</dep.ratis.version>
<dep.errorprone.version>2.36.0</dep.errorprone.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.hive;

import com.facebook.airlift.log.Logger;
Expand All @@ -30,7 +31,6 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -40,17 +40,22 @@
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.hive.HiveFileInfo.createHiveFileInfo;
import static com.facebook.presto.hive.HiveSessionProperties.getHudiTablesUseMergedView;
import static com.facebook.presto.hive.HiveSessionProperties.isHudiMetadataEnabled;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopFileStatus;

public class HudiDirectoryLister
implements DirectoryLister
Expand Down Expand Up @@ -78,7 +83,7 @@ public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table t
actualConfig = ((CopyOnFirstWriteConfiguration) actualConfig).getConfig();
}
this.metaClient = HoodieTableMetaClient.builder()
.setConf(actualConfig)
.setConf(new HadoopStorageConfiguration(actualConfig))
.setBasePath(table.getStorage().getLocation())
.build();
this.latestInstant = metaClient.getActiveTimeline()
Expand All @@ -91,7 +96,7 @@ public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table t
.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::getTimestamp).orElseThrow(() -> new RuntimeException("No active instant found")));
HoodieEngineContext engineContext = new HoodieLocalEngineContext(actualConfig);
HoodieEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(metadataEnabled)
.build();
Expand Down Expand Up @@ -142,9 +147,11 @@ public HudiFileInfoIterator(
String latestInstant,
boolean shouldUseMergedView)
{
String partition = FSUtils.getRelativePartitionPath(new Path(tablePath), directory);
String partition = HadoopFSUtils.getRelativePartitionPath(new Path(tablePath), directory);
if (fileStatuses.isPresent()) {
fileSystemView.addFilesToView(fileStatuses.get());
fileSystemView.addFilesToView(Arrays.stream(fileStatuses.get())
.map(HadoopFSUtils::convertToStoragePathInfo)
.collect(Collectors.toList()));
this.hoodieBaseFileIterator = fileSystemView.fetchLatestBaseFiles(partition).iterator();
}
else {
Expand All @@ -170,7 +177,7 @@ public boolean hasNext()
public HiveFileInfo next()
throws IOException
{
FileStatus fileStatus = hoodieBaseFileIterator.next().getFileStatus();
FileStatus fileStatus = convertToHadoopFileStatus(hoodieBaseFileIterator.next().getPathInfo());
String[] name = {"localhost:" + DFS_DATANODE_DEFAULT_PORT};
String[] host = {"localhost"};
LocatedFileStatus hoodieFileStatus = new LocatedFileStatus(fileStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.hive.util;

import com.google.common.collect.ImmutableMap;
Expand All @@ -19,6 +20,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.storage.StoragePath;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -71,7 +73,7 @@ public Optional<FileSplit> recreateFileSplitWithCustomInfo(FileSplit split, Map<
if (!isNullOrEmpty(customFileSplitClass) && HoodieRealtimeBootstrapBaseFileSplit.class.getName().equals(customFileSplitClass)) {
String deltaFilePaths = customSplitInfo.get(DELTA_FILE_PATHS_KEY);
List<String> deltaLogPaths = isNullOrEmpty(deltaFilePaths) ? Collections.emptyList() : Arrays.asList(deltaFilePaths.split(","));
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new StoragePath(p))).collect(Collectors.toList());
FileSplit bootstrapFileSplit = new FileSplit(
new Path(customSplitInfo.get(BOOTSTRAP_FILE_SPLIT_PATH)),
parseLong(customSplitInfo.get(BOOTSTRAP_FILE_SPLIT_START)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.hive.util;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.storage.StoragePath;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -67,7 +68,7 @@ public Optional<FileSplit> recreateFileSplitWithCustomInfo(FileSplit split, Map<
if (HoodieRealtimeFileSplit.class.getName().equals(customSplitClass)) {
requireNonNull(customSplitInfo.get(HUDI_DELTA_FILEPATHS_KEY), "HUDI_DELTA_FILEPATHS_KEY is missing");
List<String> deltaLogPaths = SPLITTER.splitToList(customSplitInfo.get(HUDI_DELTA_FILEPATHS_KEY));
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new StoragePath(p))).collect(Collectors.toList());
return Optional.of(new HoodieRealtimeFileSplit(
split,
requireNonNull(customSplitInfo.get(HUDI_BASEPATH_KEY), "HUDI_BASEPATH_KEY is missing"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.hive.util;

import com.google.common.collect.ImmutableList;
Expand All @@ -21,6 +22,7 @@
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.storage.StoragePath;
import org.testng.annotations.Test;

import java.io.IOException;
Expand All @@ -44,7 +46,7 @@ public void testHudiRealtimeSplitConverterRoundTrip()
throws IOException
{
List<String> deltaLogPaths = Arrays.asList("test1", "test2", "test3");
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new StoragePath(p))).collect(Collectors.toList());
String expectedMaxCommitTime = "max_commit_time";

FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
Expand Down Expand Up @@ -125,7 +127,7 @@ public void testHudiRealtimeBootstrapBaseFileSplitConverter()
throws IOException
{
List<String> deltaLogPaths = Arrays.asList("test1", "test2", "test3");
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new StoragePath(p))).collect(Collectors.toList());
String maxCommitTime = "max_commit_time";

Path bootstrapSourceFilePath = new Path("/test/source/test.parquet");
Expand Down
6 changes: 6 additions & 0 deletions presto-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@
<artifactId>units</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Loading
Loading