Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 30 additions & 94 deletions plugin/trino-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@
<artifactId>trino-plugin-toolkit</artifactId>
</dependency>

<dependency>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
</dependency>

<dependency>
<groupId>io.trino.hive</groupId>
<artifactId>hive-apache</artifactId>
Expand Down Expand Up @@ -94,6 +89,11 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand All @@ -110,6 +110,12 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maven was complaining that it was missing

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this is due to the accidental usage of org.apache.commons.lang3.tuple.ImmutablePair from this dependency. You can replace it with Map.entry().

<groupId>com.linkedin.calcite</groupId>
<artifactId>calcite-core</artifactId>
<classifier>shaded</classifier>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
Expand All @@ -131,91 +137,8 @@
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${dep.hudi.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
</exclusion>
<exclusion>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</exclusion>
<exclusion>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</exclusion>
<exclusion>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
<version>${dep.hudi.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>

<dependency>
Expand All @@ -231,14 +154,14 @@
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<scope>runtime</scope>
</dependency>

Expand Down Expand Up @@ -424,6 +347,19 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${dep.hudi.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
Comment thread
electrum marked this conversation as resolved.
Outdated
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-java-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public enum HudiErrorCode
HUDI_UNSUPPORTED_FILE_FORMAT(5, EXTERNAL),
HUDI_CURSOR_ERROR(6, EXTERNAL),
HUDI_FILESYSTEM_ERROR(7, EXTERNAL),
HUDI_PARTITION_NOT_FOUND(8, EXTERNAL);
HUDI_PARTITION_NOT_FOUND(8, EXTERNAL),
HUDI_UNSUPPORTED_TABLE_TYPE(9, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
*/
package io.trino.plugin.hudi;

import org.apache.hadoop.fs.Path;
import io.trino.filesystem.Location;

import static java.util.Objects.requireNonNull;

public record HudiFileStatus(Path path, boolean isDirectory, long length, long modificationTime, long blockSize)
public record HudiFileStatus(Location location, boolean isDirectory, long length, long modificationTime, long blockSize)
{
public HudiFileStatus
{
requireNonNull(path, "path is null");
requireNonNull(location, "location is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.metastore.Column;
Expand All @@ -43,7 +40,6 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -60,33 +56,30 @@
import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR;
import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide;
import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.hudi.model.HoodieTableType.COPY_ON_WRITE;
import static io.trino.plugin.hudi.HudiUtil.isHudiTable;
import static io.trino.plugin.hudi.model.HudiTableType.COPY_ON_WRITE;
import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;

public class HudiMetadata
implements ConnectorMetadata
{
public static final Logger log = Logger.get(HudiMetadata.class);

private final HiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final TrinoFileSystemFactory fileSystemFactory;
private final TypeManager typeManager;

public HudiMetadata(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager)
public HudiMetadata(HiveMetastore metastore, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}
Expand All @@ -109,7 +102,7 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
if (table.isEmpty()) {
return null;
}
if (!isHudiTable(session, table.get())) {
if (!isHudiTable(fileSystemFactory.create(session), Location.of(table.get().getStorage().getLocation()))) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
}
return new HudiTableHandle(
Expand All @@ -124,11 +117,11 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
return getRawSystemTable(tableName)
return getRawSystemTable(tableName, session)
.map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
}

private Optional<SystemTable> getRawSystemTable(SchemaTableName tableName)
private Optional<SystemTable> getRawSystemTable(SchemaTableName tableName, ConnectorSession session)
{
HudiTableName name = HudiTableName.from(tableName.getTableName());
if (name.getTableType() == TableType.DATA) {
Expand All @@ -144,7 +137,7 @@ private Optional<SystemTable> getRawSystemTable(SchemaTableName tableName)
break;
case TIMELINE:
SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType());
return Optional.of(new TimelineTable(hdfsEnvironment, systemTableName, tableOptional.get()));
return Optional.of(new TimelineTable(fileSystemFactory.create(session), systemTableName, tableOptional.get()));
}
return Optional.empty();
}
Expand Down Expand Up @@ -227,31 +220,6 @@ HiveMetastore getMetastore()
return metastore;
}

private boolean isHudiTable(ConnectorSession session, Table table)
{
String basePath = table.getStorage().getLocation();
try {
Location baseLocation = Location.of(basePath);
Location metaLocation = baseLocation.appendPath(METAFOLDER_NAME);

TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
FileIterator iterator = trinoFileSystem.listFiles(metaLocation);
// If there is at least one file in the .hoodie directory, it's a valid Hudi table
if (!iterator.hasNext()) {
log.warn("Could not find Hudi table at path '%s'.", basePath);
return false;
}
}
catch (IllegalArgumentException e) {
log.warn("Could not find Hudi table at path '%s'. Error: %s", basePath, e.getMessage());
return false;
}
catch (IOException e) {
throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Could not check if %s is a valid table", basePath), e);
}
return true;
}

private Optional<TableColumnsMetadata> getTableColumnMetadata(ConnectorSession session, SchemaTableName table)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.hudi;

import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.spi.security.ConnectorIdentity;
Expand All @@ -29,22 +28,20 @@
public class HudiMetadataFactory
{
private final HiveMetastoreFactory metastoreFactory;
private final HdfsEnvironment hdfsEnvironment;
private final TrinoFileSystemFactory fileSystemFactory;
private final TypeManager typeManager;

@Inject
public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, HdfsEnvironment hdfsEnvironment, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager)
public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager)
{
this.metastoreFactory = requireNonNull(metastoreFactory, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

public HudiMetadata create(ConnectorIdentity identity)
{
HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(identity));
return new HudiMetadata(metastore, hdfsEnvironment, fileSystemFactory, typeManager);
return new HudiMetadata(metastore, fileSystemFactory, typeManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.trino.plugin.hive.ReaderColumns;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
import io.trino.plugin.hudi.model.HoodieFileFormat;
import io.trino.plugin.hudi.model.HudiFileFormat;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ColumnHandle;
Expand Down Expand Up @@ -146,9 +146,9 @@ public ConnectorPageSource createPageSource(
DynamicFilter dynamicFilter)
{
HudiSplit split = (HudiSplit) connectorSplit;
String path = split.getPath();
HoodieFileFormat hudiFileFormat = getHudiFileFormat(path);
if (!HoodieFileFormat.PARQUET.equals(hudiFileFormat)) {
String path = split.getLocation();
HudiFileFormat hudiFileFormat = getHudiFileFormat(path);
if (!HudiFileFormat.PARQUET.equals(hudiFileFormat)) {
throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, format("File format %s not supported", hudiFileFormat));
}

Expand Down Expand Up @@ -185,7 +185,7 @@ private static ConnectorPageSource createPageSource(
{
ParquetDataSource dataSource = null;
boolean useColumnNames = shouldUseParquetColumnNames(session);
String path = hudiSplit.getPath();
String path = hudiSplit.getLocation();
long start = hudiSplit.getStart();
long length = hudiSplit.getLength();
try {
Expand Down
Loading