Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<dep.kafka.version>2.3.1</dep.kafka.version>
<dep.druid.version>0.19.0</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>
<dep.hudi.version>0.9.0</dep.hudi.version>
<dep.hudi.version>0.10.0-SNAPSHOT</dep.hudi.version>
<dep.testcontainers.version>1.15.1</dep.testcontainers.version>
<!--
America/Bahia_Banderas has:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Iterator<HiveFileInfo> list(
p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)),
namenodeStats,
hiveDirectoryContext.getNestedDirectoryPolicy(),
pathFilter);
Optional.of(pathFilter));
}

public static class HadoopFileInfoIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public class HiveClientConfig
private int materializedViewMissingPartitionsThreshold = 100;

private boolean verboseRuntimeStatsEnabled;
private boolean hudiPreferMetadataToListFiles;

public int getMaxInitialSplits()
{
Expand Down Expand Up @@ -1682,4 +1683,17 @@ public int getMaterializedViewMissingPartitionsThreshold()
{
return this.materializedViewMissingPartitionsThreshold;
}

@Config("hive.hudi-prefer-metadata-to-list-files")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive.hudi-enable-metadata

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have addressed this in #17084
Closing this draft patch as there were some more changes.

@ConfigDescription("For Hudi tables prefer to fetch the list of file names and sizes from metadata rather than storage")
public HiveClientConfig setHudiPreferMetadataToListFiles(boolean hudiPreferMetadataToListFiles)
{
this.hudiPreferMetadataToListFiles = hudiPreferMetadataToListFiles;
return this;
}

public boolean isHudiPreferMetadataToListFiles()
{
return this.hudiPreferMetadataToListFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static com.facebook.presto.hive.HivePageSourceProvider.ColumnMapping.toColumnHandles;
import static com.facebook.presto.hive.HiveUtil.getPrefilledColumnValue;
import static com.facebook.presto.hive.HiveUtil.parsePartitionValue;
import static com.facebook.presto.hive.HiveUtil.shouldUseRecordReaderFromInputFormat;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.hive.metastore.MetastoreUtil.reconstructPartitionSchema;
import static com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED;
Expand Down Expand Up @@ -408,42 +409,44 @@ public static Optional<ConnectorPageSource> createHivePageSource(

Optional<BucketAdaptation> bucketAdaptation = bucketConversion.map(conversion -> toBucketAdaptation(conversion, regularAndInterimColumnMappings, tableBucketNumber, ColumnMapping::getIndex));

for (HiveBatchPageSourceFactory pageSourceFactory : pageSourceFactories) {
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
configuration,
session,
path,
start,
length,
fileSize,
storage,
tableName,
tableParameters,
toColumnHandles(regularAndInterimColumnMappings, true),
effectivePredicate,
hiveStorageTimeZone,
hiveFileContext,
encryptionInformation);
if (pageSource.isPresent()) {
HivePageSource hivePageSource = new HivePageSource(
columnMappings,
bucketAdaptation,
if (!shouldUseRecordReaderFromInputFormat(configuration, storage, customSplitInfo)) {
for (HiveBatchPageSourceFactory pageSourceFactory : pageSourceFactories) {
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
configuration,
session,
path,
start,
length,
fileSize,
storage,
tableName,
tableParameters,
toColumnHandles(regularAndInterimColumnMappings, true),
effectivePredicate,
hiveStorageTimeZone,
typeManager,
pageSource.get());

if (isPushdownFilterEnabled) {
return Optional.of(new FilteringPageSource(
hiveFileContext,
encryptionInformation);
if (pageSource.isPresent()) {
HivePageSource hivePageSource = new HivePageSource(
columnMappings,
effectivePredicate,
remainingPredicate,
bucketAdaptation,
hiveStorageTimeZone,
typeManager,
rowExpressionService,
session,
outputIndices,
hivePageSource));
pageSource.get());

if (isPushdownFilterEnabled) {
return Optional.of(new FilteringPageSource(
columnMappings,
effectivePredicate,
remainingPredicate,
typeManager,
rowExpressionService,
session,
outputIndices,
hivePageSource));
}
return Optional.of(hivePageSource);
}
return Optional.of(hivePageSource);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public final class HiveSessionProperties
public static final String VERBOSE_RUNTIME_STATS_ENABLED = "verbose_runtime_stats_enabled";
private static final String DWRF_WRITER_STRIPE_CACHE_ENABLED = "dwrf_writer_stripe_cache_enabled";
private static final String DWRF_WRITER_STRIPE_CACHE_SIZE = "dwrf_writer_stripe_cache_size";
private static final String HUDI_PREFER_METADATA_TO_LIST_FILES = "hudi_prefer_metadata_to_list_files";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -617,6 +618,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
DWRF_WRITER_STRIPE_CACHE_SIZE,
"Maximum size of DWRF stripe cache to be held in memory",
orcFileWriterConfig.getDwrfStripeCacheMaxSize(),
false),
booleanProperty(
HUDI_PREFER_METADATA_TO_LIST_FILES,
"For Hudi tables prefer to fetch the list of files from its metadata",
hiveClientConfig.isHudiPreferMetadataToListFiles(),
false));
}

Expand Down Expand Up @@ -1077,4 +1083,9 @@ public static DataSize getDwrfWriterStripeCacheeMaxSize(ConnectorSession session
{
return session.getProperty(DWRF_WRITER_STRIPE_CACHE_SIZE, DataSize.class);
}

public static boolean isHudiPreferMetadataToListFiles(ConnectorSession session)
{
return session.getProperty(HUDI_PREFER_METADATA_TO_LIST_FILES, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.pagefile.PageInputFormat;
import com.facebook.presto.hive.util.FooterAwareRecordReader;
import com.facebook.presto.hive.util.HudiRealtimeSplitConverter;
import com.facebook.presto.orc.metadata.OrcType;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.PrestoException;
Expand Down Expand Up @@ -76,6 +75,8 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
Expand Down Expand Up @@ -149,6 +150,7 @@
import static com.facebook.presto.hive.util.ConfigurationUtils.copy;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
import static com.facebook.presto.hive.util.CustomSplitConversionUtils.recreateSplitWithCustomInfo;
import static com.facebook.presto.hive.util.HudiRealtimeSplitConverter.CUSTOM_SPLIT_CLASS_KEY;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.MoreObjects.firstNonNull;
Expand Down Expand Up @@ -196,6 +198,8 @@ public final class HiveUtil
private static final int DECIMAL_SCALE_GROUP = 2;

private static final String BIG_DECIMAL_POSTFIX = "BD";
private static final String USE_RECORD_READER_FROM_INPUT_FORMAT_ANNOTATION = "UseRecordReaderFromInputFormat";
private static final String USE_FILE_SPLITS_FROM_INPUT_FORMAT_ANNOTATION = "UseFileSplitsFromInputFormat";

static {
DateTimeParser[] timestampWithoutTimeZoneParser = {
Expand Down Expand Up @@ -295,7 +299,7 @@ private HiveUtil()

private static boolean isHudiRealtimeSplit(Map<String, String> customSplitInfo)
{
String customSplitClass = customSplitInfo.get(HudiRealtimeSplitConverter.CUSTOM_SPLIT_CLASS_KEY);
String customSplitClass = customSplitInfo.get(CUSTOM_SPLIT_CLASS_KEY);
return HoodieRealtimeFileSplit.class.getName().equals(customSplitClass);
}

Expand Down Expand Up @@ -366,13 +370,35 @@ static String getInputFormatName(Properties schema)
return name;
}

public static boolean shouldUseRecordReaderFromInputFormat(Configuration configuration, Storage storage)
public static boolean shouldUseRecordReaderFromInputFormat(Configuration configuration, Storage storage, Map<String, String> customSplitInfo)
{
if (customSplitInfo == null || !customSplitInfo.containsKey(CUSTOM_SPLIT_CLASS_KEY)) {
return false;
}

InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, storage.getStorageFormat().getInputFormat(), false);
return Arrays.stream(inputFormat.getClass().getAnnotations())
.map(Annotation::annotationType)
.map(Class::getSimpleName)
.anyMatch(name -> name.equals("UseRecordReaderFromInputFormat"));
.anyMatch(USE_RECORD_READER_FROM_INPUT_FORMAT_ANNOTATION::equals);
}

static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inputFormat)
{
boolean hasUseSplitsAnnotation = Arrays.stream(inputFormat.getClass().getAnnotations())
.map(Annotation::annotationType)
.map(Class::getSimpleName)
.anyMatch(USE_FILE_SPLITS_FROM_INPUT_FORMAT_ANNOTATION::equals);

return hasUseSplitsAnnotation && !isHudiParquetInputFormat(inputFormat);
}

static boolean isHudiParquetInputFormat(InputFormat<?, ?> inputFormat)
{
if (inputFormat instanceof HoodieParquetRealtimeInputFormat) {
return false;
}
return inputFormat instanceof HoodieParquetInputFormat;
}

public static long parseHiveDate(String value)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.util.HiveFileIterator;
import com.facebook.presto.spi.ConnectorSession;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;

import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;

import static com.facebook.presto.hive.HiveFileInfo.createHiveFileInfo;
import static com.facebook.presto.hive.HiveSessionProperties.isHudiPreferMetadataToListFiles;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT;

public final class HudiDirectoryLister
implements DirectoryLister
{
private static final Logger log = Logger.get(HudiDirectoryLister.class);

private final HoodieTableFileSystemView fileSystemView;

public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table table)
{
log.info("Using Hudi Directory Lister.");
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(conf)
.setBasePath(table.getStorage().getLocation())
.build();
HoodieEngineContext engineContext = new HoodieLocalEngineContext(conf);
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(isHudiPreferMetadataToListFiles(session))
.build();
this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient,
metadataConfig);
}

@Override
public Iterator<HiveFileInfo> list(ExtendedFileSystem fileSystem,
Table table,
Path path,
NamenodeStats namenodeStats,
PathFilter pathFilter,
HiveDirectoryContext hiveDirectoryContext)
{
log.debug("Listing path using Hudi directory lister: %s", path.toString());
return new HiveFileIterator(
path,
p -> new HudiFileInfoIterator(fileSystemView, fileSystem.listStatus(p), table.getStorage().getLocation(), p),
namenodeStats,
hiveDirectoryContext.getNestedDirectoryPolicy(),
Optional.empty());
}

public static class HudiFileInfoIterator
implements RemoteIterator<HiveFileInfo>
{
private final Iterator<HoodieBaseFile> hoodieBaseFileIterator;

public HudiFileInfoIterator(
HoodieTableFileSystemView fileSystemView,
FileStatus[] fileStatuses,
String tablePath,
Path directory)
{
fileSystemView.addFilesToView(fileStatuses);
String partition = FSUtils.getRelativePartitionPath(new Path(tablePath), directory);
this.hoodieBaseFileIterator = fileSystemView.fetchLatestBaseFiles(partition).iterator();
}

@Override
public boolean hasNext()
{
return hoodieBaseFileIterator.hasNext();
}

@Override
public HiveFileInfo next()
throws IOException
{
FileStatus fileStatus = hoodieBaseFileIterator.next().getFileStatus();
String[] name = new String[] {"localhost:" + DFS_DATANODE_DEFAULT_PORT};
String[] host = new String[] {"localhost"};
LocatedFileStatus hoodieFileStatus = new LocatedFileStatus(fileStatus,
new BlockLocation[] {new BlockLocation(name, host, 0L, fileStatus.getLen())});
return createHiveFileInfo(hoodieFileStatus, Optional.empty());
}
}
}
Loading