Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<dep.kafka.version>2.3.1</dep.kafka.version>
<dep.druid.version>0.19.0</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>
<dep.hudi.version>0.5.3</dep.hudi.version>
<dep.hudi.version>0.7.0-rc1</dep.hudi.version>
<!--
America/Bahia_Banderas has:
- offset change since 1970 (offset Jan 1970: -08:00, offset Jan 2018: -06:00)
Expand Down Expand Up @@ -1029,11 +1029,51 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${dep.hudi.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
</exclusion>
<exclusion>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</exclusion>
<exclusion>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
<version>${dep.hudi.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions presto-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${dep.hudi.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public class HiveClientConfig
private boolean fileRenamingEnabled;
private boolean preferManifestToListFiles;
private boolean manifestVerificationEnabled;
private boolean preferMetadataToListHudiFiles;
private boolean hudiMetadataVerificationEnabled;

public int getMaxInitialSplits()
{
Expand Down Expand Up @@ -1552,4 +1554,30 @@ public boolean isManifestVerificationEnabled()
{
return this.manifestVerificationEnabled;
}

@Config("hive.prefer-metadata-to-list-hudi-files")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be just, hive.use.hudi.metadata.to.list.files

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think community will accept this, as the convention they follow is sepration by -. In addition, I used the work prefer so it along the lines of https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java#L1530

@ConfigDescription("For Hudi tables prefer to fetch the list of file names and sizes from metadata rather than storage")
public HiveClientConfig setPreferMetadataToListHudiFiles(boolean preferMetadataToListHudiFiles)
{
this.preferMetadataToListHudiFiles = preferMetadataToListHudiFiles;
return this;
}

public boolean isPreferMetadataToListHudiFiles()
{
return this.preferMetadataToListHudiFiles;
}

@Config("hive.hudi-metadata-verification-enabled")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive.verify.hudi.metadata

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. This is not the naming convention presto community follows.

@ConfigDescription("Enable verification of file names and sizes in Hudi metadata")
public HiveClientConfig setHudiMetadataVerificationEnabled(boolean hudiMetadataVerificationEnabled)
{
this.hudiMetadataVerificationEnabled = hudiMetadataVerificationEnabled;
return this;
}

public boolean isHudiMetadataVerificationEnabled()
{
return this.hudiMetadataVerificationEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public final class HiveSessionProperties
public static final String FILE_RENAMING_ENABLED = "file_renaming_enabled";
public static final String PREFER_MANIFESTS_TO_LIST_FILES = "prefer_manifests_to_list_files";
public static final String MANIFEST_VERIFICATION_ENABLED = "manifest_verification_enabled";
public static final String PREFER_METADATA_TO_LIST_HUDI_FILES = "prefer_metadata_to_list_hudi_files";
public static final String HUDI_METADATA_VERIFICATION_ENABLED = "hudi_metadata_verification_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -532,6 +534,16 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
MANIFEST_VERIFICATION_ENABLED,
"Enable manifest verification",
hiveClientConfig.isManifestVerificationEnabled(),
false),
booleanProperty(
PREFER_METADATA_TO_LIST_HUDI_FILES,
"For Hudi tables prefer to fetch the list of files from its metadata",
hiveClientConfig.isPreferMetadataToListHudiFiles(),
false),
booleanProperty(
HUDI_METADATA_VERIFICATION_ENABLED,
"Verify file listing maintained in Hudi table metadata against the file system",
hiveClientConfig.isHudiMetadataVerificationEnabled(),
false));
}

Expand Down Expand Up @@ -932,4 +944,14 @@ public static boolean isManifestVerificationEnabled(ConnectorSession session)
{
return session.getProperty(MANIFEST_VERIFICATION_ENABLED, Boolean.class);
}

public static boolean isPreferMetadataToListHudiFiles(ConnectorSession session)
{
return session.getProperty(PREFER_METADATA_TO_LIST_HUDI_FILES, Boolean.class);
}

public static boolean isHudiMetadataVerificationEnabled(ConnectorSession session)
{
return session.getProperty(HUDI_METADATA_VERIFICATION_ENABLED, Boolean.class);
}
}
11 changes: 11 additions & 0 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
Expand Down Expand Up @@ -287,6 +288,16 @@ private static boolean isHudiRealtimeSplit(Map<String, String> customSplitInfo)
return HoodieRealtimeFileSplit.class.getName().equals(customSplitClass);
}

