diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml
index 9de46c96cf1d..cbab9902568f 100644
--- a/core/trino-server/src/main/provisio/trino.xml
+++ b/core/trino-server/src/main/provisio/trino.xml
@@ -74,6 +74,12 @@
+
+
+
+
+
+
diff --git a/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java b/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java
index 7a493236ad2c..b36936b117da 100644
--- a/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java
+++ b/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java
@@ -350,6 +350,12 @@ public FSDataInputStream open(Path f, int bufferSize)
return new InputStreamWrapper(getRawFileSystem().open(f, bufferSize), this);
}
+ @Override
+ public String getScheme()
+ {
+ return getRawFileSystem().getScheme();
+ }
+
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
index 20b323b67f08..b96310e24f6b 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
@@ -265,7 +265,7 @@
import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression;
import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle;
import static io.trino.plugin.hive.util.HiveBucketing.isSupportedBucketing;
-import static io.trino.plugin.hive.util.HiveUtil.columnExtraInfo;
+import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
@@ -3536,40 +3536,6 @@ else if (type instanceof RowType) {
}
}
- private Function columnMetadataGetter(Table table)
- {
- ImmutableList.Builder columnNames = ImmutableList.builder();
- table.getPartitionColumns().stream().map(Column::getName).forEach(columnNames::add);
- table.getDataColumns().stream().map(Column::getName).forEach(columnNames::add);
- List allColumnNames = columnNames.build();
- if (allColumnNames.size() > Sets.newHashSet(allColumnNames).size()) {
- throw new TrinoException(HIVE_INVALID_METADATA,
- format("Hive metadata for table %s is invalid: Table descriptor contains duplicate columns", table.getTableName()));
- }
-
- List tableColumns = table.getDataColumns();
- ImmutableMap.Builder> builder = ImmutableMap.builder();
- for (Column field : concat(tableColumns, table.getPartitionColumns())) {
- if (field.getComment().isPresent() && !field.getComment().get().equals("from deserializer")) {
- builder.put(field.getName(), field.getComment());
- }
- else {
- builder.put(field.getName(), Optional.empty());
- }
- }
-
- Map> columnComment = builder.buildOrThrow();
-
- return handle -> ColumnMetadata.builder()
- .setName(handle.getName())
- .setType(handle.getType())
- .setComment(handle.isHidden() ? Optional.empty() : columnComment.get(handle.getName()))
- .setExtraInfo(Optional.ofNullable(columnExtraInfo(handle.isPartitionKey())))
- .setHidden(handle.isHidden())
- .setProperties(partitionProjectionService.getPartitionProjectionTrinoColumnProperties(table, handle.getName()))
- .build();
- }
-
@Override
public void rollback()
{
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
index 7eb069ac9f6b..2f804be82a9b 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
@@ -211,17 +211,7 @@ public static ReaderPageSource createPageSource(
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
fileSchema = fileMetaData.getSchema();
- Optional message = projectSufficientColumns(columns)
- .map(projection -> projection.get().stream()
- .map(HiveColumnHandle.class::cast)
- .collect(toUnmodifiableList()))
- .orElse(columns).stream()
- .filter(column -> column.getColumnType() == REGULAR)
- .map(column -> getColumnType(column, fileSchema, useColumnNames))
- .filter(Optional::isPresent)
- .map(Optional::get)
- .map(type -> new MessageType(fileSchema.getName(), type))
- .reduce(MessageType::union);
+ Optional message = getParquetMessageType(columns, useColumnNames, fileSchema);
requestedSchema = message.orElse(new MessageType(fileSchema.getName(), ImmutableList.of()));
messageColumn = getColumnIO(fileSchema, requestedSchema);
@@ -301,6 +291,22 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
}
}
+ public static Optional getParquetMessageType(List columns, boolean useColumnNames, MessageType fileSchema)
+ {
+ Optional message = projectSufficientColumns(columns)
+ .map(projection -> projection.get().stream()
+ .map(HiveColumnHandle.class::cast)
+ .collect(toUnmodifiableList()))
+ .orElse(columns).stream()
+ .filter(column -> column.getColumnType() == REGULAR)
+ .map(column -> getColumnType(column, fileSchema, useColumnNames))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .map(type -> new MessageType(fileSchema.getName(), type))
+ .reduce(MessageType::union);
+ return message;
+ }
+
public static Optional getParquetType(GroupType groupType, boolean useParquetColumnNames, HiveColumnHandle column)
{
if (useParquetColumnNames) {
@@ -341,7 +347,7 @@ public static Optional getColumnType(HiveColumnH
return Optional.of(new GroupType(baseType.getRepetition(), baseType.getName(), ImmutableList.of(type)));
}
- private static Optional getColumnIndexStore(
+ public static Optional getColumnIndexStore(
ParquetDataSource dataSource,
BlockMetaData blockMetadata,
Map, ColumnDescriptor> descriptorsByPath,
@@ -416,7 +422,7 @@ public static TupleDomain getParquetTupleDomain(
return TupleDomain.withColumnDomains(predicate.buildOrThrow());
}
- private static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
+ public static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
return getParquetTypeByName(column.getBaseColumnName(), messageType);
@@ -428,7 +434,7 @@ private static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle co
return null;
}
- private static List createParquetReaderColumns(List baseColumns, MessageType fileSchema, MessageColumnIO messageColumn, boolean useColumnNames)
+ public static List createParquetReaderColumns(List baseColumns, MessageType fileSchema, MessageColumnIO messageColumn, boolean useColumnNames)
{
for (HiveColumnHandle column : baseColumns) {
checkArgument(column == PARQUET_ROW_INDEX_COLUMN || column.getColumnType() == REGULAR, "column type must be REGULAR: %s", column);
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java
index ac5f232d297e..bcd5625fcc65 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java
@@ -361,6 +361,12 @@ private void closeSuper()
super.close();
}
+ @Override
+ public String getScheme()
+ {
+ return uri.getScheme();
+ }
+
@Override
public URI getUri()
{
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
index 38f11ad8f515..f43632babc68 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
@@ -17,7 +17,9 @@
import com.google.common.base.Splitter;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import io.airlift.compress.lzo.LzoCodec;
import io.airlift.compress.lzo.LzopCodec;
import io.airlift.slice.Slice;
@@ -36,6 +38,7 @@
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.CharType;
@@ -96,12 +99,14 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
+import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Lists.newArrayList;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.hdfs.ConfigurationUtils.copy;
@@ -1155,4 +1160,37 @@ public static boolean isSparkBucketedTable(Table table)
return table.getParameters().containsKey(SPARK_TABLE_PROVIDER_KEY)
&& table.getParameters().containsKey(SPARK_TABLE_BUCKET_NUMBER_KEY);
}
+
+ public static Function columnMetadataGetter(Table table)
+ {
+ ImmutableList.Builder columnNames = ImmutableList.builder();
+ table.getPartitionColumns().stream().map(Column::getName).forEach(columnNames::add);
+ table.getDataColumns().stream().map(Column::getName).forEach(columnNames::add);
+ List allColumnNames = columnNames.build();
+ if (allColumnNames.size() > Sets.newHashSet(allColumnNames).size()) {
+ throw new TrinoException(HIVE_INVALID_METADATA,
+ format("Hive metadata for table %s is invalid: Table descriptor contains duplicate columns", table.getTableName()));
+ }
+
+ List tableColumns = table.getDataColumns();
+ ImmutableMap.Builder> builder = ImmutableMap.builder();
+ for (Column field : concat(tableColumns, table.getPartitionColumns())) {
+ if (field.getComment().isPresent() && !field.getComment().get().equals("from deserializer")) {
+ builder.put(field.getName(), field.getComment());
+ }
+ else {
+ builder.put(field.getName(), Optional.empty());
+ }
+ }
+
+ Map> columnComment = builder.buildOrThrow();
+
+ return handle -> ColumnMetadata.builder()
+ .setName(handle.getName())
+ .setType(handle.getType())
+ .setComment(handle.isHidden() ? Optional.empty() : columnComment.get(handle.getName()))
+ .setExtraInfo(Optional.ofNullable(columnExtraInfo(handle.isPartitionKey())))
+ .setHidden(handle.isHidden())
+ .build();
+ }
}
diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml
new file mode 100644
index 000000000000..c3086fd5a57e
--- /dev/null
+++ b/plugin/trino-hudi/pom.xml
@@ -0,0 +1,474 @@
+
+
+ 4.0.0
+
+
+ trino-root
+ io.trino
+ 398-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-hudi
+ Trino - Hudi Connector
+ trino-plugin
+
+
+ ${project.parent.basedir}
+ 0.11.1
+
+
+
+
+ io.trino
+ trino-filesystem
+
+
+
+ io.trino
+ trino-hdfs
+
+
+
+ io.trino
+ trino-hive
+
+
+
+ io.trino
+ trino-memory-context
+
+
+
+ io.trino
+ trino-parquet
+
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+
+ io.trino.hadoop
+ hadoop-apache
+
+
+
+ io.trino.hive
+ hive-apache
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ concurrent
+
+
+
+ io.airlift
+ configuration
+
+
+
+ io.airlift
+ event
+
+
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ units
+
+
+
+ com.google.code.findbugs
+ jsr305
+ true
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+
+ javax.inject
+ javax.inject
+
+
+
+ javax.validation
+ validation-api
+
+
+
+ joda-time
+ joda-time
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${dep.hudi.version}
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.hbase
+ hbase-client
+
+
+ org.apache.orc
+ orc-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ fluent-hc
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.apache.hive
+ hive-exec
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+
+
+ org.lz4
+ lz4-java
+
+
+
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
+ ${dep.hudi.version}
+
+
+ *
+ *
+
+
+
+
+
+ org.weakref
+ jmxutils
+
+
+
+
+ io.trino
+ trino-hadoop-toolkit
+ runtime
+
+
+
+ io.airlift
+ log-manager
+ runtime
+
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+
+
+ io.trino
+ trino-hive
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-hive-hadoop2
+ test
+
+
+
+ io.trino
+ trino-main
+ test
+
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-parser
+ test
+
+
+
+ io.trino
+ trino-spi
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-testing
+ test
+
+
+
+ io.trino
+ trino-testing-containers
+ test
+
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ org.apache.hudi
+ hudi-client-common
+ ${dep.hudi.version}
+ test
+
+
+ com.beust
+ jcommander
+
+
+ commons-logging
+ commons-logging
+
+
+ log4j
+ log4j
+
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+ org.apache.curator
+ curator-framework
+
+
+ org.apache.hudi
+ hudi-common
+
+
+ org.apache.hudi
+ hudi-hive-sync
+
+
+ org.apache.hudi
+ hudi-timeline-service
+
+
+ org.apache.hive
+ hive-service
+
+
+ org.apache.parquet
+ parquet-avro
+
+
+ org.apache.curator
+ curator-client
+
+
+ org.apache.curator
+ curator-recipes
+
+
+ com.github.davidmoten
+ hilbert-curve
+
+
+ io.prometheus
+ *
+
+
+ io.dropwizard.metrics
+ *
+
+
+
+
+
+ org.apache.hudi
+ hudi-java-client
+ ${dep.hudi.version}
+ test
+
+
+ org.apache.hudi
+ *
+
+
+ org.apache.parquet
+ parquet-avro
+
+
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.jetbrains
+ annotations
+ test
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+
+
+
+ org.basepom.maven
+ duplicate-finder-maven-plugin
+
+
+
+ mime.types
+ about.html
+
+ log4j.properties
+ log4j-surefire.properties
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ --add-opens=java.base/java.lang=ALL-UNNAMED
+ --add-opens=java.base/java.util=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
+
+
+
+
+
+
+
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitManager.java
new file mode 100644
index 000000000000..648f7f31fc7c
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitManager.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import javax.inject.Qualifier;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Retention(RUNTIME)
+@Target({FIELD, PARAMETER, METHOD})
+@Qualifier
+public @interface ForHudiSplitManager {}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
new file mode 100644
index 000000000000..ea323818bdf6
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.units.DataSize;
+
+import javax.validation.constraints.DecimalMax;
+import javax.validation.constraints.DecimalMin;
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import java.util.List;
+
+import static com.google.common.base.Strings.nullToEmpty;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+import static java.util.Locale.ENGLISH;
+
+public class HudiConfig
+{
+ private static final Splitter COMMA_SPLITTER = Splitter.on(",").omitEmptyStrings().trimResults();
+
+ private List columnsToHide = ImmutableList.of();
+ private boolean metadataEnabled;
+ private boolean shouldUseParquetColumnNames = true;
+ private int minPartitionBatchSize = 10;
+ private int maxPartitionBatchSize = 100;
+ private boolean sizeBasedSplitWeightsEnabled = true;
+ private DataSize standardSplitWeightSize = DataSize.of(128, MEGABYTE);
+ private double minimumAssignedSplitWeight = 0.05;
+ private int maxSplitsPerSecond = Integer.MAX_VALUE;
+ private int maxOutstandingSplits = 1000;
+
+ public List getColumnsToHide()
+ {
+ return columnsToHide;
+ }
+
+ @Config("hudi.columns-to-hide")
+ @ConfigDescription("List of column names that will be hidden from the query output. " +
+ "It can be used to hide Hudi meta fields. By default, no fields are hidden.")
+ public HudiConfig setColumnsToHide(String columnsToHide)
+ {
+ this.columnsToHide = COMMA_SPLITTER.splitToStream(nullToEmpty(columnsToHide))
+ .map(s -> s.toLowerCase(ENGLISH))
+ .collect(toImmutableList());
+ return this;
+ }
+
+ @Config("hudi.metadata-enabled")
+ @ConfigDescription("Fetch the list of file names and sizes from metadata rather than storage.")
+ public HudiConfig setMetadataEnabled(boolean metadataEnabled)
+ {
+ this.metadataEnabled = metadataEnabled;
+ return this;
+ }
+
+ public boolean isMetadataEnabled()
+ {
+ return this.metadataEnabled;
+ }
+
+ @Config("hudi.parquet.use-column-names")
+ @ConfigDescription("Access Parquet columns using names from the file. If disabled, then columns are accessed using index."
+ + "Only applicable to Parquet file format.")
+ public HudiConfig setUseParquetColumnNames(boolean shouldUseParquetColumnNames)
+ {
+ this.shouldUseParquetColumnNames = shouldUseParquetColumnNames;
+ return this;
+ }
+
+ public boolean getUseParquetColumnNames()
+ {
+ return this.shouldUseParquetColumnNames;
+ }
+
+ @Config("hudi.min-partition-batch-size")
+ @ConfigDescription("Minimum number of partitions returned in a single batch.")
+ public HudiConfig setMinPartitionBatchSize(int minPartitionBatchSize)
+ {
+ this.minPartitionBatchSize = minPartitionBatchSize;
+ return this;
+ }
+
+ @Min(1)
+ @Max(100)
+ public int getMinPartitionBatchSize()
+ {
+ return minPartitionBatchSize;
+ }
+
+ @Config("hudi.max-partition-batch-size")
+ @ConfigDescription("Maximum number of partitions returned in a single batch.")
+ public HudiConfig setMaxPartitionBatchSize(int maxPartitionBatchSize)
+ {
+ this.maxPartitionBatchSize = maxPartitionBatchSize;
+ return this;
+ }
+
+ @Min(1)
+ @Max(1000)
+ public int getMaxPartitionBatchSize()
+ {
+ return maxPartitionBatchSize;
+ }
+
+ @Config("hudi.size-based-split-weights-enabled")
+ @ConfigDescription("Unlike uniform splitting, size-based splitting ensures that each batch of splits has enough data to process. " +
+ "By default, it is enabled to improve performance.")
+ public HudiConfig setSizeBasedSplitWeightsEnabled(boolean sizeBasedSplitWeightsEnabled)
+ {
+ this.sizeBasedSplitWeightsEnabled = sizeBasedSplitWeightsEnabled;
+ return this;
+ }
+
+ public boolean isSizeBasedSplitWeightsEnabled()
+ {
+ return sizeBasedSplitWeightsEnabled;
+ }
+
+ @Config("hudi.standard-split-weight-size")
+ @ConfigDescription("The split size corresponding to the standard weight (1.0) "
+ + "when size based split weights are enabled.")
+ public HudiConfig setStandardSplitWeightSize(DataSize standardSplitWeightSize)
+ {
+ this.standardSplitWeightSize = standardSplitWeightSize;
+ return this;
+ }
+
+ @NotNull
+ public DataSize getStandardSplitWeightSize()
+ {
+ return standardSplitWeightSize;
+ }
+
+ @Config("hudi.minimum-assigned-split-weight")
+ @ConfigDescription("Minimum weight that a split can be assigned when size based split weights are enabled.")
+ public HudiConfig setMinimumAssignedSplitWeight(double minimumAssignedSplitWeight)
+ {
+ this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
+ return this;
+ }
+
+ @DecimalMax("1")
+ @DecimalMin(value = "0", inclusive = false)
+ public double getMinimumAssignedSplitWeight()
+ {
+ return minimumAssignedSplitWeight;
+ }
+
+ @Min(1)
+ public int getMaxSplitsPerSecond()
+ {
+ return maxSplitsPerSecond;
+ }
+
+ @Config("hudi.max-splits-per-second")
+ @ConfigDescription("Rate at which splits are enqueued for processing. The queue will throttle if this rate limit is breached.")
+ public HudiConfig setMaxSplitsPerSecond(int maxSplitsPerSecond)
+ {
+ this.maxSplitsPerSecond = maxSplitsPerSecond;
+ return this;
+ }
+
+ @Min(1)
+ public int getMaxOutstandingSplits()
+ {
+ return maxOutstandingSplits;
+ }
+
+ @Config("hudi.max-outstanding-splits")
+ @ConfigDescription("Maximum outstanding splits in a batch enqueued for processing.")
+ public HudiConfig setMaxOutstandingSplits(int maxOutstandingSplits)
+ {
+ this.maxOutstandingSplits = maxOutstandingSplits;
+ return this;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java
new file mode 100644
index 000000000000..a4da3a1eae4f
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import io.trino.plugin.base.session.SessionPropertiesProvider;
+import io.trino.plugin.hive.HiveTransactionHandle;
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPageSourceProvider;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.SystemTable;
+import io.trino.spi.session.PropertyMetadata;
+import io.trino.spi.transaction.IsolationLevel;
+
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.spi.transaction.IsolationLevel.SERIALIZABLE;
+import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class HudiConnector
+ implements Connector
+{
+ private final LifeCycleManager lifeCycleManager;
+ private final HudiTransactionManager transactionManager;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorPageSourceProvider pageSourceProvider;
+ private final ConnectorNodePartitioningProvider nodePartitioningProvider;
+ private final Set systemTables;
+ private final List> sessionProperties;
+ private final List> tableProperties;
+
+ public HudiConnector(
+ LifeCycleManager lifeCycleManager,
+ HudiTransactionManager transactionManager,
+ ConnectorSplitManager splitManager,
+ ConnectorPageSourceProvider pageSourceProvider,
+ ConnectorNodePartitioningProvider nodePartitioningProvider,
+ Set systemTables,
+ Set sessionPropertiesProviders,
+ List> tableProperties)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+ this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
+ this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
+ this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
+ .flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
+ .collect(toImmutableList());
+ this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null"));
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
+ {
+ ConnectorMetadata metadata = transactionManager.get(transactionHandle, session.getIdentity());
+ return new ClassLoaderSafeConnectorMetadata(metadata, getClass().getClassLoader());
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+
+ @Override
+ public ConnectorNodePartitioningProvider getNodePartitioningProvider()
+ {
+ return nodePartitioningProvider;
+ }
+
+ @Override
+ public Set getSystemTables()
+ {
+ return systemTables;
+ }
+
+ @Override
+ public List> getSessionProperties()
+ {
+ return sessionProperties;
+ }
+
+ @Override
+ public List> getTableProperties()
+ {
+ return tableProperties;
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
+ {
+ checkConnectorSupports(SERIALIZABLE, isolationLevel);
+ ConnectorTransactionHandle transaction = new HiveTransactionHandle(true);
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
+ transactionManager.put(transaction);
+ }
+ return transaction;
+ }
+
+ @Override
+ public void commit(ConnectorTransactionHandle transaction)
+ {
+ transactionManager.commit(transaction);
+ }
+
+ @Override
+ public void rollback(ConnectorTransactionHandle transaction)
+ {
+ transactionManager.rollback(transaction);
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ lifeCycleManager.stop();
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
new file mode 100644
index 000000000000..71133de3f492
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static io.trino.plugin.base.Versions.checkSpiVersion;
+
+public class HudiConnectorFactory
+ implements ConnectorFactory
+{
+ public HudiConnectorFactory()
+ {}
+
+ @Override
+ public String getName()
+ {
+ return "hudi";
+ }
+
+ @Override
+ public Connector create(String catalogName, Map config, ConnectorContext context)
+ {
+ checkSpiVersion(context, this);
+
+ ClassLoader classLoader = context.duplicatePluginClassLoader();
+ try {
+ return (Connector) classLoader.loadClass(InternalHudiConnectorFactory.class.getName())
+ .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class)
+ .invoke(null, catalogName, config, context, Optional.empty());
+ }
+ catch (InvocationTargetException e) {
+ Throwable targetException = e.getTargetException();
+ throwIfUnchecked(targetException);
+ throw new RuntimeException(targetException);
+ }
+ catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
new file mode 100644
index 000000000000..4d5686823665
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import io.trino.spi.ErrorCode;
+import io.trino.spi.ErrorCodeSupplier;
+import io.trino.spi.ErrorType;
+
+import static io.trino.spi.ErrorType.EXTERNAL;
+
+public enum HudiErrorCode
+ implements ErrorCodeSupplier
+{
+ HUDI_UNKNOWN_TABLE_TYPE(0, EXTERNAL),
+ HUDI_INVALID_PARTITION_VALUE(1, EXTERNAL),
+ HUDI_BAD_DATA(2, EXTERNAL),
+ HUDI_MISSING_DATA(3, EXTERNAL),
+ HUDI_CANNOT_OPEN_SPLIT(4, EXTERNAL),
+ HUDI_UNSUPPORTED_FILE_FORMAT(5, EXTERNAL),
+ HUDI_CURSOR_ERROR(6, EXTERNAL);
+
+ private final ErrorCode errorCode;
+
+ HudiErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0507_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java
new file mode 100644
index 000000000000..9af9d7a92906
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class HudiInputInfo
+{
+ private final List partitionIds;
+
+ @JsonCreator
+ public HudiInputInfo(@JsonProperty("partitionIds") List partitionIds)
+ {
+ this.partitionIds = ImmutableList.copyOf(requireNonNull(partitionIds, "partitionIds is null"));
+ }
+
+ @JsonProperty
+ public List getPartitionIds()
+ {
+ return partitionIds;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
new file mode 100644
index 000000000000..7e191e7e0624
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.airlift.log.Logger;
+import io.trino.hdfs.HdfsContext;
+import io.trino.hdfs.HdfsEnvironment;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.metastore.Column;
+import io.trino.plugin.hive.metastore.HiveMetastore;
+import io.trino.plugin.hive.metastore.Table;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.TableColumnsMetadata;
+import io.trino.spi.connector.TableNotFoundException;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.TypeManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieTableType;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
+import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
+import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
+import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
+import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
+import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNKNOWN_TABLE_TYPE;
+import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide;
+import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
+import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
+import static io.trino.spi.connector.SchemaTableName.schemaTableName;
+import static java.lang.String.format;
+import static java.util.Collections.singletonList;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static org.apache.hudi.common.fs.FSUtils.getFs;
+import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.exception.TableNotFoundException.checkTableValidity;
+
+public class HudiMetadata
+ implements ConnectorMetadata
+{
+ public static final Logger log = Logger.get(HudiMetadata.class);
+
+ private final HiveMetastore metastore;
+ private final HdfsEnvironment hdfsEnvironment;
+ private final TypeManager typeManager;
+
+ public HudiMetadata(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager)
+ {
+ this.metastore = requireNonNull(metastore, "metastore is null");
+ this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return metastore.getAllDatabases();
+ }
+
+ @Override
+ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
+ {
+ if (isHiveSystemSchema(tableName.getSchemaName())) {
+ return null;
+ }
+ Optional table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName());
+ if (table.isEmpty()) {
+ return null;
+ }
+ if (!isHudiTable(session, table.get())) {
+ throw new TrinoException(HUDI_UNKNOWN_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
+ }
+ return new HudiTableHandle(
+ tableName.getSchemaName(),
+ tableName.getTableName(),
+ table.get().getStorage().getLocation(),
+ HoodieTableType.COPY_ON_WRITE,
+ TupleDomain.all(),
+ TupleDomain.all());
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
+ {
+ HudiTableHandle hudiTableHandle = (HudiTableHandle) table;
+ return getTableMetadata(hudiTableHandle.getSchemaTableName(), getColumnsToHide(session));
+ }
+
+ @Override
+ public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint)
+ {
+ HudiTableHandle handle = (HudiTableHandle) tableHandle;
+ HudiPredicates predicates = HudiPredicates.from(constraint.getSummary());
+ HudiTableHandle newHudiTableHandle = handle.applyPredicates(
+ predicates.getPartitionColumnPredicates(),
+ predicates.getRegularColumnPredicates());
+
+ if (handle.getPartitionPredicates().equals(newHudiTableHandle.getPartitionPredicates())
+ && handle.getRegularPredicates().equals(newHudiTableHandle.getRegularPredicates())) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new ConstraintApplicationResult<>(
+ newHudiTableHandle,
+ newHudiTableHandle.getRegularPredicates().transformKeys(ColumnHandle.class::cast),
+ false));
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ HudiTableHandle hudiTableHandle = (HudiTableHandle) tableHandle;
+ Table table = metastore.getTable(hudiTableHandle.getSchemaName(), hudiTableHandle.getTableName())
+ .orElseThrow(() -> new TableNotFoundException(schemaTableName(hudiTableHandle.getSchemaName(), hudiTableHandle.getTableName())));
+ return hiveColumnHandles(table, typeManager, NANOSECONDS).stream()
+ .collect(toImmutableMap(HiveColumnHandle::getName, identity()));
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ return ((HiveColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public Optional