/**
* Checks whether a table is a Hudi table based on configured InputFormat.
* @param inputFormat InputFormat configured for the table
* @return Boolean indicating whether the table is a Hudi table or not.
*/
static boolean isHudiTable(InputFormat<?, ?> inputFormat)
{
return inputFormat instanceof HoodieParquetInputFormat;
}

public static void setReadColumns(Configuration configuration, List<Integer> readHiveColumnIndexes)
{
configuration.set(READ_COLUMN_IDS_CONF_STR, Joiner.on(',').join(readHiveColumnIndexes));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.util.HiveFileIterator;
import com.facebook.presto.spi.ConnectorSession;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;

import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;

import static com.facebook.presto.hive.HiveFileInfo.createHiveFileInfo;
import static com.facebook.presto.hive.HiveSessionProperties.isHudiMetadataVerificationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isPreferMetadataToListHudiFiles;

public final class HudiDirectoryLister
implements DirectoryLister
{
private HoodieTableFileSystemView fileSystemView;

public HudiDirectoryLister(Configuration conf,
ConnectorSession session,
Table table)
{
System.out.println("Uditt: Initializing Hudi Directory Lister...");
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf, table.getStorage().getLocation());
HoodieEngineContext engineContext = new HoodieLocalEngineContext(conf);
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(isPreferMetadataToListHudiFiles(session))
.validate(isHudiMetadataVerificationEnabled(session))
.build();
this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient,
metadataConfig);
}

@Override
public Iterator<HiveFileInfo> list(ExtendedFileSystem fileSystem,
Table table,
Path path,
NamenodeStats namenodeStats,
PathFilter pathFilter,
HiveDirectoryContext hiveDirectoryContext)
{
return new HiveFileIterator(
path,
p -> new HudiFileInfoIterator(fileSystemView, table.getStorage().getLocation(), p),
namenodeStats,
hiveDirectoryContext.getNestedDirectoryPolicy(),
pathFilter);
}

public static class HudiFileInfoIterator
implements RemoteIterator<HiveFileInfo>
{
private final Iterator<HoodieBaseFile> hoodieBaseFileIterator;

public HudiFileInfoIterator(HoodieTableFileSystemView fileSystemView,
String tablePath,
Path directory)
{
String partition = FSUtils.getRelativePartitionPath(new Path(tablePath), directory);
this.hoodieBaseFileIterator = fileSystemView.getLatestBaseFiles(partition).iterator();
}

@Override
public boolean hasNext()
{
return hoodieBaseFileIterator.hasNext();
}

@Override
public HiveFileInfo next() throws IOException
{
FileStatus fileStatus = hoodieBaseFileIterator.next().getFileStatus();
String[] name = new String[]{"localhost:50010"};
String[] host = new String[]{"localhost"};
LocatedFileStatus hoodieFileStatus = new LocatedFileStatus(fileStatus,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this will do one additional RPC per base file? if so, would n't this be bad actually for listing performance?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@umehrot2 what I meant was if we can simply do something like

LocatedFileStatus hoodieFileStatus = new LocatedFileStatus(fileStatus, 
   new BlockLocation[] {new BlockLocation(name, host, 0, file.getLen())});

if so, would this work for hdfs.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@umehrot2 do you have an easy test environment for HDFS?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uber and Facebook both use hdfs a lot. So this will come up in the review upstream for sure.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I will make that change, and do some testing on EMR cluster. EMR clusters all have HDFS so I should be able to test.

new BlockLocation[]{new BlockLocation(name, host, 0L, fileStatus.getLen())});
return createHiveFileInfo(hoodieFileStatus, Optional.empty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.io.CharStreams;
Expand All @@ -40,7 +39,6 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;

import java.io.BufferedReader;
Expand All @@ -57,7 +55,6 @@
import java.util.Optional;
import java.util.Properties;
import java.util.function.IntPredicate;
import java.util.function.Supplier;

import static com.facebook.presto.hive.HiveBucketing.getVirtualBucketNumber;
import static com.facebook.presto.hive.HiveColumnHandle.pathColumnHandle;
Expand Down Expand Up @@ -104,7 +101,6 @@ public class StoragePartitionLoader
private final ConnectorSession session;
private final Deque<Iterator<InternalHiveSplit>> fileIterators;
private final boolean schedulerUsesHostAddresses;
private final Supplier<HoodieROTablePathFilter> hoodiePathFilterSupplier;
private final boolean partialAggregationsPushedDown;

public StoragePartitionLoader(
Expand All @@ -126,13 +122,22 @@ public StoragePartitionLoader(
this.session = requireNonNull(session, "session is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
this.recursiveDirWalkerEnabled = recursiveDirWalkerEnabled;
this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName());
this.fileIterators = requireNonNull(fileIterators, "fileIterators is null");
this.schedulerUsesHostAddresses = schedulerUsesHostAddresses;
this.hoodiePathFilterSupplier = Suppliers.memoize(HoodieROTablePathFilter::new)::get;
this.partialAggregationsPushedDown = partialAggregationsPushedDown;

Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext,
new Path(table.getStorage().getLocation()));
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration,
table.getStorage().getStorageFormat().getInputFormat(), false);
if (HiveUtil.isHudiTable(inputFormat)) {
this.directoryLister = new HudiDirectoryLister(configuration, session, table);
}
else {
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
}
}

@Override
Expand Down Expand Up @@ -234,7 +239,7 @@ public ListenableFuture<?> loadPartition(HivePartitionMetadata partition, HiveSp

return addSplitsToSource(splits, splitFactory, hiveSplitSource, stopped);
}
PathFilter pathFilter = isHudiParquetInputFormat(inputFormat) ? hoodiePathFilterSupplier.get() : path1 -> true;
PathFilter pathFilter = path1 -> true;
// S3 Select pushdown works at the granularity of individual S3 objects,
// Partial aggregation pushdown works at the granularity of individual files
// therefore we must not split files when either is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ public void testDefaults()
.setPartialAggregationPushdownForVariableLengthDatatypesEnabled(false)
.setFileRenamingEnabled(false)
.setPreferManifestsToListFiles(false)
.setManifestVerificationEnabled(false));
.setManifestVerificationEnabled(false)
.setPreferMetadataToListHudiFiles(false)
.setHudiMetadataVerificationEnabled(false));
}

@Test
Expand Down Expand Up @@ -255,6 +257,8 @@ public void testExplicitPropertyMappings()
.put("hive.file_renaming_enabled", "true")
.put("hive.prefer-manifests-to-list-files", "true")
.put("hive.manifest-verification-enabled", "true")
.put("hive.prefer-metadata-to-list-hudi-files", "true")
.put("hive.hudi-metadata-verification-enabled", "true")
.build();

HiveClientConfig expected = new HiveClientConfig()
Expand Down Expand Up @@ -359,7 +363,9 @@ public void testExplicitPropertyMappings()
.setPartialAggregationPushdownForVariableLengthDatatypesEnabled(true)
.setFileRenamingEnabled(true)
.setPreferManifestsToListFiles(true)
.setManifestVerificationEnabled(true);
.setManifestVerificationEnabled(true)
.setPreferMetadataToListHudiFiles(true)
.setHudiMetadataVerificationEnabled(true);

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down